In [6]:
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName("pyspark_window").getOrCreate()

In [17]:
# sampleData = (("Ram", 28, "Sales", 3000),
#               ("Meena", 33, "Sales", 4600),
#               ("Robin", 40, "Sales", 4100),
#               ("Kunal", 25, "Finance", 3000),
#               ("Ram", 28, "Sales", 3000),
#               ("Srishti", 46, "Management", 3300),
#               ("Jeny", 26, "Finance", 3900),
#               ("Hitesh", 30, "Marketing", 3000),
#               ("Kailash", 29, "Marketing", 2000),
#               ("Sharad", 39, "Sales", 4100)
#               )

# columns = ["Employee_Name", "Age",
#            "Department", "Salary"]

# df = spark.createDataFrame(data=sampleData,
#                            schema=columns)

df = spark.read.csv('test4.csv',header=True,inferSchema=True)

In [18]:
df.printSchema()
df.show()

root
 |-- Employee_Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)

+-------------+---+----------+------+
|Employee_Name|Age|Department|Salary|
+-------------+---+----------+------+
|          Ram| 28|     Sales|  3000|
|        Meena| 33|     Sales|  4600|
|        Robin| 40|     Sales|  4100|
|        Kunal| 25|   Finance|  3000|
|          Ram| 28|     Sales|  3000|
|      Srishti| 46|Management|  3300|
|         Jeny| 26|   Finance|  3900|
|       Hitesh| 30| Marketing|  3000|
|      Kailash| 29| Marketing|  2000|
|       Sharad| 39|     Sales|  4100|
+-------------+---+----------+------+



In [19]:
# creating a window
# partition of dataframe
windowPartition = Window.partitionBy("Department").orderBy("Age")

In [20]:
from pyspark.sql.functions import cume_dist
# applying window function with
# cume_dist() window function is used to get the cumulative distribution within a window partition. 
df.withColumn("cume_dist",
              cume_dist().over(windowPartition)).show()

+-------------+---+----------+------+---------+
|Employee_Name|Age|Department|Salary|cume_dist|
+-------------+---+----------+------+---------+
|        Kunal| 25|   Finance|  3000|      0.5|
|         Jeny| 26|   Finance|  3900|      1.0|
|      Srishti| 46|Management|  3300|      1.0|
|      Kailash| 29| Marketing|  2000|      0.5|
|       Hitesh| 30| Marketing|  3000|      1.0|
|          Ram| 28|     Sales|  3000|      0.4|
|          Ram| 28|     Sales|  3000|      0.4|
|        Meena| 33|     Sales|  4600|      0.6|
|       Sharad| 39|     Sales|  4100|      0.8|
|        Robin| 40|     Sales|  4100|      1.0|
+-------------+---+----------+------+---------+



In [21]:
from pyspark.sql.functions import lag,when,isnull
# A lag() function is used to access previous rows’ data as per the defined offset value in the function. 

df.withColumn("Lag", lag("Salary", 2).over(windowPartition)).show()

+-------------+---+----------+------+----+
|Employee_Name|Age|Department|Salary| Lag|
+-------------+---+----------+------+----+
|        Kunal| 25|   Finance|  3000|null|
|         Jeny| 26|   Finance|  3900|null|
|      Srishti| 46|Management|  3300|null|
|      Kailash| 29| Marketing|  2000|null|
|       Hitesh| 30| Marketing|  3000|null|
|          Ram| 28|     Sales|  3000|null|
|          Ram| 28|     Sales|  3000|null|
|        Meena| 33|     Sales|  4600|3000|
|       Sharad| 39|     Sales|  4100|3000|
|        Robin| 40|     Sales|  4100|4600|
+-------------+---+----------+------+----+



In [22]:
windowOrder = Window.orderBy("Age")
df_op = df.withColumn("Lag", lag("Salary", 1).over(windowOrder))
df_op.show()

+-------------+---+----------+------+----+
|Employee_Name|Age|Department|Salary| Lag|
+-------------+---+----------+------+----+
|        Kunal| 25|   Finance|  3000|null|
|         Jeny| 26|   Finance|  3900|3000|
|          Ram| 28|     Sales|  3000|3900|
|          Ram| 28|     Sales|  3000|3000|
|      Kailash| 29| Marketing|  2000|3000|
|       Hitesh| 30| Marketing|  3000|2000|
|        Meena| 33|     Sales|  4600|3000|
|       Sharad| 39|     Sales|  4100|4600|
|        Robin| 40|     Sales|  4100|4100|
|      Srishti| 46|Management|  3300|4100|
+-------------+---+----------+------+----+



In [23]:
df_op.withColumn("diff",df_op.Salary-df_op.Lag).show()

