In [2]:
from pyspark.sql import SparkSession

# 1️⃣ Start Spark Session
spark = (
    SparkSession.builder
    .appName("Part1_Loading")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)


In [3]:
from pyspark.sql import SparkSession

# ==========================================
# 1. Initialize Spark
# ==========================================
spark = SparkSession.builder \
    .appName("Critical_Analysis_Datasets") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# ==========================================
# 2. Define paths
# ==========================================
path_csv     = r"C:\Users\HP 15\Desktop\big data project\data\nigerian_retail_and_ecommerce_customer_review_and_ratings_data.csv"
path_parquet1 = r"C:\Users\HP 15\Desktop\big data project\data\nigerian_retail_and_ecommerce_ecommerce_order_data.parquet"
path_parquet2 = r"C:\Users\HP 15\Desktop\big data project\data\nigerian_retail_and_ecommerce_purchase_history_records.parquet"

# ==========================================
# 3. Load datasets
# ==========================================
# CSV
df_csv = spark.read.option("header", True).option("inferSchema", True).csv(path_csv)
print("--- CSV Loaded ---")
df_csv.printSchema()
df_csv.show(10, truncate=False)

# Parquet 1
df_parquet1 = spark.read.parquet(path_parquet1)
print("--- Parquet 1 Loaded ---")
df_parquet1.printSchema()
df_parquet1.show(10, truncate=False)

# Parquet 2
df_parquet2 = spark.read.parquet(path_parquet2)
print("--- Parquet 2 Loaded ---")
df_parquet2.printSchema()
df_parquet2.show(10, truncate=False)


--- CSV Loaded ---
root
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- sentiment: string (nullable = true)

+----------+----------+-----------+----------+-----------+------+--------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+-----------------+-------------+---------------+---------+
|review_id |product_id|customer_id|order_id  |review_date|rating|review_title                                      |review_text                  

In [4]:
df_reviews = df_csv
df_orders = df_parquet1
df_purchase = df_parquet2


In [5]:
from pyspark.sql.functions import col, sum

df_reviews.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_reviews.columns
]).show()

df_orders.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_orders.columns
]).show()

df_purchase.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_purchase.columns
]).show()


+---------+----------+-----------+--------+-----------+------+------------+-----------+-----------------+-------------+---------------+---------+
|review_id|product_id|customer_id|order_id|review_date|rating|review_title|review_text|verified_purchase|helpful_votes|sentiment_score|sentiment|
+---------+----------+-----------+--------+-----------+------+------------+-----------+-----------------+-------------+---------------+---------+
|        0|         0|          0|       0|          0|     0|           0|          0|                0|            0|              0|        0|
+---------+----------+-----------+--------+-----------+------+------------+-----------+-----------------+-------------+---------------+---------+

+--------+-----------+----------+--------+---------------+----------------+--------------+-----------------+-------------+---------------+-----------------------+------------+
|order_id|customer_id|order_date|platform|order_value_ngn|shipping_fee_ngn|payment_method|pay

In [6]:
# List of datasets
datasets = {
    "Reviews": df_reviews,
    "Orders": df_orders,
    "Purchases": df_purchase
}

for name, df in datasets.items():
    print(f"=== {name} Dataset ===")
    # Number of rows
    num_rows = df.count()
    # Number of columns
    num_cols = len(df.columns)
    print(f"Rows: {num_rows}, Columns: {num_cols}")
    
    # Column names
    print("Column names:", df.columns)
    
    # Data types
    print("Data types:")
    df.printSchema()
    print("\n")


=== Reviews Dataset ===
Rows: 200000, Columns: 12
Column names: ['review_id', 'product_id', 'customer_id', 'order_id', 'review_date', 'rating', 'review_title', 'review_text', 'verified_purchase', 'helpful_votes', 'sentiment_score', 'sentiment']
Data types:
root
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- sentiment: string (nullable = true)



