In [8]:
# Import the required libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [10]:
# Step 1: Initialize a Spark session
spark = SparkSession.builder \
    .appName("Spotify_Read_Files") \
    .getOrCreate()

# Working on spotify_albums_data_2023.csv

## Cleaning the data

In [12]:
# Load the data of spotify-album_data_2023.csv
spotify_album_df=spark.read.format("csv")\
.option("header","true")\
.option("inferschema","true")\
.option("mode","PERMISSIVE")\
.load("C:/Users/vinishkumar.yadav/pyspark/spotify_dataset/data/spotify-albums_data_2023.csv")

In [None]:
# Display the data
spotify_album_df.show(10,False)

In [14]:
# Display the datatype of columns
spotify_album_df.printSchema()

root
 |-- track_name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- track_number: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- album_type: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- total_tracks: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- label: string (nullable = true)
 |-- album_popularity: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_0: string (nullable = true)
 |-- artist_1: string (nullable = true)
 |-- artist_2: string (nullable = true)
 |-- artist_3: string (nullable = true)
 |-- artist_4: string (nullable = true)
 |-- artist_5: string (nullable = true)
 |-- artist_6: string (nullable = true)
 |-- artist_7: string (nullable = true)
 |-- artist_8: string (nullable = true)
 |-- artist_9: string (nullable = true)
 |-- artist_10: string (nullable = true)
 |-- art

In [None]:
from IPython.display import display
import pandas as pd

# Convert to Pandas for better display
spotify_album_df_pd = spotify_album_df.toPandas()

# Display in JupyterLab with scroll
display(spotify_album_df_pd.head(10))  # Show first 10 rows


In [None]:
# Display the column names
spotify_album_df.columns

In [None]:
spotify_album_df.describe().show()

In [None]:
# Using pandas for better Display
display(spotify_album_df_pd.describe().head())

In [16]:
# Finding number of rows and columns

row_num_spotify_album=spotify_album_df.count()
column_num_spotify_album=len(spotify_album_df.columns)

print(f"Number of rows : {row_num_spotify_album}")
print(f"Number of columns : {column_num_spotify_album}")

Number of rows : 438973
Number of columns : 26


In [None]:
# Null Count in entire dataFrame
null_count_spotify_album = spotify_album_df.select([sum(col(i).isNull().cast("int")).alias(i) for i in spotify_album_df.columns])
#print(null_count_spotify_album.show())
null_count_spotify_album.show()

In [None]:
# Convert to Pandas for better display
null_count_spotify_album_df_pd = null_count_spotify_album.toPandas()

# Display in JupyterLab with scroll
display(null_count_spotify_album_df_pd.head())  # Show first 10 rows

In [None]:
# Empty Cells in entire dataFrame
empty_cell_count_spotify_album = spotify_album_df.select([sum(when(trim(col(i))=="",1).otherwise(0))\
                                                          .cast("int").alias(i) for i in spotify_album_df.columns])
#print(null_count_spotify_album.show())
empty_cell_count_spotify_album.show()

In [None]:
# Add all artist columns into one single "all_artist" column
new_column_spotify_album=spotify_album_df.withColumn("all_artist", concat(
                                                                    coalesce(col("artists"), lit("")),
                                                                    coalesce(col("artist_0"), lit("")),
                                                                    coalesce(col("artist_1"), lit("")),
                                                                    coalesce(col("artist_2"), lit("")),
                                                                    coalesce(col("artist_3"), lit("")),
                                                                    coalesce(col("artist_4"), lit("")),
                                                                    coalesce(col("artist_5"), lit("")),
                                                                    coalesce(col("artist_6"), lit("")),
                                                                    coalesce(col("artist_7"), lit("")),
                                                                    coalesce(col("artist_8"), lit("")),
                                                                    coalesce(col("artist_9"), lit("")),
                                                                    coalesce(col("artist_10"), lit("")),
                                                                    coalesce(col("artist_11"), lit(""))))

new_column_spotify_album.show()

In [None]:
# Convert to Pandas for better display
new_column_spotify_album_df_pd = new_column_spotify_album.toPandas()

