In [2]:
from pyspark.sql import SparkSession
# Initialize Spark
spark = SparkSession.builder \
    .appName("Spotify ETL") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

KeyboardInterrupt: 

Load the CSV and print first 5 rows

In [None]:
df = spark.read.option("header", True).option("inferSchema", True).csv("/FileStore/tables/spotify_streaming_data.csv")
df.show(5)
df.printSchema()

+----------+--------+-------+-------------+-------+----+--------------------+-----------------+---------+------------+------+-----+-------+
|    Artist|Category|Streams|Revenue (USD)|Country|Year|   Most Popular Song|Subscription Type|Age Group|Danceability|Energy|Tempo|Valence|
+----------+--------+-------+-------------+-------+----+--------------------+-----------------+---------+------------+------+-----+-------+
|The Weeknd|     Pop| 321147|     13035.11|  Spain|2021|Wake Me Up (feat....|             Free|      45+|        null|  null| null|   null|
|The Weeknd|     Pop| 190560|     12818.14| France|2023|          Cry For Me|             Free|    18-24|        null|  null| null|   null|
|The Weeknd|     Pop| 295077|     16020.41| Mexico|2019|I Can't Fucking Sing|          Premium|      45+|        null|  null| null|   null|
|The Weeknd|     Pop| 557311|     16907.12|Germany|2021|SÃ£o Paulo (feat. ...|             Free|    25-34|        null|  null| null|   null|
|The Weeknd|     Po

We see Danceability, Energy, Tempo, Valence has all NULL values so we drop them

In [None]:
from pyspark.sql.functions import col

df_clean = df.select(
    "Artist", "Category", "Country", "Year", "Streams",
    col("Revenue (USD)").alias("Revenue_USD"),
    col("Most Popular Song").alias("Track"),
    col("Subscription Type").alias("Subscription_Type"),
    col("Age Group").alias("Age_Group")
)

If there are any null values, we remove them as well.

In [None]:
# Remove nulls in core fields
df_clean = df_clean.dropna(subset=["Artist", "Category", "Country", "Year", "Streams", "Revenue_USD"])

# Cast numeric types
df_clean = df_clean.withColumn("Streams", col("Streams").cast("int")) \
                   .withColumn("Revenue_USD", col("Revenue_USD").cast("double")) \
                   .withColumn("Year", col("Year").cast("int"))

Create a Temporary View

In [None]:
df_clean.createOrReplaceTempView("spotify_clean")

1. ðŸ“ˆ Revenue Trend by Year

In [None]:
%sql
SELECT Year, ROUND(SUM(Revenue_USD), 2) AS Total_Revenue
FROM spotify_clean
GROUP BY Year
ORDER BY Year

Year,Total_Revenue
2019,11210654.61
2020,11467613.15
2021,11800572.09
2022,11876582.6
2023,11219721.17
2024,11674510.15


2. ðŸŽ¤ Top Artists by Streams

In [None]:
%sql
SELECT Artist, SUM(Streams) AS Total_Streams
FROM spotify_clean
GROUP BY Artist
ORDER BY Total_Streams DESC
LIMIT 10

Artist,Total_Streams
Taylor Swift,101175170
Drake,98039242
Ed Sheeran,92929668
Post Malone,89838879
Justin Bieber,86768723
Shakira,85001164
Imagine Dragons,84116119
The Weeknd,83337064
Dua Lipa,75815590
BTS,74721574


3. ðŸŒŽ Revenue by Country & Subscription

In [None]:
%sql
SELECT Country, Subscription_Type, ROUND(SUM(Revenue_USD), 2) AS Revenue
FROM spotify_clean
GROUP BY Country, Subscription_Type
ORDER BY Revenue DESC

Country,Subscription_Type,Revenue
Canada,Premium,2835090.21
Mexico,Premium,2622108.21
Mexico,Free,2619127.81
USA,Premium,2538845.36
Brazil,Premium,2508461.55
Italy,Premium,2492378.96
South Korea,Free,2483896.12
Germany,Premium,2450136.05
France,Free,2449378.25
South Korea,Premium,2441412.45


4. ðŸ‘¥ Audience Segmentation (Age Group)

In [None]:
%sql
SELECT Age_Group, ROUND(SUM(Revenue_USD), 2) AS Revenue, SUM(Streams) AS Streams
FROM spotify_clean
GROUP BY Age_Group
ORDER BY Revenue DESC

Age_Group,Revenue,Streams
25-34,14680217.13,289368303
45+,14455140.94,288448115
35-44,13452852.83,270268528
13-17,13395897.42,269882557
18-24,13231556.04,262817650
Free,33989.41,868829


In [None]:
from pyspark.sql.functions import monotonically_increasing_id, col

dim_artist = df_clean.select("Artist", "Category").distinct() \
    .withColumn("artist_id", monotonically_increasing_id())

dim_country = df_clean.select("Country").distinct() \
    .withColumn("country_id", monotonically_increasing_id())

dim_subscription = df_clean.select(col("Subscription_Type").alias("SubscriptionType")).distinct() \
    .withColumn("sub_type_id", monotonically_increasing_id())

dim_age = df_clean.select(col("Age_Group").alias("AgeGroup")).distinct() \
    .withColumn("age_group_id", monotonically_increasing_id())

In [None]:
fact = df_clean \
    .join(dim_artist, ["Artist", "Category"], "left") \
    .join(dim_country, "Country", "left") \
    .join(dim_subscription, df_clean["Subscription_Type"] == dim_subscription["SubscriptionType"], "left") \
    .join(dim_age, df_clean["Age_Group"] == dim_age["AgeGroup"], "left") \
    .select(
        "artist_id", "Track", "Year", "Streams", "Revenue_USD",
        "country_id", "sub_type_id", "age_group_id"
    )

In [None]:
# FACT table
fact.coalesce(1).write.option("header", True).mode("overwrite").csv("/FileStore/exports/fact_streams")

# DIM tables
dim_artist.coalesce(1).write.option("header", True).mode("overwrite").csv("/FileStore/exports/dim_artist")
dim_country.coalesce(1).write.option("header", True).mode("overwrite").csv("/FileStore/exports/dim_country")
dim_subscription.coalesce(1).write.option("header", True).mode("overwrite").csv("/FileStore/exports/dim_subscription")
dim_age.coalesce(1).write.option("header", True).mode("overwrite").csv("/FileStore/exports/dim_age_group")