=== Orders Dataset ===
Rows: 1000000, Columns: 12
Column names: ['order_id', 'customer_id', 'order_date', 'platform', 'order_value_ngn', 'shipping_fee_ngn', 'payment_method', 'payment_processor', 'delivery_city', 'deli

In [7]:
from pyspark.sql import SparkSession, functions as F

# Start Spark session
spark = SparkSession.builder.appName("RetailDataPreprocessing").getOrCreate()

# Load Reviews CSV
path_reviews = r"C:\Users\HP 15\Desktop\big data project\data\nigerian_retail_and_ecommerce_customer_review_and_ratings_data.csv"
df_reviews = spark.read.csv(path_reviews, header=True, inferSchema=True)

# Select useful columns
df_reviews_clean = df_reviews.select(
    "review_id",
    "product_id",
    "customer_id",
    "order_id",
    "rating",
    "review_date",
    "sentiment_score",
    "sentiment"
)

# Convert types
df_reviews_clean = df_reviews_clean.withColumn(
    "review_date", F.to_date("review_date", "yyyy-MM-dd")
).withColumn(
    "rating", F.col("rating").cast("double")
).withColumn(
    "sentiment_score", F.col("sentiment_score").cast("double")
)

# Aggregate per customer
customer_reviews = df_reviews_clean.groupBy("customer_id").agg(
    F.count("review_id").alias("total_reviews"),
    F.avg("rating").alias("avg_rating"),
    F.avg("sentiment_score").alias("avg_sentiment_score")
)

# Aggregate per product
product_reviews = df_reviews_clean.groupBy("product_id").agg(
    F.count("review_id").alias("total_reviews"),
    F.avg("rating").alias("avg_rating"),
    F.avg("sentiment_score").alias("avg_sentiment_score")
)

# Quick check
df_reviews_clean.show(5)
customer_reviews.show(5)
product_reviews.show(5)


+----------+----------+-----------+----------+------+-----------+---------------+---------+
| review_id|product_id|customer_id|  order_id|rating|review_date|sentiment_score|sentiment|
+----------+----------+-----------+----------+------+-----------+---------------+---------+
|REV0000000|  PRD61090| CUST589087|ORD6894115|   2.0| 2024-07-23|           0.39| negative|
|REV0000001|  PRD58456| CUST858117|ORD2919126|   4.0| 2024-02-15|          -0.74| positive|
|REV0000002|  PRD69222| CUST356613|ORD4782383|   2.0| 2024-09-19|           0.95| negative|
|REV0000003|  PRD74234| CUST399561|ORD9263265|   4.0| 2024-10-01|          -0.29| positive|
|REV0000004|  PRD81059| CUST736138|ORD6268224|   2.0| 2024-06-07|          -0.55| negative|
+----------+----------+-----------+----------+------+-----------+---------------+---------+
only showing top 5 rows
+-----------+-------------+----------+-------------------+
|customer_id|total_reviews|avg_rating|avg_sentiment_score|
+-----------+-------------+---

In [8]:
from pyspark.sql import SparkSession, functions as F

# -----------------------
# Start Spark Session
# -----------------------
spark = SparkSession.builder.appName("OrderDataPreprocessing").getOrCreate()

# -----------------------
# Load Orders Parquet
# -----------------------
path_orders = r"C:\Users\HP 15\Desktop\big data project\data\nigerian_retail_and_ecommerce_ecommerce_order_data.parquet"
df_orders = spark.read.parquet(path_orders)

# -----------------------
# Select useful columns (from your parquet schema)
# -----------------------
df_orders_clean = df_orders.select(
    "order_id",
    "customer_id",
    "order_date",
    "order_value_ngn",
    "shipping_fee_ngn",
    "payment_method",
    "delivery_status"
)

# -----------------------
# Convert types safely
# -----------------------
df_orders_clean = df_orders_clean.withColumn(
    "order_date", F.to_date(F.col("order_date").cast("string"))
).withColumn(
    "order_value_ngn", F.col("order_value_ngn").cast("double")
).withColumn(
    "shipping_fee_ngn", F.col("shipping_fee_ngn").cast("double")
)

# -----------------------
# Optional: add year/month columns
# -----------------------
df_orders_clean = df_orders_clean.withColumn("order_year", F.year("order_date")) \
                                 .withColumn("order_month", F.month("order_date"))

# -----------------------
# Aggregations
# -----------------------

# Per Customer
customer_orders = df_orders_clean.groupBy("customer_id").agg(
    F.count("order_id").alias("total_orders"),
    F.sum("order_value_ngn").alias("total_spent"),
    F.avg("order_value_ngn").alias("avg_order_value")
)

# Quick Check
df_orders_clean.show(5)
customer_orders.show(5)


+----------+-----------+----------+---------------+----------------+----------------+---------------+----------+-----------+
|  order_id|customer_id|order_date|order_value_ngn|shipping_fee_ngn|  payment_method|delivery_status|order_year|order_month|
+----------+-----------+----------+---------------+----------------+----------------+---------------+----------+-----------+
|ORD5652303| CUST758743|2024-08-04|      258310.46|         2308.78|      debit_card|      delivered|      2024|          8|
|ORD2597035| CUST826917|2024-01-12|      394151.94|         3037.56|      debit_card|        shipped|      2024|          1|
|ORD7235642| CUST754399|2024-09-24|      329270.06|         2389.34|cash_on_delivery|        shipped|      2024|          9|
|ORD1985887| CUST338982|2024-07-06|       106058.4|         3323.29|            ussd|        pending|      2024|          7|
|ORD4942606| CUST671418|2024-09-12|       85543.45|          2648.4|cash_on_delivery|        pending|      2024|          9|


In [9]:
# -----------------------
# Load Purchase History Parquet
# -----------------------
path_purchase = r"C:\Users\HP 15\Desktop\big data project\data\nigerian_retail_and_ecommerce_purchase_history_records.parquet"
df_purchase = spark.read.parquet(path_purchase)

# -----------------------
# Select useful columns
# -----------------------
df_purchase_clean = df_purchase.select(
    "customer_id",
    "product_category",
    "product_subcategory",
    "purchase_date",
    "quantity",
    "unit_price_ngn",
    "total_amount_ngn"
)

# -----------------------
# Convert types safely
# -----------------------
df_purchase_clean = df_purchase_clean.withColumn(
    "purchase_date", F.to_date(F.col("purchase_date").cast("string"))
).withColumn(
    "quantity", F.col("quantity").cast("integer")
).withColumn(
    "unit_price_ngn", F.col("unit_price_ngn").cast("double")
).withColumn(
    "total_amount_ngn", F.col("total_amount_ngn").cast("double")
)

# -----------------------
# Aggregations
# -----------------------

# Per Customer
customer_purchase = df_purchase_clean.groupBy("customer_id").agg(
    F.count("*").alias("total_items"),
    F.sum("total_amount_ngn").alias("total_spent"),
    F.avg("unit_price_ngn").alias("avg_price_per_item")
)

# Per Product Category
product_purchase = df_purchase_clean.groupBy("product_category").agg(
    F.count("*").alias("total_sold"),
    F.sum("total_amount_ngn").alias("total_revenue"),
    F.avg("unit_price_ngn").alias("avg_price")
)

# Quick Check
df_purchase_clean.show(5)
customer_purchase.show(5)
product_purchase.show(5)


+-----------+-----------------+-------------------+-------------+--------+--------------+----------------+
|customer_id| product_category|product_subcategory|purchase_date|quantity|unit_price_ngn|total_amount_ngn|
+-----------+-----------------+-------------------+-------------+--------+--------------+----------------+
| CUST738296|Sports & Outdoors|   Sports Equipment|   2023-03-07|       5|     136977.89|       684889.45|
| CUST566795|           Health|        Supplements|   2024-06-17|       2|     297407.47|       594814.94|
| CUST238819|      Baby & Kids|           Clothing|   2024-06-13|       5|     283936.94|       1419684.7|
| CUST974466|          Fashion|            Jewelry|   2023-09-27|       2|     235910.56|       471821.12|
| CUST686049|           Health|        Supplements|   2024-05-16|       2|     327559.25|        655118.5|
+-----------+-----------------+-------------------+-------------+--------+--------------+----------------+
only showing top 5 rows
+-----------+

In [10]:
from pyspark.sql import functions as F

# -------------------------
# 1. Reviews Data
# -------------------------
print("=== Reviews Data ===")
df_reviews_clean.printSchema()       # Column names and data types
print(f"Total rows: {df_reviews_clean.count()}")
df_reviews_clean.show(5)             # Preview first 5 rows
df_reviews_clean.describe().show()   # Basic statistics for numeric columns

# -------------------------
# 2. Orders Data
# -------------------------
print("=== Orders Data ===")
df_orders_clean.printSchema()
print(f"Total rows: {df_orders_clean.count()}")
df_orders_clean.show(5)
df_orders_clean.describe().show()

# -------------------------
# 3. Purchase Data
# -------------------------
print("=== Purchase Data ===")
df_purchase_clean.printSchema()
print(f"Total rows: {df_purchase_clean.count()}")
df_purchase_clean.show(5)
df_purchase_clean.describe().show()


=== Reviews Data ===
root
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_date: date (nullable = true)
 |-- sentiment_score: double (nullable = true)
 |-- sentiment: string (nullable = true)

Total rows: 200000
+----------+----------+-----------+----------+------+-----------+---------------+---------+
| review_id|product_id|customer_id|  order_id|rating|review_date|sentiment_score|sentiment|
+----------+----------+-----------+----------+------+-----------+---------------+---------+
|REV0000000|  PRD61090| CUST589087|ORD6894115|   2.0| 2024-07-23|           0.39| negative|
|REV0000001|  PRD58456| CUST858117|ORD2919126|   4.0| 2024-02-15|          -0.74| positive|
|REV0000002|  PRD69222| CUST356613|ORD4782383|   2.0| 2024-09-19|           0.95| negative|
|REV0000003|  PRD74234| CUST399561|ORD9263265|   4.0| 2024-10-01|    

In [11]:
from pyspark.sql import functions as F

# -------------------------
# 1. Aggregate Reviews by Customer
# -------------------------
df_reviews_agg = df_reviews_clean.groupBy("customer_id").agg(
    F.count("review_id").alias("total_reviews"),
    F.avg("rating").alias("avg_rating"),
    F.avg("sentiment_score").alias("avg_sentiment_score")
)

# -------------------------
# 2. Aggregate Orders by Customer
# -------------------------
df_orders_agg = df_orders_clean.groupBy("customer_id").agg(
    F.count("order_id").alias("total_orders"),
    F.sum("order_value_ngn").alias("total_order_value"),
    F.avg("order_value_ngn").alias("avg_order_value"),
    F.sum("shipping_fee_ngn").alias("total_shipping_fee")
)

# -------------------------
# 3. Aggregate Purchases by Customer
# -------------------------
df_purchase_agg = df_purchase_clean.groupBy("customer_id").agg(
    F.count("purchase_date").alias("total_purchases"),  # use purchase_date to count rows
    F.sum("total_amount_ngn").alias("total_purchase_value"),
    F.avg("total_amount_ngn").alias("avg_purchase_value"),
    F.sum("quantity").alias("total_quantity")
)

# -------------------------
# 4. Replace NULLs with 0
# -------------------------
df_reviews_agg = df_reviews_agg.fillna(0)
df_orders_agg = df_orders_agg.fillna(0)
df_purchase_agg = df_purchase_agg.fillna(0)


In [12]:
from pyspark.sql import SparkSession, functions as F

# Assuming Spark session is already created and the aggregated datasets exist:
# df_reviews_agg, df_orders_agg, df_purchase_agg

# -------------------------
# 1. Merge Reviews and Orders
# -------------------------
df_merged = df_reviews_agg.join(df_orders_agg, on="customer_id", how="full_outer")

# -------------------------
# 2. Merge the result with Purchases
# -------------------------
df_merged = df_merged.join(df_purchase_agg, on="customer_id", how="full_outer")

# -------------------------
# 3. Replace NULLs with 0
# -------------------------
df_merged = df_merged.fillna(0)

# -------------------------
# 4. Check the merged dataset
# -------------------------
print("=== Merged Dataset ===")
df_merged.printSchema()               # Schema & data types
print(f"Total rows: {df_merged.count()}")  # Total unique customers
df_merged.show(10)                    # Preview top 10 rows

# -------------------------
# 5. Optional: summary statistics
# -------------------------
df_merged.describe().show()


=== Merged Dataset ===
root
 |-- customer_id: string (nullable = true)
 |-- total_reviews: long (nullable = true)
 |-- avg_rating: double (nullable = false)
 |-- avg_sentiment_score: double (nullable = false)
 |-- total_orders: long (nullable = true)
 |-- total_order_value: double (nullable = false)
 |-- avg_order_value: double (nullable = false)
 |-- total_shipping_fee: double (nullable = false)
 |-- total_purchases: long (nullable = true)
 |-- total_purchase_value: double (nullable = false)
 |-- avg_purchase_value: double (nullable = false)
 |-- total_quantity: long (nullable = true)

Total rows: 822085
+-----------+-------------+----------+--------------------+------------+-----------------+---------------+------------------+---------------+--------------------+------------------+--------------+
|customer_id|total_reviews|avg_rating| avg_sentiment_score|total_orders|total_order_value|avg_order_value|total_shipping_fee|total_purchases|total_purchase_value|avg_purchase_value|total_qua

In [13]:
from pyspark.sql import functions as F

# =============================
# 1. Show schema and first rows
# =============================
print("=== Merged Dataset Schema ===")
df_merged.printSchema()

print("=== First 10 Rows ===")
df_merged.show(10, truncate=False)

# =============================
# 2. Count total rows
# =============================
total_rows = df_merged.count()
print(f"Total rows in merged dataset: {total_rows}")

# =============================
# 3. Check for duplicates based on customer_id
# =============================
duplicate_count = df_merged.groupBy("customer_id").count().filter(F.col("count") > 1).count()
print(f"Number of duplicated customer_id rows: {duplicate_count}")

# =============================
# 4. Check for missing values per column
# =============================
print("=== Missing values per column ===")
missing_counts = df_merged.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_merged.columns
])
missing_counts.show()