# Display 
display(new_column_spotify_album_df_pd.head(10))  # Show first 10 rows


In [None]:
# Count the number of null values in "all_artist" column

selected_columns = ["all_artist"]
null_count_new_column_spotify_album_df = new_column_spotify_album.select([sum(col(i).isNull().cast("int")).alias(i) for i in selected_columns])
null_count_new_column_spotify_album_df.show()

In [None]:
# drop the unwanted columns
new_column_spotify_album = new_column_spotify_album.drop("artists","artist_0", "artist_1", "artist_2","artist_3",
                                                        "artist_4","artist_5","artist_6","artist_7","artist_8",
                                                        "artist_9","artist_10","artist_11","duration_ms")
new_column_spotify_album.show()

In [301]:
print(new_column_spotify_album.count())
print(len(new_column_spotify_album.columns))

438973
13


In [None]:
# Splitting release_date column into date only.
# Casting the datatype of release_date column from string into date.
new_column_spotify_album=new_column_spotify_album.withColumn('release_date',substring('release_date',1,10))\
                        .withColumn('release_date',col('release_date').cast('date'))

new_column_spotify_album.printSchema()

In [None]:
new_column_spotify_album.show(5)

In [313]:
unique_rows = new_column_spotify_album.dropDuplicates()

In [315]:
print(unique_rows.count())
print(len(unique_rows.columns))

438973
13


In [None]:
# Null Count in entire dataFrame
new_column_spotify_album.select([sum(col(i).isNull().cast("int"))
                                 .alias(i) for i in new_column_spotify_album.columns]).show()


## Transformation on data

In [None]:
# renaming the dataframe
cleaned_spotify_album_df=new_column_spotify_album

In [None]:
# display the column name
cleaned_spotify_album_df.columns

#### Find all the album_types

In [None]:
# Find all the albumns in album_types column
unique_album_types_count=cleaned_spotify_album_df.select("album_type").distinct()
unique_album_types_count.show(truncate=False)
print(f"There are {unique_album_types_count.count()} types of albums")

#### Find total tracks in each album type.

In [None]:
# count of total_tracks for each album_type
# Group by 'album_type' and sum the 'total_tracks'
album_type_sum = cleaned_spotify_album_df.groupBy('album_type').agg(
    sum('total_tracks').alias('sum_total_tracks'))

# Show the result
album_type_sum.show()

In [None]:
# Count the number of null values in "sum_total_tracks" column
selected_columns = ["sum_total_tracks"]
null_count2 = album_type_sum.select([sum(col(i).isNull().cast("int")).alias(i) for i in selected_columns])
null_count2.show()

#### Highest number of tracks released in album type(album,single,compilation)

In [None]:
# count of top 3 total_tracks for album_type
top_3_counts1 = album_type_sum.orderBy(col('sum_total_tracks').desc())\
                    .where(col('album_type').isin('album','single','compilation'))                                                                                                                 

# Show the result
top_3_counts1.show()

#### Find the number of labels in the dataset.

In [None]:
# Find the distinct lables in lable column
unique_lables=cleaned_spotify_album_df.select("label").distinct()
unique_lables.show(truncate=False)
print(f"There are {unique_lables.count()} types of labels")

In [None]:
# show data of 'album', 'single',"compilation" in album_type column
label_df = cleaned_spotify_album_df.filter(cleaned_spotify_album_df.album_type.isin('album', 'single',"compilation"))
label_df.show()

#### Labels which has released single,album and compilation tracks and their count.

In [None]:
# Find those labels which has release single, albums and compilation songs in album_type column
# Also find the count of tracks released by those labels
labels_album_type_count = label_df.groupBy("album_type",'label').count().orderBy(col('album_type').desc())

# Display the result
labels_album_type_count.show(10,False)

#### Display the artist_name, track_name,label wrt popularity.

In [None]:
# Sorting the track_name,album_name,label,all_artist by album_popularity

