# Data Lake (Delta Lake) Analysis with PySpark and EDA

This Jupyter Notebook demonstrates the process of performing Exploratory Data Analysis (EDA) on data stored in Delta Lake format using PySpark. The script analyzes user-item interaction data, providing insights into user behavior, product popularity, and seasonality.

## Environment Setup and Imports

Before running, ensure you have `PySpark`, `delta-spark`, `matplotlib`, `seaborn`, and `pandas` installed. JDK and Hadoop are also required.

*   `pyspark` (pip install pyspark)
*   `delta-spark` (will be downloaded by Spark)
*   `matplotlib` (pip install matplotlib)
*   `seaborn` (pip install seaborn)
*   `pandas` (pip install pandas)

Hadoop and PySpark environment variables are configured for correct Spark operation on a local machine. **Be sure to change the HADOOP path to your actual path!**

In [None]:
import sys
import os
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Set environment variables for HADOOP and PySpark
# Make sure the paths correspond to your HADOOP installation
os.environ['HADOOP_HOME'] = r'C:\HADOOP' # <--- CHANGE THIS PATH TO YOURS
os.environ['PATH'] = os.environ['PATH'] + ';' + os.environ['HADOOP_HOME'] + '\\bin'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# PySpark and Delta Lake imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import(
    col,
    count,
    countDistinct,
    when,
    sum as _sum,
    avg,
    hour,
    dayofweek,
    month,
    date_format,
    datediff,
    lit,
    min as _min,
    max as _max
)
from pyspark.sql.types import (IntegerType, FloatType, TimestampType)
from delta.tables import DeltaTable

## Helper Functions

### `create_spark_session`

Function to initialize a SparkSession with the necessary configurations for working with Delta Lake. It configures Spark SQL extensions for Delta and includes the required packages.

In [None]:
def create_spark_session(app_name: str) -> SparkSession:
    """
    Creates and returns a SparkSession with Delta Lake support.
    
    Args:
        app_name (str): Name of the Spark application.
        
    Returns:
        SparkSession: Initialized SparkSession object.
    """
    print("Initializing Spark Session...")
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
        .getOrCreate()
    print("Spark Session successfully initialized.")
    return spark

### `plot_spark_df`

Utility function for plotting from a Spark DataFrame. It converts the Spark DataFrame to a Pandas DataFrame for use with `seaborn` and `matplotlib`.

**Important Note:** The `.toPandas()` method loads the entire Spark DataFrame into the driver's memory, which can lead to performance issues and OutOfMemory errors when working with very large datasets. Use it cautiously or for aggregated/filtered data.

In [None]:
def plot_spark_df(spark_df, x_col, y_col, title, kind='bar', figsize=(12,6)):
    """
    Plots a graph from a Spark DataFrame, first converting it to a Pandas DataFrame.
    Supports bar and line plots.
    
    Args:
        spark_df (DataFrame): Spark DataFrame to visualize.
        x_col (str): Column name for the X-axis.
        y_col (str): Column name for the Y-axis.
        title (str): Title of the plot.
        kind (str): Type of plot ('bar' or 'line').
        figsize (tuple): Figure size for matplotlib.
    """
    print(f"Plotting graph: {title}...")
    try:
        # Convert Spark DataFrame to Pandas DataFrame
        pd_df = spark_df.toPandas()
        plt.figure(figsize=figsize)
        if kind == 'bar':
            sns.barplot(data=pd_df.sort_values(y_col, ascending=False), x=x_col, y=y_col)
        elif kind == 'line':
            sns.lineplot(data=pd_df.sort_values(x_col), x=x_col, y=y_col, marker='o')
        else:
            print(f"Unknown plot type: {kind}")
            return
        
        plt.title(title)
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()
        plt.show()
    except Exception as e:
        print(f"Error plotting graph {title}: {e}")
        print(f"Perhaps the DataFrame is too large for .toPandas() or another error occurred")

## Main EDA Function: `run_eda`

This function performs a comprehensive exploratory data analysis of data loaded from Delta Lake. It includes:

