In [0]:
#ZADANIE 3
#odpowiednik CREATE TABLE + INSERT
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from decimal import Decimal 

spark = SparkSession.builder.appName("WindowFunctionsDemo").getOrCreate()

transactions_data = [
    (1, "2011-01-01", Decimal('500.00')),
    (1, "2011-01-15", Decimal('50.00')),
    (1, "2011-01-22", Decimal('250.00')),
    (1, "2011-01-24", Decimal('75.00')),
    (1, "2011-01-26", Decimal('125.00')),
    (1, "2011-01-28", Decimal('175.00')),
    (2, "2011-01-01", Decimal('500.00')),
    (2, "2011-01-15", Decimal('50.00')),
    (2, "2011-01-22", Decimal('25.00')),
    (2, "2011-01-23", Decimal('125.00')),
    (2, "2011-01-26", Decimal('200.00')),
    (2, "2011-01-29", Decimal('250.00')),
    (3, "2011-01-01", Decimal('500.00')),
    (3, "2011-01-15", Decimal('50.00')),
    (3, "2011-01-22", Decimal('5000.00')),
    (3, "2011-01-25", Decimal('550.00')),
    (3, "2011-01-27", Decimal('95.00')),
    (3, "2011-01-30", Decimal('2500.00'))
]

transactions_schema = StructType([
    StructField("AccountId", IntegerType()),
    StructField("TranDate", StringType()),
    StructField("TranAmt", DecimalType(8, 2))
])

transactions_df = spark.createDataFrame(transactions_data, schema=transactions_schema)

# konwersja
transactions_df = transactions_df.withColumn("TranDate", to_date(col("TranDate"), "yyyy-MM-dd"))

logical_data = [
    (1, "George", 800),
    (2, "Sam", 950),
    (3, "Diane", 1100),
    (4, "Nicholas", 1250),
    (5, "Samuel", 1250),
    (6, "Patricia", 1300),
    (7, "Brian", 1500),
    (8, "Thomas", 1600),
    (9, "Fran", 2450),
    (10, "Debbie", 2850),
    (11, "Mark", 2975),
    (12, "James", 3000),
    (13, "Cynthia", 3000),
    (14, "Christopher", 5000)
]

logical_schema = StructType([
    StructField("RowID", IntegerType()),
    StructField("FName", StringType()),
    StructField("Salary", IntegerType())
])

logical_df = spark.createDataFrame(logical_data, schema=logical_schema)

#wyniki
print("Transactions DataFrame:")
transactions_df.show()

print("\nLogical DataFrame:")
logical_df.show()

Transactions DataFrame:
+---------+----------+-------+
|AccountId|  TranDate|TranAmt|
+---------+----------+-------+
|        1|2011-01-01| 500.00|
|        1|2011-01-15|  50.00|
|        1|2011-01-22| 250.00|
|        1|2011-01-24|  75.00|
|        1|2011-01-26| 125.00|
|        1|2011-01-28| 175.00|
|        2|2011-01-01| 500.00|
|        2|2011-01-15|  50.00|
|        2|2011-01-22|  25.00|
|        2|2011-01-23| 125.00|
|        2|2011-01-26| 200.00|
|        2|2011-01-29| 250.00|
|        3|2011-01-01| 500.00|
|        3|2011-01-15|  50.00|
|        3|2011-01-22|5000.00|
|        3|2011-01-25| 550.00|
|        3|2011-01-27|  95.00|
|        3|2011-01-30|2500.00|
+---------+----------+-------+


Logical DataFrame:
+-----+-----------+------+
|RowID|      FName|Salary|
+-----+-----------+------+
|    1|     George|   800|
|    2|        Sam|   950|
|    3|      Diane|  1100|
|    4|   Nicholas|  1250|
|    5|     Samuel|  1250|
|    6|   Patricia|  1300|
|    7|      Brian|  1500|
|  

In [0]:
#zapytanie 2
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col

window_spec = Window.partitionBy("AccountId").orderBy("TranDate")

result_df = transactions_df.withColumn(
    "RunTotalAmt", 
    sum("TranAmt").over(window_spec)
).orderBy("AccountId", "TranDate")

result_df.show()

