IMPORT LIBRARY WITH PYSPARK AND OTHERS

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType
from pyspark.sql.functions import col, when, sum, avg, row_number 
from pyspark.sql.window import Window
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col
import re


In [0]:
#create session
spark = SparkSession.builder.appName("World Bank Data Analysis").getOrCreate()
     

In [0]:
spark

DATABRICK MEDALLION ARCHITECTURE

In [0]:
spark.sql("SELECT current_catalog()").show()


In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS bronze_layer")
spark.sql("CREATE DATABASE IF NOT EXISTS silver_layer")
spark.sql("CREATE DATABASE IF NOT EXISTS gold_layer")

In [0]:
spark.sql("SHOW SCHEMAS IN brian_work").show()

In [0]:
# 📁 Catalog: `brian_work`

# ├── Schema (Database): `bronze_layer`  
# │   ├── Table: `bronze_shopping_data`  

# ├── Schema (Database): `silver_layer`  
# │   ├── Table: `silver_shopping_data`  

# └── Schema (Database): `gold_layer`  
# │   ├── Table: `gold_shopping_data`  
 


BRONZE LAYER PROCESSING

In [0]:
#Check table in Bronze Layer
spark.sql("SHOW TABLES IN brian_work.bronze_layer").show()

In [0]:
#Clean column names before reading data
def clean_column_names(df):
    """
    Cleans column names by replacing invalid characters with underscores.
    Parameters:
        df (pyspark.sql.DataFrame): The input Spark DataFrame
    Returns:
        pyspark.sql.DataFrame: A new DataFrame with cleaned column names.
    """
    def sanitize_column_name(column_name):
        return re.sub(r"[ ,;{}()\n\t=]", "_", column_name)  # Replace invalid characters with "_"
    
    # Rename columns
    renamed_columns = [col(column).alias(sanitize_column_name(column)) for column in df.columns]
    return df.select(*renamed_columns)


I. LOAD DATA INTO BRONZE LAYER

1. LOAD SHOPPING DATA BY PYSPARK

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

dataset_schema = StructType([
    StructField("Customer_ID", IntegerType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Item_Purchased", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Purchase_Amount__USD_", IntegerType(), True),
    StructField("Location", StringType(), True),
    StructField("Size", StringType(), True),
    StructField("Color", StringType(), True),
    StructField("Season", StringType(), True),
    StructField("Review_Rating", FloatType(), True),
    StructField("Subscription_Status", StringType(), True),
    StructField("Payment_Method", StringType(), True),
    StructField("Shipping_Type", StringType(), True),
    StructField("Discount_Applied", StringType(), True),
    StructField("Promo_Code_Used", StringType(), True),
    StructField("Previous_Purchases", IntegerType(), True),
    StructField("Preferred_Payment_Method", StringType(), True),
    StructField("Frequency_of_Purchases", StringType(), True)
])


In [0]:
# Extract: Read raw CSV from S3 into Bronze Layer with spark
bronze_shopping_df = spark.read \
    .schema(dataset_schema) \
    .option("header", True) \
    .csv("s3://brian-work-dataset/shopping_trends.csv")

bronze_shopping_df.show(2) #to make use data loaded into df

In [0]:
# Clean column name 
bronze_shopping_df = clean_column_names(bronze_shopping_df)


2. LOAD SHOPPING DATA INTO DELTA TABLE

In [0]:
#Write data into delta tableb Delta for other department can use
bronze_shopping_df.write.format("delta") \
    .mode("append") \
    .option("overwriteSchema", "true") \
    .saveAsTable("brian_work.bronze_layer.bronze_shopping_data")

In [0]:
display(spark.sql("SELECT * FROM brian_work.bronze_layer.bronze_shopping_data LIMIT 10"))
#Use display for better formatting