# Mailchimp Transformed Data ETL

This notebook cell is designed to process CSV files stored in the Silver container of Azure Data Lake Storage (ADLS). It reads raw Mailchimp cleaned data (from the Silver layer), applies standard cleaning and schema enforcement, and writes the transformed data to Delta tables.

Below is a detailed breakdown of each section in the notebook:

In [0]:
from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core.exceptions import ResourceExistsError

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, udf, expr, lit
from pyspark.sql.types import *

import json



spark = SparkSession.builder.getOrCreate()


STORAGE_ACCOUNT_NAME = "mailchimpspnetwork"  # Make sure this is all lowercase
CONTAINER_NAME = "silver"
INPUT_PREFIX = "mailchimp_clean"
OUTPUT_PREFIX = "mailchimp_transformed"

# ADLS base path for Spark
base_path = f"abfss://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net"


try:
    credential = DefaultAzureCredential()
    service_client = DataLakeServiceClient(
        account_url=f"https://{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net",
        credential=credential
    )
    fs_client = service_client.get_file_system_client(file_system=CONTAINER_NAME)
    print(f"Connected to ADLS Gen2 container: '{CONTAINER_NAME}'")
except Exception as e:
    print(f"Failed to connect to ADLS: {e}")


### Schema Definition

- **Standard Schema:**  
  Defines the expected schema (`standard_schema`) as a `StructType` for the cleaned/transformed data.  
  This schema includes fields such as contact details, email attributes, and flattened address fields (extracted from nested fields in the raw data).

---

In [0]:
# 2. Schema Definition (including flattened address fields --happens during cleaning since they are nested)

standard_schema = StructType([
    StructField("list_name", StringType(), True),
    StructField("consents_to_one_to_one_messaging", BooleanType(), True),
    StructField("contact_id", StringType(), True),
    StructField("email_address", StringType(), True),
    StructField("email_client", StringType(), True),
    StructField("email_type", StringType(), True),
    StructField("full_name", StringType(), True),
    StructField("id", StringType(), True),
    StructField("ip_opt", StringType(), True),
    StructField("ip_signup", StringType(), True),
    StructField("language", StringType(), True),
    StructField("last_changed", StringType(), True),
    StructField("list_id", StringType(), True),
    StructField("location_country_code", StringType(), True),
    StructField("location_dstoff", IntegerType(), True),
    StructField("location_gmtoff", IntegerType(), True),
    StructField("location_latitude", DoubleType(), True),
    StructField("location_longitude", DoubleType(), True),
    StructField("location_region", StringType(), True),
    StructField("location_timezone", StringType(), True),
    StructField("member_rating", IntegerType(), True),
    StructField("merge_FNAME", StringType(), True),
    StructField("merge_LNAME", StringType(), True),
    StructField("merge_MMERGE5", StringType(), True),
    StructField("merge_PHONE", StringType(), True),
    StructField("address_addr1", StringType(), True),
    StructField("address_addr2", StringType(), True),
    StructField("address_city", StringType(), True),
    StructField("address_state", StringType(), True),
    StructField("address_zip", StringType(), True),
    StructField("address_country", StringType(), True),
    StructField("sms_phone_number", StringType(), True),
    StructField("sms_subscription_last_updated", StringType(), True),
    StructField("sms_subscription_status", StringType(), True),
    StructField("source", StringType(), True),
    StructField("stats_avg_click_rate", DoubleType(), True),
    StructField("stats_avg_open_rate", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("tags_count", IntegerType(), True),
    StructField("timestamp_opt", StringType(), True),
    StructField("timestamp_signup", StringType(), True),
    StructField("unique_email_id", StringType(), True),
    StructField("vip", BooleanType(), True),
    StructField("web_id", IntegerType(), True)
])

---

### Standard Cleaning Function

- **Function `standard_cleaning(df)`:**  
  - Checks for a nested field (`merge_ADDRESS`) and, if present as a struct, extracts subfields (address components) into individual columns.  
  - Drops the original nested field.
  - Drops additional nested fields (_links, interests, tags, marketing_permissions, last_note) that are not needed in the final output.
  - Ensures the DataFrame includes all fields from the defined standard schema by adding any missing fields (casting them to the correct data type).
  - Finally, returns a DataFrame with columns in the order specified by the schema.

---

In [0]:
def standard_cleaning(df):
    if "merge_ADDRESS" in df.columns:
        field_type = [f.dataType for f in df.schema.fields if f.name == "merge_ADDRESS"][0]
        if isinstance(field_type, StructType):
            df = df \
                .withColumn("address_addr1", col("merge_ADDRESS").getItem("addr1")) \
                .withColumn("address_addr2", col("merge_ADDRESS").getItem("addr2")) \
                .withColumn("address_city", col("merge_ADDRESS").getItem("city")) \
                .withColumn("address_state", col("merge_ADDRESS").getItem("state")) \
                .withColumn("address_zip", col("merge_ADDRESS").getItem("zip")) \
                .withColumn("address_country", col("merge_ADDRESS").getItem("country")) \
                .drop("merge_ADDRESS")
        else:
            # If it's not a struct, just drop it
            df = df.drop("merge_ADDRESS")

    # Drop nested fields not in schema
    to_drop = ["_links", "interests", "tags", "marketing_permissions", "last_note"]
    df = df.drop(*[col_name for col_name in to_drop if col_name in df.columns])

    # Enforce schema by adding missing fields
    for field in standard_schema.fields:
        if field.name not in df.columns:
            df = df.withColumn(field.name, lit(None).cast(field.dataType))
        else:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))

    return df.select([f.name for f in standard_schema.fields])