+---------+----------+-------+-----------+
|AccountId|  TranDate|TranAmt|RunTotalAmt|
+---------+----------+-------+-----------+
|        1|2011-01-01| 500.00|     500.00|
|        1|2011-01-15|  50.00|     550.00|
|        1|2011-01-22| 250.00|     800.00|
|        1|2011-01-24|  75.00|     875.00|
|        1|2011-01-26| 125.00|    1000.00|
|        1|2011-01-28| 175.00|    1175.00|
|        2|2011-01-01| 500.00|     500.00|
|        2|2011-01-15|  50.00|     550.00|
|        2|2011-01-22|  25.00|     575.00|
|        2|2011-01-23| 125.00|     700.00|
|        2|2011-01-26| 200.00|     900.00|
|        2|2011-01-29| 250.00|    1150.00|
|        3|2011-01-01| 500.00|     500.00|
|        3|2011-01-15|  50.00|     550.00|
|        3|2011-01-22|5000.00|    5550.00|
|        3|2011-01-25| 550.00|    6100.00|
|        3|2011-01-27|  95.00|    6195.00|
|        3|2011-01-30|2500.00|    8695.00|
+---------+----------+-------+-----------+



In [0]:
#zapytanie 3
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# zakres dat
all_months = [(account, month) 
             for account in [1, 2, 3] 
             for month in range(1, 13)]

full_range = spark.createDataFrame(
    all_months,
    ["AccountId", "month"]
)

# kolumna z data
full_range = full_range.withColumn(
    "TranDate",
    expr("to_date(concat('2011-', month, '-01'))")
)


transactions_with_months = full_range.join(
    transactions_df,
    ["AccountId", "TranDate"],
    "left"
).fillna(0, subset=["TranAmt"])

# f okienkowe 
window_spec = Window.partitionBy("AccountId").orderBy("TranDate")

result = transactions_with_months.withColumn(
    "RunAvg", avg("TranAmt").over(window_spec)
).withColumn(
    "RunTranQty", count("*").over(window_spec)
).withColumn(
    "RunSmallAmt", min("TranAmt").over(window_spec)
).withColumn(
    "RunLargeAmt", max("TranAmt").over(window_spec)
).withColumn(
    "RunTotalAmt", sum("TranAmt").over(window_spec)
).orderBy("AccountId", "TranDate")


result.show(36)

+---------+----------+-----+-------+----------+----------+-----------+-----------+-----------+
|AccountId|  TranDate|month|TranAmt|    RunAvg|RunTranQty|RunSmallAmt|RunLargeAmt|RunTotalAmt|
+---------+----------+-----+-------+----------+----------+-----------+-----------+-----------+
|        1|2011-01-01|    1| 500.00|500.000000|         1|     500.00|     500.00|     500.00|
|        1|2011-02-01|    2|   0.00|250.000000|         2|       0.00|     500.00|     500.00|
|        1|2011-03-01|    3|   0.00|166.666667|         3|       0.00|     500.00|     500.00|
|        1|2011-04-01|    4|   0.00|125.000000|         4|       0.00|     500.00|     500.00|
|        1|2011-05-01|    5|   0.00|100.000000|         5|       0.00|     500.00|     500.00|
|        1|2011-06-01|    6|   0.00| 83.333333|         6|       0.00|     500.00|     500.00|
|        1|2011-07-01|    7|   0.00| 71.428571|         7|       0.00|     500.00|     500.00|
|        1|2011-08-01|    8|   0.00| 62.500000|   

In [0]:
# zapytanie 4
from pyspark.sql.window import Window
from pyspark.sql.functions import *

months = [(a, f'2011-{m:02d}-01') for a in [1,2,3] for m in range(1,13)]
full_df = spark.createDataFrame(months, ['AccountId','TranDate'])

transactions_full = full_df.join(transactions_df, ['AccountId','TranDate'], 'left')\
                         .fillna(0, ['TranAmt'])

window = Window.partitionBy('AccountId').orderBy('TranDate')\
              .rowsBetween(-2, Window.currentRow)

result = transactions_full.withColumn('RN', row_number().over(Window.partitionBy('AccountId').orderBy('TranDate')))\
                         .withColumn('SlideAvg', avg('TranAmt').over(window))\
                         .withColumn('SlideQty', count('*').over(window))\
                         .withColumn('SlideMin', min('TranAmt').over(window))\
                         .withColumn('SlideMax', max('TranAmt').over(window))\
                         .withColumn('SlideTotal', sum('TranAmt').over(window))\
                         .orderBy('AccountId', 'TranDate')

result.show(36)