# =============================
# 5. Summary statistics for numeric columns
# =============================
print("=== Summary statistics ===")
df_merged.describe().show()

# =============================
# 6. Count of unique customers
# =============================
unique_customers = df_merged.select("customer_id").distinct().count()
print(f"Number of unique customers: {unique_customers}")


=== Merged Dataset Schema ===
root
 |-- customer_id: string (nullable = true)
 |-- total_reviews: long (nullable = true)
 |-- avg_rating: double (nullable = false)
 |-- avg_sentiment_score: double (nullable = false)
 |-- total_orders: long (nullable = true)
 |-- total_order_value: double (nullable = false)
 |-- avg_order_value: double (nullable = false)
 |-- total_shipping_fee: double (nullable = false)
 |-- total_purchases: long (nullable = true)
 |-- total_purchase_value: double (nullable = false)
 |-- avg_purchase_value: double (nullable = false)
 |-- total_quantity: long (nullable = true)

=== First 10 Rows ===
+-----------+-------------+----------+---------------------+------------+-----------------+---------------+------------------+---------------+--------------------+------------------+--------------+
|customer_id|total_reviews|avg_rating|avg_sentiment_score  |total_orders|total_order_value|avg_order_value|total_shipping_fee|total_purchases|total_purchase_value|avg_purchase_val

