# Pyspark EDA

In [0]:
from pyspark.sql.functions import col, avg, sum, count, desc, isnan, when, mean, stddev

### Loading in the **files**

In [0]:
# Replace these with your values
storage_account_name = "apaarblob2"
container_name = "project"

sas_token = "sp=r&st=2025-07-10T11:14:06Z&se=2025-07-10T19:14:06Z&spr=https&sv=2024-11-04&sr=c&sig=ej%2Be2KFI7n9fNaD1Z8OAfReYmKsDdFVuVd%2Bl07Bd%2FSI%3D"


file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/csv_files/transactions.csv"


spark.conf.set(
  f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",
  sas_token
)


product_df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/csv_files/products.csv")
product_df.show()

transactions_df = spark.read.option("header", True).csv(file_path)
transactions_df.show()


+----------+------------+--------------+------+--------------------+
|product_id|product_name|      category| price|         description|
+----------+------------+--------------+------+--------------------+
|      P001|         One|   Electronics|159.33|Detail should act...|
|      P002|     Citizen|         Books|330.27|Detail respond ri...|
|      P003|   Beautiful|   Electronics|442.61|Mission pattern g...|
|      P004|      Father|       Apparel|  87.2|Another produce t...|
|      P005|    Research|       Apparel|183.51|Eight role above ...|
|      P006|        From|       Apparel|474.18|Dark major seat w...|
|      P007|       Power|   Electronics|190.35|Four base easy mi...|
|      P008|         Win|Home & Kitchen|382.67|Camera already ov...|
|      P009|      Center|   Electronics| 350.8|Senior trip whose...|
|      P010|       Movie|Home & Kitchen|272.24|Single amount wif...|
|      P011|       Occur|       Apparel|174.13|    Term join catch.|
|      P012|         Yet|         

## Schema Validation

In [0]:
expected_columns_transactions = [
    'product_id', 'transaction_id', 'customer_id', 'channel', 'order_date',
    'quantity', 'unit_price', 'campaign_applied',
]

expected_columns_products = [ 'product_name', 'category',
    'price', 'description'
]

missing_cols_transactions = [col for col in expected_columns_transactions if col not in transactions_df.columns]

missing_cols_products = [col for col in expected_columns_products if col not in product_df.columns]

miss=0

if missing_cols_transactions:

    print("These columns are missing from the transactions DataFrame:", missing_cols_transactions)
    miss=1

if missing_cols_products:
    print("These columns are missing from the products DataFrame:", missing_cols_products)
    miss=1

if not miss:
    print("All columns are present in the files.")

All columns are present in the files.


### Combining product with the transaction file

In [0]:
combined_df = transactions_df.join(product_df, on="product_id", how="left")

### Checking Data Quality

Null count per column

In [0]:
combined_df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c + "_nulls")
    for c in combined_df.columns
]).show()

+----------------+--------------------+-----------------+-------------+----------------+--------------+----------------+----------------------+------------------+--------------+-----------+-----------------+
|product_id_nulls|transaction_id_nulls|customer_id_nulls|channel_nulls|order_date_nulls|quantity_nulls|unit_price_nulls|campaign_applied_nulls|product_name_nulls|category_nulls|price_nulls|description_nulls|
+----------------+--------------------+-----------------+-------------+----------------+--------------+----------------+----------------------+------------------+--------------+-----------+-----------------+
|               0|                   0|                0|            0|               0|             0|               0|                     0|                 0|             0|          0|                0|
+----------------+--------------------+-----------------+-------------+----------------+--------------+----------------+----------------------+------------------+------

Outlier Detection

In [0]:
combined_df = combined_df.withColumn("order_value", col("quantity") * col("unit_price"))

# Compute mean and stddev of order_value
stats = combined_df.select(mean("order_value"), stddev("order_value")).first()
mean_val = stats[0]
stddev_val = stats[1]

# Detect outliers using z-score
zscore_outliers = combined_df.filter(
    (col("order_value") > mean_val + 3 * stddev_val) |
    (col("order_value") < mean_val - 3 * stddev_val)
)

zscore_outliers.show()


+----------+--------------+-----------+-------+----------+--------+----------+----------------+------------+--------+-----+-----------+-----------+
|product_id|transaction_id|customer_id|channel|order_date|quantity|unit_price|campaign_applied|product_name|category|price|description|order_value|
+----------+--------------+-----------+-------+----------+--------+----------+----------------+------------+--------+-----+-----------+-----------+
+----------+--------------+-----------+-------+----------+--------+----------+----------------+------------+--------+-----+-----------+-----------+



