# PySpark Online Retail II Dataset Analysis

This notebook demonstrates how to load and analyze the Online Retail II dataset using PySpark in Google Colab.


## 1. Install Required Packages

First, install PySpark and related dependencies in Google Colab.


In [1]:
# Install required packages
%pip install pyspark pandas openpyxl


Note: you may need to restart the kernel to use updated packages.


## 2. Import Libraries and Initialize Spark Session


In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, sum as spark_sum, count, when, isnan, isnull, desc, min as spark_min, max as spark_max
from pyspark.sql import functions as F
import pandas as pd
import os

# Set environment variables for Windows
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'

# Initialize Spark session with enhanced configuration for Windows
spark = SparkSession.builder \
    .appName("OnlineRetailAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") \
    .config("spark.python.worker.timeout", "1200") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.pythonUDF.arrow.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.network.timeout", "800s") \
    .config("spark.rpc.askTimeout", "800s") \
    .config("spark.rpc.lookupTimeout", "800s") \
    .config("spark.sql.broadcastTimeout", "800s") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to reduce output noise
spark.sparkContext.setLogLevel("WARN")

print("Spark session initialized successfully!")
print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")


Spark session initialized successfully!
Spark version: 4.0.1
Spark UI: http://windows10.microdone.cn:4040


## 3. Load Data from GitHub

Since PySpark cannot directly read Excel files, we use pandas to read from GitHub and then convert to Spark DataFrame.


In [3]:
# Use pandas to read Excel file from GitHub (PySpark doesn't support Excel directly)
print("Reading Excel file from GitHub...")

# GitHub repository information
github_user = "Hachi630"
github_repo = "BDAS"
file_path = "online_retail_II.xlsx"

# Construct GitHub raw URL
github_url = f"https://raw.githubusercontent.com/{github_user}/{github_repo}/main/{file_path}"

# Read Excel file with multiple sheets
print("Loading data from both sheets (2009-2010 and 2010-2011)...")
excel_data = pd.read_excel(github_url, sheet_name=None)  # Read all sheets

# Get the two sheets
sheet_2009_2010 = excel_data['Year 2009-2010']
sheet_2010_2011 = excel_data['Year 2010-2011']

print(f"2009-2010 data shape: {sheet_2009_2010.shape}")
print(f"2010-2011 data shape: {sheet_2010_2011.shape}")

# Combine both datasets
pandas_df = pd.concat([sheet_2009_2010, sheet_2010_2011], ignore_index=True)
print(f"Combined data shape: {pandas_df.shape}")

# Convert pandas DataFrame to Spark DataFrame
df = spark.createDataFrame(pandas_df)

# Clean up pandas objects to free memory
del pandas_df, sheet_2009_2010, sheet_2010_2011, excel_data

print("Data successfully loaded from GitHub into Spark DataFrame!")
print("Pandas objects cleaned up to free memory.")


Reading Excel file from GitHub...
Loading data from both sheets (2009-2010 and 2010-2011)...
2009-2010 data shape: (525461, 8)
2010-2011 data shape: (541910, 8)
Combined data shape: (1067371, 8)


  Could not convert 'C489449' with type str: tried to convert to int64
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Data successfully loaded from GitHub into Spark DataFrame!
Pandas objects cleaned up to free memory.


## 4. Check Data Dimensions

Determine the number of rows and columns in the combined dataset.


In [13]:
# Check data dimensions using PySpark with error handling
print("=== Data Dimension Information ===")

try:
    # Get row count with retry mechanism
    print("Counting rows...")
    row_count = df.count()
    print(f"Dataset row count: {row_count:,}")
    
    # Get column count and names
    column_count = len(df.columns)
    column_names = df.columns
    
    print(f"Dataset column count: {column_count}")
    print(f"Column names: {column_names}")
    
    # Additional information
    print(f"\nDataset partitions: {df.rdd.getNumPartitions()}")
    print(f"Dataset storage level: {df.storageLevel}")
    
    print("✅ Data dimension check completed successfully!")
    
except Exception as e:
    print(f"❌ Error during data dimension check: {e}")
    print("\n🔧 Troubleshooting steps:")
    print("1. Restart the kernel and run all cells again")
    print("2. Check if Java is properly installed")
    print("3. Try reducing memory allocation in Spark config")
    print("4. Consider using pandas-only analysis for this dataset")
    
    # Fallback: try to get basic info without count()
    try:
        print("\n🔄 Attempting fallback analysis...")
        column_count = len(df.columns)
        column_names = df.columns
        print(f"Dataset column count: {column_count}")
        print(f"Column names: {column_names}")
        print("Note: Row count unavailable due to Spark connection issues")
    except Exception as e2:
        print(f"❌ Fallback also failed: {e2}")
        print("Please restart the kernel and try again.")


