In [0]:
# Read three CSV files directly from S3
csv_paths = [
    "s3://etl-bucket-ecom/customers/customerscsv.csv",
    "s3://etl-bucket-ecom/orders/orderscsv.csv",
    "s3://etl-bucket-ecom/products/productscsv.csv"
]

dataframes = []
for path in csv_paths:
    df = spark.read.csv(path, header=True)
    dataframes.append(df)

# Display the first few rows of each DataFrame
for i, df in enumerate(dataframes):
    print(f"CSV File {i+1} Preview:")
    display(df)

In [0]:
orders_path = "s3://etl-bucket-ecom/orders/orderscsv.csv"
customers_path = "s3://etl-bucket-ecom/customers/customerscsv.csv"
products_path = "s3://etl-bucket-ecom/products/productscsv.csv"

orders_df = spark.read.option("header", "true").csv(orders_path)
customers_df = spark.read.option("header", "true").csv(customers_path)
products_df = spark.read.option("header", "true").csv(products_path)


In [0]:
orders_df.write.format("delta").mode("append").save("s3://etl-bucket-ecom/bronze/orders")
customers_df.write.format("delta").mode("append").save("s3://etl-bucket-ecom/bronze/customers")
products_df.write.format("delta").mode("append").save("s3://etl-bucket-ecom/bronze/products")

In [0]:
from pyspark.sql.functions import col

orders_clean = (
    spark.read.format("delta").load("s3://etl-bucket-ecom/bronze/orders")
    .filter(col("order_id").isNotNull())
)

customers_clean = spark.read.format("delta").load("s3://etl-bucket-ecom/bronze/customers")
products_clean = spark.read.format("delta").load("s3://etl-bucket-ecom/bronze/products")

In [0]:
silver_df = (
    orders_clean
    .join(customers_clean, "customer_id", "left")
    .join(products_clean, "product_id", "left")
)


In [0]:
from pyspark.sql.functions import col

orders_clean = (
    spark.read.format("delta").load("s3://etl-bucket-ecom/bronze/orders")
    .filter(col("order_id").isNotNull())
)

customers_clean = spark.read.format("delta").load("s3://etl-bucket-ecom/bronze/customers")
products_clean = spark.read.format("delta").load("s3://etl-bucket-ecom/bronze/products")

In [0]:
gold_df = (
    silver_df
    .groupBy("product_name")
    .count()
    .withColumnRenamed("count", "total_orders")
)

gold_df.write.format("delta").mode("overwrite").save("s3://etl-bucket-ecom/gold/product_metrics")


In [0]:
%sql
CREATE TABLE gold_product_metrics
USING DELTA
LOCATION 's3://etl-bucket-ecom/gold/product_metrics';
