## Amazon Sales Analysis - Data Loading and Cleaning

Load raw sales data and perform initial cleaning operations.

In [0]:
# Configuration
CATALOG = "db_ecom_project"
SCHEMA = "amazon_sales_schema"
TABLE = "amazon_sales"
FULL_TABLE_PATH = f"{CATALOG}.{SCHEMA}.{TABLE}"

print(f"ðŸ“Š Working with: {FULL_TABLE_PATH}")

### 1. Load Data

In [0]:
df = spark.table(FULL_TABLE_PATH)

print(f"Total records: {df.count():,}")
print(f"Columns: {len(df.columns)}")
print("\nSchema:")
df.printSchema()
print("\nFirst 5 rows:")
display(df.limit(5))

### 2. Data Quality Check
Check for missing values and invalid data.

In [0]:
from pyspark.sql.functions import col, count as spark_count, when

# Null counts
quality_check = df.select([
    spark_count(when(col(c).isNull(), c)).alias(f"{c}_nulls") 
    for c in df.columns
])

print("Null counts:")
display(quality_check)

# Invalid checks
invalid_price = df.filter(col("price") <= 0).count()
invalid_quantity = df.filter(col("quantity_sold") <= 0).count()

print(f"\nRows with zero/negative price: {invalid_price}")
print(f"Rows with zero/negative quantity: {invalid_quantity}")
print(f"Total rows before cleaning: {df.count():,}")

### 3. Data Cleaning
Parse dates and filter invalid records.

In [0]:
from pyspark.sql.functions import to_date

# Parse date
df_clean = df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))

# Filter valid records
df_clean = df_clean.filter(
    (col("price") > 0) & 
    (col("quantity_sold") > 0) &
    (col("order_date").isNotNull())
)

print("âœ… Data cleaned")
print(f"Records after cleaning: {df_clean.count():,}")
display(df_clean.limit(5))

### 4. Feature Engineering
Create date features and calculated metrics.

In [0]:
from pyspark.sql.functions import year, month, quarter, dayofweek

# Date features
df_clean = df_clean \
    .withColumn("Year", year(col("order_date"))) \
    .withColumn("Month", month(col("order_date"))) \
    .withColumn("Quarter", quarter(col("order_date"))) \
    .withColumn("Day_of_Week", dayofweek(col("order_date")))

# Calculated metrics
df_clean = df_clean.withColumn("Revenue_Per_Unit", col("total_revenue") / col("quantity_sold"))

print("âœ… Features created")
display(df_clean.limit(5))

### 5. Save Cleaned Data

In [0]:
# Save
cleaned_table = f"{CATALOG}.{SCHEMA}.amazon_sales_cleaned"
df_clean.write.mode("overwrite").saveAsTable(cleaned_table)

print(f"âœ… Saved to: {cleaned_table}")
print(f"Total records: {df_clean.count():,}")