In [1]:
%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Window

spark = SparkSession.builder.getOrCreate()

<IPython.core.display.Javascript object>

In [2]:
l = [
    (1, "sales", 4200),
    (2, "admin", 3100),
    (3, "sales", 4000),
    (4, "sales", 4000),
    (5, "admin", 2700),
    (6, "dev", 3400),
    (7, "dev", 5200),
    (8, "dev", 3700),
    (9, "dev", 4400),
    (10, "dev", 4400),
]

data = spark.createDataFrame(l, schema=["id", "dept", "salary"])
data.show()

+---+-----+------+
| id| dept|salary|
+---+-----+------+
|  1|sales|  4200|
|  2|admin|  3100|
|  3|sales|  4000|
|  4|sales|  4000|
|  5|admin|  2700|
|  6|  dev|  3400|
|  7|  dev|  5200|
|  8|  dev|  3700|
|  9|  dev|  4400|
| 10|  dev|  4400|
+---+-----+------+



<IPython.core.display.Javascript object>

In [3]:
df = data.groupBy("dept").agg(
    F.expr("collect_list(salary)").alias("list_salary"),
    F.expr("avg(salary)").alias("average_salary"),
    F.expr("sum(salary)").alias("total_salary"),
)

df.show()

+-----+--------------------+------------------+------------+
| dept|         list_salary|    average_salary|total_salary|
+-----+--------------------+------------------+------------+
|  dev|[3400, 5200, 3700...|            4220.0|       21100|
|sales|  [4200, 4000, 4000]|4066.6666666666665|       12200|
|admin|        [3100, 2700]|            2900.0|        5800|
+-----+--------------------+------------------+------------+



<IPython.core.display.Javascript object>

In [4]:
windowSpec = Window.partitionBy("dept")

df = data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))\
.withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))\
.withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))

df.show()


+---+-----+------+--------------------+------------------+------------+
| id| dept|salary|         list_salary|    average_salary|total_salary|
+---+-----+------+--------------------+------------------+------------+
|  6|  dev|  3400|[3400, 5200, 3700...|            4220.0|       21100|
|  7|  dev|  5200|[3400, 5200, 3700...|            4220.0|       21100|
|  8|  dev|  3700|[3400, 5200, 3700...|            4220.0|       21100|
|  9|  dev|  4400|[3400, 5200, 3700...|            4220.0|       21100|
| 10|  dev|  4400|[3400, 5200, 3700...|            4220.0|       21100|
|  1|sales|  4200|  [4200, 4000, 4000]|4066.6666666666665|       12200|
|  3|sales|  4000|  [4200, 4000, 4000]|4066.6666666666665|       12200|
|  4|sales|  4000|  [4200, 4000, 4000]|4066.6666666666665|       12200|
|  2|admin|  3100|        [3100, 2700]|            2900.0|        5800|
|  5|admin|  2700|        [3100, 2700]|            2900.0|        5800|
+---+-----+------+--------------------+------------------+------

<IPython.core.display.Javascript object>

In [11]:
windowSpec = Window.partitionBy("dept").orderBy(F.asc("salary"))

df = (
    data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))
    .withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))
    .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))
)

df.show()

+---+-----+------+--------------------+------------------+------------+
| id| dept|salary|         list_salary|    average_salary|total_salary|
+---+-----+------+--------------------+------------------+------------+
|  6|  dev|  3400|              [3400]|            3400.0|        3400|
|  8|  dev|  3700|        [3400, 3700]|            3550.0|        7100|
|  9|  dev|  4400|[3400, 3700, 4400...|            3975.0|       15900|
| 10|  dev|  4400|[3400, 3700, 4400...|            3975.0|       15900|
|  7|  dev|  5200|[3400, 3700, 4400...|            4220.0|       21100|
|  3|sales|  4000|        [4000, 4000]|            4000.0|        8000|
|  4|sales|  4000|        [4000, 4000]|            4000.0|        8000|
|  1|sales|  4200|  [4000, 4000, 4200]|4066.6666666666665|       12200|
|  5|admin|  2700|              [2700]|            2700.0|        2700|
|  2|admin|  3100|        [2700, 3100]|            2900.0|        5800|
+---+-----+------+--------------------+------------------+------

<IPython.core.display.Javascript object>

In [12]:
df.collect()[2]["list_salary"]

[3400, 3700, 4400, 4400]

<IPython.core.display.Javascript object>

In [13]:
windowSpec = (
    Window.partitionBy("dept")
    .orderBy(F.asc("salary"))
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

df = (
    data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))
    .withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))
    .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))
)

df.show()

