# Step-by-Step Guide: Testing the ESMA Auto Loans Pipeline in a Notebook

This guide allows you to test the **Spark modules** of the ESMA Auto Loans pipeline directly from a notebook (Colab, Dataproc, Jupyter).  
The goal is to simulate a pipeline run on real data by reading and writing from GCS, without going through Airflow.

!If you want to use with local files go to [Simulation](#results)!

---

## **1. Prerequisites**
 
- Python ≥ 3.8
- Libraries: `pyspark`, `google-cloud-storage`, `pandas`, `cerberus`, `delta-spark`
- GCP service account with permissions on GCS buckets
- Input files uploaded to GCS (e.g., CSV files in `dl_data/downloaded-data/AUT/<dl_code>/`)

---

## **2. Setup Environment**

In [None]:
# Install dependencies (Colab/Dataproc)
!pip install pyspark==3.3.1 delta-spark==2.1.0 google-cloud-storage cerberus pandas

import os
from google.colab import auth
auth.authenticate_user()

**Or:**  
Make sure your GOOGLE_APPLICATION_CREDENTIALS environment variable points to the service account json!

In [None]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/your/service-account.json"

## **3. Setup SparkSession with Delta and GCS**

In [None]:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

builder = SparkSession.builder.appName("esma_test") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.1.0") \
    .config("spark.delta.logStore.gs.impl", "io.delta.storage.GCSLogStore") \
    .config("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED") \
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## **4. Test Parameters Setup**

In [None]:
# Change with real values
PROJECT_ID = "your_project_id"
RAW_BUCKET = "your_raw_bucket"
DATA_BUCKET = "your_data_bucket"
DL_CODE = "test_deal_code"  # example: 'AUT1234'
INGESTION_DATE = "2025-05-31"
SOURCE_PREFIX = f"dl_data/downloaded-data/AUT/{DL_CODE}"
TARGET_BRONZE_PREFIX = "AUT/bronze/assets"
TARGET_SILVER_PREFIX = "AUT/silver/assets"
FILE_KEY = "Loan_Data"

## **5. Profilation Test Bronze**

In [None]:
from src.aut_etl_pipeline.profile_bronze_data import profile_bronze_data

# Launch the profiling: generates the clean_dump/ and dirty_dump/ on GCS
profile_bronze_data(
    raw_bucketname=RAW_BUCKET,
    data_bucketname=DATA_BUCKET,
    source_prefix=SOURCE_PREFIX,
    file_key=FILE_KEY,
    data_type="assets",
    ingestion_date=INGESTION_DATE
)

**Check on GCS:**  
You should find the CSV files in `gs://<DATA_BUCKET>/clean_dump/assets/` and `dirty_dump/assets/`.

## **6. Test Bronze Table**

In [None]:
from src.aut_etl_pipeline.generate_bronze_tables import generate_bronze_tables

generate_bronze_tables(
    spark=spark,
    data_bucketname=DATA_BUCKET,
    source_prefix=SOURCE_PREFIX,
    target_prefix=TARGET_BRONZE_PREFIX,
    data_type="assets",
    ingestion_date=INGESTION_DATE
)

**Check on GCS:**  
You should find the Delta tables in `gs://<DATA_BUCKET>/AUT/bronze/assets/`.

## **7. Test Silver Table**

In [None]:
from src.aut_etl_pipeline.generate_asset_silver import generate_asset_silver

generate_asset_silver(
    spark=spark,
    bucket_name=DATA_BUCKET,
    source_prefix=TARGET_BRONZE_PREFIX,
    target_prefix=TARGET_SILVER_PREFIX,
    dl_code=DL_CODE,
    ingestion_date=INGESTION_DATE
)

**Check on GCS:**  
You should find the Parquet files in `gs://<DATA_BUCKET>/AUT/silver/assets/lease_info_table/`.

## **8. Read and Verify Output**

In [None]:
# Read the silver table to verify the result
df = spark.read.parquet(f"gs://{DATA_BUCKET}/AUT/silver/assets/lease_info_table/")
df.show(5)
df.printSchema()

## **9. (Optional) Test Deal Details**
 
Apply the same schema, changing `data_type`, prefixes and functions (use `generate_deal_details_bronze` and `generate_deal_details_silver`).

## **10. Debug**
- If you receive errors, print the logs, check GCS permissions and the presence of input files.
- If you have dependency errors, make sure all versions are compatible.
- You can modify the parameters or work on a single DL_CODE for faster debugging.

## **11. Cleanup**

If you want to remove test files from the buckets:

In [None]:
from google.cloud import storage
client = storage.Client(project=PROJECT_ID)
bucket = client.get_bucket(DATA_BUCKET)
for blob in bucket.list_blobs(prefix="AUT/"):
    print("Deleting", blob.name)
    blob.delete()

## **Simulation**

In [None]:
import os
from pyspark.sql import SparkSession
import sys
sys.path.append("src")

# 1. Setup Spark local
spark = SparkSession.builder.appName("esma_local_test").getOrCreate()

# 2. Test parameters (modify as needed)
LOCAL_RAW_PATH = "ETL-Pipelines/ESMA-Loan-level-data-templates/Auto_Loans_ESMA/Auto_Loans_Synthetic_Dataset.csv"  # Where you have your CSV/Excel files
LOCAL_OUTPUT_PATH = "./output"
DL_CODE = "test_deal_code"
INGESTION_DATE = "2025-06-01"

# 3. Import the real functions from the project
from src.aut_etl_pipeline.profile_bronze_data import profile_bronze_data
from src.aut_etl_pipeline.generate_bronze_tables import generate_bronze_tables
from src.aut_etl_pipeline.generate_asset_silver import generate_asset_silver

# 4. Adapt functions to accept local paths
# Example: modify functions to accept a "local_mode" flag
# or, if they use GCS directly, temporarily copy the code 
# in this script and replace GCS with local I/O.

# 5. Execute the real pipeline, working on local files

# Profilazione bronze
profile_bronze_data(
    raw_bucketname="",
    data_bucketname="",
    source_prefix=LOCAL_RAW_PATH,
    file_key="Loan_Data",
    data_type="assets",
    ingestion_date="2025-06-01",
    local_mode=True,
    local_output_path="./output"
)
# Generate the bronze table
generate_bronze_tables(
    spark=spark,
    data_bucketname="",
    source_prefix="./output",
    target_prefix="./output/bronze_table",
    data_type="assets",
    ingestion_date="2025-06-01",
    local_mode=True
)
# Generate the silver table
generate_asset_silver(
    spark=spark,
    bucket_name="",
    source_prefix="./output/bronze_table",
    target_prefix="./output/silver_table",
    dl_code="DL_CODE_TEST",
    ingestion_date="2025-06-01",
    local_mode=True
)

print("Pipline completed! Check the ./output folder for the results.")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/03 10:33:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


2025-06-03 10:33:02,214 - src.aut_etl_pipeline.profile_bronze_data - INFO - Start ASSETS BRONZE PROFILING job.
2025-06-03 10:33:02,224 - src.aut_etl_pipeline.profile_bronze_data - INFO - Running in LOCAL mode.
2025-06-03 10:33:02,224 - src.aut_etl_pipeline.profile_bronze_data - INFO - Processing local file: /Users/hp2/Deeploans/deeploans/ETL-Pipelines/ESMA-Loan-level-data-templates/Auto_Loans_ESMA/Auto_Loans_Synthetic_Dataset.csv
2025-06-03 10:33:02,452 - src.aut_etl_pipeline.profile_bronze_data - INFO - Profilazione completata (LOCAL): 0 clean, 84 dirty
2025-06-03 10:33:02,452 - src.aut_etl_pipeline.profile_bronze_data - INFO - End ASSETS BRONZE PROFILING job.
2025-06-03 10:33:02,453 - src.aut_etl_pipeline.generate_bronze_tables - INFO - Start ASSETS BRONZE job.
2025-06-03 10:33:02,453 - src.aut_etl_pipeline.generate_bronze_tables - INFO - Running in LOCAL mode.
2025-06-03 10:33:04,541 - src.aut_etl_pipeline.generate_bronze_tables - INFO - Bronze table (LOCAL) scritta in ./output/bron

### Results

In [12]:
# Show the result 
import pandas as pd


df_dirty = pd.read_csv("ETL-Pipelines/ESMA-Loan-level-data-templates/Auto_Loans_ESMA/output/dirty_dump/assets/dirty_Loan_Data_2025-06-01.csv")

print("First rows of dirty data:")

df_dirty.head()

Prime righe dati dirty:


Unnamed: 0,FIELD CODE,FIELD NAME,CONTENT TO REPORT,LOAN 1,LOAN 2,LOAN 3,LOAN 4,LOAN 5,__errors__
0,AUTL1,Unique Identifier,The unique identifier assigned by the reportin...,AUT-2024-001-A,AUT-2024-002-B,AUT-2024-003-C,AUT-2024-004-D,AUT-2024-005-E,"{'CONTENT TO REPORT': ['unknown field'], 'FIEL..."
1,AUTL2,Original Underlying Exposure Identifier,Unique underlying exposure identifier,ORIG-EXP-001,ORIG-EXP-002,ORIG-EXP-003,ORIG-EXP-004,ORIG-EXP-005,"{'CONTENT TO REPORT': ['unknown field'], 'FIEL..."
2,AUTL3,New Underlying Exposure Identifier,New identifier if changed from AUTL2,ORIG-EXP-001,ORIG-EXP-002,ORIG-EXP-003,ORIG-EXP-004,ORIG-EXP-005,"{'CONTENT TO REPORT': ['unknown field'], 'FIEL..."
3,AUTL4,Original Obligor Identifier,Original unique obligor identifier,OBLG-2024-001,OBLG-2024-002,OBLG-2024-003,OBLG-2024-004,OBLG-2024-005,"{'CONTENT TO REPORT': ['unknown field'], 'FIEL..."
4,AUTL5,New Obligor Identifier,New identifier if changed from AUTL4,OBLG-2024-001,OBLG-2024-002,OBLG-2024-003,OBLG-2024-004,OBLG-2024-005,"{'CONTENT TO REPORT': ['unknown field'], 'FIEL..."


In [None]:
import pandas as pd

bronze_path = "ETL-Pipelines/ESMA-Loan-level-data-templates/Auto_Loans_ESMA/output/bronze_table" # Insert the path to the bronze table  
silver_path = "ETL-Pipelines/ESMA-Loan-level-data-templates/Auto_Loans_ESMA/output/silver_table" # Insert the path to the silver table
df_bronze = pd.read_parquet(bronze_path)
df_silver = pd.read_parquet(silver_path)

print("Bronze table:")
print(df_bronze.head())

print("Silver table:")
print(df_silver.head())

FileNotFoundError: [Errno 2] No such file or directory: './ETL-Pipelines/ESMA-Loan-level-data-templates/Auto_Loans_ESMA/output/bronze_table'