# zadanie 3

Wykorzystaj przykłady z notatnika w SQL Windowed Aggregate Functions (cmd 11) 
i przepisz funkcje używając Spark API.  

  

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, DecimalType, StringType, ShortType


transactions_sample = StructType([
    StructField("AccountId", IntegerType(), True),
    StructField("TranDate", StringType(), True),
    StructField("TranAmt", IntegerType(), True)
])

logical_sample = StructType([
    StructField("RowID", IntegerType(), True),
    StructField("FName", StringType(), True),
    StructField("Salary", ShortType(), True)
])



In [0]:
transactions_data = [
( 1, '2011-01-01', 500),
( 1, '2011-01-15', 50),
( 1, '2011-01-22', 250),
( 1, '2011-01-24', 75),
( 1, '2011-01-26', 125),
( 1, '2011-01-28', 175),
( 2, '2011-01-01', 500),
( 2, '2011-01-15', 50),
( 2, '2011-01-22', 25),
( 2, '2011-01-23', 125),
( 2, '2011-01-26', 200),
( 2, '2011-01-29', 250),
( 3, '2011-01-01', 500),
( 3, '2011-01-15', 50 ),
( 3, '2011-01-22', 5000),
( 3, '2011-01-25', 550),
( 3, '2011-01-27', 95 ),
( 3, '2011-01-30', 2500)
]

transactions_df = spark.createDataFrame(transactions_data, transactions_sample)

transactions_df.createOrReplaceTempView("Transactions")


In [0]:
logical_data = [
(1,'George', 800),
(2,'Sam', 950),
(3,'Diane', 1100),
(4,'Nicholas', 1250),
(5,'Samuel', 1250),
(6,'Patricia', 1300),
(7,'Brian', 1500),
(8,'Thomas', 1600),
(9,'Fran', 2450),
(10,'Debbie', 2850),
(11,'Mark', 2975),
(12,'James', 3000),
(13,'Cynthia', 3000),
(14,'Christopher', 5000)
]

logical_df = spark.createDataFrame(logical_data, logical_sample)

transactions_df.createOrReplaceTempView("Logical")
display(logical_df)

RowID,FName,Salary
1,George,800
2,Sam,950
3,Diane,1100
4,Nicholas,1250
5,Samuel,1250
6,Patricia,1300
7,Brian,1500
8,Thomas,1600
9,Fran,2450
10,Debbie,2850


In [0]:
from pyspark.sql.window import Window

window_rows = Window.orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)

window_range = Window.orderBy("Salary").rangeBetween(Window.unboundedPreceding, Window.currentRow)

logical_df = logical_df.withColumn("SumByRows", sum("Salary").over(window_rows)) \
                       .withColumn("SumByRange", sum("Salary").over(window_range))

logical_df.orderBy("RowID").show()

+-----+-----------+------+---------+----------+
|RowID|      FName|Salary|SumByRows|SumByRange|
+-----+-----------+------+---------+----------+
|    1|     George|   800|      800|       800|
|    2|        Sam|   950|     1750|      1750|
|    3|      Diane|  1100|     2850|      2850|
|    4|   Nicholas|  1250|     4100|      5350|
|    5|     Samuel|  1250|     5350|      5350|
|    6|   Patricia|  1300|     6650|      6650|
|    7|      Brian|  1500|     8150|      8150|
|    8|     Thomas|  1600|     9750|      9750|
|    9|       Fran|  2450|    12200|     12200|
|   10|     Debbie|  2850|    15050|     15050|
|   11|       Mark|  2975|    18025|     18025|
|   12|      James|  3000|    21025|     24025|
|   13|    Cynthia|  3000|    24025|     24025|
|   14|Christopher|  5000|    29025|     29025|
+-----+-----------+------+---------+----------+



# zdanie 4

Do tego notatnika dopisz użycie funkcji okienkowych LEAD, LAG, FIRST_VALUE, LAST_VALUE, ROW_NUMBER  

In [0]:
from pyspark.sql.functions import lead, lag, first, last, row_number

window_spec = Window.orderBy("Salary")
logical_df = logical_df \
    .withColumn("LeadSalary", lead("Salary", 1).over(window_spec)) \
    .withColumn("LagSalary", lag("Salary", 1).over(window_spec)) \
    .withColumn("FirstSalary", first("Salary").over(window_spec)) \
    .withColumn("LastSalary", last("Salary").over(window_spec)) \
    .withColumn("RowNumber", row_number().over(window_spec))

logical_df.show()


+-----+-----------+------+----------+---------+-----------+----------+---------+
|RowID|      FName|Salary|LeadSalary|LagSalary|FirstSalary|LastSalary|RowNumber|
+-----+-----------+------+----------+---------+-----------+----------+---------+
|    1|     George|   800|       950|     null|        800|       800|        1|
|    2|        Sam|   950|      1100|      800|        800|       950|        2|
|    3|      Diane|  1100|      1250|      950|        800|      1100|        3|
|    4|   Nicholas|  1250|      1250|     1100|        800|      1250|        4|
|    5|     Samuel|  1250|      1300|     1250|        800|      1250|        5|
|    6|   Patricia|  1300|      1500|     1250|        800|      1300|        6|
|    7|      Brian|  1500|      1600|     1300|        800|      1500|        7|
|    8|     Thomas|  1600|      2450|     1500|        800|      1600|        8|
|    9|       Fran|  2450|      2850|     1600|        800|      2450|        9|
|   10|     Debbie|  2850|  