## DimUser

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *


import os
import sys

project_path = os.path.join(os.getcwd(), "..","..")
sys.path.append(project_path)

from utils.transformations import *

In [0]:
df_user = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimUser/checkpointlocation") \
    .option("schemaEvolutionMode", "addNewColumns") \
    .load("abfss://bronze@spotifydatalakeproject.dfs.core.windows.net/DimUser")


In [0]:
df_user = df_user.withColumn("user_name",upper(col("user_name")))

In [0]:
df_user = df_user.withColumn("first_name",split(col("user_name")," ")[0]).withColumn("last_name",split(col("user_name")," ")[1])

In [0]:
cols = df_user.columns

cols.remove("first_name")
cols.remove("last_name")

cols.insert(1, "first_name")
cols.insert(2, "last_name")

df_user = df_user.select(cols)


In [0]:
df_user_obj = reusable()

df_user = df_user.drop('_rescued_data')

In [0]:
df_user.writeStream.format("delta") \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimUser/checkpointlocation') \
    .trigger(once=True) \
    .option("path",'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimUser/data') \
    .toTable("spotify_cata.silver.dimuser")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f08ce995dc0>

## DimArtist

In [0]:
df_artist = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimArtist/checkpointlocation") \
    .option("schemaEvolutionMode", "addNewColumns") \
    .load("abfss://bronze@spotifydatalakeproject.dfs.core.windows.net/DimArtist")


In [0]:
df_artist = df_artist.withColumn("artist_name",upper(col("artist_name")))

In [0]:
from pyspark.sql.functions import split, col

df_artist = df_artist \
    .withColumn("first_name", split(col("artist_name"), " ")[0]) \
    .withColumn("last_name", split(col("artist_name"), " ")[1])


In [0]:
df_artist = df_artist.drop('_rescued_data')

In [0]:
df_artist.writeStream.format("delta") \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimArtist/checkpointlocation') \
    .trigger(once=True) \
    .option("path",'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimArtist/data') \
    .toTable("spotify_cata.silver.dimartist")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f08c5f93800>

## DimTrack

In [0]:
df_track = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",
            "abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimTrack/checkpointlocation") \
    .option("schemaEvolutionMode", "addNewColumns") \
    .load("abfss://bronze@spotifydatalakeproject.dfs.core.windows.net/DimTrack")


In [0]:
df_track = df_track.withColumn("track_name",upper(col("track_name")))
df_track = df_track.withColumn("album_name",upper(col("album_name")))


In [0]:
df_track = df_track.withColumn("durationFlag",when(col("duration_sec")<180,"Short").when((col("duration_sec") >= 160) & (col("duration_sec") < 240),"Medium").when(col("duration_sec") >=240,"Long").otherwise("Unknown"))

In [0]:
df_track = df_track.drop('_rescued_data')

In [0]:
df_track.writeStream.format("delta") \
    .outputMode('append') \
    .option('checkpointLocation','abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimTrack/checkpointlocation') \
    .trigger(once = True) \
    .option("path",'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimTrack/data') \
    .toTable("spotify_cata.silver.dimtrack")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f08c6229580>

## DimDate

In [0]:
df_date = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation",'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimDate/checkpointlocation') \
    .option("schemaEvolutionMode", "addNewColumns") \
    .load("abfss://bronze@spotifydatalakeproject.dfs.core.windows.net/DimDate")

In [0]:
df_date  = df_date.drop('_rescued_data')

In [0]:
df_date.writeStream.format("delta") \
    .outputMode('append') \
    .option('checkpointLocation','abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimDate/checkpointlocation') \
    .trigger(once = True) \
    .option("path",'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/DimDate/data') \
    .toTable("spotify_cata.silver.dimdate")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f08cdabdf70>


## FactStream

In [0]:
df_fact = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation", "abfss://silver@spotifydatalakeproject.dfs.core.windows.net/FactStream/checkpointlocation") \
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
    .load('abfss://bronze@spotifydatalakeproject.dfs.core.windows.net/FactStream')

In [0]:
df_fact = df_fact.drop('_rescued_data')

In [0]:
df_fact.writeStream.format("delta") \
    .outputMode('append') \
    .option('checkpointLocation','abfss://silver@spotifydatalakeproject.dfs.core.windows.net/FactStream/checkpointlocation') \
    .trigger(once = True) \
    .option("path",'abfss://silver@spotifydatalakeproject.dfs.core.windows.net/FactStream/data') \
    .toTable("spotify_cata.silver.factstream")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f08cdac2660>