<a href="https://colab.research.google.com/github/Codilis/Pyspark-Projects/blob/master/SpotifyMusicData/Spotify_Data_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Data Source : https://www.kaggle.com/datasets/kapturovalexander/spotify-data-from-pyspark-course/data
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=990b1a7951436b7488435f609fc95ef166a72642521e377d6fc2e13dbb324de9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [10]:
import pyspark as sp
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from collections import defaultdict


In [51]:
# Global Functions
def rename_columns(df, new_names):
  for k, v in new_names.items():
    df = df.withColumnRenamed(k,v)
  return df


In [3]:
#Create my_spark
spark = SparkSession.builder.getOrCreate()

In [4]:
github_url = "https://raw.githubusercontent.com/Codilis/Pyspark-Projects/master/SpotifyMusicData/spotify-data/{i}.csv"
data_files = [github_url.format(i=i) for i in range(1, 5)]

pd_df = pd.read_csv(data_files[0])

for i in range(1, len(data_files)):
  temp = pd.read_csv(data_files[i], header=None, names=pd_df.columns.tolist())
  pd_df = pd.concat([pd_df, temp], axis=0)

pd_df.head()

Unnamed: 0,id,name,artists,duration_ms,release_date,year,acousticness,danceability,energy,instrumentalness,liveness,loudness,speechiness,tempo,valence,mode,key,popularity,explicit
0,6KbQ3uYMLKb5jDxLF7wYDD,Singende Bataillone 1. Teil,['Carl Woitschach'],158648,1928,1928,0.995,0.708,0.195,0.563,0.151,-12.428,0.0506,118.469,0.779,1,10,0,0
1,6KuQTIu1KoTTkLXKrwlLPV,"Fantasiestücke, Op. 111: Più tosto lento","['Robert Schumann', 'Vladimir Horowitz']",282133,1928,1928,0.994,0.379,0.0135,0.901,0.0763,-28.454,0.0462,83.972,0.0767,1,8,0,0
2,6L63VW0PibdM1HDSBoqnoM,Chapter 1.18 - Zamek kaniowski,['Seweryn Goszczyński'],104300,1928,1928,0.604,0.749,0.22,0.0,0.119,-19.924,0.929,107.177,0.88,0,5,0,0
3,6M94FkXd15sOAOQYRnWPN8,Bebamos Juntos - Instrumental (Remasterizado),['Francisco Canaro'],180760,9/25/28,1928,0.995,0.781,0.13,0.887,0.111,-14.734,0.0926,108.003,0.72,0,1,0,0
4,6N6tiFZ9vLTSOIxkj8qKrd,"Polonaise-Fantaisie in A-Flat Major, Op. 61","['Frédéric Chopin', 'Vladimir Horowitz']",687733,1928,1928,0.99,0.21,0.204,0.908,0.098,-16.829,0.0424,62.149,0.0693,1,11,1,0


In [5]:
spark_df = spark.createDataFrame(pd_df)
schema = ArrayType(StringType())
spark_df = spark_df.withColumn('artists', F.from_json(spark_df["artists"], schema))
spark_df.limit(5).show()

