## Three types of window functions:
1) Analytical - lead(),lag(),cume_dist()
2) Aggregate - average, sum, min, max
3) Ranking - row_number(), rank(),dense_rank()


## Analytical FUNCTION


In [2]:
from pyspark.sql.window import Window
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.host", "localhost").appName("SparkByExamples.com").getOrCreate()
conf = pyspark.SparkConf()
spark_context = SparkSession.builder.config(conf=conf).getOrCreate()


In [3]:
sampleData = (("Nitya", 28, "Sales", 3000),
("Abhishek", 33, "Sales", 4600),
("Sandeep", 40, "Sales", 4100),
("Rakesh", 25, "Finance", 3000),
("Ram", 28, "Sales", 3000),
("Srishti", 46, "Management", 3300),
("Arbind", 26, "Finance", 3900),
("Hitesh", 30, "Marketing", 3000),
("Kailash", 29, "Marketing", 2000),
("Sushma", 39, "Sales", 4100)
)
columns = ["Employee_name","Age","Department","Salary"]

In [4]:
df = spark.createDataFrame(data=sampleData, schema=columns)
windowpartition = Window.partitionBy("Department").orderBy("Age")
df.printSchema()

root
 |-- Employee_name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)



In [5]:
df.show(truncate=False)

+-------------+---+----------+------+
|Employee_name|Age|Department|Salary|
+-------------+---+----------+------+
|Nitya        |28 |Sales     |3000  |
|Abhishek     |33 |Sales     |4600  |
|Sandeep      |40 |Sales     |4100  |
|Rakesh       |25 |Finance   |3000  |
|Ram          |28 |Sales     |3000  |
|Srishti      |46 |Management|3300  |
|Arbind       |26 |Finance   |3900  |
|Hitesh       |30 |Marketing |3000  |
|Kailash      |29 |Marketing |2000  |
|Sushma       |39 |Sales     |4100  |
+-------------+---+----------+------+



In [6]:
display(df)

DataFrame[Employee_name: string, Age: bigint, Department: string, Salary: bigint]

cume_dist()

In [7]:
from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist",cume_dist().over(windowpartition)).show()

+-------------+---+----------+------+---------+
|Employee_name|Age|Department|Salary|cume_dist|
+-------------+---+----------+------+---------+
|       Rakesh| 25|   Finance|  3000|      0.5|
|       Arbind| 26|   Finance|  3900|      1.0|
|      Srishti| 46|Management|  3300|      1.0|
|      Kailash| 29| Marketing|  2000|      0.5|
|       Hitesh| 30| Marketing|  3000|      1.0|
|        Nitya| 28|     Sales|  3000|      0.4|
|          Ram| 28|     Sales|  3000|      0.4|
|     Abhishek| 33|     Sales|  4600|      0.6|
|       Sushma| 39|     Sales|  4100|      0.8|
|      Sandeep| 40|     Sales|  4100|      1.0|
+-------------+---+----------+------+---------+



lag

In [8]:
from pyspark.sql.functions import  lag
df.withColumn("Lag",lag("Salary",2).over(windowpartition)).show()



+-------------+---+----------+------+----+
|Employee_name|Age|Department|Salary| Lag|
+-------------+---+----------+------+----+
|       Rakesh| 25|   Finance|  3000|null|
|       Arbind| 26|   Finance|  3900|null|
|      Srishti| 46|Management|  3300|null|
|      Kailash| 29| Marketing|  2000|null|
|       Hitesh| 30| Marketing|  3000|null|
|        Nitya| 28|     Sales|  3000|null|
|          Ram| 28|     Sales|  3000|null|
|     Abhishek| 33|     Sales|  4600|3000|
|       Sushma| 39|     Sales|  4100|3000|
|      Sandeep| 40|     Sales|  4100|4600|
+-------------+---+----------+------+----+



lead


In [9]:
from pyspark.sql.functions import lead
df.withColumn("Lead",lead("salary",2).over(windowpartition)).show()

+-------------+---+----------+------+----+
|Employee_name|Age|Department|Salary|Lead|
+-------------+---+----------+------+----+
|       Rakesh| 25|   Finance|  3000|null|
|       Arbind| 26|   Finance|  3900|null|
|      Srishti| 46|Management|  3300|null|
|      Kailash| 29| Marketing|  2000|null|
|       Hitesh| 30| Marketing|  3000|null|
|        Nitya| 28|     Sales|  3000|4600|
|          Ram| 28|     Sales|  3000|4100|
|     Abhishek| 33|     Sales|  4600|4100|
|       Sushma| 39|     Sales|  4100|null|
|      Sandeep| 40|     Sales|  4100|null|
+-------------+---+----------+------+----+



## RANKING FUNCTION

In [10]:
sampleData = ((101, "Ram", "Biology", 80),
(103, "Sita", "Social Science", 78),
(104, "Lakshman", "Sanskrit", 58),
(102, "Kunal", "Phisycs", 89),
(101, "Ram", "Biology", 80),
(106, "Srishti", "Maths", 70),
(108, "Sandeep", "Physics", 75),
(107, "Hitesh", "Maths", 88),
(109, "Kailash", "Maths", 90),
(105, "Abhishek", "Social Science", 84)
)
columns = ["Roll_No", "Student_Name", "Subject", "Marks"]