+-------------+---+----------+------+----+-----+
|Employee_Name|Age|Department|Salary| Lag| diff|
+-------------+---+----------+------+----+-----+
|        Kunal| 25|   Finance|  3000|null| null|
|         Jeny| 26|   Finance|  3900|3000|  900|
|          Ram| 28|     Sales|  3000|3900| -900|
|          Ram| 28|     Sales|  3000|3000|    0|
|      Kailash| 29| Marketing|  2000|3000|-1000|
|       Hitesh| 30| Marketing|  3000|2000| 1000|
|        Meena| 33|     Sales|  4600|3000| 1600|
|       Sharad| 39|     Sales|  4100|4600| -500|
|        Robin| 40|     Sales|  4100|4100|    0|
|      Srishti| 46|Management|  3300|4100| -800|
+-------------+---+----------+------+----+-----+



In [24]:
wind = Window.orderBy("Age")
# df_ =  df.withColumn("lag_sal", lag("Salary", 1).over(wind))
df_ = df.withColumn("lag_sal", lag("Salary", 1).over(wind)).na.fill(0, ['lag_sal'])

df_ = df_.withColumn("diff", df_.Salary - df_.lag_sal)
df_.show()

+-------------+---+----------+------+-------+-----+
|Employee_Name|Age|Department|Salary|lag_sal| diff|
+-------------+---+----------+------+-------+-----+
|        Kunal| 25|   Finance|  3000|      0| 3000|
|         Jeny| 26|   Finance|  3900|   3000|  900|
|          Ram| 28|     Sales|  3000|   3900| -900|
|          Ram| 28|     Sales|  3000|   3000|    0|
|      Kailash| 29| Marketing|  2000|   3000|-1000|
|       Hitesh| 30| Marketing|  3000|   2000| 1000|
|        Meena| 33|     Sales|  4600|   3000| 1600|
|       Sharad| 39|     Sales|  4100|   4600| -500|
|        Robin| 40|     Sales|  4100|   4100|    0|
|      Srishti| 46|Management|  3300|   4100| -800|
+-------------+---+----------+------+-------+-----+



### Ranking Function

In [25]:
df2 = spark.read.csv('test5.csv',header=True,inferSchema=True)

In [26]:
df2.printSchema()
df2.show()

root
 |-- Roll_No: integer (nullable = true)
 |-- Student_Name: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Marks: integer (nullable = true)

+-------+------------+--------------+-----+
|Roll_No|Student_Name|       Subject|Marks|
+-------+------------+--------------+-----+
|    101|         Ram|       Biology|   80|
|    103|       Meena|Social Science|   78|
|    104|       Robin|      Sanskrit|   58|
|    102|       Kunal|       Phisycs|   89|
|    101|         Ram|       Biology|   80|
|    106|     Srishti|         Maths|   70|
|    108|        Jeny|       Physics|   75|
|    107|      Hitesh|         Maths|   70|
|    109|     Kailash|         Maths|   90|
|    105|      Sharad|Social Science|   84|
+-------+------------+--------------+-----+



In [27]:
# creating a window partition of dataframe
windowPartition2 = Window.partitionBy("Subject").orderBy("Marks")
# windowPartition2 = Window.partitionBy("Subject").orderBy("Marks",ascending=False)

In [28]:
from pyspark.sql.functions import row_number
 
# row_number() function is used to gives a sequential number to each row present in the table.
df2.withColumn("row_number",
               row_number().over(windowPartition2)).show()

+-------+------------+--------------+-----+----------+
|Roll_No|Student_Name|       Subject|Marks|row_number|
+-------+------------+--------------+-----+----------+
|    101|         Ram|       Biology|   80|         1|
|    101|         Ram|       Biology|   80|         2|
|    106|     Srishti|         Maths|   70|         1|
|    107|      Hitesh|         Maths|   70|         2|
|    109|     Kailash|         Maths|   90|         3|
|    102|       Kunal|       Phisycs|   89|         1|
|    108|        Jeny|       Physics|   75|         1|
|    104|       Robin|      Sanskrit|   58|         1|
|    103|       Meena|Social Science|   78|         1|
|    105|      Sharad|Social Science|   84|         2|
+-------+------------+--------------+-----+----------+



In [29]:
from pyspark.sql.functions import rank

# The rank function is used to give ranks to rows specified in the window partition.
# This function leaves gaps in rank if there are ties
df2.withColumn("rank", rank().over(windowPartition2)).show()

+-------+------------+--------------+-----+----+
|Roll_No|Student_Name|       Subject|Marks|rank|
+-------+------------+--------------+-----+----+
|    101|         Ram|       Biology|   80|   1|
|    101|         Ram|       Biology|   80|   1|
|    106|     Srishti|         Maths|   70|   1|
|    107|      Hitesh|         Maths|   70|   1|
|    109|     Kailash|         Maths|   90|   3|
|    102|       Kunal|       Phisycs|   89|   1|
|    108|        Jeny|       Physics|   75|   1|
|    104|       Robin|      Sanskrit|   58|   1|
|    103|       Meena|Social Science|   78|   1|
|    105|      Sharad|Social Science|   84|   2|
+-------+------------+--------------+-----+----+



In [30]:
from pyspark.sql.functions import dense_rank
 