In [14]:
# Import display function
from pyspark.sql import functions as F

# Total Orders Distribution
df_merged.groupBy("total_orders").count().orderBy("total_orders").show(20)

# Total Purchases Distribution
df_merged.groupBy("total_purchases").count().orderBy("total_purchases").show(20)

# Total Reviews Distribution
df_merged.groupBy("total_reviews").count().orderBy("total_reviews").show(20)


+------------+------+
|total_orders| count|
+------------+------+
|           0|218603|
|           1|329004|
|           2|182682|
|           3| 67750|
|           4| 18921|
|           5|  4207|
|           6|   779|
|           7|   125|
|           8|    12|
|           9|     2|
+------------+------+

+---------------+------+
|total_purchases| count|
+---------------+------+
|              0|218597|
|              1|328852|
|              2|182935|
|              3| 67798|
|              4| 18742|
|              5|  4248|
|              6|   747|
|              7|   138|
|              8|    24|
|              9|     4|
+---------------+------+

+-------------+------+
|total_reviews| count|
+-------------+------+
|            0|642633|
|            1|160344|
|            2| 17735|
|            3|  1306|
|            4|    67|
+-------------+------+



In [15]:
pip install duckdb


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [16]:
import duckdb


In [17]:
# File-based (saved to disk)
con = duckdb.connect('big_data_project.duckdb')

