In [None]:
# !pip install awscli
!pip install --upgrade awscli botocore
# !aws configure

In [None]:
# !apt-get install openjdk-11-jdk -y

In [None]:
# !pip install boto3 pyspark

In [None]:
!apt-get install openjdk-11-jdk -y
!pip install pyspark boto3
!wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar
!wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar

In [None]:
# !pip install delta-spark

In [None]:
!pip install -q findspark

In [None]:
# !pip install delta-spark
!pip install delta-spark==2.2.0

# download the required jar files
!wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar
!wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar


In [None]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars hadoop-aws-3.3.2.jar,aws-java-sdk-bundle-1.12.262.jar pyspark-shell'
# set the environment variable to include the jar files to the spark session
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars hadoop-aws-3.3.2.jar,aws-java-sdk-bundle-1.12.262.jar pyspark-shell'

In [None]:
import findspark
findspark.init()

In [None]:
import sys
from pyspark.sql import SparkSession
# from pyspark.sql.functions import when, col, count, countDistinct, sum, avg, row_number, to_date, date_format, dense_rank, desc, trim, datediff, min, max, unix_timestamp, to_timestamp, round
from pyspark.sql.functions import when, col, count
from pyspark.sql.window import Window
import boto3
import pyspark.sql.functions as F
from delta.tables import DeltaTable

In [None]:
import os
from google.colab import userdata

AWS_ACCESS_KEY = userdata.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_KEY = userdata.get('AWS_SECRET_ACCESS_KEY')

# Verify if credentials are loaded
print(f"AWS_ACCESS_KEY: {AWS_ACCESS_KEY[:4]}********")

In [None]:
# Create a Spark session with S3 support
spark = SparkSession.builder \
    .appName("LakehouseEcommerce") \
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2,org.apache.hadoop:hadoop-common:3.3.2") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.schema.autoMerge.enable", "true") \
    .config("spark.jars", "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar") \
    .getOrCreate()

# spark = SparkSession.builder.appName("VehicleLocationMetrics").getOrCreate()

In [None]:
# Initialize S3 Hook
# s3_hook = S3Hook(aws_conn_id="aws_default")

S3_BUCKET_NAME = "lakehouse-e-commerce"
AWS_REGION = "eu-west-1"
DATA_FOLDER = "raw-data/"
PROCESSED_DATA_FOLDER = "processed/"
ARCHIVED_DATA_FOLDER = "archive/"
DELTA_LAKE_FOLDER = "lakehouse-dwh/"

In [None]:
s3 = boto3.client('s3',
                  aws_access_key_id=AWS_ACCESS_KEY,
                  aws_secret_access_key=AWS_SECRET_KEY,
                  region_name=AWS_REGION)

### Merging sheets in orders.xlsx and order_items.xlsx

In [None]:
# Loading the xlsx files
from google.colab import files
uploaded = files.upload()

# loading xlsx file through file path
# orders_df = pd.read_excel('orders_apr_2025.xlsx')
# order_items_df = pd.read_excel('order_items_apr_2025.xlsx')

In [None]:
# combining the sheets into one dataframe
import pandas as pd

def read_all_sheets(xlsx_file):
    # Read the Excel file
    xls = pd.ExcelFile(xlsx_file)
    sheets = xls.sheet_names
    combined_df = pd.concat([xls.parse(sheet) for sheet in sheets], ignore_index=True)
    return combined_df

orders_df = read_all_sheets('orders_apr_2025.xlsx')
order_items_df = read_all_sheets('order_items_apr_2025.xlsx')

In [None]:
orders_df.tail()

In [None]:
# saving dataframes in csv files in colab
orders_df.to_csv('orders.csv', index=False)
order_items_df.to_csv('order_items.csv', index=False)

In [None]:
from io import StringIO

def upload_df_to_s3(df, bucket, key):
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue())

In [None]:
upload_df_to_s3(orders_df, S3_BUCKET_NAME, "raw-data/orders.csv")
upload_df_to_s3(order_items_df, S3_BUCKET_NAME, "raw-data/order_items.csv")

### Continuation

In [None]:
# # List files in bucket
# response = s3.list_objects_v2(Bucket=S3_BUCKET_NAME)
# for obj in response.get('Contents', []):
#     print(obj['Key'])  # Printing the file names

In [None]:
# Defining S3 folder paths
products_path = f"s3a://{S3_BUCKET_NAME}/{DATA_FOLDER}/products.csv"
orders_path = f"s3a://{S3_BUCKET_NAME}/{DATA_FOLDER}/orders.csv"
order_items_path = f"s3a://{S3_BUCKET_NAME}/{DATA_FOLDER}/order_items.csv"
products_lakehouse_path = f"s3a://{S3_BUCKET_NAME}/{DELTA_LAKE_FOLDER}/products"
orders_lakehouse_path = f"s3a://{S3_BUCKET_NAME}/{DELTA_LAKE_FOLDER}/orders"
order_items_lakehouse_path = f"s3a://{S3_BUCKET_NAME}/{DELTA_LAKE_FOLDER}/order_items"

