In [0]:
transactions = spark.read.table("Sample.Transactions")
logical = spark.read.table("Sample.Logical")

display(transactions)
display(logical)

AccountId,TranDate,TranAmt
1,2011-01-01,500.0
1,2011-01-15,50.0
1,2011-01-22,250.0
1,2011-01-24,75.0
1,2011-01-26,125.0
1,2011-01-28,175.0
2,2011-01-01,500.0
2,2011-01-15,50.0
2,2011-01-22,25.0
2,2011-01-23,125.0


RowID,FName,Salary
1,George,800
2,Sam,950
3,Diane,1100
4,Nicholas,1250
5,Samuel,1250
6,Patricia,1300
7,Brian,1500
8,Thomas,1600
9,Fran,2450
10,Debbie,2850


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window_spec = Window.partitionBy("AccountId").orderBy("TranDate")

df = transactions.withColumn("RunTotalAmt", F.sum("TranAmt").over(window_spec))

df.show()

+---------+----------+-------+-----------+
|AccountId|  TranDate|TranAmt|RunTotalAmt|
+---------+----------+-------+-----------+
|        1|2011-01-01| 500.00|    1000.00|
|        1|2011-01-01| 500.00|    1000.00|
|        1|2011-01-15|  50.00|    1100.00|
|        1|2011-01-15|  50.00|    1100.00|
|        1|2011-01-22| 250.00|    1600.00|
|        1|2011-01-22| 250.00|    1600.00|
|        1|2011-01-24|  75.00|    1750.00|
|        1|2011-01-24|  75.00|    1750.00|
|        1|2011-01-26| 125.00|    2000.00|
|        1|2011-01-26| 125.00|    2000.00|
|        1|2011-01-28| 175.00|    2350.00|
|        1|2011-01-28| 175.00|    2350.00|
|        2|2011-01-01| 500.00|    1000.00|
|        2|2011-01-01| 500.00|    1000.00|
|        2|2011-01-15|  50.00|    1100.00|
|        2|2011-01-15|  50.00|    1100.00|
|        2|2011-01-22|  25.00|    1150.00|
|        2|2011-01-22|  25.00|    1150.00|
|        2|2011-01-23| 125.00|    1400.00|
|        2|2011-01-23| 125.00|    1400.00|
+---------+

In [0]:
window_spec = Window.partitionBy("AccountId").orderBy("TranDate").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = transactions.withColumn("RunAvg", F.avg("TranAmt").over(window_spec)) \
                 .withColumn("RunTranQty", F.count("TranAmt").over(window_spec)) \
                 .withColumn("RunSmallAmt", F.min("TranAmt").over(window_spec)) \
                 .withColumn("RunLargeAmt", F.max("TranAmt").over(window_spec)) \
                 .withColumn("RunTotalAmt", F.sum("TranAmt").over(window_spec))

df.show()

+---------+----------+-------+----------+----------+-----------+-----------+-----------+
|AccountId|  TranDate|TranAmt|    RunAvg|RunTranQty|RunSmallAmt|RunLargeAmt|RunTotalAmt|
+---------+----------+-------+----------+----------+-----------+-----------+-----------+
|        1|2011-01-01| 500.00|500.000000|         1|     500.00|     500.00|     500.00|
|        1|2011-01-01| 500.00|500.000000|         2|     500.00|     500.00|    1000.00|
|        1|2011-01-15|  50.00|350.000000|         3|      50.00|     500.00|    1050.00|
|        1|2011-01-15|  50.00|275.000000|         4|      50.00|     500.00|    1100.00|
|        1|2011-01-22| 250.00|270.000000|         5|      50.00|     500.00|    1350.00|
|        1|2011-01-22| 250.00|266.666667|         6|      50.00|     500.00|    1600.00|
|        1|2011-01-24|  75.00|239.285714|         7|      50.00|     500.00|    1675.00|
|        1|2011-01-24|  75.00|218.750000|         8|      50.00|     500.00|    1750.00|
|        1|2011-01-26

In [0]:
sliding_window_spec = Window.partitionBy("AccountId").orderBy("TranDate").rowsBetween(-2, Window.currentRow)

df = transactions.withColumn("SlideAvg", F.avg("TranAmt").over(sliding_window_spec)) \
                 .withColumn("SlideQty", F.count("TranAmt").over(sliding_window_spec)) \
                 .withColumn("SlideMin", F.min("TranAmt").over(sliding_window_spec)) \
                 .withColumn("SlideMax", F.max("TranAmt").over(sliding_window_spec)) \
                 .withColumn("SlideTotal", F.sum("TranAmt").over(sliding_window_spec)) \
                 .withColumn("RN", F.row_number().over(Window.partitionBy("AccountId").orderBy("TranDate")))

