In [0]:
import pyspark.sql.functions as f
simpleData = (("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
rdd1 = sc.parallelize(simpleData)
df = rdd1.toDF(["employee_name","department","salary"])
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")
res = df.withColumn("row_number",row_number().over(windowSpec))
res.show()

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
+-------------+----------+------+----------+



In [0]:
from pyspark.sql.functions import rank
res1 = df.withColumn("rank",rank().over(windowSpec))
res1.show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+



In [0]:
from pyspark.sql.functions import dense_rank
res3=df.withColumn("dense_rank",dense_rank().over(windowSpec))
res3.show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+



In [0]:
from pyspark.sql.functions import lag    
from pyspark.sql.functions import lag    
df.withColumn("lag",lag("salary",1).over(windowSpec)) \
      .show()  

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|3000|
|          Jen|   Finance|  3900|3300|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|2000|
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|3000|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|4100|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



In [0]:
from pyspark.sql.functions import lead    
df.withColumn("lead",lead("salary",1).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3300|
|        Scott|   Finance|  3300|3900|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|3000|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|3000|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4100|
|         Saif|     Sales|  4100|4600|
|      Michael|     Sales|  4600|null|
+-------------+----------+------+----+



In [0]:
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max").show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



In [0]:
#3. PySpark Window Analytic functions



#3.1 lag Window Function



from pyspark.sql.functions import lag,lead
#windowSpec = Window.partitionBy("department").orderBy("salary")
#df = df.withColumn("lag",lag("salary",1).over(windowSpec))
#df.show()



#3.2 lead Window Function
#from pyspark.sql.functions import lead
windowSpec = Window.partitionBy("department").orderBy("salary")
#df = df.withColumn("lead",lead("salary",1).over(windowSpec))
df = df.withColumn("lag",lag("salary",1).over(windowSpec))
df.show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|3000|
|          Jen|   Finance|  3900|3300|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|2000|
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|3000|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|4100|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



In [0]:
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)).withColumn("sum", sum(col("salary")).over(windowSpecAgg)).show()

+-------------+----------+------+----+---+-----+
|employee_name|department|salary| lag|row|  sum|
+-------------+----------+------+----+---+-----+
|        Maria|   Finance|  3000|null|  1|10200|
|        Scott|   Finance|  3300|3000|  2|10200|
|          Jen|   Finance|  3900|3300|  3|10200|
|        Kumar| Marketing|  2000|null|  1| 5000|
|         Jeff| Marketing|  3000|2000|  2| 5000|
|        James|     Sales|  3000|null|  1|18800|
|        James|     Sales|  3000|3000|  2|18800|
|       Robert|     Sales|  4100|3000|  3|18800|
|         Saif|     Sales|  4100|4100|  4|18800|
|      Michael|     Sales|  4600|4100|  5|18800|
+-------------+----------+------+----+---+-----+



In [0]:
df_sum=df.withColumn("sal_sum", sum(col("salary")).over(windowSpecAgg))

In [0]:
windowSpecAgg = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
df.withColumn("row",row_number().over(windowSpec)).withColumn("sum", sum(col("salary")).over(windowSpecAgg)).show()

+-------------+----------+------+----+---+-----+
|employee_name|department|salary| lag|row|  sum|
+-------------+----------+------+----+---+-----+
|        Maria|   Finance|  3000|null|  1|10200|
|        Scott|   Finance|  3300|3000|  2|10200|
|          Jen|   Finance|  3900|3300|  3|10200|
|        Kumar| Marketing|  2000|null|  1| 5000|
|         Jeff| Marketing|  3000|2000|  2| 5000|
|        James|     Sales|  3000|null|  1|18800|
|        James|     Sales|  3000|3000|  2|18800|
|       Robert|     Sales|  4100|3000|  3|18800|
|         Saif|     Sales|  4100|4100|  4|18800|
|      Michael|     Sales|  4600|4100|  5|18800|
+-------------+----------+------+----+---+-----+



In [0]:
import pyspark.sql.functions as f
simpleData = (("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
rdd1 = sc.parallelize(simpleData)
df = rdd1.toDF(["employee_name","department","salary"])
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+

