### Create Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

from delta import *

In [2]:
import warnings ## importing warnings library 
warnings.filterwarnings('ignore') ## Ignore warning

In [3]:
# Initialize Spark session
builder = SparkSession.builder \
    .appName("Delta Table Example") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/anastasiiatrofymova/.ivy2/cache
The jars for the packages stored in: /Users/anastasiiatrofymova/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2b569abc-dd81-4a53-9b43-2cf127b23e58;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 220ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|    

### Create a Delta Table or Load a CSV

In [4]:
# Schema definition
schema = StructType([
    StructField("impression_id", StringType(), True),
    StructField("impression_time", StringType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("app_code", IntegerType(), True),
    StructField("os_version", StringType(), True),
    StructField("is_4G", IntegerType(), True),
    StructField("is_click", IntegerType(), True)
])

# Load data with schema applied
path = "impressions.csv"
df = spark.read.option("header", True).schema(schema).csv(path)

# Show the DataFrame
df.show()

+--------------------+-------------------+-------+--------+------------+-----+--------+
|       impression_id|    impression_time|user_id|app_code|  os_version|is_4G|is_click|
+--------------------+-------------------+-------+--------+------------+-----+--------+
|c4ca4238a0b923820...|2018-11-15 00:00:00|  87862|     422|         old|    0|       0|
|45c48cce2e2d7fbde...|2018-11-15 00:01:00|  63410|     467|      latest|    1|       1|
|70efdf2ec9b086079...|2018-11-15 00:02:00|  71748|     259|intermediate|    1|       0|
|8e296a067a3756337...|2018-11-15 00:02:00|  69209|     244|      latest|    1|       0|
|182be0c5cdcd5072b...|2018-11-15 00:02:00|  62873|     473|      latest|    0|       0|
|3416a75f4cea91095...|2018-11-15 00:03:00|  67352|     409|      latest|    1|       0|
|f457c545a9ded88f1...|2018-11-15 00:03:00|  64356|     190|intermediate|    0|       0|
|72b32a1f754ba1c09...|2018-11-15 00:04:00|  27329|     481|      latest|    0|       0|
|fc490ca45c00b1249...|2018-11-15

In [5]:
# Write DataFrame as Delta table
df.write.format("delta").mode("overwrite").save("data/impressions_delta_table")

# Load Delta table
delta_df = spark.read.format("delta").load("data/impressions_delta_table")
delta_df.show()

24/09/27 11:57:35 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/09/27 11:57:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+-------------------+-------+--------+------------+-----+--------+
|       impression_id|    impression_time|user_id|app_code|  os_version|is_4G|is_click|
+--------------------+-------------------+-------+--------+------------+-----+--------+
|e9ed0a38ae765e5c7...|2018-12-10 17:00:00|  37755|     386|      latest|    1|       0|
|83aa8115e0c04ddb6...|2018-12-10 17:01:00|  13949|     386|         old|    1|       0|
|46c529721b46a7461...|2018-12-10 17:02:00|  71261|     249|intermediate|    0|       0|
|49b2e97d32e87b86a...|2018-12-10 17:03:00|  64600|     283|      latest|    0|       0|
|a1dd77ee7f3501f1d...|2018-12-10 17:03:00|  86711|     318|      latest|    1|       0|
|797ccff2ab7ec91ee...|2018-12-10 17:04:00|   1277|     504|         old|    1|       0|
|911be73e46fe41d58...|2018-12-10 17:05:00|  35063|     207|      latest|    0|       0|
|c7c3eda2bae16bbcf...|2018-12-10 17:06:00|  45027|     207|      latest|    0|       0|
|39340ef0b0e146881...|2018-12-10

### Data Transformation Pipeline

In [6]:
from pyspark.sql import functions as F

# 1. Filter: Impressions for Users only with latest OS
latest_os_df = delta_df.filter(F.col("os_version") == 'latest')
latest_os_df.show()

+--------------------+-------------------+-------+--------+----------+-----+--------+
|       impression_id|    impression_time|user_id|app_code|os_version|is_4G|is_click|
+--------------------+-------------------+-------+--------+----------+-----+--------+
|39af3153ae383ed71...|2018-11-24 01:35:00|  66498|     508|    latest|    1|       0|
|50b581cac0dcc1672...|2018-11-24 01:37:00|  77000|     244|    latest|    1|       0|
|277c3d058cb0490fa...|2018-11-24 01:39:00|  46028|     190|    latest|    0|       0|
|bde2abe34297af7e0...|2018-11-24 01:44:00|  84806|     190|    latest|    0|       0|
|bdc3472e51887357a...|2018-11-24 01:47:00|  73076|     473|    latest|    0|       0|
|9a14ec361fce610fe...|2018-11-24 01:49:00|  85634|      32|    latest|    1|       0|
|b2ad0e581e0195bcb...|2018-11-24 01:50:00|  82257|       3|    latest|    1|       0|
|a2b490ed6409a7b13...|2018-11-24 01:53:00|  64412|      38|    latest|    0|       0|
|298ba172091b07c23...|2018-11-24 01:54:00|  34412|    

In [7]:
# 2. Add New Columns  - Day of Week
newcolumn_df = latest_os_df.withColumn("dayofweek", F.dayofweek("impression_time"))
newcolumn_df.show()

+--------------------+-------------------+-------+--------+----------+-----+--------+---------+
|       impression_id|    impression_time|user_id|app_code|os_version|is_4G|is_click|dayofweek|
+--------------------+-------------------+-------+--------+----------+-----+--------+---------+
|39af3153ae383ed71...|2018-11-24 01:35:00|  66498|     508|    latest|    1|       0|        7|
|50b581cac0dcc1672...|2018-11-24 01:37:00|  77000|     244|    latest|    1|       0|        7|
|277c3d058cb0490fa...|2018-11-24 01:39:00|  46028|     190|    latest|    0|       0|        7|
|bde2abe34297af7e0...|2018-11-24 01:44:00|  84806|     190|    latest|    0|       0|        7|
|bdc3472e51887357a...|2018-11-24 01:47:00|  73076|     473|    latest|    0|       0|        7|
|9a14ec361fce610fe...|2018-11-24 01:49:00|  85634|      32|    latest|    1|       0|        7|
|b2ad0e581e0195bcb...|2018-11-24 01:50:00|  82257|       3|    latest|    1|       0|        7|
|a2b490ed6409a7b13...|2018-11-24 01:53:0

In [8]:
# 3. Aggregate: Total Clicks per Day of Week
total_clicks_df = newcolumn_df.groupBy("dayofweek",).agg(F.sum("is_click").alias("total_clicks"))
total_clicks_df.show()

+---------+------------+
|dayofweek|total_clicks|
+---------+------------+
|        1|         798|
|        6|         696|
|        3|         931|
|        5|         770|
|        4|         654|
|        7|         743|
|        2|         790|
+---------+------------+



### Write the Transformed Data

In [9]:
# Save as Delta table
total_clicks_df.write.format("delta").mode("overwrite").save("data/tranformation_impressions_delta_table")

                                                                                

### Read the Final Output

In [10]:
# Load Delta table
total_clicks_df = spark.read.format("delta").load("data/tranformation_impressions_delta_table")
total_clicks_df.show()

+---------+------------+
|dayofweek|total_clicks|
+---------+------------+
|        1|         798|
|        6|         696|
|        3|         931|
|        5|         770|
|        4|         654|
|        7|         743|
|        2|         790|
+---------+------------+