artist_name = cleaned_spotify_album_df.withColumn("album_popularity",col("album_popularity").cast("integer"))
# Filter to exclude NULL values in 'album_popularity', then select and order
filtered_df = (artist_name
                .filter(cleaned_spotify_album_df.album_popularity.isNotNull())  # Exclude rows where album_popularity is NULL
                .select('track_name', 'album_name', 'label', 'all_artist', 'album_popularity')  # Select desired columns
                .orderBy(col('album_popularity').desc()))  # Order by album_popularity in descending order


# Show the result
filtered_df.show()


#### Album release over the year.

In [None]:
# Extract the year from the release_date column
album_releases_by_year = cleaned_spotify_album_df.withColumn('release_year', year(col('release_date')))

# Group by 'release_year' and count the number of albums released each year
album_releases_count = album_releases_by_year.groupBy('release_year').count().orderBy('release_year')

# Show the result
album_releases_count.show(truncate=False)


In [None]:
# Filter the dataset to include only 'album', 'single', and 'compilation'
album_releases_filtered = cleaned_spotify_album_df.filter(col('album_type').isin('album', 'single', 'compilation'))

album_releases_by_year = album_releases_filtered.withColumn('release_year',year(col('release_date')))

# Group by 'album_type' and collect the 'release_date' for each type
album_releases_by_type = album_releases_by_year.groupBy('album_type','release_date').count().orderBy("release_date")

# Show the result
album_releases_by_type.show(truncate=False)


# Working on spotify_artist_data_2023.csv

## Cleaning the data

In [18]:
# Load the data of spotify-album_data_2023.csv
spotify_artist_df=spark.read.format("csv")\
.option("header","true")\
.option("inferschema","true")\
.option("mode","PERMISSIVE")\
.load("C:/Users/vinishkumar.yadav/pyspark/spotify_dataset/data/spotify_artist_data_2023.csv")

In [None]:
spotify_artist_df.show(10)

In [20]:
# Display the datatype of columns
spotify_artist_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artist_popularity: integer (nullable = true)
 |-- artist_genres: string (nullable = true)
 |-- followers: string (nullable = true)
 |-- genre_0: string (nullable = true)
 |-- genre_1: string (nullable = true)
 |-- genre_2: string (nullable = true)
 |-- genre_3: string (nullable = true)
 |-- genre_4: string (nullable = true)
 |-- genre_5: string (nullable = true)
 |-- genre_6: string (nullable = true)



In [None]:
# Display the column names
spotify_artist_df.columns

In [None]:
# Display the column names
spotify_artist_df.describe().show()

In [None]:
# Finding number of rows and columns
row_num_spotify_artist=spotify_artist_df.count()
column_num_spotify_artist=len(spotify_artist_df.columns)

print(f"Number of rows : {row_num_spotify_artist}")
print(f"Number of columns : {column_num_spotify_artist}")

In [None]:
# Null Count in entire dataFrame
null_count_spotify_artist = spotify_artist_df.select([sum(col(i).isNull().cast("int")).alias(i) for i in spotify_artist_df.columns])
null_count_spotify_artist.show()

In [None]:
# Empty Cells in entire dataFrame
empty_cell_count_spotify_artist = spotify_artist_df.select([sum(when(trim(col(i))=="",1).otherwise(0))\
                                                          .cast("int").alias(i) for i in spotify_artist_df.columns])
empty_cell_count_spotify_artist.show()

In [None]:
# Proportion of each column that is a null value
proportion = spotify_artist_df.select(
    [(sum(col(i).isNull().cast("int"))/37012).alias(i) for i in spotify_artist_df.columns])

proportion.show()

The proportion of null values among the "genre" type variables is too high that makes these columns not useful 
for further analysis, therefore, we will drop them

In [None]:
# drop the unwanted columns
new_column_spotify_artist = spotify_artist_df.drop("genre_0","genre_1", "genre_2", "genre_3","genre_4",
                                                        "genre_5","genre_6")

new_column_spotify_artist.show()

In [None]:
# Change the datatype of followers column frong string to integer.
new_column_spotify_artist=new_column_spotify_artist.withColumn("followers",col("followers").cast("int"))

