# ## **Mount bronze Folder  and Preview Data** ##

In [0]:
storage_account_name = "dbsql"
container_name = "bronze"
mount_point = "/mnt/bronze"

# Define the Key Vault secret scope and secret name
secret_scope = "adls-secrets"
client_id = dbutils.secrets.get(scope=secret_scope, key="ClientId")
tenant_id = dbutils.secrets.get(scope=secret_scope, key="tanentId")
client_secret = dbutils.secrets.get(scope=secret_scope, key="Value")

In [0]:
configs = {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id": client_id,
        "fs.azure.account.oauth2.client.secret": client_secret,
        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"+tenant_id+"/oauth2/token"
    }
dbutils.fs.mount(
        source = "abfss://bronze@dbsql.dfs.core.windows.net/",
        mount_point = mount_point,
        extra_configs = configs
)

# **## Display CSV Data**

In [0]:
df = spark.read.option("header", True).csv("/mnt/bronze/bronze/bronze/spotify_songs.csv")
df.display()


In [0]:
# 1. Print the schema to see the structure and data types of each column
df.printSchema()

# 2. Show the first 5 rows of the DataFrame
df.show(5)

# 3. Print the total number of rows in the DataFrame
print(f"Total rows: {df.count()}")

# 4. Display the column names (it will return a list of column names)
print(f"Columns: {df.columns}")


# ## Display **Columns**

In [0]:
df.columns


In [0]:
# Step 1: Read the original dataset from the Bronze Layer
df = spark.read.option("header", True).csv("/mnt/bronze/bronze/bronze/spotify_songs.csv")

# Step 2: Drop the unwanted columns
columns_to_drop = ['snapshot_date', 'is_explicit', 'key', 'speechiness', 'acousticness']
df_cleaned = df.drop(*columns_to_drop)

# Step 3: Limit to the top 25,000 rows
df_limited = df_cleaned.limit(1900000)

# Optional: Display the resulting DataFrame to verify
df_limited.display()



In [0]:

# Optional: Display to verify
df_limited.show()

# **Total Row and Column **

In [0]:
df = df_limited
row_count = df.count()
column_count = len(df.columns)
print(f"Total Rows: {row_count}")
print(f"Total Columns: {column_count}")


# ## **Show Null Values**

In [0]:
from pyspark.sql.functions import col, isnan, when, count

# Count nulls and NaNs in each column
null_counts = df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c)
    for c in df.columns
])

# Convert result to Pandas for better formatting
for col_name, null_count in null_counts.collect()[0].asDict().items():
    print(f"{col_name:<20} {null_count}")


# ## **Data described**

In [0]:
df.describe()


In [0]:
df = df.dropna()

In [0]:
from pyspark.sql.functions import col, isnan, when, count

# Count nulls and NaNs in each column
null_counts = df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c)
    for c in df.columns
])

# Convert result to Pandas for better formatting
for col_name, null_count in null_counts.collect()[0].asDict().items():
    print(f"{col_name:<20} {null_count}")


In [0]:
df.count()

In [0]:
df.write.format("delta").mode("overwrite").save("/mnt/bronze/silver/spotify_songs")

# ## **Silver Layer: Clean and Transform Data**

In [0]:
# Read from Bronze layer
df_silver = spark.read.format("delta").load("/mnt/bronze/silver/spotify_songs")


In [0]:
df_silver.show(5)

# ## **Step 1: Remove Duplicate Records**


In [0]:
df_deduped = df_silver.dropDuplicates()


In [0]:
df_deduped = df_silver.dropDuplicates(['spotify_id', 'name'])


# ##**Step 2: Handle Null Values**

In [0]:
df_nonull = df_deduped.dropna(subset=['name', 'artists', 'popularity', 'duration_ms'])
df_nonull = df_deduped.dropna()



# ## **Step 3: Validate Data**

In [0]:
from pyspark.sql.functions import col

df_validated = df_nonull.withColumn("popularity", col("popularity").cast("int")) \
                        .withColumn("duration_ms", col("duration_ms").cast("int")) \
                        .filter((col("popularity") >= 0) & (col("popularity") <= 100)) \
                        .filter(col("duration_ms") > 0)


In [0]:
df_validated.show(5)
display(df_validated)
df_validated.printSchema()



# ## **Rename the columns**

In [0]:
df_silver_renamed = df_silver.withColumnRenamed("name", "song_name") \
                              .withColumnRenamed("artists", "artists_name")


In [0]:
df_silver_renamed.printSchema()
df_silver_renamed.show(5)
display(df_silver_renamed)


In [0]:
from pyspark.sql.functions import col

