In [195]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import max, min, col, round, avg, explode, from_json, count, sum, rank, dense_rank
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, ArrayType, StringType

import findspark
findspark.init()

In [2]:
spark = SparkSession.builder \
                    .appName("spotify_data_analysis") \
                    .getOrCreate()

spark

## Questions

Basic Exploration
Display the schema and find how many columns are present.

Show the first 10 rows sorted by popularity.

What are the distinct genres in the dataset?

Which track has the highest danceability?

🧼 Data Cleaning & Transformation
Are there any null values in the dataset? List column-wise null counts.

Create a new column duration_min by converting duration_ms to minutes.

Cast the popularity column to IntegerType and filter tracks with popularity > 80.

📊 Aggregations & Grouping
What is the average loudness and energy per genre?

Which artist has the most tracks in the dataset?

What is the average duration of songs by each artist?

🔗 Joins and Multi-file Work (if more datasets are added)
Join with a mock dataset of top global artists to find their tracks and popularity.

Find the top 5 artists by average track popularity using groupBy.

🪟 Window Functions
For each artist, assign a rank to their songs based on popularity.

Calculate a rolling average of popularity for all tracks within the same genre.

🧠 User-Defined Functions (UDFs)
Create a UDF to classify songs as "Hit" if popularity > 75, else "Normal".

Write a UDF that tags a song as "Short", "Medium", or "Long" based on duration.

🧪 Filtering and Advanced Queries
Find songs that are both explicit and have a popularity over 70.

Find artists whose average track popularity is below 30 — label them "Underrated".

🧠 PySpark SQL
Register the DataFrame as a SQL temp view. Write a query to find:

Top 5 longest songs in each genre.

Using SQL, get track counts per genre and per artist, ordered by count descending.

In [5]:
spotify_df = spark.read.csv('datasets/spotify-data.csv', header = True)
spotify_df.show(4)

+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|                  id|                name|             artists|duration_ms|release_date|year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode|key|popularity|explicit|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|     158648|        1928|1928|       0.995|       0.708| 0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779|   1| 10|         0|       0|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert Schumann...|     282133|        1928|1928|       0.994|       0.379|

### 1 ) Basic Exploration Display the schema and find how many columns are present.

In [7]:
spotify_df.schema

StructType([StructField('id', StringType(), True), StructField('name', StringType(), True), StructField('artists', StringType(), True), StructField('duration_ms', StringType(), True), StructField('release_date', StringType(), True), StructField('year', StringType(), True), StructField('acousticness', StringType(), True), StructField('danceability', StringType(), True), StructField('energy', StringType(), True), StructField('instrumentalness', StringType(), True), StructField('liveness', StringType(), True), StructField('loudness', StringType(), True), StructField('speechiness', StringType(), True), StructField('tempo', StringType(), True), StructField('valence', StringType(), True), StructField('mode', StringType(), True), StructField('key', StringType(), True), StructField('popularity', StringType(), True), StructField('explicit', StringType(), True)])

In [8]:
spotify_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- key: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- explicit: string (nullable = true)



In [9]:
spotify_df.columns

['id',
 'name',
 'artists',
 'duration_ms',
 'release_date',
 'year',
 'acousticness',
 'danceability',
 'energy',
 'instrumentalness',
 'liveness',
 'loudness',
 'speechiness',
 'tempo',
 'valence',
 'mode',
 'key',
 'popularity',
 'explicit']

In [10]:
spotify_df.count()

169909

In [113]:
len(spotify_df.columns)

20

### 2 ) Show the first 10 rows sorted by popularity.

In [12]:
spotify_df.select('id', 'name', 'popularity') \
        .sort("popularity",ascending=False) \
        .show(10)

