In [0]:
%sql
CREATE CATALOG IF NOT EXISTS ecommerce_analytics_dev;

CREATE SCHEMA IF NOT EXISTS ecommerce_analytics_dev.bronze_layer;

CREATE VOLUME IF NOT EXISTS ecommerce_analytics_dev.bronze_layer.raw_data;


In [0]:
# ======================================
# E-COMMERCE DATASET INGESTION - TEST
# ======================================

try:    import kagglehub
except ModuleNotFoundError:
    import sys
    !pip install kagglehub
    import kagglehub

from kagglehub import KaggleDatasetAdapter
import os
import shutil

# ------------------------
# CONFIG
# ------------------------
VOLUME_RAW_PATH = "/Volumes/ecommerce_analytics_dev/bronze_layer/raw_data"
DATASET_NAME = "mkechinov/ecommerce-behavior-data-from-multi-category-store"

# Ensure volume folder exists
os.makedirs(VOLUME_RAW_PATH, exist_ok=True)

# ------------------------
# DOWNLOAD DATASET (cached)
# ------------------------
print("Downloading dataset via KaggleHub...")
dataset_path = kagglehub.dataset_download(DATASET_NAME)
print("Dataset downloaded to KaggleHub cache:", dataset_path)

# ------------------------
# LIST ALL FILES
# ------------------------
all_files = []
for root, dirs, files in os.walk(dataset_path):
    for file in files:
        if file.endswith(".csv"):
            all_files.append(os.path.join(root, file))

print(f"CSV files found ({len(all_files)}):")
for f in all_files:
    print(" ", f)

# ------------------------
# COPY FILES INTO UC VOLUME
# ------------------------
print("\nCopying CSV files to Unity Catalog volume...")
for fpath in all_files:
    fname = os.path.basename(fpath)
    target_file = os.path.join(VOLUME_RAW_PATH, fname)
    shutil.copy2(fpath, target_file)
    print(f"Copied: {fname} â†’ {VOLUME_RAW_PATH}")

# ------------------------
# VERIFY
# ------------------------
print("\nFiles in volume:")
for f in os.listdir(VOLUME_RAW_PATH):
    print(" ", f)


In [0]:
from pyspark.sql.functions import col, sum as spark_sum
import os

# Path to your raw data volume
VOLUME_RAW_PATH = "/Volumes/ecommerce_analytics_dev/bronze_layer/raw_data"

# List all CSV files in the volume
files = [f.path for f in dbutils.fs.ls(VOLUME_RAW_PATH) if f.name.endswith(".csv")]

print(f"Found {len(files)} CSV files:\n")
for f in files:
    info = dbutils.fs.ls(f)
    # Get file size
    file_size = sum([x.size for x in dbutils.fs.ls(f)])
    print(f"File: {f}, Size: {file_size / (1024*1024):.2f} MB")

print("\n--- Processing each CSV with Spark ---\n")

for f in files:
    print(f"ðŸ“„ File: {f}")
    
    # Read CSV using Spark (infer schema)
    df = spark.read.option("header", True).option("inferSchema", True).csv(f)
    
    # Number of rows
    n_rows = df.count()
    
    # Number of nulls per column
    nulls_df = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    nulls = nulls_df.collect()[0].asDict()
    
    print(f"Rows: {n_rows}")
    print("Nulls per column:")
    for col_name, null_count in nulls.items():
        print(f"  {col_name}: {null_count}")
    print("\n" + "-"*50 + "\n")