=== Data Dimension Information ===
Counting rows...
❌ Error during data dimension check: An error occurred while calling o80.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.nio.file.NoSuchFileException: C:\Users\87190\AppData\Local\Temp\blockmgr-036add69-0553-4f5f-b972-4fc7a2c3463f\05
java.nio.file.NoSuchFileException: C:\Users\87190\AppData\Local\Temp\blockmgr-036add69-0553-4f5f-b972-4fc7a2c3463f\05
	at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
	at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
	at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
	at java.base/sun.nio.fs.WindowsFileSystemProvider.createDirectory(WindowsFileSystemProvider.java:521)
	at java.base/java.nio.file.Files.createDirectory(Files.java:700)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:110)
	at org.apache

## 5. Data Quality Assessment

This section performs comprehensive data quality checks to identify potential issues in the dataset.


### 5.1 Missing Values Analysis


In [10]:
# Count nulls in each column with robust error handling
print("=== Missing Values Analysis ===")

# Try PySpark first, fallback to pandas if needed
USE_SPARK = True

try:
    # Test Spark connection first
    print("Testing Spark connection...")
    test_count = df.count()
    print(f"✅ Spark connection successful! Dataset has {test_count:,} records")
    
    # Count missing values in each column using PySpark
    print("Counting missing values using PySpark...")
    missing_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
    
    print("Missing values per column:")
    missing_counts.show()
    
    # Get total count for percentage calculation
    total_records = df.count()
    print(f"\nTotal records: {total_records:,}")
    
    # Calculate and display missing percentages
    print("\nMissing values summary:")
    missing_data = missing_counts.collect()[0]
    for col_name in df.columns:
        missing_count = missing_data[col_name]
        missing_pct = (missing_count / total_records) * 100
        print(f"{col_name}: {missing_count:,} ({missing_pct:.2f}%)")
        
        # Add specific comment for CustomerID
        if col_name == "Customer ID" and missing_count > 0:
            print(f"  → CustomerID has many missing entries ({missing_count:,} records)")
        elif missing_count == 0:
            print(f"  → {col_name} is complete (no missing values)")
        else:
            print(f"  → {col_name} has some missing values")
    
    print("✅ Missing values analysis completed successfully with PySpark!")
    
except Exception as e:
    print(f"❌ PySpark analysis failed: {e}")
    print("\n🔄 Switching to pandas analysis...")
    USE_SPARK = False
    
    # Fallback to pandas analysis
    try:
        # Re-read data with pandas for analysis
        print("Re-reading data with pandas for analysis...")
        github_url = "https://raw.githubusercontent.com/Hachi630/BDAS/main/online_retail_II.xlsx"
        excel_data = pd.read_excel(github_url, sheet_name=None)
        sheet_2009_2010 = excel_data['Year 2009-2010']
        sheet_2010_2011 = excel_data['Year 2010-2011']
        pandas_df = pd.concat([sheet_2009_2010, sheet_2010_2011], ignore_index=True)
        
        print("Missing values per column:")
        missing_counts = pandas_df.isnull().sum()
        print(missing_counts)
        
        total_records = len(pandas_df)
        print(f"\nTotal records: {total_records:,}")
        
        # Calculate and display missing percentages
        print("\nMissing values summary:")
        for col_name in missing_counts.index:
            missing_count = missing_counts[col_name]
            missing_pct = (missing_count / total_records) * 100
            print(f"{col_name}: {missing_count:,} ({missing_pct:.2f}%)")
            
            # Add specific comment for CustomerID
            if col_name == "Customer ID" and missing_count > 0:
                print(f"  → CustomerID has many missing entries ({missing_count:,} records)")
            elif missing_count == 0:
                print(f"  → {col_name} is complete (no missing values)")
            else:
                print(f"  → {col_name} has some missing values")
        
        print("✅ Missing values analysis completed successfully with pandas!")
        
    except Exception as e2:
        print(f"❌ Pandas analysis also failed: {e2}")
        print("Please check your internet connection and try again.")