+---------+----------+-------+---+----------+--------+--------+--------+----------+
|AccountId|  TranDate|TranAmt| RN|  SlideAvg|SlideQty|SlideMin|SlideMax|SlideTotal|
+---------+----------+-------+---+----------+--------+--------+--------+----------+
|        1|2011-01-01| 500.00|  1|500.000000|       1|  500.00|  500.00|    500.00|
|        1|2011-02-01|   0.00|  2|250.000000|       2|    0.00|  500.00|    500.00|
|        1|2011-03-01|   0.00|  3|166.666667|       3|    0.00|  500.00|    500.00|
|        1|2011-04-01|   0.00|  4|  0.000000|       3|    0.00|    0.00|      0.00|
|        1|2011-05-01|   0.00|  5|  0.000000|       3|    0.00|    0.00|      0.00|
|        1|2011-06-01|   0.00|  6|  0.000000|       3|    0.00|    0.00|      0.00|
|        1|2011-07-01|   0.00|  7|  0.000000|       3|    0.00|    0.00|      0.00|
|        1|2011-08-01|   0.00|  8|  0.000000|       3|    0.00|    0.00|      0.00|
|        1|2011-09-01|   0.00|  9|  0.000000|       3|    0.00|    0.00|    

In [0]:
#zapytanie 5
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as _sum

rows_window = Window.orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)
range_window = Window.orderBy("Salary").rangeBetween(Window.unboundedPreceding, Window.currentRow)

result = logical_df.withColumn("SumByRows", _sum("Salary").over(rows_window)) \
                  .withColumn("SumByRange", _sum("Salary").over(range_window)) \
                  .orderBy("RowID")

result.show()

+-----+-----------+------+---------+----------+
|RowID|      FName|Salary|SumByRows|SumByRange|
+-----+-----------+------+---------+----------+
|    1|     George|   800|      800|       800|
|    2|        Sam|   950|     1750|      1750|
|    3|      Diane|  1100|     2850|      2850|
|    4|   Nicholas|  1250|     4100|      5350|
|    5|     Samuel|  1250|     5350|      5350|
|    6|   Patricia|  1300|     6650|      6650|
|    7|      Brian|  1500|     8150|      8150|
|    8|     Thomas|  1600|     9750|      9750|
|    9|       Fran|  2450|    12200|     12200|
|   10|     Debbie|  2850|    15050|     15050|
|   11|       Mark|  2975|    18025|     18025|
|   12|      James|  3000|    21025|     24025|
|   13|    Cynthia|  3000|    24025|     24025|
|   14|Christopher|  5000|    29025|     29025|
+-----+-----------+------+---------+----------+



In [0]:
#funkcji okienkowych
from pyspark.sql.window import Window
from pyspark.sql.functions import col, expr

# Definiujemy okno
window_spec = Window.orderBy("Salary")

# Obliczenia z użyciem funkcji okienkowych
result = logical_df.select(
    "RowID",
    "FName",
    "Salary", #expr sluzy do wykonania ezposrednich wyrazen SQL
    expr("ROW_NUMBER() OVER (ORDER BY Salary)").alias("RowNum"), #przypisuje numery wierszom kolejne w kazdej partycji
    expr("LEAD(Salary, 1) OVER (ORDER BY Salary)").alias("NextSalary"), #zwraca wartosc z kolumny z nastepnego wiersza
    expr("LAG(Salary, 1) OVER (ORDER BY Salary)").alias("PrevSalary"), #zwraca warstosc z kolumny z poprzedniego wiersza 
    expr("FIRST_VALUE(Salary) OVER (ORDER BY Salary)").alias("FirstSalary"), # zwraca pierwsza wartosc z kolumny
    expr("LAST_VALUE(Salary) OVER (ORDER BY Salary ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)").alias("LastSalary") #ostatnia val col
).orderBy("RowID")

result.show()

+-----+-----------+------+------+----------+----------+-----------+----------+
|RowID|      FName|Salary|RowNum|NextSalary|PrevSalary|FirstSalary|LastSalary|
+-----+-----------+------+------+----------+----------+-----------+----------+
|    1|     George|   800|     1|       950|      null|        800|      5000|
|    2|        Sam|   950|     2|      1100|       800|        800|      5000|
|    3|      Diane|  1100|     3|      1250|       950|        800|      5000|
|    4|   Nicholas|  1250|     4|      1250|      1100|        800|      5000|
|    5|     Samuel|  1250|     5|      1300|      1250|        800|      5000|
|    6|   Patricia|  1300|     6|      1500|      1250|        800|      5000|
|    7|      Brian|  1500|     7|      1600|      1300|        800|      5000|
|    8|     Thomas|  1600|     8|      2450|      1500|        800|      5000|
|    9|       Fran|  2450|     9|      2850|      1600|        800|      5000|
|   10|     Debbie|  2850|    10|      2975|      24