In [4]:
pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [5]:

import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, month, year

In [12]:
def create_spark_session():
    return SparkSession.builder \
        .appName("Food Delivery Data Exploration") \
        .getOrCreate()

def load_parquet_files(spark, path):
    return spark.read.parquet(path)

def basic_statistics(df, column):
    return df.select(column).summary().toPandas()

def plot_distribution(df, column, title):
    plt.figure(figsize=(10, 6))
    sns.histplot(data=df, x=column, kde=True)
    plt.title(title)
    plt.savefig(f"{column}_distribution.png")
    plt.close()

def plot_time_series(df, time_column, value_column, title):
    df = df.sort_values(time_column)
    plt.figure(figsize=(12, 6))
    plt.plot(df[time_column], df[value_column])
    plt.title(title)
    plt.xlabel("Time")
    plt.ylabel(value_column)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    plt.savefig(f"{value_column}_time_series.png")
    plt.close()

def analyze_order_placed_events(spark, path):
    df = load_parquet_files(spark, os.path.join(path, "order_placed_events"))
    
    # Basic statistics
    print(basic_statistics(df, "totalAmount"))
    
    # Distribution of order amounts
    pandas_df = df.select("totalAmount").toPandas()
    plot_distribution(pandas_df, "totalAmount", "Distribution of Order Amounts")
    
    # Time series of order frequency
    time_df = df.groupBy(hour("timestamp").alias("hour")).count().toPandas()
    plot_time_series(time_df, "hour", "count", "Order Frequency by Hour")
    
    # Most popular restaurants
    top_restaurants = df.groupBy("restaurantId").count().orderBy(col("count").desc()).limit(10).toPandas()
    print("Top 10 Restaurants by Order Count:")
    print(top_restaurants)

def analyze_order_delivery_events(spark, path):
    df = load_parquet_files(spark, os.path.join(path, "order_delivery_events"))
    
    # Calculate delivery time
    df = df.withColumn("delivery_time", (col("actualDeliveryTime").cast("long") - col("timestamp").cast("long")) / 60)
    
    # Basic statistics of delivery time
    print(basic_statistics(df, "delivery_time"))
    
    # Distribution of delivery times
    pandas_df = df.select("delivery_time").toPandas()
    plot_distribution(pandas_df, "delivery_time", "Distribution of Delivery Times (minutes)")
    
    # Average delivery time by hour of day
    time_df = df.groupBy(hour("timestamp").alias("hour")).avg("delivery_time").orderBy("hour").toPandas()
    plot_time_series(time_df, "hour", "avg(delivery_time)", "Average Delivery Time by Hour")

def analyze_review_events(spark, path):
    df = load_parquet_files(spark, os.path.join(path, "review_events"))
    
    # Basic statistics of ratings
    print("Food Rating Statistics:")
    print(basic_statistics(df, "foodRating"))
    print("Delivery Rating Statistics:")
    print(basic_statistics(df, "deliveryRating"))
    print("Overall Rating Statistics:")
    print(basic_statistics(df, "overallRating"))
    
    # Distribution of ratings
    pandas_df = df.select("foodRating", "deliveryRating", "overallRating").toPandas()
    plot_distribution(pandas_df, "foodRating", "Distribution of Food Ratings")
    plot_distribution(pandas_df, "deliveryRating", "Distribution of Delivery Ratings")
    plot_distribution(pandas_df, "overallRating", "Distribution of Overall Ratings")
    
    # Correlation between ratings
    correlation_matrix = pandas_df.corr()
    plt.figure(figsize=(10, 8))
    sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm")
    plt.title("Correlation between Ratings")
    plt.show()
    plt.savefig("rating_correlation.png")
    plt.close()

In [17]:
    spark = create_spark_session()
    parquet_path = "output/events"  # Update this path
    
    analyze_order_placed_events(spark, parquet_path)
    # analyze_order_delivery_events(spark, parquet_path)
    # analyze_review_events(spark, parquet_path)
    
    spark.stop()

AnalysisException: Unable to infer schema for Parquet at . It must be specified manually.