=== Missing Values Analysis ===
Testing Spark connection...
❌ PySpark analysis failed: An error occurred while calling o80.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.nio.file.NoSuchFileException: C:\Users\87190\AppData\Local\Temp\blockmgr-036add69-0553-4f5f-b972-4fc7a2c3463f\08
java.nio.file.NoSuchFileException: C:\Users\87190\AppData\Local\Temp\blockmgr-036add69-0553-4f5f-b972-4fc7a2c3463f\08
	at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
	at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
	at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
	at java.base/sun.nio.fs.WindowsFileSystemProvider.createDirectory(WindowsFileSystemProvider.java:521)
	at java.base/java.nio.file.Files.createDirectory(Files.java:700)
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:110)
	at org.apache.s

### 5.2 Numeric Values Validity Check


In [None]:
# Identify records with non-positive prices or zero quantity with error handling
print("=== Numeric Values Validity Check ===")

if USE_SPARK:
    try:
        print("Using PySpark for numeric validity analysis...")
        
        # Check for non-positive Price values
        invalid_price_count = df.filter(F.col("Price") <= 0).count()
        print(f"Records with Price <= 0: {invalid_price_count:,}")
        
        if invalid_price_count > 0:
            print("\nSample records with invalid prices:")
            df.filter(F.col("Price") <= 0).select("Invoice", "StockCode", "Quantity", "Price").show(5)
        
        # Check for zero Quantity values
        zero_quantity_count = df.filter(F.col("Quantity") == 0).count()
        print(f"\nRecords with Quantity = 0: {zero_quantity_count:,}")
        
        if zero_quantity_count > 0:
            print("\nSample records with zero quantity:")
            df.filter(F.col("Quantity") == 0).show(5)
        
        # Check for negative quantities (returns)
        negative_quantity_count = df.filter(F.col("Quantity") < 0).count()
        print(f"\nRecords with Quantity < 0 (returns): {negative_quantity_count:,}")
        
        print("✅ PySpark numeric validity check completed!")
        
    except Exception as e:
        print(f"❌ PySpark numeric check failed: {e}")
        print("🔄 Switching to pandas analysis...")
        USE_SPARK = False

if not USE_SPARK:
    try:
        print("Using pandas for numeric validity analysis...")
        
        # Check for non-positive Price values
        invalid_price_mask = pandas_df["Price"] <= 0
        invalid_price_count = invalid_price_mask.sum()
        print(f"Records with Price <= 0: {invalid_price_count:,}")
        
        if invalid_price_count > 0:
            print("\nSample records with invalid prices:")
            print(pandas_df[invalid_price_mask][["Invoice", "StockCode", "Quantity", "Price"]].head())
        
        # Check for zero Quantity values
        zero_quantity_mask = pandas_df["Quantity"] == 0
        zero_quantity_count = zero_quantity_mask.sum()
        print(f"\nRecords with Quantity = 0: {zero_quantity_count:,}")
        
        if zero_quantity_count > 0:
            print("\nSample records with zero quantity:")
            print(pandas_df[zero_quantity_mask].head())
        
        # Check for negative quantities (returns)
        negative_quantity_mask = pandas_df["Quantity"] < 0
        negative_quantity_count = negative_quantity_mask.sum()
        print(f"\nRecords with Quantity < 0 (returns): {negative_quantity_count:,}")
        
        print("✅ Pandas numeric validity check completed!")
        
    except Exception as e:
        print(f"❌ Pandas numeric check failed: {e}")

# Summary comments
print("\n=== Validity Summary ===")
if invalid_price_count > 0:
    print(f"⚠️  Found {invalid_price_count:,} records with non-positive prices (possibly freebies)")
else:
    print("✅ All prices are positive")
    
if zero_quantity_count > 0:
    print(f"⚠️  Found {zero_quantity_count:,} records with zero quantity (unusual - could indicate data entry errors)")
else:
    print("✅ No zero quantity records found")
    
if negative_quantity_count > 0:
    print(f"ℹ️  Found {negative_quantity_count:,} return transactions (negative quantities)")


## 6. Exploratory Data Analysis (EDA)

Perform comprehensive exploratory data analysis using PySpark.


### 6.1 Key Dataset Metrics


In [None]:
# Calculate number of distinct products and customers
print("=== Key Dataset Metrics ===")

# Calculate distinct counts using Spark
unique_products = df.select("StockCode").distinct().count()
unique_customers = df.select("Customer ID").distinct().count()

# Calculate total revenue
total_revenue = df.agg(F.sum(F.col("Quantity") * F.col("Price")).alias("TotalRevenue")).collect()[0]["TotalRevenue"]