## Data Analysis

Average Order Value Per Customer

In [0]:
avg_order_value = combined_df.groupBy("customer_id") \
    .agg(avg("order_value").alias("avg_order_value"))

avg_order_value.show()

+-----------+------------------+
|customer_id|   avg_order_value|
+-----------+------------------+
|       C006| 548.4466666666667|
|       C010|           911.135|
|       C038|           886.198|
|       C031|           913.625|
|       C044| 611.0274999999999|
|       C007|            805.86|
|       C042| 893.3766666666667|
|       C018|             884.8|
|       C040|           2213.05|
|       C029|            348.26|
|       C032| 922.7149999999998|
|       C043| 805.4124999999999|
|       C025|1226.3220000000001|
|       C030|1115.0266666666666|
|       C012| 585.1840000000001|
|       C048| 590.0333333333334|
|       C049|1114.3957142857143|
|       C050|           811.962|
|       C023|          1115.095|
|       C003|            483.98|
+-----------+------------------+
only showing top 20 rows



Top 10 Popular Products

In [0]:
popular_products_df = combined_df.groupBy("product_name") \
    .agg(count("transaction_id").alias("purchase_count")) \
    .orderBy(desc("purchase_count")) \
    .limit(10)

popular_products_df.show()

+------------+--------------+
|product_name|purchase_count|
+------------+--------------+
|       After|            18|
|     Citizen|            16|
|       Occur|            15|
|         Win|            13|
|    Business|            13|
|       Power|            12|
|         Yes|            12|
|       Movie|            11|
|   Beautiful|            11|
|Organization|            10|
+------------+--------------+



Sales by category

In [0]:
category_sales_df = combined_df.groupBy("category") \
    .agg(sum("order_value").alias("total_sales")) \
    .orderBy(desc("total_sales"))

category_sales_df.show()

+--------------+------------------+
|      category|       total_sales|
+--------------+------------------+
|       Apparel| 43969.35999999999|
|   Electronics|           41922.9|
|Home & Kitchen|          38638.84|
|         Books|36975.340000000004|
+--------------+------------------+



Marketing Capaign Impact

In [0]:
campaign_impact_df = combined_df.groupBy("campaign_applied") \
    .agg(sum("order_value").alias("total_sales"))

campaign_impact_df.show()


+----------------+-----------------+
|campaign_applied|      total_sales|
+----------------+-----------------+
|               0|82612.13999999997|
|               1|78894.29999999999|
+----------------+-----------------+



Sales By Channels

In [0]:
channel_sales_df = combined_df.groupBy("channel") \
    .agg(sum("order_value").alias("channel_sales"))

channel_sales_df.show()

+--------+------------------+
| channel|     channel_sales|
+--------+------------------+
|  mobile| 50720.76999999999|
|in-store|56645.249999999985|
|     web|          54140.42|
+--------+------------------+



# Delta table and optimizing data storage

Save analysis as delta table

In [0]:
def save_as_delta_tables(df_table_list):
    for df, table_name in df_table_list:
        df.write.format("delta").mode("overwrite").saveAsTable(table_name)
        print(f"Saved Delta table: {table_name}")

save_as_delta_tables([
    (avg_order_value, "avg_order_value"),
    (popular_products_df, "popular_products"),
    (category_sales_df, "category_sales"),
    (campaign_impact_df, "campaign_impact"),
    (channel_sales_df, "channel_sales")
])

Saved Delta table: avg_order_value
Saved Delta table: popular_products
Saved Delta table: category_sales
Saved Delta table: campaign_impact
Saved Delta table: channel_sales


Optmizing Data Storage

In [0]:
spark.sql("OPTIMIZE avg_order_value")
spark.sql("OPTIMIZE popular_products")
spark.sql("OPTIMIZE category_sales")
spark.sql("OPTIMIZE campaign_impact")
spark.sql("OPTIMIZE channel_sales")

spark.sql("DESCRIBE HISTORY avg_order_value").show(truncate=False)


+-------+-------------------+---------------+--------------------+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+------------------------------------------------------------+------------+------------------------------------------+
|version|timestamp          |userId         |userName            |operation                        |operationParameters                                                                                                                                     |job |notebook          |clusterId           |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                            |userMetadata|engineInfo                                |
+-------+-------------------+---------------+--------------------+--------------