In [16]:
# cell_1_install_pyspark.py
!pip install pyspark
!pip install findspark



In [31]:
# generate_retail_data.py
import csv
import random
from datetime import datetime, timedelta
import uuid

# Configuration
NUM_TRANSACTIONS = 10000
NUM_CUSTOMERS = 500
NUM_PRODUCTS = 200
NUM_STORES = 50

# Generate product categories
categories = ['Electronics', 'Clothing', 'Food', 'Home & Garden', 'Sports', 'Books', 'Toys', 'Beauty']
products = []

for i in range(1, NUM_PRODUCTS + 1):
    category = random.choice(categories)
    price = round(random.uniform(5.99, 499.99), 2)
    products.append({
        'product_id': f'P{str(i).zfill(5)}',
        'product_name': f'Product_{i}',
        'category': category,
        'unit_price': price,
        'supplier_id': f'SUP{random.randint(1, 50):03d}'
    })

# Generate customers
customers = []
for i in range(1, NUM_CUSTOMERS + 1):
    customers.append({
        'customer_id': f'C{str(i).zfill(6)}',
        'loyalty_tier': random.choice(['Bronze', 'Silver', 'Gold', 'Platinum']),
        'join_date': (datetime.now() - timedelta(days=random.randint(1, 1000))).strftime('%Y-%m-%d'),
        'region': random.choice(['North', 'South', 'East', 'West', 'Central'])
    })

# Generate stores
stores = []
regions = ['North', 'South', 'East', 'West', 'Central']
for i in range(1, NUM_STORES + 1):
    region = random.choice(regions)
    stores.append({
        'store_id': f'S{str(i).zfill(3)}',
        'region': region,
        'size': random.choice(['Small', 'Medium', 'Large']),
        'online_store': random.choice([True, False])
    })

# Generate transactions
transactions = []
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 1, 1)

for i in range(1, NUM_TRANSACTIONS + 1):
    transaction_date = start_date + timedelta(
        days=random.randint(0, (end_date - start_date).days),
        hours=random.randint(0, 23),
        minutes=random.randint(0, 59)
    )
    
    customer = random.choice(customers)
    store = random.choice(stores)
    num_items = random.randint(1, 5)
    
    transaction_total = 0
    for item_num in range(num_items):
        product = random.choice(products)
        quantity = random.randint(1, 3)
        line_total = product['unit_price'] * quantity
        transaction_total += line_total
        
        transactions.append({
            'transaction_id': f'T{str(i).zfill(8)}-{item_num+1}',
            'transaction_date': transaction_date.strftime('%Y-%m-%d %H:%M:%S'),
            'customer_id': customer['customer_id'],
            'store_id': store['store_id'],
            'product_id': product['product_id'],
            'quantity': quantity,
            'unit_price': product['unit_price'],
            'line_total': round(line_total, 2),
            'payment_method': random.choice(['Credit Card', 'Debit Card', 'Cash', 'Mobile Payment'])
        })