In [None]:
# Loading the data from S3
products_df = spark.read.option("header", True).csv(products_path)
orders_df = spark.read.option("header", True).csv(orders_path)
order_items_df = spark.read.option("header", True).csv(order_items_path)

In [None]:
products_df.show()
orders_df.show()
order_items_df.show()

In [None]:
products_df.printSchema()
orders_df.printSchema()
order_items_df.printSchema()

In [None]:
products_df.filter(col("product_id").isNull()).show()

In [None]:
orders_df.filter(col("order_timestamp").isNull()).show()

In [None]:
# check for missing values in each column
orders_df.select([count(when(col(c).isNull(), c)).alias(c) for c in orders_df.columns]).show()

In [None]:
order_items_df.select([count(when(col(c).isNull(), c)).alias(c) for c in order_items_df.columns]).show()

In [None]:
products_df.select([count(when(col(c).isNull(), c)).alias(c) for c in products_df.columns]).show()

### Glue Jobs

In [None]:
from pyspark.sql import functions as F

# Common helper functions
def validate_schema(df, required_columns):
    missing = [col for col in required_columns if col not in df.columns]
    if missing:
        raise Exception(f"Missing columns: {missing}")
    return df

def deduplicate(df, primary_keys):
    return df.dropDuplicates(primary_keys)

def validate_nulls(df, columns):
    for col in columns:
        for null_count in df.select(F.count(F.when(F.col(col).isNull(), col))).collect()[0]:
            if null_count > 0:
                raise Exception(f"Nulls found in column {col}")
    return df

def write_delta(df, output_path, partition_by=None):
    if partition_by:
        df.write.format("delta").mode("overwrite").partitionBy(partition_by).save(output_path)
    else:
        df.write.format("delta").mode("overwrite").save(output_path)

def merge_upsert(spark, delta_path, df, primary_keys):
    from delta.tables import DeltaTable
    delta_table = DeltaTable.forPath(spark, delta_path)
    cond = ' AND '.join([f"target.{pk} = source.{pk}" for pk in primary_keys])

    delta_table.alias("target").merge(
        source=df.alias("source"),
        condition=cond
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()


In [None]:
# --- Products ETL Job ---

# Read from S3
p_df = spark.read.csv(products_path, header=True)

# Validate schema
required_columns = ["product_id", "department_id", "department", "product_name"]
p_df = validate_schema(p_df, required_columns)

# Check for null primary keys
p_df = validate_nulls(p_df, ["product_id"])

# Deduplicate
p_df = deduplicate(p_df, ["product_id"])

# # Ordering by product_id before writing to the delta lake tables
# p_df = p_df.orderBy("product_id")

# Write to Delta
# write_delta(p_df, products_lakehouse_path, partition_by="department_id")

print("Products ETL job finished successfully.")

In [None]:
write_delta(p_df, products_lakehouse_path, partition_by="department_id")

In [None]:
p_df.show(50)

In [None]:
# show values with product_id of 2
p_df.filter(col("product_id") == 3).show()

In [None]:
# --- Orders ETL Job ---

# Read from S3
o_df = spark.read.csv(orders_path, header=True)

# Validate schema
required_columns = ["order_num", "order_id", "user_id", "order_timestamp", "total_amount", "date"]
o_df = validate_schema(o_df, required_columns)

# Check for null primary keys
o_df = validate_nulls(o_df, ["order_id"])

# Deduplicate
o_df = deduplicate(o_df, ["order_id"])

# Write to Delta
write_delta(o_df, orders_lakehouse_path, partition_by="date")

print("Products ETL job finished successfully.")

In [None]:
# --- Order_items ETL Job ---

# Read from S3
oi_df = spark.read.csv(order_items_path, header=True)

# Validate schema
required_columns = ["id", "order_id", "user_id", "days_since_prior_order", "product_id", "add_to_cart_order", "reordered", "order_timestamp", "date"]
oi_df = validate_schema(oi_df, required_columns)

# Check for null primary keys
oi_df = validate_nulls(oi_df, ["id"])

# Deduplicate
oi_df = deduplicate(oi_df, ["id"])

# Write to Delta
write_delta(oi_df, order_items_lakehouse_path, partition_by="date")

print("Products ETL job finished successfully.")

In [None]:
write_delta(oi_df, order_items_lakehouse_path, partition_by="date")