In [0]:
def list_csvs(path_prefix):
    try:
        paths = fs_client.get_paths(path_prefix)
        return [p.name for p in paths if not p.is_directory and p.name.endswith(".csv")]
    except ResourceExistsError as e:
        print(f"Error: Path not found {e}")
        return []
csv_file_paths = list_csvs(INPUT_PREFIX)



### Processing Each CSV File

For each CSV file found:
- **Path Setup:**  
  - Extracts the file name and constructs full input and output paths using the ADLS base path.
  
- **Data Reading:**  
  - Reads the CSV file into a Spark DataFrame (`df_raw`), and logs its schema and row count.
  
- **Data Cleaning:**  
  - Applies the `standard_cleaning` function to transform and enforce the schema on the raw data.
  - Logs the number of rows after cleaning and identifies any missing columns compared to the expected schema.
  
- **Data Writing:**  
  - Writes the cleaned DataFrame to a Delta table at the specified output path (`OUTPUT_PREFIX`) with `overwrite` mode.
  - Prints success messages upon completion or error messages if any exceptions occur during processing.

---

In [0]:
from pyspark.sql import SparkSession

# Assuming spark is already instantiated
spark = SparkSession.builder.getOrCreate()

for path in csv_file_paths:
    file_name = path.split("/")[-1]
    input_path = f"{base_path}/{path}"
    output_path = f"{base_path}/{OUTPUT_PREFIX}/{file_name.replace('.csv', '')}"

    print(f"\nProcessing: **{file_name}**")
    
    try:
        # Read CSV with the predefined schema and cache the DataFrame to avoid repeated scans.
        df_raw = spark.read \
            .option("header", True) \
            .option("multiLine", True) \
            .schema(standard_schema) \
            .csv(input_path) \
            .cache()
        
        raw_count = df_raw.count()
        print(f"Raw schema: {[f.name for f in df_raw.schema.fields]}")
        print(f"Raw row count: {raw_count}")

        # Cleaning process (assumed to be optimized)
        df_clean = standard_cleaning(df_raw).cache()
        cleaned_count = df_clean.count()

        # Validate missing expected columns
        raw_cols = set(df_raw.columns)
        expected_cols = set([f.name for f in standard_schema.fields])
        missing_cols = expected_cols - raw_cols

        if missing_cols:
            print(f"Missing expected columns: {sorted(missing_cols)}")

        print(f"Cleaned row count: {cleaned_count}")
        
        # Optionally adjust partitions to control output file size/number.
        df_to_write = df_clean.coalesce(1)

        print(f"Saving to Delta: {output_path}")
        df_to_write.write.format("delta").mode("overwrite").save(output_path)
        
        print(f"Success: {file_name} → {OUTPUT_PREFIX}")

        # Free up the cached DataFrames
        df_raw.unpersist()
        df_clean.unpersist()

    except Exception as e:
        print(f"Error processing {file_name}: {e}")
