In [97]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [98]:
base_order_df = spark.read.format("csv").option("header","true").option("inferSchema","true").option("samplingRatio",.1).load("/public/trendytech/datasets/windowdata.csv")

In [3]:
base_order_df.show()

+--------------+-------+-----------+-------------+------------+
|       country|weeknum|numinvoices|totalquantity|invoicevalue|
+--------------+-------+-----------+-------------+------------+
|         Spain|     49|          1|           67|      174.72|
|       Germany|     48|         11|         1795|     3309.75|
|     Lithuania|     48|          3|          622|     1598.06|
|       Germany|     49|         12|         1852|     4521.39|
|       Bahrain|     51|          1|           54|      205.74|
|       Iceland|     49|          1|          319|      711.79|
|         India|     51|          5|           95|      276.84|
|     Australia|     50|          2|          133|      387.95|
|         Italy|     49|          1|           -2|       -17.0|
|         India|     49|          5|         1280|      3284.1|
|         Spain|     50|          2|          400|     1049.01|
|United Kingdom|     51|        200|        28782|    75103.46|
|        Norway|     49|          1|    

In [17]:
base_order_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- weeknum: integer (nullable = true)
 |-- numinvoices: integer (nullable = true)
 |-- totalquantity: integer (nullable = true)
 |-- invoicevalue: double (nullable = true)



Grouping Aggregation - Find Total invoice value contrywise

In [99]:
from pyspark.sql import *
count_invoice = base_order_df.groupBy("country").agg(sum(expr("invoicevalue * totalquantity")).alias("Countryinvoicevalue"))

In [100]:
count_invoice.show()

+---------------+--------------------+
|        country| Countryinvoicevalue|
+---------------+--------------------+
|         Sweden|   9828358.200000001|
|        Germany|2.6146917930000003E7|
|         France|1.5800077040000001E7|
|        Belgium|           1150919.7|
|        Finland|           1119571.2|
|          India|1.5860418379999999E7|
|          Italy|            120457.9|
|      Lithuania|           995883.32|
|         Norway|          6785852.68|
|          Spain|           679310.24|
|        Denmark|            581801.0|
|        Iceland|  227061.00999999998|
|         Israel|            12736.64|
|Channel Islands|  29082.399999999998|
|         Cyprus|          1458781.94|
|    Switzerland|           638292.08|
|          Japan|2.8842041709999997E7|
|         Poland|             34742.4|
|       Portugal|  1409959.5200000003|
|      Australia|            145334.7|
+---------------+--------------------+
only showing top 20 rows



Window Function - Running Total

In [18]:
mywin = Window.partitionBy("country").orderBy("weeknum").rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [19]:
running_total = base_order_df.withColumn("RunningTotal",sum("invoicevalue").over(mywin))

In [20]:
running_total.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|      RunningTotal|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|     3309.75|           3309.75|
|Germany|     49|         12|         1852|     4521.39|           7831.14|
|Germany|     50|         15|         1973|     5065.79|          12896.93|
|Germany|     51|          5|         1103|     1665.91|          14562.84|
| France|     48|          4|         1299|     2808.16|           2808.16|
| France|     49|          9|         2303|     4527.01|           7335.17|
| France|     50|          6|          529|      537.32|           7872.49|
| France|     51|          5|          847|     1702.87|           9575.36|
|Belgium|     48|          1|          528|       346.1|             346.1|
|Belgium|   

Window Function - Rank the weeknum in each country invoice value wise(Jump rank if duplicate ranks present)

In [28]:
mywin2 = Window.partitionBy("country").orderBy(column("invoicevalue").desc()).rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [29]:
rank_inv_df = base_order_df.withColumn("Rank",rank().over(mywin2))

In [30]:
rank_inv_df.show()

+-------+-------+-----------+-------------+------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|Rank|
+-------+-------+-----------+-------------+------------+----+
| Sweden|     50|          3|         3714|      2646.3|   1|
|Germany|     50|         15|         1973|     5065.79|   1|
|Germany|     49|         12|         1852|     4521.39|   2|
|Germany|     48|         11|         1795|     3309.75|   3|
|Germany|     51|          5|         1103|     1665.91|   4|
| France|     49|          9|         2303|     4527.01|   1|
| France|     48|          4|         1299|     2808.16|   2|
| France|     51|          5|          847|     1702.87|   3|
| France|     50|          6|          529|      537.32|   4|
|Belgium|     51|          2|          942|      838.65|   1|
|Belgium|     50|          2|          285|      625.16|   2|
|Belgium|     48|          1|          528|       346.1|   3|
|Finland|     50|          1|         1254|       892.8|   1|
|  India

Window Function - Dense Rank()

In [42]:
mylst = [('Akash',99),('Vinay',99),('Ashish',98),('Vikram',95)]

In [43]:
base_stud_df = spark.createDataFrame(mylst,schema=["Name","Marks"])

In [44]:
base_stud_df.show()

+------+-----+
|  Name|Marks|
+------+-----+
| Akash|   99|
| Vinay|   99|
|Ashish|   98|
|Vikram|   95|
+------+-----+



In [45]:
base_stud_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Marks: long (nullable = true)