In [11]:
df2 = spark.createDataFrame(data = sampleData, schema=columns)
windowpartition = Window.partitionBy("Subject").orderBy("Marks")
df2.printSchema()


root
 |-- Roll_No: long (nullable = true)
 |-- Student_Name: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Marks: long (nullable = true)



In [12]:
df2.show()

+-------+------------+--------------+-----+
|Roll_No|Student_Name|       Subject|Marks|
+-------+------------+--------------+-----+
|    101|         Ram|       Biology|   80|
|    103|        Sita|Social Science|   78|
|    104|    Lakshman|      Sanskrit|   58|
|    102|       Kunal|       Phisycs|   89|
|    101|         Ram|       Biology|   80|
|    106|     Srishti|         Maths|   70|
|    108|     Sandeep|       Physics|   75|
|    107|      Hitesh|         Maths|   88|
|    109|     Kailash|         Maths|   90|
|    105|    Abhishek|Social Science|   84|
+-------+------------+--------------+-----+



rownumber

In [13]:
from pyspark.sql.functions import row_number
df2.withColumn("row_number",row_number().over(windowpartition)).show()

+-------+------------+--------------+-----+----------+
|Roll_No|Student_Name|       Subject|Marks|row_number|
+-------+------------+--------------+-----+----------+
|    101|         Ram|       Biology|   80|         1|
|    101|         Ram|       Biology|   80|         2|
|    106|     Srishti|         Maths|   70|         1|
|    107|      Hitesh|         Maths|   88|         2|
|    109|     Kailash|         Maths|   90|         3|
|    102|       Kunal|       Phisycs|   89|         1|
|    108|     Sandeep|       Physics|   75|         1|
|    104|    Lakshman|      Sanskrit|   58|         1|
|    103|        Sita|Social Science|   78|         1|
|    105|    Abhishek|Social Science|   84|         2|
+-------+------------+--------------+-----+----------+



rank

In [15]:
from pyspark.sql.functions import  rank
df2.withColumn("rank",rank().over(windowpartition)).show()

+-------+------------+--------------+-----+----+
|Roll_No|Student_Name|       Subject|Marks|rank|
+-------+------------+--------------+-----+----+
|    101|         Ram|       Biology|   80|   1|
|    101|         Ram|       Biology|   80|   1|
|    106|     Srishti|         Maths|   70|   1|
|    107|      Hitesh|         Maths|   88|   2|
|    109|     Kailash|         Maths|   90|   3|
|    102|       Kunal|       Phisycs|   89|   1|
|    108|     Sandeep|       Physics|   75|   1|
|    104|    Lakshman|      Sanskrit|   58|   1|
|    103|        Sita|Social Science|   78|   1|
|    105|    Abhishek|Social Science|   84|   2|
+-------+------------+--------------+-----+----+



percent_rank

In [17]:
from pyspark.sql.functions import  percent_rank
df2.withColumn("per_rank",percent_rank().over(windowpartition)).show()

+-------+------------+--------------+-----+--------+
|Roll_No|Student_Name|       Subject|Marks|per_rank|
+-------+------------+--------------+-----+--------+
|    101|         Ram|       Biology|   80|     0.0|
|    101|         Ram|       Biology|   80|     0.0|
|    106|     Srishti|         Maths|   70|     0.0|
|    107|      Hitesh|         Maths|   88|     0.5|
|    109|     Kailash|         Maths|   90|     1.0|
|    102|       Kunal|       Phisycs|   89|     0.0|
|    108|     Sandeep|       Physics|   75|     0.0|
|    104|    Lakshman|      Sanskrit|   58|     0.0|
|    103|        Sita|Social Science|   78|     0.0|
|    105|    Abhishek|Social Science|   84|     1.0|
+-------+------------+--------------+-----+--------+



dense_rank

In [18]:
from pyspark.sql.functions import  dense_rank
df2.withColumn("per_rank",dense_rank().over(windowpartition)).show()

+-------+------------+--------------+-----+--------+
|Roll_No|Student_Name|       Subject|Marks|per_rank|
+-------+------------+--------------+-----+--------+
|    101|         Ram|       Biology|   80|       1|
|    101|         Ram|       Biology|   80|       1|
|    106|     Srishti|         Maths|   70|       1|
|    107|      Hitesh|         Maths|   88|       2|
|    109|     Kailash|         Maths|   90|       3|
|    102|       Kunal|       Phisycs|   89|       1|
|    108|     Sandeep|       Physics|   75|       1|
|    104|    Lakshman|      Sanskrit|   58|       1|
|    103|        Sita|Social Science|   78|       1|
|    105|    Abhishek|Social Science|   84|       2|
+-------+------------+--------------+-----+--------+



Aggregate Function

