# Spark Data Analysis Demo
This notebook is a demo of how you one can use spark for exploratory data analysis.stems.

## 1. Why this EDA exists

In modern data platforms, data constantly crosses system boundaries:
from applications to data distributors, from distributors to data lakes,
from data lakes to warehouses, and from warehouses into analytics
and machine-learning systems.

At each of these transitions, data can become incomplete
(e.g. broken exports), duplicative (e.g. repeated ingestimistreatednderstood (e.g. timestamps ingested as strings, numbers parsed as text).
These issues often propagate silently and eventually break reports,
analytics, and machine-learning models.

This EDA exissuch detect those risks early, before the data is used
in product

This EDA does not cover ingestion errors (i.e. corrupted files or transmissions), it assumes data parsing completed without errors. ion pipelines. silent data corruption.
 be automated in production pipelines.


In [1]:
# To explore data we need a spark session. Its and an object used to read data, explore data (including not limited to sql), access spark confirguration. 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark-dq-demo").getOrCreate()

In [2]:
# lets read a csv file into spark data frame, inferSchema will detect datatypes (if consistent), header will take first row as header.
df = spark.read.csv(
    "../data/retail_personalization_dataset.csv",
    inferSchema=True,
    header=True)


In [3]:
# let's print schema of the data frame
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- session_id: string (nullable = true)
 |-- interaction_type: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- price: double (nullable = true)
 |-- discount: integer (nullable = true)
 |-- product_category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- user_age: integer (nullable = true)
 |-- user_gender: string (nullable = true)
 |-- loyalty_score: integer (nullable = true)
 |-- previous_purchase_count: integer (nullable = true)
 |-- avg_purchase_value: double (nullable = true)
 |-- search_keywords: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- purchase: integer (nullable = true)



Because the source is a CSV file, there is no enforced schema or
nullability contract, and all constraints must be validated in Spark.

Headers were read correctly and key fields such as timestamp, price,
discount, user_age, loyalty_score, previous_purchase_count,
avg_purchase_value, rating, and purchase were correctly inferred as
numeric or temporal types and not silently downgraded to strings due to
inconsistent formatting..

## 3. Event-level uniqueness and duplicate risk

Before aggregating, joining, or building features, we must understand
what a single row represents and whether rows are duplicated.

In this dataset, each row is assumed to represent a single user
interaction with a product at a given point in time.

To validate this assumption, we perform two checks:

- **Full-row duplicates**  
  We check whether identical rows appear more than once. The presence
  of full duplicates would indicate upstream ingestion or replay issues.

- **Event-level duplicates**  
  We check for duplicates using a proxy event identifier:
  `(user_id, product_id, session_id, timestamp)`.  
  If this combination is not unique, it suggests that the same real-world
  interaction has been recorded more than once.

Duplicates at either level would cause overcounting in reports,
distort aggregated metrics, and introduce bias into machine-learning
features.

In [4]:
total_rows=df.count()
total_rows

150002

In [5]:
distinct_rows=df.dropDuplicates().count()
distinct_rows

150000

In [6]:
duplicate_rows = total_rows - distinct_rows
duplicate_rows

2

In [7]:
rows_keys=df.select("user_id","product_id","timestamp","session_id").dropDuplicates().count()
rows_keys

150000

### Results

The dataset contains:
- 2 fully duplicated rows
- 2 duplicated event records when using the proxy
  `(user_id, product_id, session_id, timestamp)`

This indicates that the same real-world interaction has been recorded
more than once.

While the number of duplicates is small, their presence confirms that
duplicate events are a realistic failure mode and must be handled by
t

### Implications for data pipelines

Because duplicate events exist and the dataset contains sufficient
identifiers to detect them, this data can be made safe for downstream
use by applying deterministic deduplication during ingestion.

The following checks should be automated in production:
- detection of fully duplicated rows
- detection of duplicated event identifiers

Ingestion should either:
- drop duplicates, or
- fail when duplicate rates exceed an acceptable thresholdhe pipeline.

## 4. Completeness and missing data

Missing values (Nulls, Nans) can silently corrupt aggregations, joins, and machine-learning
features. This section evaluates how complete the dataset is and whether
critical fields are missing.

In [24]:
# get list of columns that are numeric types to later check NaN (Not a Number)
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType
numeric_cols=[f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]

In [37]:
# get dict of columns and their nan counts
nan_count=df.select([F.count(F.when(F.isnan(F.col(c)),1)).alias(c) for c in numeric_cols])
nan_count.first().asDict()

{'price': 0,
 'discount': 0,
 'user_age': 0,
 'loyalty_score': 0,
 'previous_purchase_count': 0,
 'avg_purchase_value': 0,
 'rating': 0,
 'purchase': 0}

In [38]:
# get dict of columns and their null counts
null_counts=df.select([F.count(F.when(F.col(c).isNull(),1)).alias(c) for c in df.columns])
null_counts.first().asDict()

{'user_id': 0,
 'product_id': 0,
 'timestamp': 0,
 'session_id': 0,
 'interaction_type': 0,
 'device_type': 0,
 'location': 0,
 'price': 0,
 'discount': 0,
 'product_category': 0,
 'brand': 0,
 'user_age': 0,
 'user_gender': 0,
 'loyalty_score': 0,
 'previous_purchase_count': 0,
 'avg_purchase_value': 0,
 'search_keywords': 0,
 'rating': 74881,
 'purchase': 0}

In [57]:
df.groupBy("purchase","rating").count().orderBy("purchase","rating").show()

+--------+------+-----+
|purchase|rating|count|
+--------+------+-----+
|       0|  NULL|71121|
|       0|   1.0| 7113|
|       0|   2.0|14426|
|       0|   3.0|21129|
|       0|   4.0|21439|
|       0|   5.0| 7195|
|       1|  NULL| 3760|
|       1|   1.0|  368|
|       1|   2.0|  764|
|       1|   3.0| 1183|
|       1|   4.0| 1145|
|       1|   5.0|  359|
+--------+------+-----+



In [52]:
df.groupBy("interaction_type").pivot("purchase").count().show()

+----------------+-----+----+
|interaction_type|    0|   1|
+----------------+-----+----+
|        purchase| NULL|7579|
|     add_to_cart|22217|NULL|
|            view|75217|NULL|
|           click|44989|NULL|
+----------------+-----+----+



### Results

One column contains a large number of NULL values, while the remaining
columns are fully populated. Numeric columns do not contain NaN values.

This indicates that missing data is concentrated in a specific field,
rather than being randomly distributed across the da

### Implications for data pipelines

Because NULLs are present, downstream pipelines must explicitly decide
how to handle missing values for this column.

Depending on how the column is used:
- reporting pipelines may need to filter or impute NULLs
- machine learning pipelines may need explicit imputation or masking
- feature engineering must not assume the field is always present

The following checks should be automated in production:
- NULL and NaN rates per column
- NULL rates for critical identifiers and target variables

Pipelines should fail or alert when NULL or NaN rates exceed acceptable
thresholds, rather than silently propagating missing values.taset.

In [10]:
spark.stop()