# OR in-memory (temporary)
# con = duckdb.connect(':memory:')


In [18]:
import duckdb

# Connect to DuckDB (file-based or in-memory)
con = duckdb.connect(database='merged_data.duckdb', read_only=False)


In [19]:
df_merged.count()
df_merged.show(5)


+-----------+-------------+----------+--------------------+------------+-----------------+---------------+------------------+---------------+--------------------+------------------+--------------+
|customer_id|total_reviews|avg_rating| avg_sentiment_score|total_orders|total_order_value|avg_order_value|total_shipping_fee|total_purchases|total_purchase_value|avg_purchase_value|total_quantity|
+-----------+-------------+----------+--------------------+------------+-----------------+---------------+------------------+---------------+--------------------+------------------+--------------+
| CUST100001|            0|       0.0|                 0.0|           4|       1058640.66|     264660.165| 6482.049999999999|              0|                 0.0|               0.0|             0|
| CUST100010|            0|       0.0|                 0.0|           1|        376963.68|      376963.68|           3157.44|              1|           831664.54|         831664.54|             2|
| CUST100039|  

In [21]:
# Convert Spark DataFrame to Pandas (safe for local writing)
df_merged_pd = df_merged.toPandas()
print("✅ Converted to Pandas DataFrame:", df_merged_pd.shape)