# Calculate total transactions
total_transactions = df.count()

print(f"Total Transactions: {total_transactions:,}")
print(f"Unique Products (StockCode): {unique_products:,}")
print(f"Unique Customers: {unique_customers:,}")
print(f"Total Revenue: £{total_revenue:,.2f}")

# Additional metrics
avg_order_value = df.agg(F.avg(F.col("Quantity") * F.col("Price")).alias("AvgOrderValue")).collect()[0]["AvgOrderValue"]
print(f"Average Order Value: £{avg_order_value:.2f}")

# Data preview
print("\n=== Data Preview ===")
df.show(5, truncate=False)

print("\n=== Data Schema ===")
df.printSchema()


## 7. Data Quality Summary Report


In [None]:
# Data Quality Summary Report
print("=" * 60)
print("           DATA QUALITY ASSESSMENT SUMMARY")
print("=" * 60)

print("\n📊 DATASET OVERVIEW:")
total_records = df.count()
total_columns = len(df.columns)

print(f"   • Total Records: {total_records:,}")
print(f"   • Total Columns: {total_columns}")
print(f"   • Analysis Engine: PySpark")

print("\n🔍 DATA QUALITY ISSUES IDENTIFIED:")

# Missing Values Summary
print("\n1. MISSING VALUES:")
missing_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
missing_data = missing_counts.collect()[0]
for col_name in df.columns:
    missing_count = missing_data[col_name]
    missing_pct = (missing_count / total_records) * 100
    if missing_count > 0:
        print(f"   • {col_name}: {missing_count:,} ({missing_pct:.2f}%)")

# Numeric Validity Summary
print("\n2. NUMERIC VALIDITY:")
invalid_price_count = df.filter(F.col("Price") <= 0).count()
zero_quantity_count = df.filter(F.col("Quantity") == 0).count()
negative_quantity_count = df.filter(F.col("Quantity") < 0).count()

if invalid_price_count > 0:
    print(f"   • Non-positive prices: {invalid_price_count:,}")
if zero_quantity_count > 0:
    print(f"   • Zero quantities: {zero_quantity_count:,}")
if negative_quantity_count > 0:
    print(f"   • Negative quantities (returns): {negative_quantity_count:,}")

# Returns and Cancellations Summary
print("\n3. RETURNS & CANCELLATIONS:")
num_cancelled = df.filter(F.col("Invoice").startswith("C")).count()
num_returns = df.filter(F.col("Quantity") < 0).count()

print(f"   • Cancelled invoices: {num_cancelled:,}")
print(f"   • Return transactions: {num_returns:,}")

# Duplicates Summary
print("\n4. DUPLICATE RECORDS:")
unique_rows = df.dropDuplicates().count()
num_duplicates = total_records - unique_rows
if num_duplicates > 0:
    print(f"   • Duplicate records: {num_duplicates:,}")
else:
    print("   • No duplicate records found")

# Date Range Summary
print("\n5. DATE RANGE CONSISTENCY:")
date_range = df.select(F.min("InvoiceDate").alias("MinDate"), F.max("InvoiceDate").alias("MaxDate"))
date_info = date_range.collect()[0]
min_date = date_info["MinDate"]
max_date = date_info["MaxDate"]

print(f"   • Date range: {min_date} to {max_date}")
print(f"   • Expected range: 2009-12-01 to 2011-12-09")

print("\n📋 RECOMMENDATIONS:")
print("   • CustomerID missing values: Consider impact on customer analysis")
print("   • Return transactions: Account for net vs gross sales calculations")
print("   • Date range: Verify business context for partial years")
if num_duplicates > 0:
    print("   • Duplicates: Consider removing for accurate analysis")
if invalid_price_count > 0 or zero_quantity_count > 0:
    print("   • Invalid values: Review business rules for data cleaning")

print("\n" + "=" * 60)
print("           END OF DATA QUALITY ASSESSMENT")
print("=" * 60)


## 8. Troubleshooting: Restart Spark Session

If you encounter connection timeout errors, run this cell to restart the Spark session.


In [None]:
# Restart Spark session if needed
print("🔄 Restarting Spark session...")

try:
    # Stop current session
    spark.stop()
    print("✅ Previous Spark session stopped")
except:
    print("ℹ️  No previous session to stop")

# Wait a moment
import time
time.sleep(2)

