In [0]:
# Define your variables here (or better yet, pull them from a Secret Scope/Key Vault)
STORAGE_ACCOUNT = "<your-storage-account-name>"
CLIENT_ID       = "<your-client-id>"
CLIENT_SECRET   = "<your-client-secret>"
TENANT_ID       = "<your-tenant-id>"

# Authenticate using Service Principal
spark.conf.set(
    f"fs.azure.account.auth.type.{STORAGE_ACCOUNT}.dfs.core.windows.net", 
    "OAuth"
)
spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{STORAGE_ACCOUNT}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.id.{STORAGE_ACCOUNT}.dfs.core.windows.net",
    CLIENT_ID
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.secret.{STORAGE_ACCOUNT}.dfs.core.windows.net",
    CLIENT_SECRET
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.endpoint.{STORAGE_ACCOUNT}.dfs.core.windows.net",
    f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/token"
)

# Base path for data access
BASE_PATH = f"abfss://landing-zone-2@{STORAGE_ACCOUNT}.dfs.core.windows.net"

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
spark

In [0]:
## Create `to_processed` and `processed` folder manually in landing-zone-2/user-raw-2

In [0]:

# Read parquet file from /mnt/ecomdata1/user-raw-2 folder
userDF = spark.read.format("parquet")\
    .option("header",'true')\
    .option("inferSchema",'true')\
    .load(BASE_PATH+"/users-raw-2/"+"to_processed")

In [0]:
# userDF.show(5)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+-

In [0]:

# Check current catalog and schema
print(f"Current catalog: {spark.catalog.currentCatalog()}")


Current catalog: ecom_db_shubham


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS ecom_db_shubham.bronze;

In [0]:
userDF.write.mode("append").saveAsTable("bronze.users")

In [0]:
source_dir = f"{BASE_PATH}/users-raw-2/to_processed"
target_dir = f"{BASE_PATH}/users-raw-2/processed"

files = dbutils.fs.ls(source_dir)

for f in files:
    source_path = f.path
    target_path = f"{target_dir}/{f.name}"

    dbutils.fs.mv(source_path, target_path)
    print(f"Moved: {f.name}")


In [0]:
buyerDF = spark.read.format("parquet")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load(BASE_PATH+"/buyers-raw-2")


In [0]:
buyerDF.write.mode("overwrite").saveAsTable("bronze.buyers")

In [0]:
sellersDF = spark.read.format("parquet")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load(BASE_PATH+"/sellers-raw-2")

sellersDF.write.mode("overwrite").saveAsTable("bronze.sellers")

In [0]:
countriesDF = spark.read.format("parquet")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load(BASE_PATH+"/countries-raw-2")


countriesDF.write.mode("overwrite").saveAsTable("bronze.countries")

In [0]:
%sql
SELECT * FROM bronze.countries;


country,sellers,topsellers,topsellerratio,femalesellersratio,topfemalesellersratio,femalesellers,malesellers,topfemalesellers,topmalesellers,countrysoldratio,bestsoldratio,toptotalproductssold,totalproductssold,toptotalproductslisted,totalproductslisted,topmeanproductssold,topmeanproductslisted,meanproductssold,meanproductslisted,meanofflinedays,topmeanofflinedays,meanfollowers,meanfollowing,topmeanfollowers,topmeanfollowing
Taiwan,1,1,100.0,100.0,100.0,1,0,1,0,1.02,1.02,57,57,56,56,57.0,56.0,57.0,56.0,11.0,11.0,83.0,8.0,83.0,8.0
Slovaquie,2,1,50.0,0.0,0.0,0,2,0,1,2.0,1.93,27,28,14,14,27.0,14.0,14.0,7.0,17.0,15.0,10.5,8.5,15.0,8.0
Lettonie,4,2,50.0,100.0,100.0,4,0,2,0,2.31,2.25,81,83,36,36,40.5,18.0,20.75,9.0,120.3,11.5,21.0,52.3,38.0,98.5
Bulgarie,9,4,44.4,66.7,100.0,6,3,4,0,2.07,2.1,145,170,69,82,36.25,17.25,18.88888888888889,9.111111111111112,98.3,19.0,28.6,31.6,46.3,19.0
Chypre,4,1,25.0,100.0,100.0,4,0,1,0,0.69,0.62,41,56,66,81,41.0,66.0,14.0,20.25,17.3,11.0,21.3,10.3,39.0,17.0
Monaco,5,1,20.0,100.0,100.0,5,0,1,0,7.31,8.95,170,190,19,26,170.0,19.0,38.0,5.2,51.6,12.0,39.6,8.0,167.0,8.0
Roumanie,13,2,15.4,76.9,50.0,10,3,1,1,0.88,1.26,49,68,39,77,24.5,19.5,5.230769230769231,5.923076923076923,121.6,11.0,10.9,11.5,30.0,32.0
Luxembourg,7,1,14.3,85.7,100.0,6,1,1,0,5.38,,30,43,0,8,30.0,0.0,6.142857142857143,1.1428571428571428,73.6,11.0,15.9,8.4,52.0,3.0
Espagne,119,13,10.9,81.5,76.9,97,22,10,3,1.67,2.02,607,990,301,594,46.692307692307686,23.153846153846157,8.319327731092438,4.991596638655462,202.4,30.5,16.1,14.6,53.2,14.5
Italie,347,35,10.1,71.5,65.7,248,99,23,12,1.27,1.29,1389,2820,1077,2218,39.68571428571429,30.77142857142857,8.126801152737752,6.39193083573487,141.8,26.5,16.1,54.6,63.4,429.3