✅ Converted to Pandas DataFrame: (822085, 12)


In [23]:
output_csv = r"C:\big_data_processed\processed\customer_analytics_final.csv"
df_merged_pd.to_csv(output_csv, index=False)
print("✅ CSV saved successfully:", output_csv)


✅ CSV saved successfully: C:\big_data_processed\processed\customer_analytics_final.csv


In [24]:
output_csv = r"C:\temp\customer_analytics.csv"
os.makedirs(r"C:\temp", exist_ok=True)
df_merged_pd.to_csv(output_csv, index=False)
print("✅ CSV saved successfully:", output_csv)


✅ CSV saved successfully: C:\temp\customer_analytics.csv


In [29]:
# Convert Spark DataFrame to Pandas
df_merged_pd = df_merged.toPandas()

# Save as a single CSV file
csv_file_path = r"C:\big_data_processed\processed\customer_analytics_final.csv"
df_merged_pd.to_csv(csv_file_path, index=False)
print("✅ CSV saved as single file:", csv_file_path)


✅ CSV saved as single file: C:\big_data_processed\processed\customer_analytics_final.csv


In [30]:
import duckdb

duckdb_path = r"C:\big_data_processed\processed\customer_analytics.duckdb"

con = duckdb.connect(database=duckdb_path, read_only=False)