# Working on spotify_tracks_data_2023.csv

## Cleaning the data

In [22]:
spotify_track_df=spark.read.format("csv")\
                     .option("header","true")\
                     .option("mode","PERMISSIVE")\
                     .option("inferschema","true")\
                     .load("C:/Users/vinishkumar.yadav/pyspark/spotify_dataset/data/spotify_tracks_data_2023.csv")

In [None]:
spotify_track_df.show()

In [None]:
# Display the columns of dataset
spotify_track_df.columns

In [24]:
# Display the datatype of columns
spotify_track_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- explicit: boolean (nullable = true)



In [None]:
# Find rows and columns
row_count_spotify_track=spotify_track_df.count()
column_count_spotify_track=len(spotify_track_df.columns)

print(f"Number of rows    : {row_count_spotify_track}")
print(f"Number of columns : {column_count_spotify_track}")

In [None]:
# Find the null values in the entire dataframe
null_count_spotify_track=spotify_track_df.select([sum(col(i).isNull().cast("int")).alias(i) for i in spotify_track_df.columns])
null_count_spotify_track.show()

# Working on spotify_feature_data_2023.csv

## Cleaning the data

In [26]:
spotify_feature_df=spark.read.format("csv")\
                     .option("header","true")\
                     .option("mode","PERMISSIVE")\
                     .option("inferschema","true")\
                     .load("C:/Users/vinishkumar.yadav/pyspark/spotify_dataset/data/spotify_features_data_2023.csv")

In [None]:
spotify_feature_df.show()

In [None]:
# Display the columns of dataset
spotify_feature_df.columns

In [None]:
# Find rows and columns
row_count_spotify_feature=spotify_feature_df.count()
column_count_spotify_feature=len(spotify_feature_df.columns)

print(f"Number of rows    : {row_count_spotify_feature}")
print(f"Number of columns : {column_count_spotify_feature}")

In [28]:
# Display the datatype of columns
spotify_feature_df.printSchema()

root
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- uri: string (nullable = true)
 |-- track_href: string (nullable = true)
 |-- analysis_url: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- time_signature: integer (nullable = true)



In [None]:
# Find the null values in the entire dataframe
null_count_spotify_feature=spotify_feature_df.select([sum(col(i).isNull().cast("int")).alias(i) for i in spotify_feature_df.columns])
null_count_spotify_feature.show()

In [None]:
# drop the unwanted columns
new_column_spotify_feature = spotify_feature_df.drop("type","uri", "track_href", "analysis_url")

new_column_spotify_feature.show()

# Transformation on spotify_artist_data_2023.csv

#### Top 5 artists based on total number of followers.

In [None]:
top_5_artist=new_column_spotify_artist.select('followers','name')\
                                      .orderBy(col("followers").desc()).limit(5)
top_5_artist.show()

#### Top 5 artists based on average popularity rating.

In [None]:
top_5_artist_avg = new_column_spotify_artist.groupBy("name").agg(avg("artist_popularity").alias("avg_popularity"))\
                                            .orderBy(col("avg_popularity").desc()).limit(5)
top_5_artist_avg.show()

####  Who are the artists in the top 50 and how many songs to they have in this list?

In [None]:
new_df_join = new_column_spotify_artist.join(new_column_spotify_album,new_column_spotify_album.artist_id==new_column_spotify_artist.id,"inner")

In [None]:
top_50_artist = new_df_join.groupBy("artist_id","name").agg(count("track_id").alias("total_tracks")).orderBy(col("total_tracks").desc()).limit(50)
top_50_artist.show()

# Transformation on spotify_track_data_2023.csv

#### How has the proportion of explicit content behaved over the years?

In [None]:
spotify_track_df
new_column_spotify_album

In [215]:
spotify_track_join = spotify_track_df.join(new_column_spotify_album, new_column_spotify_album.track_id==spotify_track_df.id,"inner")

In [110]:
explicit_behaviour = spotify_track_join.withColumn("release_year", year("release_date"))\
                                            .groupBy("explicit","release_year").agg(count(col("explicit")).alias("explicit_count"))\
                                            .orderBy(col("release_year"))

