In [0]:
# Mount Azure Data Lake Gen2 container using OAuth

# Define the storage account and container names
storage_account_name = "dlprojectspotify"
container_name = "bronze"
mount_point = "/mnt/bronze"

# Define the Key Vault secret scope and secret name
secret_scope = "my-secret"
client_id = dbutils.secrets.get(scope=secret_scope, key="ClientId")
tenant_id = dbutils.secrets.get(scope=secret_scope, key="tenantid")
client_secret = dbutils.secrets.get(scope=secret_scope, key="secretvalue")





In [0]:
mount_point = "/mnt/bronze"

# Check if already mounted
mounts = [mount.mountPoint for mount in dbutils.fs.mounts()]
if mount_point not in mounts:
    dbutils.fs.mount(
        source="abfss://bronze@dlprojectspotify.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )
else:
    print(f"{mount_point} is already mounted.")


In [0]:
configs = {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id": client_id,
        "fs.azure.account.oauth2.client.secret": client_secret,
        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"+tenant_id+"/oauth2/token"
    }


# Mount the container
dbutils.fs.mount(
  source = "abfss://bronze@dlprojectspotify.dfs.core.windows.net/",
  mount_point = mount_point,
  extra_configs = configs
)


In [0]:
a=dbutils.fs.ls("/mnt/bronze/Bronze")
print(a)

In [0]:

df=spark.read.option("header",True).csv("/mnt/bronze/Bronze/universal_top_spotify_songs.csv")
df.display()

In [0]:
df.printSchema()

In [0]:
display(df.describe())

In [0]:
print(df.columns)

In [0]:
from pyspark.sql.functions import col, isnan, when, count

# Create an empty dictionary to store the results
null_counts = {}

# Iterate through columns
for column_name in df.columns:
    # Count the number of nulls or NaNs for each column and store it in the dictionary
    null_counts[column_name] = df.filter(col(column_name).isNull() | isnan(col(column_name))).count()

# Display the results
for col_name, count_value in null_counts.items():
    print(f"{col_name}: {count_value}")

In [0]:
len(df.columns)

In [0]:
df.count()

In [0]:
cols_to_drop = ["key", "mode", "time_signature", "daily_movement", "weekly_movement"]
df_cleaned = df.drop(*cols_to_drop)

In [0]:
dftransform=df_cleaned.dropna()

In [0]:
from pyspark.sql.functions import col, isnan, when, count

# Create an empty dictionary to store the results
null_counts = {}

# Iterate through columns
for column_name in dftransform.columns:
    # Count the number of nulls or NaNs for each column and store it in the dictionary
    null_counts[column_name] = dftransform.filter(col(column_name).isNull() | isnan(col(column_name))).count()

# Display the results
for col_name, count_value in null_counts.items():
    print(f"{col_name}: {count_value}")

In [0]:
dftransform.count()

In [0]:
df_dup=dftransform.distinct()
df_dup.count()

In [0]:
from pyspark.sql.functions import col,to_date
dftransform=dftransform.withColumn("daily_rank",col("daily_rank").cast("int"))
dftransform.schema["daily_rank"].dataType
dftransform=dftransform.withColumn("snapshot_date",to_date("snapshot_date","yyyy-MM-dd"))
dftransform.schema["snapshot_date"].dataType


In [0]:
%python
from pyspark.sql.functions import col, to_date

dftransform = dftransform.withColumn("popularity", col("popularity").cast("int")) \
       .withColumn("duration_ms", col("duration_ms").cast("double")) \
       .withColumn("album_release_date", to_date("album_release_date", "yyyy-MM-dd")) \
       .withColumn("danceability", col("danceability").cast("double")) \
       .withColumn("energy", col("energy").cast("double")) \
       .withColumn("loudness", col("loudness").cast("double")) \
       .withColumn("speechiness", col("speechiness").cast("double")) \
       .withColumn("acousticness", col("acousticness").cast("double")) \
       .withColumn("instrumentalness", col("instrumentalness").cast("double")) \
       .withColumn("liveness", col("liveness").cast("double")) \
       .withColumn("valence", col("valence").cast("double")) \
       .withColumn("tempo", col("tempo").cast("double"))

In [0]:
dftransform.printSchema

In [0]:
dftransform = dftransform.withColumn("duration_minutes", (col("duration_ms") / 60000).cast("double"))


In [0]:
from pyspark.sql.functions import trim

string_columns = ["spotify_id", "name", "artists", "country", "album_name"]

for col_name in string_columns:
    dftransform = dftransform.withColumn(col_name, trim(col(col_name)))


In [0]:
from pyspark.sql.functions import when

dftransform = dftransform.withColumn("is_explicit", when(col("is_explicit") == "1", "Yes")
                                       .when(col("is_explicit") == "0", "No")
                                       .otherwise(col("is_explicit")))


In [0]:
dftransform = dftransform.filter(col("popularity") >= 10)


In [0]:
dftransform.columns

In [0]:
df.describe(["danceability", "energy", "loudness"]).show()


In [0]:
ordered_columns = [
    "spotify_id", "name", "artists", "album_name",
    "album_release_date", "snapshot_date",
    "country", "daily_rank", "popularity", "is_explicit", 
    "duration_ms", "duration_minutes",
    "danceability", "energy", "loudness", "speechiness",
    "acousticness", "instrumentalness", "liveness", "valence", "tempo"
]


In [0]:
from pyspark.sql.functions import col

# 1. Reorder columns
dftransform = dftransform.select(*[
    "spotify_id", "name", "artists", "album_name",
    "album_release_date", "snapshot_date",
    "country", "daily_rank", "popularity", "is_explicit",
    "duration_ms", "duration_minutes",
    "danceability", "energy", "loudness", "speechiness",
    "acousticness", "instrumentalness", "liveness", "valence", "tempo"
])

# 2. Validate columns

# List of columns that should be between 0 and 1
columns_0_1 = ["danceability", "energy", "speechiness", "acousticness", "instrumentalness", "liveness", "valence"]

# List of problems found
problems = []

# Validate 0-1 range columns
for col_name in columns_0_1:
    invalid = dftransform.filter((col(col_name) < 0) | (col(col_name) > 1)).count()
    if invalid > 0:
        problems.append(f"{col_name} has {invalid} invalid rows outside [0,1]")

# Validate loudness (usually between -60 and 0)
invalid_loudness = dftransform.filter((col("loudness") < -60) | (col("loudness") > 0)).count()
if invalid_loudness > 0:
    problems.append(f"loudness has {invalid_loudness} values outside [-60, 0] range")

# Validate tempo (positive, usually 20-300 bpm)
invalid_tempo = dftransform.filter((col("tempo") <= 0) | (col("tempo") > 300)).count()
if invalid_tempo > 0:
    problems.append(f"tempo has {invalid_tempo} invalid values (should be >0 and reasonable)")

# Validate duration_ms and duration_minutes (should be positive)
for col_name in ["duration_ms", "duration_minutes"]:
    invalid = dftransform.filter(col(col_name) <= 0).count()
    if invalid > 0:
        problems.append(f"{col_name} has {invalid} non-positive values")

# Validate daily_rank (should be positive integers)
invalid_daily_rank = dftransform.filter(col("daily_rank") <= 0).count()
if invalid_daily_rank > 0:
    problems.append(f"daily_rank has {invalid_daily_rank} non-positive values")

# Validate popularity (should be between 0 and 100)
invalid_popularity = dftransform.filter((col("popularity") < 0) | (col("popularity") > 100)).count()
if invalid_popularity > 0:
    problems.append(f"popularity has {invalid_popularity} invalid values (should be 0-100)")

# Show results
if problems:
    print("Found issues:")
    for p in problems:
        print("-", p)
else:
    print("✅ All validations passed successfully!")


In [0]:
from pyspark.sql.functions import col

# 1. Drop rows where 0-1 columns are invalid
columns_0_1 = ["danceability", "energy", "speechiness", "acousticness", "instrumentalness", "liveness", "valence"]

condition_0_1 = " AND ".join([f"({col_name} >= 0 AND {col_name} <= 1)" for col_name in columns_0_1])

# 2. Drop rows where loudness is invalid
condition_loudness = "(loudness >= -60 AND loudness <= 0)"

# 3. Drop rows where tempo is invalid
condition_tempo = "(tempo > 0 AND tempo <= 300)"

# 4. Drop rows where duration is invalid
condition_duration = "(duration_ms > 0 AND duration_minutes > 0)"

# 5. Combine all conditions
full_condition = f"{condition_0_1} AND {condition_loudness} AND {condition_tempo} AND {condition_duration}"

# 6. Apply filter
dftransform = dftransform.filter(full_condition)

print("✅ Invalid rows dropped successfully.")
print(f"🧮 New number of rows: {dftransform.count()}")


In [0]:
from pyspark.sql.functions import year

# Add release_year column
dftransform = dftransform.withColumn("album_release_year", year(col("album_release_date")))

print("✅ Created new column 'album_release_year'.")


In [0]:
dftransform = dftransform.withColumnRenamed('name', 'track_name') \
    .withColumnRenamed('artists', 'artist_name') \
    .withColumnRenamed('loudness', 'loudness_db') \
    .withColumnRenamed('tempo', 'tempo_bpm')
# (Rest are already clean and good)

print("✅ Columns renamed for better readability.")


In [0]:
desired_order = [
    'spotify_id', 'track_name', 'artist_name', 'daily_rank', 'country', 'snapshot_date',
    'popularity', 'is_explicit', 'duration_ms', 'duration_minutes',
    'album_name', 'album_release_date', 'album_release_year',
    'danceability', 'energy', 'loudness_db', 'speechiness', 'acousticness',
    'instrumentalness', 'liveness', 'valence', 'tempo_bpm'
]
dftransform = dftransform.select(*desired_order)

print("✅ Columns rearranged successfully!")


In [0]:
dftransform.display(10)

In [0]:
silver_path = "/mnt/bronze/Silver/spotify_tracks"
dftransform.write.format("delta").mode("overwrite").save(silver_path)

print("✅ Data successfully saved to Silver layer in Delta format!")



In [0]:
dfsilver=spark.read.format("delta").load("/mnt/bronze/Silver/spotify_tracks")
dfsilver.display()


In [0]:
dfsilver.printSchema

In [0]:
# Import Required Libraries
from pyspark.sql.functions import year, month, dayofmonth, monotonically_increasing_id, dense_rank
from pyspark.sql.window import Window

# Assume dfsilver is your starting dataframe
df = dfsilver

# ============================
# 1. Create Dimension Tables
# ============================

# 1.1 dim_track
dim_track = df.select(
    "spotify_id", "track_name", "is_explicit"
).dropDuplicates().withColumn(
    "track_id", monotonically_increasing_id()
)

# 1.2 dim_artist
dim_artist = df.select(
    "artist_name"
).dropDuplicates().withColumn(
    "artist_id", monotonically_increasing_id()
)

# 1.3 dim_album
dim_album = df.select(
    "album_name", "album_release_date", "album_release_year"
).dropDuplicates().withColumn(
    "album_id", monotonically_increasing_id()
)

# 1.4 dim_date
dim_date = df.select(
    "snapshot_date"
).dropDuplicates().withColumn(
    "year", year("snapshot_date")
).withColumn(
    "month", month("snapshot_date")
).withColumn(
    "day", dayofmonth("snapshot_date")
).withColumn(
    "date_id", monotonically_increasing_id()
)

# 1.5 dim_country
dim_country = df.select(
    "country"
).dropDuplicates().withColumn(
    "country_id", monotonically_increasing_id()
)

# ============================
# 2. Create Fact Table
# ============================

# Now join df with dimension tables to get foreign keys

# Join with dim_track
fact = df.join(dim_track, on=["spotify_id", "track_name", "is_explicit"], how="left")

# Join with dim_artist
fact = fact.join(dim_artist, on="artist_name", how="left")

# Join with dim_album
fact = fact.join(dim_album, on=["album_name", "album_release_date", "album_release_year"], how="left")

# Join with dim_date
fact = fact.join(dim_date, on="snapshot_date", how="left")

# Join with dim_country
fact = fact.join(dim_country, on="country", how="left")

# Select the final fact columns
fact_track_rank = fact.select(
    "track_id",
    "artist_id",
    "album_id",
    "date_id",
    "country_id",
    "daily_rank",
    "popularity",
    "duration_ms",
    "duration_minutes",
    "danceability",
    "energy",
    "loudness_db",
    "speechiness",
    "acousticness",
    "instrumentalness",
    "liveness",
    "valence",
    "tempo_bpm"
)

# ============================
# 3. Save Tables to Gold Layer
# ============================

# Set your Gold Layer path
gold_path = "/mnt/bronze/Gold/"

# Save dimension tables
dim_track.write.format("delta").mode("overwrite").save(gold_path + "dim_track")
dim_artist.write.format("delta").mode("overwrite").save(gold_path + "dim_artist")
dim_album.write.format("delta").mode("overwrite").save(gold_path + "dim_album")
dim_date.write.format("delta").mode("overwrite").save(gold_path + "dim_date")
dim_country.write.format("delta").mode("overwrite").save(gold_path + "dim_country")

# Save fact table
fact_track_rank.write.format("delta").mode("overwrite").save(gold_path + "fact_track_rank")


In [0]:
# Read Dimension Tables
dim_track = spark.read.format("delta").load("/mnt/bronze/Gold/dim_track")
dim_artist = spark.read.format("delta").load("/mnt/bronze/Gold/dim_artist")
dim_album = spark.read.format("delta").load("/mnt/bronze/Gold/dim_album")
dim_date = spark.read.format("delta").load("/mnt/bronze/Gold/dim_date")
dim_country = spark.read.format("delta").load("/mnt/bronze/Gold/dim_country")

# Read Fact Table
fact_track_rank = spark.read.format("delta").load("/mnt/bronze/Gold/fact_track_rank")


In [0]:
fact_track_rank.createOrReplaceTempView("fact_track_rank")
dim_track.createOrReplaceTempView("dim_track")
dim_artist.createOrReplaceTempView("dim_artist")
dim_album.createOrReplaceTempView("dim_album")
dim_country.createOrReplaceTempView("dim_country")
dim_date.createOrReplaceTempView("dim_date")


In [0]:
from pyspark.sql import functions as F

# Top 10 songs globally based on avg popularity
top_songs_global = fact_track_rank.groupBy("track_id")\
    .agg(F.avg("popularity").alias("avg_popularity"))\
    .orderBy("avg_popularity", ascending=False)\
    .limit(10)

# Save to Gold
top_songs_global.write.format("delta").mode("overwrite").save("/mnt/bronze/Gold/top_songs_global")


In [0]:
top_songs_global_df = spark.read.format("delta").load("/mnt/bronze/Gold/top_songs_global")
top_songs_global_df.createOrReplaceTempView("top_songs_global")
top_songs_global_df.display()


Databricks visualization. Run in Databricks to view.

In [0]:
# Top 10 artists globally based on avg popularity
top_artists_global = fact_track_rank.groupBy("artist_id")\
    .agg(F.avg("popularity").alias("avg_popularity"))\
    .orderBy("avg_popularity", ascending=False)\
    .limit(10)

# Save to Gold
top_artists_global.write.format("delta").mode("overwrite").save("/mnt/bronze/Gold/top_artists_global")


In [0]:
top_artists_global_df = spark.read.format("delta").load("/mnt/bronze/Gold/top_artists_global")
top_artists_global_df.createOrReplaceTempView("top_artists_global")
top_artists_global_df.display()

Databricks visualization. Run in Databricks to view.

In [0]:
# Join fact_track_rank with dim_track to access 'is_explicit'
track_data = fact_track_rank.join(dim_track, fact_track_rank.track_id == dim_track.track_id, "inner")

# Group by 'is_explicit' and calculate the average popularity
explicit_popularity = track_data.groupBy("is_explicit")\
    .agg(F.avg("popularity").alias("avg_popularity"))\
    .orderBy("is_explicit")

# Save to Gold
explicit_popularity.write.format("delta").mode("overwrite").save("/mnt/bronze/Gold/explicit_popularity")


In [0]:
explicit_popularity_df = spark.read.format("delta").load("/mnt/bronze/Gold/explicit_popularity")
explicit_popularity_df.createOrReplaceTempView("explicit_popularity")
explicit_popularity_df.display()

Databricks visualization. Run in Databricks to view.