In [1]:
from pyspark.sql import SparkSession
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages io.delta:delta-spark_2.12:3.1.0 pyspark-shell"
)

spark = (
    SparkSession.builder
    .appName("FeatureStore-Feature-Engineering")
    .master("local[*]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog"
    )
    .getOrCreate()
)

spark


25/12/28 08:02:51 WARN Utils: Your hostname, RidhiGuntur resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/28 08:02:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ridhi/feature-store-delta/venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ridhi/.ivy2/cache
The jars for the packages stored in: /home/ridhi/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a324be24-c978-4aaa-93ea-48583df0f4e9;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 222ms :: artifacts dl 7ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   

In [2]:
bronze_path = "/home/ridhi/feature-store-delta/delta/bronze/user_events"

bronze_df = spark.read.format("delta").load(bronze_path)
bronze_df.show()


25/12/28 08:03:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 9:>                                                          (0 + 1) / 1]

+-------+----------+-----------+-------------------+
|user_id|event_type|event_value|         event_time|
+-------+----------+-----------+-------------------+
|      1|     click|          5|2024-01-01 10:00:00|
|      1|     click|          3|2024-01-02 11:00:00|
|      2|      view|          1|2024-01-01 09:30:00|
|      2|     click|          2|2024-01-03 14:00:00|
|      3|      view|          4|2024-01-02 16:45:00|
+-------+----------+-----------+-------------------+



                                                                                

In [3]:
from pyspark.sql.functions import count, avg, sum, col

features_df = (
    bronze_df
    .groupBy("user_id")
    .agg(
        count("*").alias("total_events"),
        sum((col("event_type") == "click").cast("int")).alias("total_clicks"),
        avg("event_value").alias("avg_event_value")
    )
)

features_df.show()


+-------+------------+------------+---------------+
|user_id|total_events|total_clicks|avg_event_value|
+-------+------------+------------+---------------+
|      1|           2|           2|            4.0|
|      3|           1|           0|            4.0|
|      2|           2|           1|            1.5|
+-------+------------+------------+---------------+



In [4]:
features_path = "/home/ridhi/feature-store-delta/delta/features/user_features"

features_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(features_path)


                                                                                

In [5]:
spark.read.format("delta").load(features_path).show()


+-------+------------+------------+---------------+
|user_id|total_events|total_clicks|avg_event_value|
+-------+------------+------------+---------------+
|      1|           2|           2|            4.0|
|      3|           1|           0|            4.0|
|      2|           2|           1|            1.5|
+-------+------------+------------+---------------+



In [6]:
spark.sql(
    f"DESCRIBE HISTORY delta.`{features_path}`"
).show(truncate=False)


+-------+----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp             |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|0      |2025-12-28 08:03:31.92|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |NULL       |Serializable  |false        |{numFiles -> 1, numOut

In [7]:
updated_features_df = (
    features_df
    .withColumn("avg_event_value", col("avg_event_value") + 1)
)

updated_features_df.show()


+-------+------------+------------+---------------+
|user_id|total_events|total_clicks|avg_event_value|
+-------+------------+------------+---------------+
|      1|           2|           2|            5.0|
|      3|           1|           0|            5.0|
|      2|           2|           1|            2.5|
+-------+------------+------------+---------------+



In [8]:
updated_features_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(features_path)


                                                                                

In [9]:
spark.sql(
    f"DESCRIBE HISTORY delta.`{features_path}`"
).show(truncate=False)


+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|1      |2025-12-28 08:04:22.889|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |0          |Serializable  |false        |{numFiles -> 1, nu