+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|                  id|                name|             artists|duration_ms|release_date|year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode|key|popularity|explicit|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...|   [Carl Woitschach]|     158648|        1928|1928|       0.995|       0.708| 0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779|   1| 10|         0|       0|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|[Robert Schumann,...|     282133|        1928|1928|       0.994|       0.379|

# Artist Analysis

In [52]:
# Get the names of all the artist from the dataset
distinct_artists = spark_df.select(F.explode(F.col('artists'))).distinct()

# Artists Average Song Duration, acousticness, danceability, energy, instrumentalness, liveness, loudness, speechiness, tempo, valence, popularity
new_names = {'avg(popularity)': 'AveragePopularity', 'avg(duration_ms)': 'AverageDuration_ms', 'avg(acousticness)': 'AverageAcousticness', 'avg(danceability)': 'AverageDanceability',
             'avg(energy)': 'AverageEnergy', 'avg(instrumentalness)': 'AverageInstrumentalness', 'avg(liveness)': 'AverageLiveness', 'avg(loudness)': 'AverageLoudness',
             'avg(speechiness)': 'AverageSpeechiness', 'avg(tempo)': 'AverageTempo', 'avg(valence)': 'AverageValence'}

artist_popularity = spark_df.select(
    F.explode(F.col('artists')).alias('artists'), F.col('popularity'), F.col('duration_ms'), F.col('acousticness'), F.col('danceability'), F.col('energy'), F.col('instrumentalness'),
    F.col('liveness'), F.col('loudness'), F.col('speechiness'), F.col('tempo'), F.col('valence')).groupBy(F.col('artists')).avg()
artist_popularity = rename_columns(artist_popularity, new_names)


# Most Active Year for an artist
most_active_year = spark_df.select(F.explode(F.col('artists')).alias('artists'), F.col('year')).groupBy(F.col('artists'), F.col('year')).count().select(
    F.col('artists'), F.col('year'), F.col('count'), F.row_number().over(Window.partitionBy("artists").orderBy(F.col("count").desc())).alias("RowNumber")).filter(
        "RowNumber = 1").select(F.col("artists"), F.col("year"), F.col("count").alias("SongCount"))

# Number of Explicit Songs By an artist
explicit_song_count = spark_df.select(F.explode(F.col('artists')).alias('artists'), F.col('explicit')).groupBy(F.col('artists')).sum().alias('ExplicitCount')

# Most Popular Year
most_active_year = spark_df.select(F.explode(F.col('artists')).alias('artists'), F.col('year'), F.when(F.col('popularity') == 0, 0).otherwise(1).alias('Popular')).groupBy(
    F.col('artists'), F.col('year')).agg(F.sum("Popular").alias("Count")).select(
    F.col('artists'), F.col('year'), F.col('count'), F.row_number().over(Window.partitionBy("artists").orderBy(F.col("count").desc())).alias("RowNumber")).filter(
        "RowNumber = 1").select(F.col("artists"), F.col("year").alias("PopularYear"), F.col("count").alias("SongsIn100"))

# First and Last Song
artist_last_song = spark_df.select(F.explode(F.col('artists')).alias('artists'), F.col('name'), F.col('year'), F.col('release_date')).select(
                      F.col('artists'), F.col('name'), F.col('year'), F.col('release_date'),
                F.row_number().over(Window.partitionBy("artists").orderBy(F.col("year").desc(), F.col("release_date").desc())).alias("RowNumber")). \
                filter("RowNumber = 1").select(F.col("artists"), F.col("name").alias("LastSong"), F.col("year").alias("LastYear"))

artist_first_song = spark_df.select(F.explode(F.col('artists')).alias('artists'), F.col('name'), F.col('year'), F.col('release_date')).select(
                      F.col('artists'), F.col('name'), F.col('year'), F.col('release_date'),
                F.row_number().over(Window.partitionBy("artists").orderBy(F.col("year"), F.col("release_date"))).alias("RowNumber")). \
                filter("RowNumber = 1").select(F.col("artists"), F.col("name").alias("FirstSong"), F.col("year").alias("FirstYear"))

#distinct_artists.count(), artist_popularity.count(), most_active_year.count(), explicit_song_count.count(), most_active_year.count()


In [48]:
spark_df.select(F.explode(F.col('artists')).alias('artists'), F.col('name'), F.col('year'), F.col('release_date')).select(F.col('artists'), F.col('name'), F.col('year'), F.col('release_date'),
                F.row_number().over(Window.partitionBy("artists").orderBy(F.col("year").desc(), F.col("release_date").desc())).alias("RowNumber")). \
                filter("RowNumber = 1").select(F.col("artists"), F.col("name").alias("LastSong"), F.col("year").alias("LastYear")).show(truncate=False)

+-----------------------------------------------------------------+------------------------------------------------------------+----+------------+---------+
|artists                                                          |name                                                        |year|release_date|RowNumber|
+-----------------------------------------------------------------+------------------------------------------------------------+----+------------+---------+
|"Cats" 1981 Original London Cast                                 |The Rum Tum Tugger                                          |1981|1/1/81      |1        |
|"Cats" 1983 Broadway Cast                                        |The Journey To The Heaviside Layer                          |1983|1/1/83      |1        |
|"Fiddler On The Roof” Motion Picture Chorus                      |Sunrise, Sunset                                             |1971|8/24/71     |1        |
|"Joseph And The Amazing Technicolor Dreamcoat" 1992 Canad

In [7]:
spark_df.agg({"year": "avg", "duration_ms":"avg"}).collect()[0], spark_df.agg({"year": "min", "duration_ms":"min"}).collect()[0], spark_df.agg({"year": "max", "duration_ms":"max"}).collect()[0]

(Row(avg(duration_ms)=231406.1589733328, avg(year)=1977.2232312590857),
 Row(min(duration_ms)=5108, min(year)=1921),
 Row(max(duration_ms)=5403500, max(year)=2020))

In [18]:

# spark_df.select(F.col('popularity')).distinct().sort('popularity').collect()

# spark_df.groupBy(F.col('popularity')).count().show()

'Johannes Brahms'

+----------------+-------+-----+----------+
|          Artist|Average|Count|Popularity|
+----------------+-------+-----+----------+
| Johannes Brahms|    4.1|  313|      1282|
|    Morton Gould|    7.0|    8|        56|
|       B Jayamma|    0.0|   14|         0|
|Mixalis Thomakos|    0.0|   22|         0|
|      Alan Mills|   1.67|   18|        30|
+----------------+-------+-----+----------+



In [None]:
min_year = spark_df.agg({"year": "min"}).collect()[0]['min(year)']
max_year = spark_df.agg({"year": "max"}).collect()[0]['max(year)']
yearwise_artist_song_count = [defaultdict(lambda: 0) for _ in range(min_year, max_year)]
yearwise_most_active = [{'Name':[], 'Count':0, 'Year': i} for i in range(min_year, max_year)]

for row in spark_df.rdd.toLocalIterator():
  if row['artists'] is None:
    print(row)
    continue
  year = row['year']
  index = min_year - year
  for artist in row['artists']:
    yearwise_artist_song_count[index][artist] += 1
    if yearwise_most_active[index]['Count'] < yearwise_artist_song_count[index][artist]:
      yearwise_most_active[index]['Count'] = yearwise_artist_song_count[index][artist]
      yearwise_most_active[index]['Name'] = [artist]
    elif yearwise_most_active[index]['Count'] == yearwise_artist_song_count[index][artist]:
      yearwise_most_active[index]['Name'].append(artist)


Row(id='55oE1XL7HceNY7VR7Nz4yu', name="Sweet '69", artists=None, duration_ms=244507, release_date='1995', year=1995, acousticness=2.74e-05, danceability=0.355, energy=0.928, instrumentalness=0.134, liveness=0.316, loudness=-5.552, speechiness=0.056, tempo=144.858, valence=0.667, mode=0, key=9, popularity=40, explicit=0)
Row(id='0o0fnK2CqZvI3jONRDv6ax', name='Bruise Violet', artists=None, duration_ms=172267, release_date='8/7/92', year=1992, acousticness=3.14e-06, danceability=0.461, energy=0.812, instrumentalness=0.683, liveness=0.247, loudness=-9.686, speechiness=0.055, tempo=141.275, valence=0.629, mode=0, key=11, popularity=33, explicit=1)
Row(id='1Lf9G9klo4xMYimW5nP0Eq', name='Baile en el bosque/Eres t£ el Pr¡ncipe Azul', artists=None, duration_ms=209453, release_date='1/1/58', year=1958, acousticness=0.952, danceability=0.354, energy=0.219, instrumentalness=4.25e-05, liveness=0.219, loudness=-16.141, speechiness=0.0369, tempo=106.019, valence=0.32, mode=1, key=5, popularity=25, ex

In [None]:
yearwise_most_active

[{'Name': ['Lil Baby'], 'Count': 56, 'Year': 1921},
 {'Name': ['Juice WRLD'], 'Count': 30, 'Year': 1922},
 {'Name': ['Juice WRLD'], 'Count': 40, 'Year': 1923},
 {'Name': ['Lil Uzi Vert'], 'Count': 32, 'Year': 1924},
 {'Name': ['Drake'], 'Count': 40, 'Year': 1925},
 {'Name': ['Future'], 'Count': 36, 'Year': 1926},
 {'Name': ['Taylor Swift'], 'Count': 33, 'Year': 1927},
 {'Name': ['Drake'], 'Count': 36, 'Year': 1928},
 {'Name': ['Lana Del Rey'], 'Count': 52, 'Year': 1929},
 {'Name': ['Lil Wayne'], 'Count': 44, 'Year': 1930},
 {'Name': ['Drake'], 'Count': 39, 'Year': 1931},
 {'Name': ['Kid Cudi'], 'Count': 36, 'Year': 1932},
 {'Name': ['Lil Wayne'], 'Count': 36, 'Year': 1933},
 {'Name': ['Five Finger Death Punch'], 'Count': 23, 'Year': 1934},
 {'Name': ['Red Hot Chili Peppers'], 'Count': 30, 'Year': 1935},
 {'Name': ['Eminem'], 'Count': 42, 'Year': 1936},
 {'Name': ['George Strait'], 'Count': 51, 'Year': 1937},
 {'Name': ['50 Cent'], 'Count': 41, 'Year': 1938},
 {'Name': ['Johnny Cash'], 