##Databricks ETL Pipeline using PySpark and Delta Lake Architecture

####Introduction

This project implements a complete ETL pipeline on Azure Databricks using PySpark and the Delta Lake architecture. The goal is to ingest raw datasets from Kaggle using API, store them in Azure Data Lake Storage Gen2 (Bronze layer), process and transform the data using PySpark, and organize the cleaned and enriched data into Silver, and Gold layers following the medallion architecture.

##### Key Steps in the Workflow:

**Data Ingestion** - Download datasets from Kaggle using the Kaggle API.

**Data Access Setup** – Configure OAuth-based connection between Databricks and ADLS.

**Load into Bronze Layer** – Store the raw data files in ADLS (Bronze folder).

**Data Transformation** – Use PySpark to clean, filter, aggregate, and enrich the data.

**Silver & Gold Layers** – Save processed data back to ADLS in Silver (cleaned) and Gold (business-ready) folders using Delta format.



####Data Ingestion

**Step 1**: Install the Kaggle CLI package and restart Python to apply changes.<br>
**Step 2**: Upload the kaggle.json file (downloadable from your Kaggle API account) to the DBFS Filestore and verify the upload by listing the directory.<br>
**Step 3**: Ensure the JSON file is readable, then set the Kaggle configuration directory using the KAGGLE_CONFIG_DIR environment variable.<br>
**Step 4**: Download the required dataset using the Kaggle CLI and unzip the contents for further processing.

In [0]:
%pip install kaggle

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
# Check if JSON file is present
dbutils.fs.ls('/FileStore/tables/kaggle/')

[FileInfo(path='dbfs:/FileStore/tables/kaggle/kaggle.json', name='kaggle.json', size=73, modificationTime=1755096345000),
 FileInfo(path='dbfs:/FileStore/tables/kaggle/movies-dataset-for-feature-extracion-prediction.zip', name='movies-dataset-for-feature-extracion-prediction.zip', size=1102493, modificationTime=1755096656000)]

In [0]:
# To check if the file is readable
!cat /dbfs/FileStore/tables/kaggle/kaggle.json

{"username":"dilipkumarramella","key":"cc2cc967a2e3be857473bdc279429594"}

In [0]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = '/dbfs/FileStore/tables/kaggle/'

In [0]:
!kaggle datasets download -d bharatnatrayn/movies-dataset-for-feature-extracion-prediction -p /dbfs/FileStore/tables/kaggle/

Dataset URL: https://www.kaggle.com/datasets/bharatnatrayn/movies-dataset-for-feature-extracion-prediction
License(s): CC0-1.0
Downloading movies-dataset-for-feature-extracion-prediction.zip to /dbfs/FileStore/tables/kaggle
  0%|                                               | 0.00/1.05M [00:00<?, ?B/s]
100%|██████████████████████████████████████| 1.05M/1.05M [00:00<00:00, 12.0MB/s]


In [0]:
import zipfile

zip_path = "/dbfs/FileStore/tables/kaggle/movies-dataset-for-feature-extracion-prediction.zip"
extract_path = "/dbfs/FileStore/tables/kaggle/movies_dataset"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)


####Data Access Setup  

**Step 1:** Register a Service Principal in Azure AD and generate a client secret. Note down the Application (client) ID and Tenant ID.  
**Step 2:** Grant the Service Principal access to your ADLS account by assigning the *Storage Blob Data Contributor* role.  
**Step 3:** Store the client secret securely in Azure Key Vault.  
**Step 4:** Create an Azure Key Vault-backed secret scope in Databricks and link it to your workspace.  
**Step 5:** Configure OAuth in Databricks using the secret scope and Spark configuration settings to enable secure ADLS access.  

Reference: [Connect to Azure Data Lake Storage – Azure Databricks](https://learn.microsoft.com/en-us/azure/databricks/connect/storage/tutorial-azure-storage)  


In [0]:
service_credential = dbutils.secrets.get(scope="myscope",key="ADLStoDatabricks")

spark.conf.set("fs.azure.account.auth.type.deltaetlstorage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.deltaetlstorage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.deltaetlstorage.dfs.core.windows.net", "96151c83-cb41-427c-9983-db06a024c0eb")
spark.conf.set("fs.azure.account.oauth2.client.secret.deltaetlstorage.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.deltaetlstorage.dfs.core.windows.net", "https://login.microsoftonline.com/0e46b9cd-f9de-44e6-8720-1e792845704b/oauth2/token/oauth2/token")

In [0]:
# To check the connectivity
container_name = "delta-lakehouse"
storage_account = "deltaetlstorage"

try:
    files = dbutils.fs.ls(f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/")
    print("Connected to ADLS")
    
except Exception as e:
    print("Failed to connect to ADLS:", e)


Connected to ADLS


###Mount ADLS to Databricks Using OAuth Configuration

#####Why Mount After Testing the Connection
I first connected to ADLS using OAuth to confirm that the service principal, permissions, and networking were all set up correctly. This step helped isolate any authentication or access issues before moving forward.

After confirming connectivity, I mounted the ADLS container to Databricks so I could work with it using short, clean paths like /mnt/ADLSmount instead of long URLs. Mounting also makes it easier to reuse the same storage location across multiple notebooks and jobs without re-entering the OAuth config every time.

Reference: [Mounting cloud object storage on Azure Databricks](https://learn.microsoft.com/en-us/azure/databricks/dbfs/mounts)

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "96151c83-cb41-427c-9983-db06a024c0eb",
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="myscope",key="ADLStoDatabricks"),
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/0e46b9cd-f9de-44e6-8720-1e792845704b/oauth2/token"
}


dbutils.fs.mount(
    source = "abfss://delta-lakehouse@deltaetlstorage.dfs.core.windows.net/",
    mount_point = "/mnt/ADLSmount",
    extra_configs = configs
)

True

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/Volumes,UnityCatalogVolumes,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/ADLSmount,abfss://delta-lakehouse@deltaetlstorage.dfs.core.windows.net/,
/Volume,DbfsReserved,
/volumes,DbfsReserved,
/,DatabricksRoot,
/volume,DbfsReserved,


True

In [0]:
dbutils.fs.ls("dbfs:/mnt/ADLSmount/bronze/movies_dataset/")


[FileInfo(path='dbfs:/mnt/ADLSmount/bronze/movies_dataset/movies.csv', name='movies.csv', size=3112190, modificationTime=1755097078000)]

In [0]:
# Uploaded the snip file to DBFS and re-checking its existence

display(dbutils.fs.ls("/FileStore/tables/"))


path,name,size,modificationTime
dbfs:/FileStore/tables/ADLSsnipBronze.png,ADLSsnipBronze.png,151719,1755496848000
dbfs:/FileStore/tables/kaggle/,kaggle/,0,1755096332000


#####Bronze dataset is present in ADLS
![Bronze Layer in ADLS](https://adb-582130891499017.17.azuredatabricks.net/files/tables/ADLSsnipBronze.png)
