## AUTOLOADER


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os
import sys

project_path = os.path.join(os.getcwd(),'..','..')
sys.path.append(project_path)

from utils.transformations import reusable

### DimUser

In [0]:
df_user = spark.readStream.format("cloudFiles")\
  .option("cloudFiles.format", "parquet")\
  .option("cloudFiles.schemalocation","abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimUser/checkpoint")\
  .option("schemaEvolutionMode","addNewColumns")\
  .load("abfss://bronze@storageazureprojectvarad.dfs.core.windows.net/DimUser")

In [0]:
# display(df_user, checkpointLocation = "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimUser/temp_display")

In [0]:
# This doesn't stream, just shows you the structure
df_user.printSchema()

In [0]:
df_user_obj = reusable()

df_user = df_user_obj.dropColumns(df_user,['_rescued_data']) # this column is automatically created
df_user = df_user.dropDuplicates(['user_id'])
df_user.printSchema()


In [0]:
# Writing the data to silver
df_user.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimUser/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimUser/data")\
    .toTable("spotify_cata.silver.DimUser")

### DimArtist

In [0]:
df_art = spark.readStream.format("cloudFiles")\
  .option("cloudFiles.format", "parquet")\
  .option("cloudFiles.schemalocation","abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimArtist/checkpoint")\
  .option("schemaEvolutionMode","addNewColumns")\
  .load("abfss://bronze@storageazureprojectvarad.dfs.core.windows.net/DimArtist")

In [0]:
df_art.printSchema()

In [0]:
df_art_obj = reusable()

df_art = df_art_obj.dropColumns(df_art,['_rescued_data']) # this column is automatically created
df_art = df_art.dropDuplicates(['artist_id'])
df_art.printSchema()

In [0]:
# Writing the data to silver
df_art.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimArtist/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimArtist/data")\
    .toTable("spotify_cata.silver.DimArtist")

### DimTrack

In [0]:
df_track = spark.readStream.format("cloudFiles")\
  .option("cloudFiles.format", "parquet")\
  .option("cloudFiles.schemalocation","abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimTrack/checkpoint")\
  .option("schemaEvolutionMode","addNewColumns")\
  .load("abfss://bronze@storageazureprojectvarad.dfs.core.windows.net/DimTrack")

In [0]:
df_track.printSchema()

In [0]:
df_track = df_track.withColumn("durationFlag", when(col('duration_sec')<150,"low")\
                                                    .when(col('duration_sec')<300,"medium")\
                                                    .otherwise("high"))
df_track.printSchema()

In [0]:
df_track = df_track.withColumn("track_name", regexp_replace(col("track_name"), '-', ' '))

In [0]:
df_track = reusable().dropColumns(df_track,['_rescued_data'])

In [0]:
# Writing the data to silver
df_track.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimTrack/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimTrack/data")\
    .toTable("spotify_cata.silver.DimTrack")

### DimDate

In [0]:
df_date = spark.readStream.format("cloudFiles")\
  .option("cloudFiles.format", "parquet")\
  .option("cloudFiles.schemalocation","abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimDate/checkpoint")\
  .option("schemaEvolutionMode","addNewColumns")\
  .load("abfss://bronze@storageazureprojectvarad.dfs.core.windows.net/DimDate")

In [0]:
df_date = reusable().dropColumns(df_track,['_rescued_data'])

In [0]:
# Writing the data to silver
df_date.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimDate/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/DimDate/data")\
    .toTable("spotify_cata.silver.DimDate")

### FactStream

In [0]:
df_fact = spark.readStream.format("cloudFiles")\
  .option("cloudFiles.format", "parquet")\
  .option("cloudFiles.schemalocation","abfss://silver@storageazureprojectvarad.dfs.core.windows.net/FactStream/checkpoint")\
  .option("schemaEvolutionMode","addNewColumns")\
  .load("abfss://bronze@storageazureprojectvarad.dfs.core.windows.net/FactStream")

In [0]:
    df_fact.printSchema()

In [0]:
df_fact = reusable().dropColumns(df_fact, ['_rescued_data'])

In [0]:
# Writing the data to silver
df_fact.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/FactStream/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storageazureprojectvarad.dfs.core.windows.net/FactStream/data")\
    .toTable("spotify_cata.silver.FactStream")