df.show()

+---------+----------+-------+----------+--------+--------+--------+----------+---+
|AccountId|  TranDate|TranAmt|  SlideAvg|SlideQty|SlideMin|SlideMax|SlideTotal| RN|
+---------+----------+-------+----------+--------+--------+--------+----------+---+
|        1|2011-01-01| 500.00|500.000000|       1|  500.00|  500.00|    500.00|  1|
|        1|2011-01-01| 500.00|500.000000|       2|  500.00|  500.00|   1000.00|  2|
|        1|2011-01-15|  50.00|350.000000|       3|   50.00|  500.00|   1050.00|  3|
|        1|2011-01-15|  50.00|200.000000|       3|   50.00|  500.00|    600.00|  4|
|        1|2011-01-22| 250.00|116.666667|       3|   50.00|  250.00|    350.00|  5|
|        1|2011-01-22| 250.00|183.333333|       3|   50.00|  250.00|    550.00|  6|
|        1|2011-01-24|  75.00|191.666667|       3|   75.00|  250.00|    575.00|  7|
|        1|2011-01-24|  75.00|133.333333|       3|   75.00|  250.00|    400.00|  8|
|        1|2011-01-26| 125.00| 91.666667|       3|   75.00|  125.00|    275.

In [0]:
window_spec_rows = Window.orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_spec_range = Window.orderBy("Salary").rangeBetween(Window.unboundedPreceding, Window.currentRow)

df = logical.withColumn("SumByRows", F.sum("Salary").over(window_spec_rows)) \
            .withColumn("SumByRange", F.sum("Salary").over(window_spec_range))

df.show()

+-----+-----------+------+---------+----------+
|RowID|      FName|Salary|SumByRows|SumByRange|
+-----+-----------+------+---------+----------+
|    1|     George|   800|      800|       800|
|    2|        Sam|   950|     1750|      1750|
|    3|      Diane|  1100|     2850|      2850|
|    4|   Nicholas|  1250|     4100|      5350|
|    5|     Samuel|  1250|     5350|      5350|
|    6|   Patricia|  1300|     6650|      6650|
|    7|      Brian|  1500|     8150|      8150|
|    8|     Thomas|  1600|     9750|      9750|
|    9|       Fran|  2450|    12200|     12200|
|   10|     Debbie|  2850|    15050|     15050|
|   11|       Mark|  2975|    18025|     18025|
|   12|      James|  3000|    21025|     24025|
|   13|    Cynthia|  3000|    24025|     24025|
|   14|Christopher|  5000|    29025|     29025|
+-----+-----------+------+---------+----------+



ZADANIE 1

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window_spec = Window.partitionBy("AccountId").orderBy("TranDate")

df = transactions.withColumn("NextTranAmt", F.lead("TranAmt").over(window_spec)) \
                 .withColumn("PrevTranAmt", F.lag("TranAmt").over(window_spec)) \
                 .withColumn("FirstTranAmt", F.first("TranAmt").over(window_spec)) \
                 .withColumn("LastTranAmt", F.last("TranAmt").over(window_spec)) \
                 .withColumn("RowNumber", F.row_number().over(window_spec))

df.show()


+---------+----------+-------+-----------+-----------+------------+-----------+---------+
|AccountId|  TranDate|TranAmt|NextTranAmt|PrevTranAmt|FirstTranAmt|LastTranAmt|RowNumber|
+---------+----------+-------+-----------+-----------+------------+-----------+---------+
|        1|2011-01-01| 500.00|     500.00|       null|      500.00|     500.00|        1|
|        1|2011-01-01| 500.00|      50.00|     500.00|      500.00|     500.00|        2|
|        1|2011-01-15|  50.00|      50.00|     500.00|      500.00|      50.00|        3|
|        1|2011-01-15|  50.00|     250.00|      50.00|      500.00|      50.00|        4|
|        1|2011-01-22| 250.00|     250.00|      50.00|      500.00|     250.00|        5|
|        1|2011-01-22| 250.00|      75.00|     250.00|      500.00|     250.00|        6|
|        1|2011-01-24|  75.00|      75.00|     250.00|      500.00|      75.00|        7|
|        1|2011-01-24|  75.00|     125.00|      75.00|      500.00|      75.00|        8|
|        1