# This function is used to get the rank of each row in the form of row numbers. 
# This is similar to rank() function, 
# there is only one difference the rank function leaves gaps in rank when there are ties.
df2.withColumn("dense_rank",
               dense_rank().over(windowPartition2)).show()

+-------+------------+--------------+-----+----------+
|Roll_No|Student_Name|       Subject|Marks|dense_rank|
+-------+------------+--------------+-----+----------+
|    101|         Ram|       Biology|   80|         1|
|    101|         Ram|       Biology|   80|         1|
|    106|     Srishti|         Maths|   70|         1|
|    107|      Hitesh|         Maths|   70|         1|
|    109|     Kailash|         Maths|   90|         2|
|    102|       Kunal|       Phisycs|   89|         1|
|    108|        Jeny|       Physics|   75|         1|
|    104|       Robin|      Sanskrit|   58|         1|
|    103|       Meena|Social Science|   78|         1|
|    105|      Sharad|Social Science|   84|         2|
+-------+------------+--------------+-----+----------+



In [31]:
from pyspark.sql.functions import percent_rank
 
# This function is similar to rank() function. It also provides rank to rows but in a percentile format
df2.withColumn("percent_rank",
               percent_rank().over(windowPartition2)).show()

+-------+------------+--------------+-----+------------+
|Roll_No|Student_Name|       Subject|Marks|percent_rank|
+-------+------------+--------------+-----+------------+
|    101|         Ram|       Biology|   80|         0.0|
|    101|         Ram|       Biology|   80|         0.0|
|    106|     Srishti|         Maths|   70|         0.0|
|    107|      Hitesh|         Maths|   70|         0.0|
|    109|     Kailash|         Maths|   90|         1.0|
|    102|       Kunal|       Phisycs|   89|         0.0|
|    108|        Jeny|       Physics|   75|         0.0|
|    104|       Robin|      Sanskrit|   58|         0.0|
|    103|       Meena|Social Science|   78|         0.0|
|    105|      Sharad|Social Science|   84|         1.0|
+-------+------------+--------------+-----+------------+



In [39]:
from pyspark.sql.functions import col,avg,sum,min,max,row_number
 
# creating a window partition of dataframe
windowPartitionAgg  = Window.partitionBy("Department")
 
# applying window aggregate function
# to df3 with the help of withColumn
 
# this is average()
df.withColumn("Avg",avg(col("salary")).over(windowPartitionAgg))\
.withColumn("Sum",sum(col("salary")).over(windowPartitionAgg))\
.withColumn("Min",min(col("salary")).over(windowPartitionAgg))\
.withColumn("Max",max(col("salary")).over(windowPartitionAgg))\
.show()

+-------------+---+----------+------+------+-----+----+----+
|Employee_Name|Age|Department|Salary|   Avg|  Sum| Min| Max|
+-------------+---+----------+------+------+-----+----+----+
|        Kunal| 25|   Finance|  3000|3450.0| 6900|3000|3900|
|         Jeny| 26|   Finance|  3900|3450.0| 6900|3000|3900|
|      Srishti| 46|Management|  3300|3300.0| 3300|3300|3300|
|       Hitesh| 30| Marketing|  3000|2500.0| 5000|2000|3000|
|      Kailash| 29| Marketing|  2000|2500.0| 5000|2000|3000|
|          Ram| 28|     Sales|  3000|3760.0|18800|3000|4600|
|        Meena| 33|     Sales|  4600|3760.0|18800|3000|4600|
|        Robin| 40|     Sales|  4100|3760.0|18800|3000|4600|
|          Ram| 28|     Sales|  3000|3760.0|18800|3000|4600|
|       Sharad| 39|     Sales|  4100|3760.0|18800|3000|4600|
+-------------+---+----------+------+------+-----+----+----+



In [6]:
# import pandas as pd
# data1 = {'date': {0: '2018-04-03', 1: '2018-04-04', 2: '2018-04-05', 3: '2018-04-06', 4: '2018-04-07'},
#          'id': {0: 'id1', 1: 'id2', 2: 'id1', 3: 'id3', 4: 'id2'},
#          'group': {0: '1', 1: '1', 2: '1', 3: '2', 4: '1'},
#          'amount': {0: 50, 1: 40, 2: 50, 3: 55, 4: 20}}
# df1_pd = pd.DataFrame(data1, columns=data1.keys())
# df1_pd.to_csv('test6.csv')

# df1 = spark.createDataFrame(df1_pd)
# df1.show()

In [14]:
df3 = spark.read.csv("test6.csv",header=True,inferSchema=True)
df3.show()

+---+----------+---+-----+------+
|ID0|      date|id2|group|amount|
+---+----------+---+-----+------+
|  0|03-04-2018|id1|    1|    50|
|  1|04-04-2018|id2|    1|    40|
|  2|05-04-2018|id1|    1|    50|
|  3|06-04-2018|id3|    2|    55|
|  4|07-04-2018|id2|    1|    20|
+---+----------+---+-----+------+