+--------------------+--------------------+----------+
|                  id|                name|popularity|
+--------------------+--------------------+----------+
|42OhQBFNIbMyxRGxy...|"Not Tonight (fea...|    99.429|
|7ytR5pFWmSjzHJIeQ...|ROCKSTAR (feat. R...|        99|
|7zNOcAn34AVywtf2p...|South Pacific (19...|    97.724|
|7eJMfftS33KTjuF7l...|death bed (coffee...|        97|
|14RtX37onWzSyGDxr...|I Ain't Superstit...|    96.361|
|39Yp9wwQiSRIDOvrV...|          THE SCOTTS|        96|
|62aP9fBQKYKxi7PDX...|ily (i love you b...|        95|
|0nbXyq5TXYPCO7pr3...|             The Box|        95|
|4nK5YrxbMGZstTLbv...|          Supalonely|        95|
|127QTOFJsJQp5LbJb...|        Toosie Slide|        95|
+--------------------+--------------------+----------+
only showing top 10 rows



In [13]:
spotify_df.select('id', 'name', 'artists','duration_ms','popularity','year')\
            .orderBy(['popularity', 'year'],ascending = [False, False])\
            .show()

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|                  id|                name|             artists|         duration_ms|popularity|                year|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|42OhQBFNIbMyxRGxy...|"Not Tonight (fea...|      "[""Lil' Kim""|    'Angie Martinez'|    99.429|           'Da Brat'|
|7ytR5pFWmSjzHJIeQ...|ROCKSTAR (feat. R...|['DaBaby', 'Roddy...|              181733|        99|                2020|
|7zNOcAn34AVywtf2p...|South Pacific (19...|"['Richard Rodger...|        'Ezio Pinza'|    97.724|     'Jim Hawthorne'|
|7eJMfftS33KTjuF7l...|death bed (coffee...|['Powfu', 'beabad...|              173333|        97|                2020|
|14RtX37onWzSyGDxr...|I Ain't Superstit...|  "[""Howlin' Wolf""|      'Eric Clapton'|    96.361|        'Bill Wyman'|
|39Yp9wwQiSRIDOvrV...|          THE SCOTTS|['THE SCOTTS'

### 3) What are the distinct year in the dataset?

In [15]:
spotify_df.select('year')\
            .distinct()\
            .show()

+-----------------+
|             year|
+-----------------+
|           164466|
|           1/1/53|
| 'Georges Prêtre'|
|           1/1/08|
|            28600|
|   'Justin Bohon'|
|             1953|
|           7/1/53|
|          Laura]"|
|           232613|
|           1/1/56|
|             1957|
|             1987|
|       Raimondo)"|
|           338907|
|             1956|
|           160200|
|       Marchesa]"|
|             1936|
|             2016|
+-----------------+
only showing top 20 rows



### 4) Which track has the highest danceability?

In [17]:
spotify_df.select(max('danceability')).show()

+--------------------+
|   max(danceability)|
+--------------------+
|['Wolfgang Amadeu...|
+--------------------+



In [18]:
spotify_df.select(min('danceability')).show()

+--------------------+
|   min(danceability)|
+--------------------+
| ""Orchestra Sinf...|
+--------------------+



In [21]:
# spotify_df = spotify_df.withColumn("danceability",col("danceability").cast("double"))

In [22]:
# col_list = ['acousticness', 'danceability', 'energy', 'popularity']
# for column in col_list:
#     spotify_df = spotify_df.withColumn(column, col(column).cast("double"))

# spotify_df.printSchema()


from pyspark.sql.functions import col

cols_to_cast = {
    "duration_ms": "double",
    "year": "double",
    "acousticness": "double",
    "danceability": "double",
    "energy": "double",
    "instrumentalness": "double",
    "liveness": "double",
    "loudness": "double",
    "speechiness": "double",
    "tempo": "double",
    "valence": "double",
    "mode": "double",
    "key": "double",
    "popularity": "double",
    "explicit": "double"
}

for column, dtype in cols_to_cast.items():
    spotify_df = spotify_df.withColumn(column, col(column).cast(dtype))
spotify_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- year: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- key: double (nullable = true)
 |-- popularity: double (nullable = true)
 |-- explicit: double (nullable = true)



In [23]:
spotify_df.select(max('danceability')).show()

+-----------------+
|max(danceability)|
+-----------------+
|         668333.0|
+-----------------+



#### EG

In [25]:
from pyspark.sql import Row
data = [Row(value="0.86"), Row(value="not_a_number"), Row(value="1.23")]

df = spark.createDataFrame(data)
df.show()


+------------+
|       value|
+------------+
|        0.86|
|not_a_number|
|        1.23|
+------------+



In [26]:
df_casted = df.withColumn("value_double", col("value").cast("double"))

df_casted.show()

+------------+------------+
|       value|value_double|
+------------+------------+
|        0.86|        0.86|
|not_a_number|        NULL|
|        1.23|        1.23|
+------------+------------+



In [27]:
# ## this problem can directly solve by 
# spotify_df = spark.read.csv("datasets/spotify-data.csv",header = True, inferSchema=True)
# spotify_df.show(4)
# spotify_df.printSchema()

## Data Cleaning & Transformation 

### 5) Are there any null values in the dataset? List column-wise null counts.

In [30]:
a = spotify_df.filter(col('danceability').isNull()).count()
a

138

In [31]:
for column in spotify_df.columns:
    null_cnt = spotify_df.filter(col(column).isNull()).count()
    print(f'{column}--> {null_cnt}')

id--> 0
name--> 0
artists--> 0
duration_ms--> 1447
release_date--> 0
year--> 692
acousticness--> 285
danceability--> 138
energy--> 70
instrumentalness--> 44
liveness--> 23
loudness--> 8
speechiness--> 8
tempo--> 4
valence--> 2
mode--> 2
key--> 2
popularity--> 1
explicit--> 2


### 6 )Create a new column duration_min by converting duration_ms to minutes.

In [33]:
spotify_df = spotify_df.withColumn('duration_min',round(col('duration_ms')/3600,2))
spotify_df.show()

+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+
|                  id|                name|             artists|duration_ms|release_date|  year|acousticness|danceability| energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode| key|popularity|explicit|duration_min|
+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|   158648.0|        1928|1928.0|       0.995|       0.708|  0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779| 1.0|10.0|       0.0|     0.0|       44.07|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert 

### 7 ) Cast the popularity column to IntegerType and filter tracks with popularity > 80.

In [66]:
spotify_df = spotify_df.withColumn('popularity', col('popularity').cast(IntegerType())) 
spotify_df.show()

+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+
|                  id|                name|             artists|duration_ms|release_date|  year|acousticness|danceability| energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode| key|popularity|explicit|duration_min|
+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|   158648.0|        1928|1928.0|       0.995|       0.708|  0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779| 1.0|10.0|         0|     0.0|       44.07|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert 

In [68]:
spotify_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- year: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- key: double (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- explicit: double (nullable = true)
 |-- duration_min: double (nullable = true)



In [70]:
spotify_df.select(max('popularity')).show()

+---------------+
|max(popularity)|
+---------------+
|         538760|
+---------------+



In [72]:
spotify_df.select(min('popularity')).show()

+---------------+
|min(popularity)|
+---------------+
|            -20|
+---------------+



In [78]:
spotify_df.filter(col("popularity")>80).count()

518

## 📊 Aggregations & Grouping 

### 9 )What is the average loudness and energy per genre?

In [86]:
spotify_df.select(avg("loudness")).show()

+-----------------+
|    avg(loudness)|
+-----------------+
|3.800375127691626|
+-----------------+



In [90]:
spotify_df.select(avg("energy")).show()

+-----------------+
|      avg(energy)|
+-----------------+
|70.88351548936592|
+-----------------+



In [102]:
spotify_df.groupby('year')\
    .agg(avg("loudness").alias('avg_loudness'),\
         avg('energy').alias('avg_energy')
    )\
    .orderBy(['avg_loudness'], ascending = [False])\
    .show()

+--------+-----------------+------------------+
|    year|     avg_loudness|        avg_energy|
+--------+-----------------+------------------+
|    NULL|3733.574104137426|19221.435661897107|
|200427.0|            0.985|             0.927|
| 98240.0|             0.96|             0.865|
|169907.0|            0.938|             0.858|
|415747.0|            0.924|             0.995|
|653800.0|              0.9|             0.987|
|264853.0|            0.896|             0.886|
|339733.0|            0.887|             0.992|
|461427.0|             0.88|             0.887|
|209600.0|            0.864|             0.995|
|214667.0|            0.837|            0.0252|
|201400.0|            0.831|             0.983|
|671733.0|            0.826|              0.96|
|145253.0|            0.796|             0.984|
|184000.0|            0.791|             0.988|
|214840.0|            0.779|             0.323|
|277960.0|            0.774|             0.875|
|114533.0|            0.759|            

In [110]:
spotify_df.filter(col('year') == 200427.0).show()

+--------------------+--------------------+--------------------+-----------+-------------------+--------+------------+------------+------+----------------+--------+--------+-----------+-------+-------+-----+------+----------+--------+------------+
|                  id|                name|             artists|duration_ms|       release_date|    year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence| mode|   key|popularity|explicit|duration_min|
+--------------------+--------------------+--------------------+-----------+-------------------+--------+------------+------------+------+----------------+--------+--------+-----------+-------+-------+-----+------+----------+--------+------------+
|1syEt258kzY0ApsMF...|"Madama Butterfly...|"['Giacomo Puccin...|       NULL| 'Tullio Serafin']"|200427.0|        NULL|      1958.0| 0.927|          0.0952| 0.00375|   0.985|      0.104|-41.957| 0.0405|73.06|0.0499|         1|    10.0|        NULL|
+-------

### 10 ) Which artist has the most tracks in the dataset?

In [125]:
spotify_df.show(n=2)

+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+
|                  id|                name|             artists|duration_ms|release_date|  year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode| key|popularity|explicit|duration_min|
+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|   158648.0|        1928|1928.0|       0.995|       0.708| 0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779| 1.0|10.0|         0|     0.0|       44.07|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert Schu

In [137]:
spotify_df = spotify_df.withColumn('cleaned_artists', from_json(col('artists'),ArrayType(StringType())))

In [149]:
dummy_df = spotify_df.withColumn('explode_artist', explode(col("cleaned_artists")))
dummy_df.show(3)

+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+--------------------+-----------------+
|                  id|                name|             artists|duration_ms|release_date|  year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode| key|popularity|explicit|duration_min|     cleaned_artists|   explode_artist|
+--------------------+--------------------+--------------------+-----------+------------+------+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+----+----------+--------+------------+--------------------+-----------------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|   158648.0|        1928|1928.0|       0.995|       0.708| 0.195|           0.563|   0.151| -12.428|     0.0506

In [168]:
dummy_df.groupby('explode_artist')\
        .agg(
            count(col('id')).alias('cnt_album')
        )\
        .orderBy(['cnt_album'], ascending = [False])\
        .select('cnt_album')\
        .show(1)

+---------+
|cnt_album|
+---------+
|     2234|
+---------+
only showing top 1 row



### 11) Find the top 5 artists by average track popularity using groupBy.

In [174]:
dummy_df.groupby('explode_artist')\
        .agg(
            avg(col('popularity')).alias("avg_popul")
        )\
        .orderBy(["avg_popul"], ascending = [False])\
        .show(5)

+--------------+---------+
|explode_artist|avg_popul|
+--------------+---------+
|        Emilee|     95.0|
|StaySolidRocky|     94.0|
|         Topic|     92.0|
|           A7S|     92.0|
|   J. Rey Soul|     90.0|
+--------------+---------+
only showing top 5 rows



## 🪟 Window Functions

### 12) For each artist, assign a rank to their songs based on popularity.

In [193]:
window_spec = Window.partitionBy(col("explode_artist")).orderBy(col("popularity").desc())
dummy_df.withColumn("_rank", rank().over(window_spec))\
        .select('id', 'name', 'explode_artist', 'popularity', '_rank')\
        .filter(col('explode_artist') == '$uicideBoy$')\
        .show(50)

+--------------------+--------------------+--------------+----------+-----+
|                  id|                name|explode_artist|popularity|_rank|
+--------------------+--------------------+--------------+----------+-----+
|41LhQUkElADQ5YUbp...|...And To Those I...|   $uicideBoy$|        82|    1|
|5XAPpyIoYF3QXP34H...|Kill Yourself (Pa...|   $uicideBoy$|        77|    2|
|4irYeuAi87yyGHcI4...|               Paris|   $uicideBoy$|        74|    3|
|70nmZhHZLNVYWP4NO...|Runnin' Thru The ...|   $uicideBoy$|        72|    4|
|4yORBk6ZyYsJpnJch...|            2nd Hand|   $uicideBoy$|        72|    4|
|09kvhnq2ONdNo4NRt...|My Flaws Burn Thr...|   $uicideBoy$|        70|    6|
|7wPHEEGn0rWMZEDrD...|          Carrollton|   $uicideBoy$|        70|    6|
|4CwA4CdTQqNv18QSA...|        Putrid Pride|   $uicideBoy$|        70|    6|
|4VsMart0lkjUtyQnm...|  $outh $ide $uicide|   $uicideBoy$|        69|    9|
|3ljehcUqmUI7BHXpQ...|All Dogs Go To He...|   $uicideBoy$|        68|   10|
|2I12vOWeJU5

In [199]:
window_spec = Window.partitionBy(col("explode_artist")).orderBy(col("popularity").desc())
dummy_df.withColumn('_dense_rank', dense_rank().over(window_spec))\
        .select('id', 'name', 'explode_artist', 'popularity', '_dense_rank')\
        .filter(col('explode_artist') == '$uicideBoy$')\
        .show()

+--------------------+--------------------+--------------+----------+-----------+
|                  id|                name|explode_artist|popularity|_dense_rank|
+--------------------+--------------------+--------------+----------+-----------+
|41LhQUkElADQ5YUbp...|...And To Those I...|   $uicideBoy$|        82|          1|
|5XAPpyIoYF3QXP34H...|Kill Yourself (Pa...|   $uicideBoy$|        77|          2|
|4irYeuAi87yyGHcI4...|               Paris|   $uicideBoy$|        74|          3|
|70nmZhHZLNVYWP4NO...|Runnin' Thru The ...|   $uicideBoy$|        72|          4|
|4yORBk6ZyYsJpnJch...|            2nd Hand|   $uicideBoy$|        72|          4|
|09kvhnq2ONdNo4NRt...|My Flaws Burn Thr...|   $uicideBoy$|        70|          5|
|7wPHEEGn0rWMZEDrD...|          Carrollton|   $uicideBoy$|        70|          5|
|4CwA4CdTQqNv18QSA...|        Putrid Pride|   $uicideBoy$|        70|          5|
|4VsMart0lkjUtyQnm...|  $outh $ide $uicide|   $uicideBoy$|        69|          6|
|3ljehcUqmUI7BHX

## 🧠 User-Defined Functions (UDFs)

### 13) Create a UDF to classify songs as "Hit" if popularity > 75, else "Normal".

In [None]:
def fn_song_pop(popularity):
    return "Hit" if popularity > 75 else "Normal"