+---+-----+------+--------------------+------------------+------------+
| id| dept|salary|         list_salary|    average_salary|total_salary|
+---+-----+------+--------------------+------------------+------------+
|  6|  dev|  3400|              [3400]|            3400.0|        3400|
|  8|  dev|  3700|        [3400, 3700]|            3550.0|        7100|
|  9|  dev|  4400|  [3400, 3700, 4400]|3833.3333333333335|       11500|
| 10|  dev|  4400|[3400, 3700, 4400...|            3975.0|       15900|
|  7|  dev|  5200|[3400, 3700, 4400...|            4220.0|       21100|
|  3|sales|  4000|              [4000]|            4000.0|        4000|
|  4|sales|  4000|        [4000, 4000]|            4000.0|        8000|
|  1|sales|  4200|  [4000, 4000, 4200]|4066.6666666666665|       12200|
|  5|admin|  2700|              [2700]|            2700.0|        2700|
|  2|admin|  3100|        [2700, 3100]|            2900.0|        5800|
+---+-----+------+--------------------+------------------+------

<IPython.core.display.Javascript object>

In [15]:
df.collect()[3]["list_salary"]

[3400, 3700, 4400, 4400]

<IPython.core.display.Javascript object>

In [16]:
windowSpec = (
    Window.partitionBy("dept")
    .orderBy(F.asc("salary"))
    .rowsBetween(-1, Window.currentRow)
)

<IPython.core.display.Javascript object>

In [17]:
df = (
    data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))
    .withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))
    .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))
)

df.show()

+---+-----+------+------------+--------------+------------+
| id| dept|salary| list_salary|average_salary|total_salary|
+---+-----+------+------------+--------------+------------+
|  6|  dev|  3400|      [3400]|        3400.0|        3400|
|  8|  dev|  3700|[3400, 3700]|        3550.0|        7100|
|  9|  dev|  4400|[3700, 4400]|        4050.0|        8100|
| 10|  dev|  4400|[4400, 4400]|        4400.0|        8800|
|  7|  dev|  5200|[4400, 5200]|        4800.0|        9600|
|  3|sales|  4000|      [4000]|        4000.0|        4000|
|  4|sales|  4000|[4000, 4000]|        4000.0|        8000|
|  1|sales|  4200|[4000, 4200]|        4100.0|        8200|
|  5|admin|  2700|      [2700]|        2700.0|        2700|
|  2|admin|  3100|[2700, 3100]|        2900.0|        5800|
+---+-----+------+------------+--------------+------------+



<IPython.core.display.Javascript object>

In [18]:
windowSpec = Window.partitionBy("dept").orderBy(F.asc("salary"))

df = (
    data.withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))
    .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))
    .withColumn("rank", F.rank().over(windowSpec))
    .withColumn("dense_rank", F.dense_rank().over(windowSpec))
    .withColumn("perc_rank", F.percent_rank().over(windowSpec))
)


df.show()

+---+-----+------+------------------+------------+----+----------+---------+
| id| dept|salary|    average_salary|total_salary|rank|dense_rank|perc_rank|
+---+-----+------+------------------+------------+----+----------+---------+
|  6|  dev|  3400|            3400.0|        3400|   1|         1|      0.0|
|  8|  dev|  3700|            3550.0|        7100|   2|         2|     0.25|
|  9|  dev|  4400|            3975.0|       15900|   3|         3|      0.5|
| 10|  dev|  4400|            3975.0|       15900|   3|         3|      0.5|
|  7|  dev|  5200|            4220.0|       21100|   5|         4|      1.0|
|  3|sales|  4000|            4000.0|        8000|   1|         1|      0.0|
|  4|sales|  4000|            4000.0|        8000|   1|         1|      0.0|
|  1|sales|  4200|4066.6666666666665|       12200|   3|         2|      1.0|
|  5|admin|  2700|            2700.0|        2700|   1|         1|      0.0|
|  2|admin|  3100|            2900.0|        5800|   2|         2|      1.0|

<IPython.core.display.Javascript object>

In [19]:
windowSpec = Window.partitionBy("dept").orderBy(F.asc("salary"))
df = data.withColumn("lag", F.lag("salary", 1).over(windowSpec))

df.show()

+---+-----+------+----+
| id| dept|salary| lag|
+---+-----+------+----+
|  6|  dev|  3400|null|
|  8|  dev|  3700|3400|
|  9|  dev|  4400|3700|
| 10|  dev|  4400|4400|
|  7|  dev|  5200|4400|
|  3|sales|  4000|null|
|  4|sales|  4000|4000|
|  1|sales|  4200|4000|
|  5|admin|  2700|null|
|  2|admin|  3100|2700|
+---+-----+------+----+



<IPython.core.display.Javascript object>

In [20]:
df.withColumn("diff", F.col("salary") - F.col("lag")).show()

+---+-----+------+----+----+
| id| dept|salary| lag|diff|
+---+-----+------+----+----+
|  6|  dev|  3400|null|null|
|  8|  dev|  3700|3400| 300|
|  9|  dev|  4400|3700| 700|
| 10|  dev|  4400|4400|   0|
|  7|  dev|  5200|4400| 800|
|  3|sales|  4000|null|null|
|  4|sales|  4000|4000|   0|
|  1|sales|  4200|4000| 200|
|  5|admin|  2700|null|null|
|  2|admin|  3100|2700| 400|
+---+-----+------+----+----+



<IPython.core.display.Javascript object>