# Select, rename, and cast necessary columns
df_transformed = df_nonull.select(
    col("spotify_id"),
    col("name").alias("song_name"),
    col("artists").alias("artists_name"),
    col("daily_rank"),
    col("daily_movement"),
    col("weekly_movement"),
    col("country"),
    col("popularity").cast("int").alias("popularity"),
    col("duration_ms").cast("int").alias("duration_ms"),
    col("album_name"),
    col("album_release_date"),
    col("danceability"),
    col("energy"),
    col("loudness"),
    col("mode"),
    col("instrumentalness"),
    col("liveness"),
    col("valence"),
    col("tempo"),
    col("time_signature")
)

# Apply filters to validate data
df_validated = df_transformed.filter(
    (col("popularity").between(0, 100)) & (col("duration_ms") > 0)
)


# ## **Step 4: Write Cleaned Data to the Silver Layer in Delta Format**

In [0]:
df_validated.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .save("/mnt/silver/spotify_songs_cleaned")

# ## **Optional: Create a Delta Table for Easy Access**

In [0]:
# Check if the directory exists
dbutils.fs.ls("dbfs:/mnt/silver/")


# ## **Step 1: Define and Create the Star Schema in the Gold Layer**
A star schema consists of a central fact table connected to multiple dimension tables. Here's how you can structure it for your Spotify dataset:

1.1 Identify Dimension Tables
Artists Dimension: Contains unique artist information.

Albums Dimension: Contains unique album details.

Countries Dimension: Contains country-specific information.

Dates Dimension: Contains date-related attributes.

In [0]:
%python
from pyspark.sql.functions import col, monotonically_increasing_id, to_date, year, month, dayofmonth

# Artists Dimension
artists_dim = df_validated.select("artists_name").distinct() \
    .withColumn("artist_id", monotonically_increasing_id())

# Albums Dimension
albums_dim = df_validated.select("album_name", "album_release_date").distinct() \
    .withColumn("album_id", monotonically_increasing_id())

# Countries Dimension
countries_dim = df_validated.select("country").distinct() \
    .withColumn("country_id", monotonically_increasing_id())

# Dates Dimension
dates_dim = df_validated.select(to_date(col("album_release_date")).alias("date")).distinct() \
    .withColumn("date_id", monotonically_increasing_id()) \
    .withColumn("year", year(col("date"))) \
    .withColumn("month", month(col("date"))) \
    .withColumn("day", dayofmonth(col("date")))

display(artists_dim)
display(albums_dim)
display(countries_dim)
display(dates_dim)

# ## **Create Fact Table**

In [0]:
%python
# Join with Artists Dimension
fact_table = df_validated.join(artists_dim, on="artists_name", how="left") \
    .join(albums_dim, on=["album_name", "album_release_date"], how="left") \
    .join(countries_dim, on="country", how="left") \
    .join(dates_dim, df_validated.album_release_date == dates_dim.date, how="left") \
    .select(
        "spotify_id",
        "song_name",  # Include song_name here
        "daily_rank",
        "daily_movement",
        "weekly_movement",
        "popularity",
        "duration_ms",
        "danceability",
        "energy",
        "mode",
        "artist_id",
        "album_id",
        "country_id",
        "date_id"
    )

# Save Fact Table
fact_table.write.format("delta").mode("overwrite").save("/mnt/bronze/gold/fact_songs")

# ## **Save Fact and Dimension Tables to the Gold Layer**

In [0]:
# Save Dimension Tables
artists_dim.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/bronze/gold/dim_artists")
albums_dim.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/bronze/gold/dim_albums")
countries_dim.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/bronze/gold/dim_countries")
dates_dim.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/bronze/gold/dim_dates")

# Save Fact Table
fact_table.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/bronze/gold/fact_songs")

# ## **Derive KPIs and Create Data Marts**

In [0]:
from pyspark.sql.functions import avg, col
# Join fact table with artists dimension to get artist_name
top_artists = fact_table.join(artists_dim, on="artist_id", how="left") \
    .groupBy("artist_id", "artists_name", "song_name") \
    .agg(
        avg("popularity").alias("avg_popularity"),
        avg("duration_ms").alias("avg_duration_ms"),
        avg("danceability").alias("avg_danceability"),
        avg("energy").alias("avg_energy"),
        avg("mode").alias("avg_mode")
    ) \
    .orderBy(col("avg_popularity").desc()) \
    .limit(10)

display(top_artists)


In [0]:
%python
# Average Danceability by Country
avg_danceability = fact_table.groupBy("country_id") \
    .agg(avg("danceability").alias("avg_danceability")) \
    .orderBy(col("avg_danceability").desc())