In [None]:
explicit_behaviour.show(5,False)

#### Create a new variable to track the proportion of tracks with explicit content, among the total tracks per year.

In [None]:
total_tracks_per_year = spotify_track_join.withColumn("release_year", year("release_date"))\
                                        .filter(col("release_year").isNotNull())\
                                        .groupBy("release_year","explicit").agg(sum(col("total_tracks")).alias("total_tracks")
                                                                                ,count(col("explicit")).alias("explicit_count"))\
                                        .orderBy(col("release_year"))

# Add a new column for the proportion of explicit tracks
total_tracks_per_year1 = total_tracks_per_year.withColumn("explicit_proportion",
    round(when((col("total_tracks")>0) & (col("explicit")==1), col("explicit_count")/col("total_tracks"))\
    .otherwise((col("total_tracks")-col("explicit_count"))/col("total_tracks")),2))

total_tracks_per_year1.show()

#### How has the duration of tracks changed over the years?

In [None]:
# Extracting the year from yyyy-mm-dd
df_of_year = new_column_spotify_album.withColumn("release_year", year("release_date"))

# Filter out those records which does not contain any null value
df_of_year= df_of_year.filter(col("duration_sec").isNotNull() & col("release_year").isNotNull())

average_duration = df_of_year.groupBy("release_year").agg(round(avg("duration_sec"),2)
                                                          .alias("average_duration"))\
                                                          .orderBy(col("release_year"))
average_duration.show()

In [None]:
window_track= Window.orderBy("release_year")
average_duration_lag = average_duration.withColumn("previous_year_avg_duration(sec)", (lag(col("average_duration"),1)
                                                  .over(window_track)))
average_duration_lag.show()

In [None]:
track_changed=average_duration_lag\
    .withColumn("%loss_gain", round(((col("average_duration")-col("previous_year_avg_duration(sec)"))/col("average_duration"))*100,2))
track_changed.show()

#### Let's check the trends from the 60's!(	Year	id	duration_sec	explicit	% explicit)

In [None]:
spotify_track_join.printSchema()

In [None]:
explicit_count_60s = spotify_track_join.withColumn("year", year("release_date"))\
                                       .where((col("year")>=1960) & (col("year")<=1969))\
                                       .groupBy("year").agg(count("explicit").alias("explicit_count"))
explicit_count_60s.show(10)

In [283]:
trend_from_60s=spotify_track_join.withColumn("year", year("release_date"))\
                                 .where((col("year")>=1960) & (col("year")<=1969))\
                                 .join(explicit_count_60s ,on="year", how="inner")\
                                 .select("year","track_id","duration_sec","explicit_count")\
                                 .orderBy(col("year"))
trend_from_60s.show()

+----+--------------------+------------+--------------+
|year|            track_id|duration_sec|explicit_count|
+----+--------------------+------------+--------------+
|1960|2HjGXzYim1CmvBzrZ...|      144.36|           357|
|1960|78Au1HXTWKHdXzbbK...|     267.266|           357|
|1960|3PiFCNfm0IIwaBRkd...|     307.834|           357|
|1960|5dJEMlIHCUqWiCS4D...|      273.04|           357|
|1960|3L4bHmQjrCbDeT5X6...|     204.892|           357|
|1960|4Vg6HLivX0drxFGqg...|     231.106|           357|
|1960|5VdHiODOghO0bAM1u...|     128.639|           357|
|1960|3yGzvEA8cWSkpMeHA...|     260.322|           357|
|1960|1tI5GZsOxD3o3UvD6...|     220.093|           357|
|1960|0PPROHFTk7JWRfyN8...|     217.901|           357|
|1960|67oPAbYskbUApqr0C...|     173.501|           357|
|1960|1ICJNLZSsYl450ojf...|     323.266|           357|
|1960|7vdCM2IJiFOiVoBH5...|      70.333|           357|
|1960|6ypuIs8m35PRm9Ps3...|      108.96|           357|
|1960|2DPjsMnKG6qsYDPeU...|     200.933|        