*   Schema overview and data preview.
*   NULL value check.
*   Analysis of interaction type distribution.
*   User analysis (loyalty, cities, activity, 'cold' users).
*   Product analysis (popular categories/brands, 'cold' products).
*   Seasonality analysis (activity by hour, day of week, day).
*   Funnel and correlation analysis (relation of search queries to purchases).

In [None]:
def run_eda(spark:SparkSession, marth_path: str):
    """
    Performs Exploratory Data Analysis (EDA) for the specified Delta table.
    
    Args:
        spark (SparkSession): Active SparkSession.
        marth_path (str): Path to the Delta table.
    """
    print(f"\nStarting EDA for Delta table: {marth_path}")
    if not DeltaTable.isDeltaTable(spark, marth_path):
        print(f"Error: Delta table at path {marth_path} not found.")
        return
    
    print("Loading data from Data Lake...")
    df = spark.read.format("delta").load(marth_path)
    df.cache() # Cache the DataFrame for improved performance during multiple operations
    total_records = df.count()
    print(f"Loaded {total_records} records.")

    if total_records == 0:
        print("Table is empty. EDA cannot be performed.")
        return
    
    print("\n### Data Overview")
    df.printSchema()
    print("\nSample Data:")
    df.show(5, truncate=False)
    print("\nKey Statistics (for numeric columns):")
    df.select("age", "quantity", "price_at_interaction", "product_current_price").describe().show()

    print("\n### Check for NULL values")
    null_counts = []
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        if null_count > 0:
            null_counts.append((column, null_count, (null_count / total_records) * 100))
    if null_counts:
        print("NULL values found:")
        for col_name, count, percentage in null_counts:
            print(f" - {col_name}: {count} records ({percentage:.2f}%)") 
    else:
        print("NULL values not found (except expected ones). DataFrame is considered ready for analysis.")
    
    print("\n### Interaction Analysis")
    interaction_counts = df.groupBy("interaction_type").count() \
    .orderBy("count", ascending=False)
    print("\nDistribution of Interaction Types:")
    interaction_counts.show()
    plot_spark_df(interaction_counts, "interaction_type", "count", "Distribution of Interaction Types")

    print("\n### User Analysis")
    user_count = df.select("user_id").distinct().count()
    print(f"Total unique users: {user_count}")

    loyalty_dist = df.select("user_id", "loyalty_tier").distinct() \
    .groupBy("loyalty_tier").count() \
    .orderBy("count", ascending=False)
    print("\nDistribution of users by loyalty tier:")
    loyalty_dist.show()
    plot_spark_df(loyalty_dist, "loyalty_tier", "count", "Distribution of Users by Loyalty Tier")

    city_dist = df.select("user_id", "city").distinct() \
    .groupBy("city").count() \
    .orderBy("count", ascending=False) 
    print("\nDistribution of users by city:")
    city_dist.show(10)
    plot_spark_df(city_dist.limit(10), "city", "count", "TOP-10 Cities by User Count")
    
    user_activity = df.groupBy("user_id").count().withColumnRenamed("count", "interaction_count")
    print("\nUser Activity Statistics:")
    user_activity.select(
        avg("interaction_count").alias("avg_interactions"),
        _min("interaction_count").alias("min_interactions"),
        _max("interaction_count").alias("max_interactions")
    ).show()

    COLD_START_TRESHOLD = 10 # Threshold for defining 'cold' users/products
    cold_users_count = user_activity.filter(col("interaction_count") <= COLD_START_TRESHOLD).count()
    print(f"\nNumber of 'cold' users (< {COLD_START_TRESHOLD} interactions): {cold_users_count} ({(cold_users_count / user_count) * 100:.2f}%) or {cold_users_count} users")

    print("\n### Product Analysis")
    product_count = df.select("item_id").distinct().count()
    print(f"Total unique products: {product_count}")

    top_categories = df.filter(col("interaction_type") == "purchase") \
                       .groupBy("category").count().orderBy("count", ascending=False).limit(10)
    
    print("\nTOP-10 Categories by Purchases:")
    top_categories.show()
    plot_spark_df(top_categories, "category", "count", "TOP-10 Categories by Purchases")

    top_brands = df.filter(col("interaction_type") == "purchase") \
                   .groupBy("brand").count().orderBy("count", ascending=False).limit(10)
    print("\nTOP-10 Brands by Purchases:")
    top_brands.show()
    plot_spark_df(top_brands, "brand", "count", "TOP-10 Brands by Purchases")

    item_activity = df.groupBy("item_id").count().withColumnRenamed("count", "interaction_count")
    cold_items_count = item_activity.filter(col("interaction_count") <= COLD_START_TRESHOLD).count()
    print(f"\nNumber of 'cold' products (< {COLD_START_TRESHOLD} interactions): {cold_items_count} ({(cold_items_count / product_count) * 100 :.2f}%) or {cold_items_count} products")

    cold_items_df = item_activity.filter(col("interaction_count") <= COLD_START_TRESHOLD) \
                                 .orderBy("interaction_count", ascending=True) \
                                 .select("item_id", "interaction_count")
    
    if cold_items_count > 0:
        print(f"\nList of 'cold' products (TOP-20 least popular):")
        cold_items_df.show(20, truncate=False) 
    else:
        print(f"\nNo 'cold' products to display (all products have more than {COLD_START_TRESHOLD} interactions).")

    print("\n### Seasonality Analysis")
    df_ts = df.filter(col("timestamp").isNotNull()) # Filter records with missing timestamps

    hourly_activity = df_ts.withColumn("hour", hour(col("timestamp"))) \
                         .groupBy("hour").count().orderBy("hour")
    print("\nActivity by Hour (UTC):")
    hourly_activity.show(24)
    plot_spark_df(hourly_activity, "hour", "count", "Activity by Hour (UTC)", kind='line')

    dow_activity = df_ts.withColumn("dow", dayofweek(col("timestamp"))) \
    .groupBy("dow").count() \
    .orderBy("dow")

    print("\nActivity by Day of Week (1=Sun, 7=Sat):")
    dow_activity.show()
    plot_spark_df(dow_activity, "dow", "count", "Activity by Day of Week", kind='line')

    daily_activity = df_ts.withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd")) \
                        .groupBy("date").count() \
                        .orderBy("date")
    
    print("\nActivity by Day (first 10 days):")
    daily_activity.show(10)

    plot_spark_df(daily_activity.limit(30), "date", "count", "Activity by Day (up to 30 days)", kind='line', figsize=(15,6))

    print("\n### Funnel and Correlation Analysis")
    funnel_df = df.groupBy("interaction_type").agg(countDistinct("user_id").alias("unique_users"))
    print("\nApproximate Funnel (unique users):")
    funnel_df.orderBy("unique_users", ascending=False).show()

    purchasing_users = df.filter(col("interaction_type") == "purchase") \
    .select("user_id").distinct()

    purchase_related_interactions = df.join(purchasing_users, on="user_id", how="inner") \
                                     .groupBy("interaction_type").count().orderBy("count", ascending=False)
    
    print("\nDistribution of interactions for users who made a purchase:")
    purchase_related_interactions.show()
    plot_spark_df(purchase_related_interactions, "interaction_type", "count", "Interactions for Purchasers")

    searches = df.filter(col("interaction_type") == "search") \
                                     .select("user_id", "search_query").distinct()
    
    search_to_purchase = searches.join(purchasing_users, on="user_id", how="inner") \
                                     .groupBy("search_query").count().orderBy("count", ascending=False).limit(15)
    
    print("\nTOP-15 Search Queries from Users who Made a Purchase:")
    search_to_purchase.show(truncate=False)

    df.unpersist() # Uncache the DataFrame after analysis completion
    print("\nEDA completed")

## Run EDA

Specify the path to your Delta table and execute. After the analysis, the Spark Session will be stopped.

In [None]:
if __name__ == "__main__":
    print("Script started")
    # Specify the path to your Delta table. Example: "./data/mart/user_item_interactions_delta"
    MART_PATH = "./data/mart/user_item_interactions_delta" # <--- CHANGE THIS PATH TO YOURS
    spark = None # Initialize spark as None to ensure it's defined for the finally block
    try:
        spark = create_spark_session("DataLake_EDA")
        print("Spark session created")
        run_eda(spark, MART_PATH)
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if spark:
            spark.stop()
            print("Spark session stopped")
        else:
            print("Spark session was not created or an error occurred before stopping.")