display(avg_danceability)

In [0]:
# Popularity Trend by Song over Time
popularity_trends = fact_table.groupBy("song_name", "date_id") \
    .agg(avg("popularity").alias("avg_popularity")) \
    .orderBy("date_id")

display(popularity_trends)


In [0]:
# Save Top Artists Data Mart
top_artists = top_artists.withColumnRenamed("avg_popularity", "avg_popularity")

# Calculate Average Danceability for Data Mart
avg_danceability = fact_table.groupBy("song_name") \
    .agg(avg("danceability").alias("avg_danceability")) \
    .orderBy(col("avg_danceability").desc())

# Save Data Marts
top_artists.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/gold/mart_top_artists")
avg_danceability.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/gold/mart_avg_danceability")


In [0]:
%python
# Save Data Marts with Correct Column Names
top_artists = top_artists.withColumnRenamed("avg_popularity", "avg_popularity")  # Rename the correct column

# Calculate Average Danceability for Data Mart (if not already calculated)
avg_danceability = fact_table.groupBy("song_name") \
    .agg(avg("danceability").alias("avg_danceability")) \
    .orderBy(col("avg_danceability").desc())

# Save Data Marts
top_artists.write.format("delta").mode("overwrite").save("/mnt/gold/mart_top_artists")
avg_danceability.write.format("delta").mode("overwrite").save("/mnt/gold/mart_avg_danceability")

# Display KPIs
display(top_artists)
display(avg_danceability)



Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks data profile. Run in Databricks to view.

In [0]:
# Join with the artists_dim to bring in the artist names
fact_with_artists = fact_table.join(artists_dim, on="artist_id", how="left")

# Now, we can group by artist_id, song_name, and artists_name
top_artists = fact_with_artists.groupBy("artist_id", "song_name", "artists_name") \
    .agg(
        avg("popularity").alias("avg_popularity"),
        avg("duration_ms").alias("avg_duration_ms"),
        avg("danceability").alias("avg_danceability"),
        avg("energy").alias("avg_energy")
    ) \
    .orderBy(col("avg_popularity").desc()) \
    .limit(10)

# Display KPIs
display(top_artists)


In [0]:
%python
from pyspark.sql.functions import avg, col

# Join fact_table with artists_dim on artist_id
fact_with_artists = fact_table.join(artists_dim, on="artist_id", how="left")

# 10 Artists by Popularity
top_artists = fact_with_artists.groupBy("artist_id", "artists_name", "song_name") \
    .agg(
        avg("popularity").alias("avg_popularity"),
        avg("duration_ms").alias("avg_duration_ms"),
        avg("danceability").alias("avg_danceability"),
        avg("energy").alias("avg_energy")
    ) \
    .orderBy(col("avg_popularity").desc()) \
    .limit(10)

# Cache the result
top_artists.cache()

# Display KPIs
display(top_artists)

In [0]:
%python
from pyspark.sql.functions import col, avg

# Join fact_table with artists_dim on artist_id
fact_with_artists = fact_table.join(artists_dim, on="artist_id", how="left")

# 10 Artists by Popularity
top_artists = fact_with_artists.groupBy("artist_id", "artists_name", "song_name") \
    .agg(
        avg("popularity").alias("avg_popularity"),
        avg("duration_ms").alias("avg_duration_ms"),
        avg("danceability").alias("avg_danceability"),
        avg("energy").alias("avg_energy")
    ) \
    .orderBy(col("avg_popularity").desc()) \
    .limit(10)

# Cache the result
top_artists.cache()

# Display KPIs
display(top_artists)

In [0]:
%python
# Save Data Marts
top_artists = top_artists.withColumnRenamed("avg(popularity)", "avg_popularity") \
                         .withColumnRenamed("avg(duration_ms)", "avg_duration_ms") \
                         .withColumnRenamed("avg(danceability)", "avg_danceability") \
                         .withColumnRenamed("avg(energy)", "avg_energy")

# Save Data Marts with schema evolution enabled
top_artists.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/gold/mart_top_artists")
avg_danceability.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/gold/mart_avg_danceability")

# Display KPIs
display(top_artists)
display(avg_danceability)

Databricks data profile. Run in Databricks to view.

# ## **Visualization**

In [0]:
%python
# Display the top_artists DataFrame
## Bar Chart

display(top_artists)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Line Chart
display(avg_danceability)

Databricks visualization. Run in Databricks to view.

In [0]:
# Scatter plot, Pie Chart, Bubble Chart
# Display the top_artists DataFrame
display(top_artists)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.