In [1]:
import os
import logging
from itertools import chain
from datetime import datetime
import math

os.environ["SPARK_HOME"] = "/Applications/Spark"
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"

In [2]:
from pyspark.sql import SparkSession
from pyspark.logger import PySparkLogger
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.getOrCreate()
#spark.sparkContext.setLogLevel("WARN") #To keep the notebook clean for now

{"ts":"2025-09-29T18:19:19.715Z","level":"WARN","msg":"Your hostname, jaguilar.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)","logger":"Utils"}
{"ts":"2025-09-29T18:19:19.718Z","level":"WARN","msg":"Set SPARK_LOCAL_IP if you need to bind to another address","logger":"Utils"}
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
{"ts":"2025-09-29T18:19:20.388Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54676)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/Users/jaguilar/Desktop/Portfolio/data_pipeline/venv/lib/python3.11/site-packages/pyspark/accumulators.py", line 299, in handle
    poll(accum_updates)
  File "/Users/jaguilar/Deskt

In [4]:
logger = PySparkLogger.getLogger("spark_logger")
handler = logging.FileHandler("cleanup.log")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

### BigQuery Raw Data Cleanup

First, we've gotta do a clean up of the "databases" the business already has in place, as if their data platform team does a good job of maintaining it in proper form :sunglasses:

In [5]:
users = spark.read.csv("../tables/users_table.csv", header=True, inferSchema=True)
orders = spark.read.csv("../tables/orders_table.csv", header=True, inferSchema=True)
products = spark.read.csv("../tables/products_table.csv", header=True, inferSchema=True)

users_size = users.count()
orders_size = orders.count()
products_size = products.count()

logger.info(f"Users Database has {users_size} records as of this pipeline run.", users_size=users_size)
logger.info(f"Orders Database has {orders_size} records as of this pipeline run.", orders_size=orders_size)
logger.info(f"Products Database has {products_size} records as of this pipeline run.", products_size=products_size)

{"ts": "2025-09-29 20:19:26.790", "level": "INFO", "logger": "spark_logger", "msg": "Users Database has 100000 records as of this pipeline run.", "context": {"users_size": 100000}}
{"ts": "2025-09-29 20:19:26.791", "level": "INFO", "logger": "spark_logger", "msg": "Orders Database has 125188 records as of this pipeline run.", "context": {"orders_size": 125188}}
{"ts": "2025-09-29 20:19:26.792", "level": "INFO", "logger": "spark_logger", "msg": "Products Database has 29120 records as of this pipeline run.", "context": {"products_size": 29120}}


In [6]:
users_null_dictionary = {col : users.filter(users[col].isNull()).count() for col in users.columns}
logger.info(f"The users database has the following amount of null values: {users_null_dictionary}",
    users_null_dictionary=users_null_dictionary)

orders_null_dictionary = {col : orders.filter(orders[col].isNull()).count() for col in orders.columns}
logger.info(f"The orders database has the following amount of null values: {orders_null_dictionary}",
    orders_null_dictionary=orders_null_dictionary)

products_null_dictionary = {col : products.filter(products[col].isNull()).count() for col in products.columns}
logger.info(f"The products database has the following amount of null values: {products_null_dictionary}",
    products_null_dictionary=products_null_dictionary)

{"ts": "2025-09-29 20:19:31.473", "level": "INFO", "logger": "spark_logger", "msg": "The users database has the following amount of null values: {'id': 0, 'first_name': 0, 'last_name': 0, 'email': 0, 'age': 0, 'gender': 0, 'state': 0, 'street_address': 0, 'postal_code': 0, 'city': 0, 'country': 0, 'latitude': 0, 'longitude': 0, 'traffic_source': 0, 'created_at': 0, 'user_geom': 0}", "context": {"users_null_dictionary": {"id": 0, "first_name": 0, "last_name": 0, "email": 0, "age": 0, "gender": 0, "state": 0, "street_address": 0, "postal_code": 0, "city": 0, "country": 0, "latitude": 0, "longitude": 0, "traffic_source": 0, "created_at": 0, "user_geom": 0}}}
{"ts": "2025-09-29 20:19:32.449", "level": "INFO", "logger": "spark_logger", "msg": "The orders database has the following amount of null values: {'order_id': 0, 'user_id': 0, 'status': 0, 'gender': 0, 'created_at': 0, 'returned_at': 112841, 'shipped_at': 43812, 'delivered_at': 81271, 'num_of_item': 0}", "context": {"orders_null_dicti

- We can see that the users database is good to go :white_check_mark:
- Orders have nulls for values in columns that recorded an instance, such as `returned_at, shipped_at (worrying), delivered_at`, and so on. We'll have to check if the nulls make sense :eyes:
- Twenty-four of the products don't have a brand, this affects analytics and would be manually handled in real life, so we'll name them and give them a good brand name.

## Orders Cleanup

All orders have values of `created_at`, that's good :thumbsup:.\
 After this we have to make sure they're shipped, then they should be delivered and lastly returned.

1. With `shipped_at` having 81,376 non-null values, we should expect delivered orders to be the same or smaller.
2. We have 43,917 `delivered_at` orders in total.
3. And finally we have 12,347 orders that have values in the column `returned_at`. 

As a business this would be an alarming rate of delivered/shipped orders but as a portfolio project, this is more than interesting to analyze.

In [7]:
orders.select(F.col("order_id")).where((F.col("shipped_at").isNull()) & (F.col("delivered_at").isNotNull())).count()

0

In [8]:
orders.select(F.col("order_id")).where((F.col("delivered_at").isNull()) & (F.col("returned_at").isNotNull())).count()

0

Now that we're sure the way those columns were filled makes sense, we can continue with the products table:

## Products Cleanup

In [9]:
products.select("*").where(F.col("brand").isNull()).show()

+-----+------------------+--------------------+--------------------+-----+------------------+----------+--------------------+----------------------+
|   id|              cost|            category|                name|brand|      retail_price|department|                 sku|distribution_center_id|
+-----+------------------+--------------------+--------------------+-----+------------------+----------+--------------------+----------------------+
|  755| 15.14085034638792|         Tops & Tees|The Very Hungry C...| NULL|28.950000762939453|     Women|CCB0989662211F61E...|                     3|
| 1629| 23.57235048930124|Fashion Hoodies &...|Carhartt Women's ...| NULL|45.950000762939446|     Women|5C50B4DF4B176845C...|                     3|
| 8600| 16.01555072412528|   Outerwear & Coats|Women's Micro Fle...| NULL|  35.9900016784668|     Women|CE840AA9583592E71...|                     3|
| 9482|5.7119999527931204|     Socks & Hosiery|KEEN Women Bellin...| NULL|              16.0|     Women|C5

As filling in individual rows with different values proves almost impossible with Spark (because honestly, just do it in the database, this tool is made to handle a lot of data at the same time), we'll fill in the missing brands with my country's _"We make everything for cheap"_ brand: **Suli**.

In [10]:
products = products.fillna(value="Suli")
logger.info("The products database has been cleared of null values, by filling in the missing data.")

{"ts": "2025-09-29 20:20:05.013", "level": "INFO", "logger": "spark_logger", "msg": "The products database has been cleared of null values, by filling in the missing data.", "context": {}}


In [11]:
products_null_dictionary = {col : products.filter(products[col].isNull()).count() for col in products.columns}
logger.info(f"The products database has the following amount of null values: {products_null_dictionary}",
    products_null_dictionary=products_null_dictionary)

{"ts": "2025-09-29 20:20:09.287", "level": "INFO", "logger": "spark_logger", "msg": "The products database has the following amount of null values: {'id': 0, 'cost': 0, 'category': 0, 'name': 0, 'brand': 0, 'retail_price': 0, 'department': 0, 'sku': 0, 'distribution_center_id': 0}", "context": {"products_null_dictionary": {"id": 0, "cost": 0, "category": 0, "name": 0, "brand": 0, "retail_price": 0, "department": 0, "sku": 0, "distribution_center_id": 0}}}


Now we can continue with the simulated daily batch of orders we'd get as a business.

### Simulated Batches Cleanup

In [12]:
events_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("user_id", StringType(), True),
    StructField("sequence_number", IntegerType(), True),
    StructField("session_id", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("ip_address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("browser", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("uri", StringType(), True),
    StructField("event_type", StringType(), True)
])

In [13]:
events = spark.read.csv("../tables/events_table.csv", header=True, schema=events_schema).persist()
events_size = events.count()
logger.info(f"Events data loaded, there are {events_size} records on it.", events_size=events_size)

{"ts": "2025-09-29 20:22:17.289", "level": "INFO", "logger": "spark_logger", "msg": "Events data loaded, there are 1200000 records on it.", "context": {"events_size": 1200000}}


In [14]:
#Let's check what columns have nulls and how many of them
Events_null_dictionary = {col : events.filter(events[col].isNull()).count() for col in events.columns}
logger.info(f"The events data has the following amount of null values: {Events_null_dictionary}",
     Events_null_dictionary=Events_null_dictionary)

{"ts": "2025-09-29 20:22:19.508", "level": "INFO", "logger": "spark_logger", "msg": "The events data has the following amount of null values: {'id': 0, 'user_id': 561396, 'sequence_number': 0, 'session_id': 0, 'created_at': 0, 'ip_address': 0, 'city': 0, 'state': 0, 'postal_code': 0, 'browser': 0, 'traffic_source': 0, 'uri': 0, 'event_type': 0}", "context": {"Events_null_dictionary": {"id": 0, "user_id": 561396, "sequence_number": 0, "session_id": 0, "created_at": 0, "ip_address": 0, "city": 0, "state": 0, "postal_code": 0, "browser": 0, "traffic_source": 0, "uri": 0, "event_type": 0}}}


In [16]:
#Sadly we know that the events locaiton columns city, state and country are not discrete enough to let us narrow down 
#a user for its mapping
#Now we should check whether the columns contains values that make sense, such as status, gender and num of item.
#Columns as dates are checked when loading the CSV, they'd be nulls if they didn't match the format 
#(Spark's default reading mode)
events.show(1, truncate=False)

+-------+-------+---------------+------------------------------------+-------------------+---------------+-------+--------+-----------+-------+--------------+-------+----------+
|id     |user_id|sequence_number|session_id                          |created_at         |ip_address     |city   |state   |postal_code|browser|traffic_source|uri    |event_type|
+-------+-------+---------------+------------------------------------+-------------------+---------------+-------+--------+-----------+-------+--------------+-------+----------+
|2302471|NULL   |3              |6552bb55-8b9e-4eb5-bd1e-3f7f67ca168d|2019-04-02 11:33:00|172.175.102.165|Sapporo|Hokkaido|005-0849   |IE     |Adwords       |/cancel|cancel    |
+-------+-------+---------------+------------------------------------+-------------------+---------------+-------+--------+-----------+-------+--------------+-------+----------+
only showing top 1 row


Come to think of it, maybe the user_id is null due to all of the subsequent rows being part of one session, and thus traceable to one user.

In [17]:
events.select("*").where(F.col("session_id") == "6552bb55-8b9e-4eb5-bd1e-3f7f67ca168d").orderBy("sequence_number").show(truncate=False)

+-------+-------+---------------+------------------------------------+-------------------+---------------+-------+--------+-----------+-------+--------------+--------------+----------+
|id     |user_id|sequence_number|session_id                          |created_at         |ip_address     |city   |state   |postal_code|browser|traffic_source|uri           |event_type|
+-------+-------+---------------+------------------------------------+-------------------+---------------+-------+--------+-----------+-------+--------------+--------------+----------+
|2302469|NULL   |1              |6552bb55-8b9e-4eb5-bd1e-3f7f67ca168d|2019-04-02 11:15:00|172.175.102.165|Sapporo|Hokkaido|005-0849   |IE     |Adwords       |/product/16284|product   |
|2302470|NULL   |2              |6552bb55-8b9e-4eb5-bd1e-3f7f67ca168d|2019-04-02 11:27:00|172.175.102.165|Sapporo|Hokkaido|005-0849   |IE     |Adwords       |/cart         |cart      |
|2302471|NULL   |3              |6552bb55-8b9e-4eb5-bd1e-3f7f67ca168d|2019-

Well it isn't the way I suspected it could be, but then we can generate fake_ids to have the totality of the data for the Churn model

In [18]:
sessionids_with_null_userids = list(events.select("session_id").distinct().where(F.col("user_id").isNull()).toPandas()["session_id"])

                                                                                

In [None]:
#Check if any user_id is null for a session that does have a user_id record
# cantnameit = list(events.select("id")
#         .where(
#             (F.col("session_id").isin(sessionids_with_null_userids)) &
#             (F.col("user_id").isNotNull())
#         )
#     )

In [None]:
# if sessionids_with_null_userids:
#     #The whole ass query thing and the filling process
#     print("hola")
# elif cantnameit:
#     #drop the rows with the ids in cantnameit
#     print("adios")

In [19]:
events.createOrReplaceTempView("events_table")

query = """
SELECT
    AVG(number_of_sessions) as avg_sessions_per_user
FROM
    (SELECT
        user_id,
        COUNT(DISTINCT(session_id)) as number_of_sessions
    FROM
        events_table
    WHERE
        user_id IS NOT NULL
    GROUP BY
        user_id
)
"""

avg_sessions_per_user = int(round(spark.sql(query).first()[0],0))

avg_sessions_per_user

#Don't use CTE in Spark, better to push filters first in the FROM clause

                                                                                

2

In [20]:
fake_ids = []
value = 1
fake_ids_amount = math.floor(len(sessionids_with_null_userids)/avg_sessions_per_user)
for i in range(fake_ids_amount):
    fake_ids.append(f"fake_id_{value}_{datetime.now().strftime('%Y-%m-%d')}") #These are daily batches
    value += 1

In [21]:
df1 = events.filter(F.col("user_id").isNull()).select("session_id")

window = Window.orderBy("session_id")

#Given that the window function is over the whole dataset, we persist it
df1 = df1.withColumn("row_number", F.dense_rank().over(window)).persist() 
df1 = df1.withColumn("fake_index", (((F.col("row_number") - 1)/avg_sessions_per_user).cast("int")))

{"ts":"2025-09-29T18:23:54.981Z","level":"WARN","msg":"No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.","logger":"WindowExec"}
{"ts":"2025-09-29T18:23:54.981Z","level":"WARN","msg":"No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.","logger":"WindowExec"}
{"ts":"2025-09-29T18:23:54.981Z","level":"WARN","msg":"No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.","logger":"WindowExec"}


There would be no data loss if we generate fake_ids for the missing sessions. 
We'll fill them with the average amount of sessions a user has had historically per batch (this batch in our project's case).

In [22]:
fake_ids_df = spark.createDataFrame([(i, fake_ids[i]) for i in range(len(fake_ids))], ["fake_index", "fake_id"])

filled_df = df1.join(fake_ids_df, on="fake_index", how="left")

In [23]:
events = (events.join(filled_df.select("session_id", "fake_id"),
                        on="session_id", how="left").withColumn("user_id", F.coalesce("user_id", filled_df["fake_id"]))
                .drop("fake_id")
)
events.show(20)

{"ts":"2025-09-29T18:25:03.464Z","level":"WARN","msg":"No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.","logger":"WindowExec"}
{"ts":"2025-09-29T18:25:03.465Z","level":"WARN","msg":"No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.","logger":"WindowExec"}
{"ts":"2025-09-29T18:25:03.644Z","level":"WARN","msg":"No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.","logger":"WindowExec"}
{"ts":"2025-09-29T18:25:03.644Z","level":"WARN","msg":"No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.","logger":"WindowExec"}
                                                                                

+--------------------+-------+--------------------+---------------+-------------------+---------------+------------+--------------------+-----------+-------+--------------+--------------------+----------+
|          session_id|     id|             user_id|sequence_number|         created_at|     ip_address|        city|               state|postal_code|browser|traffic_source|                 uri|event_type|
+--------------------+-------+--------------------+---------------+-------------------+---------------+------------+--------------------+-----------+-------+--------------+--------------------+----------+
|09c046f5-d079-45f...|1153971|               88494|              7|2020-09-04 04:01:27|  75.232.73.112|    Petaluma|          California|      94952| Safari|         Email|/department/men/c...|department|
|19298899-38b0-45d...|1861162|fake_id_18433_202...|              3|2022-03-22 14:39:00|110.148.245.212|     Beijing|Xinjiang Uygur Au...|     841001| Chrome|         Email|        

In [24]:
Events_null_dictionary = {col : events.filter(events[col].isNull()).count() for col in events.columns}
logger.info(f"The events data has the following amount of null values: {Events_null_dictionary}",
     Events_null_dictionary=Events_null_dictionary)

{"ts": "2025-09-29 20:25:33.838", "level": "INFO", "logger": "spark_logger", "msg": "The events data has the following amount of null values: {'session_id': 0, 'id': 0, 'user_id': 0, 'sequence_number': 0, 'created_at': 0, 'ip_address': 0, 'city': 0, 'state': 0, 'postal_code': 0, 'browser': 0, 'traffic_source': 0, 'uri': 0, 'event_type': 0}", "context": {"Events_null_dictionary": {"session_id": 0, "id": 0, "user_id": 0, "sequence_number": 0, "created_at": 0, "ip_address": 0, "city": 0, "state": 0, "postal_code": 0, "browser": 0, "traffic_source": 0, "uri": 0, "event_type": 0}}}


In [25]:
events = events.drop_duplicates(subset=["id", "user_id"])

In [None]:
traffic_sources = ["Organic", "YouTube", "Email", "Adwords", "Facebook"]
event_types = ["cancel", "purchase", "cart", "cart", "department", "home", "product"]

events = events.filter(F.col("traffic_source").isin(traffic_sources))
events = events.filter(F.col("event_type").isin(event_types))

In [30]:
events.write.parquet(f'./cleaned_data/cleaned_events_batch_{datetime.now().strftime("%Y-%m-%d")}',
                     mode = "overwrite",
                     partitionBy="event_type" #To be defined once we know what to train the model with
                     
                     )

{"ts":"2025-09-29T18:35:26.821Z","level":"WARN","msg":"Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory\nScaling row group sizes to 95.00% for 8 writers","context":{"task_name":"task 0.0 in stage 373.0 (TID 798)"},"logger":"MemoryManager"}
                                                                                

## What's next? Analytics?

In [None]:
orders_items_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("order_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("inventory_item_id", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("shipped_at", TimestampType(), True),
    StructField("delivered_at", TimestampType(), True),
    StructField("returned_at", TimestampType(), True),
    StructField("sale_price", FloatType(), True)
])