mywin3 = Window.orderBy(column("Marks").desc())

In [51]:
base_stud_df.withColumn("Rank",dense_rank().over(mywin3)).show()

+------+-----+----+
|  Name|Marks|Rank|
+------+-----+----+
| Akash|   99|   1|
| Vinay|   99|   1|
|Ashish|   98|   2|
|Vikram|   95|   3|
+------+-----+----+



In [53]:
mywin4 = Window.orderBy(column("Name"))

In [54]:
stud_df = base_stud_df.withColumn("Roll No",row_number().over(mywin4))

In [55]:
stud_df.show()

+------+-----+-------+
|  Name|Marks|Roll No|
+------+-----+-------+
| Akash|   99|      1|
|Ashish|   98|      2|
|Vikram|   95|      3|
| Vinay|   99|      4|
+------+-----+-------+



Lead - Lag 

In [75]:
mylst2 = [("Iphone12",1,400,"2018-01-01"),
         ("Iphone12",2,450,"2019-01-01"),
         ("Iphone13",1,700,"2020-01-01"),
         ("Iphone13",2,710,"2021-01-01"),
         ("Iphone14",1,800,"2022-01-01"),
         ("Iphone15",1,1200,"2023-01-01"),]

In [76]:
base_iphone_df = spark.createDataFrame(mylst2,schema = ["Iphone","UpgradeVersion","Price","UpdateLaunchDate"])

In [77]:
base_iphone_df.show()

+--------+--------------+-----+----------------+
|  Iphone|UpgradeVersion|Price|UpdateLaunchDate|
+--------+--------------+-----+----------------+
|Iphone12|             1|  400|      2018-01-01|
|Iphone12|             2|  450|      2019-01-01|
|Iphone13|             1|  700|      2020-01-01|
|Iphone13|             2|  710|      2021-01-01|
|Iphone14|             1|  800|      2022-01-01|
|Iphone15|             1| 1200|      2023-01-01|
+--------+--------------+-----+----------------+



Add a new column in dataframe VersionEndDate. validity of current version will end when next version Update date is launched

In [78]:
mywin5 = Window.partitionBy("Iphone").orderBy("UpgradeVersion")

In [79]:
final_iphone_df = base_iphone_df.withColumn("VersionEndDate",lead("UpdateLaunchDate").over(mywin5))

In [80]:
final_iphone_df.show()

+--------+--------------+-----+----------------+--------------+
|  Iphone|UpgradeVersion|Price|UpdateLaunchDate|VersionEndDate|
+--------+--------------+-----+----------------+--------------+
|Iphone12|             1|  400|      2018-01-01|    2019-01-01|
|Iphone12|             2|  450|      2019-01-01|          null|
|Iphone13|             1|  700|      2020-01-01|    2021-01-01|
|Iphone13|             2|  710|      2021-01-01|          null|
|Iphone14|             1|  800|      2022-01-01|          null|
|Iphone15|             1| 1200|      2023-01-01|          null|
+--------+--------------+-----+----------------+--------------+



In [81]:
mywin6 = Window.partitionBy("Iphone").orderBy("UpgradeVersion")

In [82]:
final_iphone_df1 = base_iphone_df.withColumn("PreviosPrice",lag("Price").over(mywin6))

In [83]:
final_iphone_df1.show()

+--------+--------------+-----+----------------+------------+
|  Iphone|UpgradeVersion|Price|UpdateLaunchDate|PreviosPrice|
+--------+--------------+-----+----------------+------------+
|Iphone12|             1|  400|      2018-01-01|        null|
|Iphone12|             2|  450|      2019-01-01|         400|
|Iphone13|             1|  700|      2020-01-01|        null|
|Iphone13|             2|  710|      2021-01-01|         700|
|Iphone14|             1|  800|      2022-01-01|        null|
|Iphone15|             1| 1200|      2023-01-01|        null|
+--------+--------------+-----+----------------+------------+



In [84]:
mylst3 = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

In [85]:
base_fruits_df = spark.createDataFrame(mylst3,schema = ["Fruit","Price","Country"])

In [86]:
base_fruits_df.show()

+-------+-----+-------+
|  Fruit|Price|Country|
+-------+-----+-------+
| Banana| 1000|    USA|
|Carrots| 1500|    USA|
|  Beans| 1600|    USA|
| Orange| 2000|    USA|
| Orange| 2000|    USA|
| Banana|  400|  China|
|Carrots| 1200|  China|
|  Beans| 1500|  China|
| Orange| 4000|  China|
| Banana| 2000| Canada|
|Carrots| 2000| Canada|
|  Beans| 2000| Mexico|
+-------+-----+-------+



In [94]:
pivot_fruits_df = base_fruits_df.groupBy("Fruit").pivot("Country").sum("Price")

In [95]:
pivot_fruits_df.show()

+-------+------+-----+------+----+
|  Fruit|Canada|China|Mexico| USA|
+-------+------+-----+------+----+
| Orange|  null| 4000|  null|4000|
|  Beans|  null| 1500|  2000|1600|
| Banana|  2000|  400|  null|1000|
|Carrots|  2000| 1200|  null|1500|
+-------+------+-----+------+----+



In [96]:
spark.stop()