# Spotify charts - Exploratory Data Analysis (EDA)

## Preparing PySpark

In [1]:
# Installing and importing packages

!pip install pyspark==3.2.1
import pyspark
import pandas as pd

# Importing important classes of Spark SQL and DataFrames

from pyspark.sql import SparkSession # Main entry point for DataFrame and SQL functionality
import pyspark.sql.types as t # List of data types avaiable
import pyspark.sql.functions as f # List of built-in functions avaiable for DataFrame 
from pyspark.sql.functions import date_format

Defaulting to user installation because normal site-packages is not writeable


In this project, we're going to use Spark (large dataset, with more than 28 milions lines) and SQL to explore the dataset.

The first step working with Spark, is create a instance of class "Spark Session". 

In [2]:
# Building a Spark session - entry point to underlying Spark funtionality
spark = (SparkSession.builder.config("spark.driver.memory","4g").config("spark.driver.maxResultSize", "4g").getOrCreate())

In [3]:
# Importing dataset (with spark)
data_spk = spark.read.csv(path = 'charts.csv', inferSchema = True, header = True)

In [None]:
# Looking into the data types, all of then are 'string'. We need to use the correct type to perform our analysis.
# This are the data types supported by Spark DataFrames and SQL: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
data_spk

In [4]:
# Duplicating column 'date', to use two different date formats
data_spk = data_spk.withColumn('date_complete', data_spk.date)

In [5]:
# Apllying data typ
data_spk = data_spk.withColumn('rank', f.col('rank').cast(t.IntegerType())).withColumn('date', f.col('date').cast(t.DateType())).withColumn('date', date_format(f.col("date"), "yyyy-MM").alias("date_format")).withColumn('streams', f.col('streams').cast(t.IntegerType()))

In [None]:
# Verifying the data types of 'rank', 'date' and 'streams'
data_spk

To save our intermediare queries and results, we create a temp table (process data much faster).
It's important understand that temporary tables are available just within the current session.

In [6]:
# Setting a temp table
data_spk.registerTempTable('data_temp')



# Exploratory Data Analysis with PySpark - SQL

**Period of analysis - Spotify Charts**

In [7]:
spark.sql('''
SELECT MIN(date) begin, MAX(date) end 
FROM data_temp 
WHERE chart = 'top200';
''').toPandas()

Unnamed: 0,begin,end
0,2017-01,2021-12


## Artists analysis

**Artist with most different tracks on Top 200 - by day**