# Load CSV into DuckDB table
con.execute(f"""
    CREATE OR REPLACE TABLE customer_analytics AS
    SELECT * FROM read_csv_auto('{csv_file_path}')
""")

# Verify
row_count = con.execute("SELECT COUNT(*) FROM customer_analytics").fetchone()[0]
print(f"✅ Total rows in DuckDB table: {row_count}")


✅ Total rows in DuckDB table: 822085


In [31]:
preview = con.execute("SELECT * FROM customer_analytics LIMIT 5").fetchdf()
print(preview)


  customer_id  total_reviews  avg_rating  avg_sentiment_score  total_orders  \
0  CUST100001              0         0.0                 0.00             4   
1  CUST100010              0         0.0                 0.00             1   
2  CUST100039              0         0.0                 0.00             1   
3  CUST100070              2         1.5                -0.04             1   
4  CUST100093              1         4.0                -0.75             3   

   total_order_value  avg_order_value  total_shipping_fee  total_purchases  \
0         1058640.66       264660.165             6482.05                0   
1          376963.68       376963.680             3157.44                1   
2          499357.29       499357.290             1290.09                1   
3          327502.07       327502.070             2126.32                2   
4          596773.23       198924.410             9301.84                0   

   total_purchase_value  avg_purchase_value  total_quant

In [32]:
stats = con.execute("""
    SELECT 
        COUNT(*) AS total_customers,
        AVG(total_order_value) AS avg_order_value,
        MAX(total_order_value) AS max_order_value,
        MIN(total_order_value) AS min_order_value
    FROM customer_analytics
""").fetchdf()
print(stats)


   total_customers  avg_order_value  max_order_value  min_order_value
0           822085    307167.057393       2855488.87              0.0


In [51]:
high_value_customers = con.execute("""
    SELECT customer_id, total_order_value 
    FROM customer_analytics
    WHERE total_order_value > 1_000_000
    ORDER BY total_order_value DESC
    LIMIT 10
""").fetchdf()
print(high_value_customers)


  customer_id  total_order_value
0  CUST507899         2855488.87
1  CUST752131         2715618.63
2  CUST531000         2523450.99
3  CUST149378         2522018.58
4  CUST541204         2509108.56
5  CUST690896         2490225.56
6  CUST448218         2487683.53
7  CUST627816         2477454.82
8  CUST522634         2454485.22
9  CUST780518         2419085.29


In [1]:
pip install prefect pyspark duckdb pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import os
import urllib.request

# 1. Create directory C:\hadoop\bin
hadoop_bin_dir = r"C:\hadoop\bin"
os.makedirs(hadoop_bin_dir, exist_ok=True)

# 2. URLs for the missing Windows files (Hadoop 3.3.5)
base_url = "https://github.com/cdarlint/winutils/raw/master/hadoop-3.3.5/bin/"
files = ["hadoop.dll", "winutils.exe"]

print(f"⬇️ Downloading files to {hadoop_bin_dir}...")

for filename in files:
    url = base_url + filename
    dest_path = os.path.join(hadoop_bin_dir, filename)
    try:
        urllib.request.urlretrieve(url, dest_path)
        print(f"✅ Successfully saved: {filename}")
    except Exception as e:
        print(f"❌ Failed to download {filename}: {e}")

print("\n🎉 Download complete! Now update your orchestration script.")

⬇️ Downloading files to C:\hadoop\bin...
✅ Successfully saved: hadoop.dll
✅ Successfully saved: winutils.exe

🎉 Download complete! Now update your orchestration script.