# Write to CSV files
# Products
with open('products.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['product_id', 'product_name', 'category', 'unit_price', 'supplier_id'])
    writer.writeheader()
    writer.writerows(products)

# Customers
with open('customers.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['customer_id', 'loyalty_tier', 'join_date', 'region'])
    writer.writeheader()
    writer.writerows(customers)

# Stores
with open('stores.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['store_id', 'region', 'size', 'online_store'])
    writer.writeheader()
    writer.writerows(stores)

# Transactions
with open('transactions.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['transaction_id', 'transaction_date', 'customer_id', 'store_id', 
                                          'product_id', 'quantity', 'unit_price', 'line_total', 'payment_method'])
    writer.writeheader()
    writer.writerows(transactions)

print("Data generation complete!")
print(f"Generated {len(products)} products")
print(f"Generated {len(customers)} customers")
print(f"Generated {len(stores)} stores")
print(f"Generated {len(transactions)} transaction lines")

Sample data generated


In [None]:
-- setup_retailchain_hive.sql

-- Create database
CREATE DATABASE IF NOT EXISTS retailchain_analytics
COMMENT 'RetailChain Big Data Analytics Database'
LOCATION '/user/hive/warehouse/retailchain_analytics.db';

USE retailchain_analytics;

-- Set Hive optimization properties
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions=1000;
SET hive.exec.max.dynamic.partitions.pernode=100;
SET hive.enforce.bucketing=true;
SET hive.mapred.mode=nonstrict;
SET hive.optimize.sort.dynamic.partition=true;

-- Create external tables for raw data
CREATE EXTERNAL TABLE IF NOT EXISTS raw_products (
    product_id STRING,
    product_name STRING,
    category STRING,
    unit_price FLOAT,
    supplier_id STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse/retailchain_analytics.db/raw_products'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE EXTERNAL TABLE IF NOT EXISTS raw_customers (
    customer_id STRING,
    loyalty_tier STRING,
    join_date STRING,
    region STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse/retailchain_analytics.db/raw_customers'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE EXTERNAL TABLE IF NOT EXISTS raw_stores (
    store_id STRING,
    region STRING,
    size STRING,
    online_store STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse/retailchain_analytics.db/raw_stores'
TBLPROPERTIES ("skip.header.line.count"="1");

CREATE EXTERNAL TABLE IF NOT EXISTS raw_transactions (
    transaction_id STRING,
    transaction_date STRING,
    customer_id STRING,
    store_id STRING,
    product_id STRING,
    quantity INT,
    unit_price FLOAT,
    line_total FLOAT,
    payment_method STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse/retailchain_analytics.db/raw_transactions'
TBLPROPERTIES ("skip.header.line.count"="1");

-- Load data into external tables
LOAD DATA LOCAL INPATH 'products.csv' OVERWRITE INTO TABLE raw_products;
LOAD DATA LOCAL INPATH 'customers.csv' OVERWRITE INTO TABLE raw_customers;
LOAD DATA LOCAL INPATH 'stores.csv' OVERWRITE INTO TABLE raw_stores;
LOAD DATA LOCAL INPATH 'transactions.csv' OVERWRITE INTO TABLE raw_transactions;

-- Create optimized tables for analytics
-- Partitioned and bucketed fact table for transactions
CREATE TABLE IF NOT EXISTS fact_transactions (
    transaction_id STRING,
    customer_id STRING,
    store_id STRING,
    product_id STRING,
    quantity INT,
    unit_price FLOAT,
    line_total FLOAT,
    payment_method STRING,
    transaction_hour INT
)
PARTITIONED BY (transaction_year INT, transaction_month INT, transaction_day INT)
CLUSTERED BY (customer_id) INTO 8 BUCKETS
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY');

-- Dimension tables optimized for querying
CREATE TABLE IF NOT EXISTS dim_customer (
    customer_id STRING,
    loyalty_tier STRING,
    join_date DATE,
    region STRING,
    customer_tenure_days INT
)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY');

CREATE TABLE IF NOT EXISTS dim_product (
    product_id STRING,
    product_name STRING,
    category STRING,
    unit_price FLOAT,
    supplier_id STRING,
    price_range STRING
)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY');

CREATE TABLE IF NOT EXISTS dim_store (
    store_id STRING,
    region STRING,
    size STRING,
    online_store BOOLEAN
)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY');

-- Transform and load data into optimized tables
-- Populate dim_customer
INSERT OVERWRITE TABLE dim_customer
SELECT 
    customer_id,
    loyalty_tier,
    TO_DATE(join_date) as join_date,
    region,
    DATEDIFF(CURRENT_DATE(), TO_DATE(join_date)) as customer_tenure_days
FROM raw_customers;

-- Populate dim_product with derived columns
INSERT OVERWRITE TABLE dim_product
SELECT 
    product_id,
    product_name,
    category,
    unit_price,
    supplier_id,
    CASE 
        WHEN unit_price < 50 THEN 'Budget'
        WHEN unit_price >= 50 AND unit_price < 200 THEN 'Mid-Range'
        ELSE 'Premium'
    END as price_range
FROM raw_products;

-- Populate dim_store
INSERT OVERWRITE TABLE dim_store
SELECT 
    store_id,
    region,
    size,
    CASE WHEN online_store = 'True' THEN true ELSE false END as online_store
FROM raw_stores;

-- Populate fact_transactions with partitioning
INSERT OVERWRITE TABLE fact_transactions 
PARTITION (transaction_year, transaction_month, transaction_day)
SELECT 
    t.transaction_id,
    t.customer_id,
    t.store_id,
    t.product_id,
    t.quantity,
    t.unit_price,
    t.line_total,
    t.payment_method,
    HOUR(t.transaction_date) as transaction_hour,
    YEAR(t.transaction_date) as transaction_year,
    MONTH(t.transaction_date) as transaction_month,
    DAY(t.transaction_date) as transaction_day
FROM raw_transactions t;

In [22]:
-- retailchain_analytics_queries.sql

USE retailchain_analytics;

-- Query 1: Sales Performance by Region and Category
-- This helps identify top-performing regions and product categories
SELECT 
    ds.region,
    dp.category,
    COUNT(DISTINCT ft.transaction_id) as num_transactions,
    SUM(ft.quantity) as total_units_sold,
    ROUND(SUM(ft.line_total), 2) as total_revenue,
    ROUND(AVG(ft.line_total), 2) as avg_transaction_value,
    RANK() OVER (PARTITION BY ds.region ORDER BY SUM(ft.line_total) DESC) as category_rank_in_region
FROM fact_transactions ft
JOIN dim_store ds ON ft.store_id = ds.store_id
JOIN dim_product dp ON ft.product_id = dp.product_id
WHERE ft.transaction_year = 2023
GROUP BY ds.region, dp.category
ORDER BY ds.region, total_revenue DESC;

-- Query 2: Customer Loyalty Analysis
-- Analyze purchasing patterns by loyalty tier
SELECT 
    dc.loyalty_tier,
    COUNT(DISTINCT dc.customer_id) as num_customers,
    ROUND(AVG(ft.line_total), 2) as avg_purchase_amount,
    SUM(ft.quantity) as total_items_purchased,
    COUNT(DISTINCT ft.transaction_id) as total_transactions,
    ROUND(SUM(ft.line_total) / COUNT(DISTINCT ft.customer_id), 2) as revenue_per_customer,
    ROUND(COUNT(DISTINCT ft.transaction_id) / COUNT(DISTINCT ft.customer_id), 2) as avg_transactions_per_customer
FROM dim_customer dc
LEFT JOIN fact_transactions ft ON dc.customer_id = ft.customer_id
WHERE ft.transaction_year = 2023
GROUP BY dc.loyalty_tier
ORDER BY avg_purchase_amount DESC;

-- Query 3: Time-based Sales Patterns
-- Analyze sales by hour, day, and month for better staffing and inventory
SELECT 
    ft.transaction_hour,
    COUNT(DISTINCT ft.transaction_id) as num_transactions,
    ROUND(AVG(ft.line_total), 2) as avg_sale_value,
    SUM(ft.quantity) as total_items_sold,
    ROUND(SUM(ft.line_total), 2) as total_revenue,
    -- Compare with average to identify peak hours
    ROUND(SUM(ft.line_total) / AVG(SUM(ft.line_total)) OVER (), 2) as revenue_index
FROM fact_transactions ft
WHERE ft.transaction_year = 2023
GROUP BY ft.transaction_hour
ORDER BY ft.transaction_hour;

-- Query 4: Monthly Sales Trends and YoY Comparison
-- Track business growth and seasonal patterns
WITH monthly_sales AS (
    SELECT 
        ft.transaction_year,
        ft.transaction_month,
        COUNT(DISTINCT ft.transaction_id) as transactions,
        SUM(ft.line_total) as revenue,
        SUM(ft.quantity) as units_sold
    FROM fact_transactions ft
    GROUP BY ft.transaction_year, ft.transaction_month
)
SELECT 
    ms.transaction_year,
    ms.transaction_month,
    ms.transactions,
    ROUND(ms.revenue, 2) as revenue,
    ROUND(ms.units_sold, 2) as units_sold,
    ROUND(LAG(ms.revenue) OVER (PARTITION BY ms.transaction_month ORDER BY ms.transaction_year), 2) as prev_year_revenue,
    ROUND(((ms.revenue - LAG(ms.revenue) OVER (PARTITION BY ms.transaction_month ORDER BY ms.transaction_year)) / 
           LAG(ms.revenue) OVER (PARTITION BY ms.transaction_month ORDER BY ms.transaction_year)) * 100, 2) as yoy_growth_percentage
FROM monthly_sales ms
ORDER BY ms.transaction_year, ms.transaction_month;

-- Query 5: Product Performance Analysis
-- Identify best and worst performing products for inventory optimization
SELECT 
    dp.category,
    dp.product_name,
    dp.price_range,
    SUM(ft.quantity) as total_units_sold,
    ROUND(SUM(ft.line_total), 2) as total_revenue,
    COUNT(DISTINCT ft.transaction_id) as num_transactions,
    ROUND(AVG(ft.quantity), 2) as avg_units_per_transaction,
    -- Ranking within category
    RANK() OVER (PARTITION BY dp.category ORDER BY SUM(ft.line_total) DESC) as rank_in_category
FROM fact_transactions ft
JOIN dim_product dp ON ft.product_id = dp.product_id
WHERE ft.transaction_year = 2023
GROUP BY dp.category, dp.product_name, dp.price_range
HAVING SUM(ft.quantity) > 0
ORDER BY dp.category, total_revenue DESC;

-- Query 6: Store Performance Comparison
-- Compare store performance for resource allocation
SELECT 
    ds.store_id,
    ds.region,
    ds.size,
    ds.online_store,
    COUNT(DISTINCT ft.transaction_id) as transaction_count,
    COUNT(DISTINCT ft.customer_id) as unique_customers,
    ROUND(SUM(ft.line_total), 2) as total_revenue,
    ROUND(AVG(ft.line_total), 2) as avg_transaction_value,
    ROUND(SUM(ft.line_total) / COUNT(DISTINCT ft.transaction_id), 2) as revenue_per_transaction,
    -- Performance percentile
    ROUND(PERCENT_RANK() OVER (ORDER BY SUM(ft.line_total)), 2) as revenue_percentile
FROM fact_transactions ft
JOIN dim_store ds ON ft.store_id = ds.store_id
WHERE ft.transaction_year = 2023
GROUP BY ds.store_id, ds.region, ds.size, ds.online_store
ORDER BY total_revenue DESC;

-- Query 7: Customer Segmentation (RFM Analysis)
-- Identify valuable customers for targeted marketing
WITH customer_metrics AS (
    SELECT 
        ft.customer_id,
        dc.loyalty_tier,
        dc.region,
        DATEDIFF(MAX(ft.transaction_date), MIN(ft.transaction_date)) as customer_lifetime_days,
        COUNT(DISTINCT ft.transaction_id) as frequency,
        SUM(ft.line_total) as monetary_value,
        AVG(ft.line_total) as avg_transaction_value
    FROM fact_transactions ft
    JOIN dim_customer dc ON ft.customer_id = dc.customer_id
    WHERE ft.transaction_year = 2023
    GROUP BY ft.customer_id, dc.loyalty_tier, dc.region
)
SELECT 
    customer_id,
    loyalty_tier,
    region,
    frequency,
    ROUND(monetary_value, 2) as total_spent,
    ROUND(avg_transaction_value, 2) as avg_spent,
    CASE 
        WHEN monetary_value > 1000 AND frequency > 10 THEN 'VIP'
        WHEN monetary_value > 500 AND frequency > 5 THEN 'Regular High-Value'
        WHEN monetary_value > 100 THEN 'Occasional'
        ELSE 'Low Activity'
    END as customer_segment
FROM customer_metrics
ORDER BY monetary_value DESC;

-- Query 8: Payment Method Analysis
-- Understand customer payment preferences
SELECT 
    ft.payment_method,
    COUNT(DISTINCT ft.transaction_id) as num_transactions,
    ROUND(SUM(ft.line_total), 2) as total_processed,
    ROUND(AVG(ft.line_total), 2) as avg_transaction_amount,
    ROUND(COUNT(DISTINCT ft.transaction_id) * 100.0 / SUM(COUNT(DISTINCT ft.transaction_id)) OVER(), 2) as percentage_of_transactions,
    -- Analyze by region
    ds.region,
    COUNT(DISTINCT ft.transaction_id) as region_transactions
FROM fact_transactions ft
JOIN dim_store ds ON ft.store_id = ds.store_id
WHERE ft.transaction_year = 2023
GROUP BY ft.payment_method, ds.region
WITH CUBE
ORDER BY ft.payment_method, ds.region;

NameError: name 'spark' is not defined

In [10]:
# cell_4_create_hive_tables.py
# Convert pandas to spark dataframes
spark_products = spark.createDataFrame(products_df)
spark_customers = spark.createDataFrame(customers_df)
spark_transactions = spark.createDataFrame(transactions_df)

# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS retailchain_db")
spark.sql("USE retailchain_db")
print("Using database: retailchain_db")

# Create and populate tables
spark_products.write.mode("overwrite").saveAsTable("products")
spark_customers.write.mode("overwrite").saveAsTable("customers")
spark_transactions.write.mode("overwrite").saveAsTable("transactions")

print("Tables created successfully")

NameError: name 'spark' is not defined

In [23]:
# cell_5_run_analytics.py
# Q1: Sales by category
print("="*50)
print("SALES BY CATEGORY")
print("="*50)
result1 = spark.sql("""
    SELECT 
        p.category,
        COUNT(DISTINCT t.transaction_id) as num_transactions,
        SUM(t.quantity) as total_units,
        ROUND(SUM(t.line_total), 2) as total_revenue,
        ROUND(AVG(t.line_total), 2) as avg_transaction_value
    FROM transactions t
    JOIN products p ON t.product_id = p.product_id
    GROUP BY p.category
    ORDER BY total_revenue DESC
""")
result1.show()

# Q2: Customer loyalty analysis
print("="*50)
print("CUSTOMER LOYALTY ANALYSIS")
print("="*50)
result2 = spark.sql("""
    SELECT 
        c.loyalty_tier,
        COUNT(DISTINCT t.customer_id) as num_customers,
        ROUND(AVG(t.line_total), 2) as avg_purchase,
        SUM(t.quantity) as total_items,
        COUNT(DISTINCT t.transaction_id) as total_transactions,
        ROUND(SUM(t.line_total) / COUNT(DISTINCT t.customer_id), 2) as revenue_per_customer
    FROM transactions t
    JOIN customers c ON t.customer_id = c.customer_id
    GROUP BY c.loyalty_tier
    ORDER BY avg_purchase DESC
""")
result2.show()

# Q3: Monthly sales trends
print("="*50)
print("MONTHLY SALES TRENDS")
print("="*50)
result3 = spark.sql("""
    SELECT 
        YEAR(transaction_date) as year,
        MONTH(transaction_date) as month,
        COUNT(DISTINCT transaction_id) as transactions,
        ROUND(SUM(line_total), 2) as revenue,
        COUNT(DISTINCT customer_id) as unique_customers
    FROM transactions
    GROUP BY YEAR(transaction_date), MONTH(transaction_date)
    ORDER BY year, month
""")
result3.show()

# Q4: Top products
print("="*50)
print("TOP 10 PRODUCTS BY REVENUE")
print("="*50)
result4 = spark.sql("""
    SELECT 
        p.product_name,
        p.category,
        SUM(t.quantity) as units_sold,
        ROUND(SUM(t.line_total), 2) as revenue,
        COUNT(DISTINCT t.transaction_id) as times_purchased
    FROM transactions t
    JOIN products p ON t.product_id = p.product_id
    GROUP BY p.product_name, p.category
    ORDER BY revenue DESC
    LIMIT 10
""")
result4.show()

SALES BY CATEGORY


NameError: name 'spark' is not defined

In [24]:
# cell_6_create_optimized_tables.py
# Create partitioned table for better performance
spark.sql("""
    CREATE TABLE IF NOT EXISTS transactions_partitioned
    PARTITIONED BY (transaction_year INT, transaction_month INT)
    STORED AS PARQUET
    AS
    SELECT 
        transaction_id,
        customer_id,
        product_id,
        quantity,
        unit_price,
        line_total,
        payment_method,
        transaction_date,
        YEAR(transaction_date) as transaction_year,
        MONTH(transaction_date) as transaction_month
    FROM transactions
""")
print("Partitioned table created")

# Create bucketed table for optimized joins
spark.sql("""
    CREATE TABLE IF NOT EXISTS transactions_bucketed
    CLUSTERED BY (customer_id) INTO 8 BUCKETS
    STORED AS PARQUET
    AS
    SELECT * FROM transactions
""")
print("Bucketed table created")

NameError: name 'spark' is not defined

In [25]:
# cell_7_export_for_visualization.py
# Export results to CSV for visualization
results_to_export = spark.sql("""
    SELECT 
        p.category,
        c.region,
        DATE_FORMAT(t.transaction_date, 'yyyy-MM') as month,
        SUM(t.line_total) as revenue,
        SUM(t.quantity) as quantity
    FROM transactions t
    JOIN products p ON t.product_id = p.product_id
    JOIN customers c ON t.customer_id = c.customer_id
    GROUP BY p.category, c.region, DATE_FORMAT(t.transaction_date, 'yyyy-MM')
    ORDER BY month, category, region
""")

# Convert to pandas and save
results_to_export_pd = results_to_export.toPandas()
results_to_export_pd.to_csv('retailchain_analytics_export.csv', index=False)
print("Data exported to retailchain_analytics_export.csv")

NameError: name 'spark' is not defined