In [18]:
artist_tracks_appear_day = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT artist, region, date_complete as date, count(DISTINCT title) as n
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY region, date_complete, artist
        ORDER BY n DESC
    )
    SELECT artist, region, n, date, ROW_NUMBER() OVER (PARTITION BY region, date Order by n DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
artist_tracks_appear_day.to_csv('data_tableau/artist_tracks_appear_day.csv')

**Artist with most different tracks on Top 200 - by month**

In [16]:
artist_tracks_appear_month = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT artist, region, date, count(DISTINCT title) as n
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY region, date, artist
        ORDER BY n DESC
    )
    SELECT artist, region, n, date, ROW_NUMBER() OVER (PARTITION BY region, date Order by n DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
artist_tracks_appear_month.to_csv('data_tableau/artist_tracks_appear_month.csv')

**Artist with most different tracks on Top 200 - by year**

In [20]:
artist_tracks_appear_year = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT artist, region, YEAR(date) as year, count(DISTINCT title) as n
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY region, artist, YEAR(date)
        ORDER BY n DESC
    )
    SELECT artist, region, n, year, ROW_NUMBER() OVER (PARTITION BY region, year Order by n DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
artist_tracks_appear_year.to_csv('data_tableau/artist_tracks_appear_year.csv')

**Artist appearances on Top 200 - by year**

In [22]:
artist_appear_year = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT artist, region, YEAR(date) as year, count(artist) as n
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY artist, YEAR(date), region
        ORDER BY n DESC
    )
    SELECT artist, region, n, year, ROW_NUMBER() OVER (PARTITION BY region, year Order by n DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
artist_appear_year.to_csv('data_tableau/artist_appear_year.csv')

**Artist with the most number of music streams - by year (1 stream  = 1 song that has been played over 30 sec)**

In [26]:
artist_stream_year = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT artist, region, YEAR(date) as year, sum(streams) as stream
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY artist, region, YEAR(date)
        ORDER BY stream DESC
    )
    SELECT artist, region, stream, year, ROW_NUMBER() OVER (PARTITION BY region, year Order by stream DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
artist_stream_year.to_csv('data_tableau/artist_stream_year.csv')

**Artists to achieve Top 1 the most times**

In [28]:
artist_first_rank = spark.sql('''
SELECT artist, region, YEAR(date) as date, count(artist) as n
FROM data_temp
WHERE chart = 'top200'
AND rank = 1
GROUP BY artist, region, YEAR(date)
ORDER BY n DESC
''').toPandas()

In [None]:
# Saving the dataset as a csv file
artist_first_rank.to_csv('data_tableau/artist_first_rank.csv')

**Artists to achieve Top 1 the most times (region independent)**

In [None]:
artist_first_rank_indep_region = spark.sql('''
SELECT artist, date, count(artist) as n
FROM data_temp
WHERE chart = 'top200'
AND rank = 1
AND region <> "Global"
GROUP BY artist, date
ORDER BY n DESC
''').toPandas()

In [None]:
# Saving the dataset as a csv file
artist_first_rank_indep_region.to_csv('data_tableau/artist_first_rank_indep_region.csv')

## Song analysis

**Songs with the most appearances by year**

In [30]:
song_appear_year = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT title, artist, region, YEAR(date) as year, count(title) as n
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY title, artist, region, YEAR(date)
        ORDER BY n DESC
    )
    SELECT title, artist, region, year, n, ROW_NUMBER() OVER (PARTITION BY region, year Order by n DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 20;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
song_appear_year.to_csv('data_tableau/song_appear_year.csv')

**Songs with the most number of streams on single year**

In [36]:
song_stream_year = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT title, artist, region, YEAR(date) as year, sum(streams) as stream
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY title, artist, region, YEAR(date)
        ORDER BY stream DESC
    )
    SELECT title, artist, region, year, stream, ROW_NUMBER() OVER (PARTITION BY region, year Order by stream DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
song_stream_year.to_csv('data_tableau/song_stream_year.csv')

**Songs with the most number of streams on single month**

In [34]:
song_stream_month = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT title, artist, region, date, sum(streams) as stream
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY title, artist, region, date
        ORDER BY stream DESC
    )
    SELECT title, artist, region, date, stream, ROW_NUMBER() OVER (PARTITION BY region, date Order by stream DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
song_stream_month.to_csv('data_tableau/song_stream_month.csv')

**Songs with the most number of streams on a single day**

In [32]:
song_stream_day = spark.sql('''
WITH CTE AS
(
    WITH CTE2 AS 
    (
        SELECT  title, artist, region, sum(streams) as stream, date_complete as date
        FROM data_temp 
        WHERE chart = 'top200'
        GROUP BY date_complete, title, artist, region
        ORDER BY stream DESC
    )
    SELECT title, artist, region, stream, date, ROW_NUMBER() OVER (PARTITION BY region, date Order by stream DESC) AS Rank FROM CTE2 
)
SELECT * FROM CTE WHERE Rank <= 10;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
song_stream_day.to_csv('data_tableau/song_stream_day.csv')

**Songs that stayed at Top 1 the most times**

In [38]:
songs_first_rank = spark.sql('''
SELECT  title, artist, region, date, count(title) as n
FROM data_temp 
WHERE chart = 'top200'
AND rank = 1
GROUP BY title, artist, region, date
ORDER BY n DESC;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
songs_first_rank.to_csv('data_tableau/songs_first_rank.csv')

**Songs that stayed at Top 1 the most times (region independent)**

In [None]:
songs_first_rank_indep_region = spark.sql('''
SELECT  title, artist, date, count(title) as n
FROM data_temp 
WHERE chart = 'top200'
AND rank = 1
AND region <> 'Global'
GROUP BY title, artist, date
ORDER BY n DESC;
''').toPandas()

In [None]:
# Saving the dataset as a csv file
songs_first_rank_indep_region.to_csv('data_tableau/songs_first_rank_indep_region.csv')

**KPI - Streams/population (map plot)**

In [40]:
streams_population = spark.sql('''
SELECT region, YEAR(date), sum(streams) as stream
FROM data_temp
WHERE chart = 'top200'
AND region <> "Global"
GROUP BY region, YEAR(date)
ORDER BY stream DESC
''').toPandas()

In [42]:
# Uploading a dataset with values of the population of each country
country_population = pd.read_excel('country_population.xlsx', dtype = {'Country Name': str, 'Population': str})

In [43]:
# Changing variable names
country_population = country_population.rename(columns={'Country Name':'region', 'Population':'population'})

In [44]:
# Applying left join on both dataframes
streams_population = pd.merge(streams_population, country_population, on = 'region', how = 'left')

In [45]:
streams_population.head(10)

Unnamed: 0,region,year(date),stream,population
0,United States,2018,30785180823,331893745
1,United States,2019,29791300658,331893745
2,United States,2020,28685986160,331893745
3,United States,2021,27093009816,331893745
4,United States,2017,25763351668,331893745
5,Brazil,2021,16426316020,214326223
6,Brazil,2020,12523039097,214326223
7,Mexico,2020,11678777603,126705138
8,Mexico,2021,11229884263,126705138
9,Brazil,2019,11105842661,214326223


In [None]:
# Saving the dataset as a csv file
streams_population.to_csv('data_tableau/streams_population.csv')