In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os
import sys

project_path = (os.path.join(os.getcwd(), '..', '..'))
sys.path.append(project_path)

from utils.transformations import reusable

### Bronze

In [None]:
df = spark.read.format("parquet").load("abfss://bronze@dinethazurestorage.dfs.core.windows.net/DimUser")
display(df)

### Transformations

In [None]:
df_user = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option(
        "cloudFiles.schemaLocation",
        "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimUser/schema"
    )
    .option("schemaEvolutionMode", "rescue")
    .load("abfss://bronze@dinethazurestorage.dfs.core.windows.net/DimUser")
)

df_user_upper = df_user.withColumn(
    "user_name",
    upper(col("user_name"))
)

df_user_obj = reusable()
df_user_clean = df_user_obj.dropColumns(df_user_upper, ['_rescued_data'])
df_user_clean = df_user_clean.dropDuplicates(['user_id'])

df_user.writeStream.format("delta").outputMode("append").option("checkpointLocation","abfss://silver@dinethazurestorage.dfs.core.windows.net/DimUser/checkpoint").trigger(once=True).option("path", "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimUser/data").toTable("spotify_catalog.silver.DimUser")


In [None]:
df_art = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option(
        "cloudFiles.schemaLocation",
        "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimArt/schema"
    )
    .option("schemaEvolutionMode", "addNewColumns")
    .load("abfss://bronze@dinethazurestorage.dfs.core.windows.net/DimArtist")
)

df_art_obj = reusable()

df_art = df_art_obj.dropColumns(df_art, ['_rescued_data'])
df_art = df_art.dropDuplicates(['artist_id'])

df_art.writeStream.format("delta").outputMode("append").option("checkpointLocation","abfss://silver@dinethazurestorage.dfs.core.windows.net/DimArt/checkpoint").trigger(once=True).option("path", "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimArt/data").toTable("spotify_catalog.silver.DimArtist")

In [None]:
df_track = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option(
        "cloudFiles.schemaLocation",
        "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimTrack/schema"
    )
    .option("schemaEvolutionMode", "addNewColumns")
    .load("abfss://bronze@dinethazurestorage.dfs.core.windows.net/DimTrack")
)

df_track = df_track.withColumn("durationFlag"\
  ,when(col('duration_sec')<150, "low")\
  .when(col('duration_sec')<300, "medium")\
  .otherwise("high"))

df_track = df_track.withColumn("track_name", regexp_replace(col("track_name"), '-', ' '))

df_track = reusable().dropColumns(df_track, ['_rescued_data'])

df_track.writeStream.format("delta").outputMode("append").option("checkpointLocation","abfss://silver@dinethazurestorage.dfs.core.windows.net/DimTrack/checkpoint").trigger(once=True).option("path", "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimTrack/data").toTable("spotify_catalog.silver.DimTrack")

In [None]:
df_date = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option(
        "cloudFiles.schemaLocation",
        "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimDate/schema"
    )
    .option("schemaEvolutionMode", "addNewColumns")
    .load("abfss://bronze@dinethazurestorage.dfs.core.windows.net/DimDate")
)

df_date = reusable().dropColumns(df_date, ['_rescued_data'])

df_date.writeStream.format("delta").outputMode("append").option("checkpointLocation","abfss://silver@dinethazurestorage.dfs.core.windows.net/DimDate/checkpoint").trigger(once=True).option("path", "abfss://silver@dinethazurestorage.dfs.core.windows.net/DimDate/data").toTable("spotify_catalog.silver.DimDate")

In [None]:
df_fact = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option(
        "cloudFiles.schemaLocation",
        "abfss://silver@dinethazurestorage.dfs.core.windows.net/FactStream/schema"
    )
    .option("schemaEvolutionMode", "addNewColumns")
    .load("abfss://bronze@dinethazurestorage.dfs.core.windows.net/FactStream")
)

df_fact = reusable().dropColumns(df_fact, ['_rescued_data'])

df_fact.writeStream.format("delta").outputMode("append").option("checkpointLocation","abfss://silver@dinethazurestorage.dfs.core.windows.net/FactStream/checkpoint").trigger(once=True).option("path", "abfss://silver@dinethazurestorage.dfs.core.windows.net/FactStream/data").toTable("spotify_catalog.silver.FactStream")