# Restart with optimized configuration
spark = SparkSession.builder \
    .appName("OnlineRetailAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.memory", "1g") \
    .config("spark.python.worker.timeout", "1800") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.network.timeout", "1200s") \
    .config("spark.rpc.askTimeout", "1200s") \
    .config("spark.rpc.lookupTimeout", "1200s") \
    .config("spark.sql.broadcastTimeout", "1200s") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .master("local[2]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("✅ New Spark session initialized!")
print(f"Spark version: {spark.version}")

# Test the connection
try:
    test_df = spark.range(10)
    test_count = test_df.count()
    print(f"✅ Connection test successful: {test_count} rows")
except Exception as e:
    print(f"❌ Connection test failed: {e}")
    print("Please check your Java installation and try again.")


## 9. Alternative: Pandas-Only Analysis

If PySpark continues to have connection issues, use this pandas-only analysis as a complete alternative.


In [None]:
# Complete pandas-only analysis as fallback
print("=== Complete Pandas Analysis ===")
print("This analysis uses pandas only, bypassing PySpark completely.")

# Load data with pandas
print("Loading data with pandas...")
github_url = "https://raw.githubusercontent.com/Hachi630/BDAS/main/online_retail_II.xlsx"
excel_data = pd.read_excel(github_url, sheet_name=None)
sheet_2009_2010 = excel_data['Year 2009-2010']
sheet_2010_2011 = excel_data['Year 2010-2011']
df_pandas = pd.concat([sheet_2009_2010, sheet_2010_2011], ignore_index=True)

print(f"✅ Data loaded successfully!")
print(f"Dataset shape: {df_pandas.shape}")
print(f"Columns: {list(df_pandas.columns)}")

# Basic info
print("\n=== Dataset Overview ===")
print(f"Total records: {len(df_pandas):,}")
print(f"Total columns: {len(df_pandas.columns)}")
print(f"Memory usage: {df_pandas.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Data preview
print("\n=== Data Preview ===")
print(df_pandas.head())

# Data types
print("\n=== Data Types ===")
print(df_pandas.dtypes)

# Missing values
print("\n=== Missing Values ===")
missing_counts = df_pandas.isnull().sum()
missing_pct = (missing_counts / len(df_pandas)) * 100
missing_df = pd.DataFrame({
    'Missing Count': missing_counts,
    'Missing Percentage': missing_pct
})
print(missing_df)

# Basic statistics
print("\n=== Basic Statistics ===")
print(df_pandas.describe())

# Key metrics
print("\n=== Key Metrics ===")
unique_products = df_pandas['StockCode'].nunique()
unique_customers = df_pandas['Customer ID'].nunique()
total_revenue = (df_pandas['Quantity'] * df_pandas['Price']).sum()
avg_order_value = (df_pandas['Quantity'] * df_pandas['Price']).mean()

print(f"Unique Products: {unique_products:,}")
print(f"Unique Customers: {unique_customers:,}")
print(f"Total Revenue: £{total_revenue:,.2f}")
print(f"Average Order Value: £{avg_order_value:.2f}")

# Data quality checks
print("\n=== Data Quality Checks ===")
invalid_prices = (df_pandas['Price'] <= 0).sum()
zero_quantities = (df_pandas['Quantity'] == 0).sum()
negative_quantities = (df_pandas['Quantity'] < 0).sum()
cancelled_invoices = df_pandas['Invoice'].str.startswith('C').sum()

print(f"Invalid prices (≤0): {invalid_prices:,}")
print(f"Zero quantities: {zero_quantities:,}")
print(f"Negative quantities (returns): {negative_quantities:,}")
print(f"Cancelled invoices: {cancelled_invoices:,}")

# Date range
print("\n=== Date Range ===")
min_date = df_pandas['InvoiceDate'].min()
max_date = df_pandas['InvoiceDate'].max()
print(f"Date range: {min_date} to {max_date}")

# Top countries
print("\n=== Top 10 Countries by Revenue ===")
country_revenue = df_pandas.groupby('Country').agg({
    'Quantity': 'sum',
    'Price': lambda x: (df_pandas.loc[x.index, 'Quantity'] * x).sum(),
    'Invoice': 'count'
}).rename(columns={'Price': 'TotalRevenue', 'Invoice': 'TransactionCount'})
country_revenue = country_revenue.sort_values('TotalRevenue', ascending=False)
print(country_revenue.head(10))

print("\n✅ Complete pandas analysis finished!")
print("All analysis completed successfully using pandas.")