In [19]:
sampleData = (("Ram", "Sales", 3000),
("Meena", "Sales", 4600),
("Abhishek", "Sales", 4100),
("Kunal", "Finance", 3000),
("Ram", "Sales", 3000),
("Srishti", "Management", 3300),
("Sandeep", "Finance", 3900),
("Hitesh", "Marketing", 3000),
("Kailash", "Marketing", 2000),
("Shyam", "Sales", 4100)
)
columns = ["Employee_Name", "Department", "Salary"]


In [20]:
df3 = spark.createDataFrame(data=sampleData,schema=columns)
df3.printSchema()


root
 |-- Employee_Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)



In [21]:
df3.show()

+-------------+----------+------+
|Employee_Name|Department|Salary|
+-------------+----------+------+
|          Ram|     Sales|  3000|
|        Meena|     Sales|  4600|
|     Abhishek|     Sales|  4100|
|        Kunal|   Finance|  3000|
|          Ram|     Sales|  3000|
|      Srishti|Management|  3300|
|      Sandeep|   Finance|  3900|
|       Hitesh| Marketing|  3000|
|      Kailash| Marketing|  2000|
|        Shyam|     Sales|  4100|
+-------------+----------+------+



In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,avg,sum,min,max,row_number

In [23]:
windowPartitionAgg = Window.partitionBy("Department")

In [24]:
df3.withColumn("avg",avg(col("salary")).over(windowPartitionAgg)).show()

+-------------+----------+------+------+
|Employee_Name|Department|Salary|   avg|
+-------------+----------+------+------+
|        Kunal|   Finance|  3000|3450.0|
|      Sandeep|   Finance|  3900|3450.0|
|      Srishti|Management|  3300|3300.0|
|       Hitesh| Marketing|  3000|2500.0|
|      Kailash| Marketing|  2000|2500.0|
|          Ram|     Sales|  3000|3760.0|
|        Meena|     Sales|  4600|3760.0|
|     Abhishek|     Sales|  4100|3760.0|
|          Ram|     Sales|  3000|3760.0|
|        Shyam|     Sales|  4100|3760.0|
+-------------+----------+------+------+



In [25]:
df3.withColumn("Sum",sum(col("salary")).over(windowPartitionAgg)).show()
df3.withColumn("Min",min(col("salary")).over(windowPartitionAgg)).show()
df3.withColumn("Max",max(col("salary")).over(windowPartitionAgg)).show()

+-------------+----------+------+-----+
|Employee_Name|Department|Salary|  Sum|
+-------------+----------+------+-----+
|        Kunal|   Finance|  3000| 6900|
|      Sandeep|   Finance|  3900| 6900|
|      Srishti|Management|  3300| 3300|
|       Hitesh| Marketing|  3000| 5000|
|      Kailash| Marketing|  2000| 5000|
|          Ram|     Sales|  3000|18800|
|        Meena|     Sales|  4600|18800|
|     Abhishek|     Sales|  4100|18800|
|          Ram|     Sales|  3000|18800|
|        Shyam|     Sales|  4100|18800|
+-------------+----------+------+-----+

+-------------+----------+------+----+
|Employee_Name|Department|Salary| Min|
+-------------+----------+------+----+
|        Kunal|   Finance|  3000|3000|
|      Sandeep|   Finance|  3900|3000|
|      Srishti|Management|  3300|3300|
|       Hitesh| Marketing|  3000|2000|
|      Kailash| Marketing|  2000|2000|
|          Ram|     Sales|  3000|3000|
|        Meena|     Sales|  4600|3000|
|     Abhishek|     Sales|  4100|3000|
|         

In [29]:
from pyspark.sql.functions import current_timestamp, to_timestamp
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, DateType, BooleanType
df = spark.createDataFrame([["1", "2019-07-01 12:01:19.000"],
 ["2", "2019-06-24 12:01:19.000"]], ["id", "input_timestamp"])
df.printSchema()


root
 |-- id: string (nullable = true)
 |-- input_timestamp: string (nullable = true)



# Converting string datatype to timestamp


In [30]:
df1 = df.withColumn("timestamptype",to_timestamp("input_timestamp"))

In [32]:
df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- input_timestamp: string (nullable = true)
 |-- timestamptype: timestamp (nullable = true)



# selecting only necessary column and renaming

In [33]:
df2 = df1.select("id","timestamptype").withColumnRenamed("timestamptype","input_timestamp")
df2.printSchema()

root
 |-- id: string (nullable = true)
 |-- input_timestamp: timestamp (nullable = true)



# using cast to convert timestamp to DataType

In [35]:
df3=df2.select(col("id"), col("input_timestamp").cast('string'))
df3.printSchema()

root
 |-- id: string (nullable = true)
 |-- input_timestamp: string (nullable = true)



# timestamp type to datetype


In [37]:
df4 = df2.select(col("id"), to_date(col("input_timestamp")))
df4.printSchema()

root
 |-- id: string (nullable = true)
 |-- to_date(input_timestamp): date (nullable = true)

