In [54]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pyarrow==1.0.0
!pip install pandas
!pip install numpy==1.19.5

In [55]:
import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [56]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("MusicAnalysis") \
    .getOrCreate()


In [57]:
music_df = spark.read.csv("/content/music_streaming.csv", header=True, inferSchema=True)

In [58]:
music_df.printSchema()

root
 |-- Artist Name: string (nullable = true)
 |-- Track Name: string (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_in min/ms: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- Genre: double (nullable = true)



## Data Processing

### Missing Values

In [59]:
from pyspark.sql.functions import isnan, when, count , col

# Check for missing values
missing_counts = music_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in music_df.columns])
missing_counts.show()

+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|Artist Name|Track Name|Popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|duration_in min/ms|time_signature|Genre|
+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|          0|         0|       394|           0|     0|1743|       0|   0|          0|           0|            3587|       0|      0|    0|                 0|             0|    0|
+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+



In [60]:
# Drop rows where Popularity is missing
music_df = music_df.na.drop(subset=["Popularity"])

In [61]:
# Drop the 'key' column
music_df = music_df.drop("key")

Here we are calculating the mean of the 'instrumentalness' column for each group of rows grouped by 'Artist Name'. Then, we fill the missing (NaN) values in the 'instrumentalness' column with the mean of each artist's instrumentalness.

In [62]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when
from pyspark.sql.window import Window

In [63]:
# Impute missing values in 'instrumentalness' with the mean of the instrumentalness of each artist
window = Window.partitionBy("Artist Name")
music_df = music_df.withColumn("instrumentalness",
                                when(col("instrumentalness").isNull(),
                                     avg(col("instrumentalness")).over(window))
                                .otherwise(col("instrumentalness")))

I fill missing values in the 'instrumentalness' column of my DataFrame with the mean of all non-missing values in that column and then count how many missing values remain.

In [64]:
 # Impute missing values in 'instrumentalness' with the mean of all instrumentalness values
instrumentalness_mean = music_df.select(avg("instrumentalness")).collect()[0][0]
music_df = music_df.withColumn("instrumentalness",
                                when(col("instrumentalness").isNull(),
                                     instrumentalness_mean)
                                .otherwise(col("instrumentalness")))

In [65]:
# Check the data types of each column
music_df.printSchema()

root
 |-- Artist Name: string (nullable = true)
 |-- Track Name: string (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_in min/ms: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- Genre: double (nullable = true)



In [66]:
from pyspark.sql.functions import isnan, when, count

# Check for missing values
missing_counts = music_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in music_df.columns])
missing_counts.show()

+-----------+----------+----------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|Artist Name|Track Name|Popularity|danceability|energy|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|duration_in min/ms|time_signature|Genre|
+-----------+----------+----------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|          0|         0|         0|           0|     0|       0|   0|          0|           0|               0|       0|      0|    0|                 0|             0|    0|
+-----------+----------+----------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+



### Duplicates

In [67]:
from pyspark.sql.functions import col

# Check number of duplicate rows before removing duplicates
print("Number of duplicate rows before removing duplicates:", music_df.count() - music_df.dropDuplicates(["Track Name", "Artist Name"]).count())

# Remove duplicate rows based on "Track Name" and "Artist Name" columns
df = music_df.dropDuplicates(["Track Name", "Artist Name"])

# Check number of duplicate rows after removing duplicates
print("Number of duplicate rows after removing duplicates:", df.count())


Number of duplicate rows before removing duplicates: 1583
Number of duplicate rows after removing duplicates: 13540


### Transformation

Transform duration to minutes only

In [68]:
from pyspark.sql.functions import col

# Convert "duration_in min/ms" column to minutes only
music_df = music_df.withColumn("duration_in_min", col("duration_in min/ms") / 60000)

# Drop the original "duration_in min/ms" column
music_df = music_df.drop("duration_in min/ms")

In [69]:
from pyspark.sql.functions import col

# Convert specified columns to double data type
music_df = music_df.withColumn("Popularity", col("Popularity").cast("double"))
music_df = music_df.withColumn("danceability", col("danceability").cast("double"))
music_df = music_df.withColumn("energy", col("energy").cast("double"))
music_df = music_df.na.drop(subset=["Popularity","danceability","energy"])


In [70]:
music_df.printSchema()

root
 |-- Artist Name: string (nullable = true)
 |-- Track Name: string (nullable = true)
 |-- Popularity: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- Genre: double (nullable = true)
 |-- duration_in_min: double (nullable = true)



a) Which genre has the highest average popularity?

In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc

In [72]:
genre_avg_popularity = music_df.groupBy('Genre').agg(avg(col('Popularity')).alias('AvgPopularity'))
highest_avg_popularity_genre = genre_avg_popularity.orderBy(desc('AvgPopularity')).first()['Genre']
print("Genre with highest average popularity:", highest_avg_popularity_genre)


Genre with highest average popularity: 4.0


b) Display which artists have recorded the most number of songs with a duration of more than 5 minutes

In [73]:
artists_most_songs_gt_5mins = music_df.filter(col('duration_in_min') > 5).groupBy('Artist Name').count().orderBy(desc('count'))
print("Artists with most songs longer than 5 minutes:")
artists_most_songs_gt_5mins.show()

Artists with most songs longer than 5 minutes:
+--------------------+-----+
|         Artist Name|count|
+--------------------+-----+
|           Metallica|   21|
|                TOOL|   13|
|         Arcade Fire|   12|
|        Led Zeppelin|   11|
|           Pearl Jam|    9|
|           Aerosmith|    9|
|               Kyuss|    6|
|Kenny Wayne Shepherd|    6|
|       Wooden Shjips|    6|
|         Sonic Youth|    6|
|             Pantera|    6|
|       Joe Bonamassa|    6|
|               Opeth|    5|
|                  U2|    5|
|         Patti Smith|    5|
|        Dire Straits|    5|
|   Avenged Sevenfold|    5|
|      Monster Magnet|    5|
|     Bernard Allison|    5|
| Derek & The Dominos|    5|
+--------------------+-----+
only showing top 20 rows



c) How many songs are included in every Genre?


In [74]:
songs_count_per_genre = music_df.groupBy('Genre').count().orderBy('Genre')
print("Number of songs per Genre:")
songs_count_per_genre.show()

Number of songs per Genre:
+-----+-----+
|Genre|count|
+-----+-----+
|  0.0|  517|
|  1.0| 1241|
|  2.0| 1169|
|  3.0|  358|
|  4.0|  357|
|  5.0| 1189|
|  6.0| 2223|
|  7.0|  440|
|  8.0| 1675|
|  9.0| 1768|
| 10.0| 4185|
+-----+-----+



d) Which artists dominated the charts?

In [75]:
from pyspark.sql.functions import col, count, desc
# Find the dominant artists based on the count of their songs
dominant_artists = music_df.groupBy('Artist Name').agg(count('Track Name').alias('SongCount')).orderBy(desc('SongCount'))
print("Dominant artists based on song count:")
dominant_artists.show()

Dominant artists based on song count:
+--------------------+---------+
|         Artist Name|SongCount|
+--------------------+---------+
|     Backstreet Boys|       66|
|      Britney Spears|       43|
|  The Rolling Stones|       32|
|                  U2|       29|
|           Metallica|       27|
|            Westlife|       27|
|     Lata Mangeshkar|       23|
|               AC/DC|       23|
|      The Black Keys|       22|
|       Fleetwood Mac|       22|
|             Nirvana|       22|
|        Led Zeppelin|       22|
|       Mohammed Rafi|       21|
|            Coldplay|       20|
|       Kishore Kumar|       20|
|           Pearl Jam|       18|
|The Smashing Pump...|       17|
|           Van Halen|       17|
|Creedence Clearwa...|       17|
|           Aerosmith|       17|
+--------------------+---------+
only showing top 20 rows



e) Recommend at least 5 fun/not-boring songs that can be played at a party, you can use features like
energy, danceability etc.. to represent cheerfulness.

In [76]:
party_songs_recommendation = music_df.filter((col('energy') > 0.7) & (col('danceability') > 0.7)).orderBy(desc('Popularity')).limit(5)
print("Recommended party songs:")
party_songs_recommendation.show()

Recommended party songs:
+-------------+--------------------+----------+------------+------+--------+----+-----------+------------+-----------------+--------+-------+-------+--------------+-----+--------------------+
|  Artist Name|          Track Name|Popularity|danceability|energy|loudness|mode|speechiness|acousticness| instrumentalness|liveness|valence|  tempo|time_signature|Genre|     duration_in_min|
+-------------+--------------------+----------+------------+------+--------+----+-----------+------------+-----------------+--------+-------+-------+--------------+-----+--------------------+
|     Måneskin|             Beggin'|     100.0|       0.714|   0.8|  -4.808| 0.0|     0.0504|       0.127|0.160956687886367|   0.359|  0.589|134.002|           4.0|  9.0|5.876666666666666E-5|
|     Doja Cat|Kiss Me More (fea...|      98.0|       0.762| 0.701|  -3.541| 1.0|     0.0286|       0.235|          1.58E-4|   0.123|  0.742|110.968|           4.0|  5.0|   3.481116666666667|
|   Ed Sheeran|

# ML MODELS

In [77]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Select the relevant columns from the DataFrame
selected_df = music_df.select('Popularity', 'danceability', 'energy', 'duration_in_min', 'Genre')

# Split the data into training and testing sets
train_data, test_data = selected_df.randomSplit([0.7, 0.3], seed=42)

# Assemble features vector
assembler = VectorAssembler(inputCols=['Popularity', 'danceability', 'energy', 'duration_in_min'], outputCol='features')
assembled_train_data = assembler.transform(train_data)
assembled_test_data = assembler.transform(test_data)

# Logistic Regression classifier
lr = LogisticRegression(labelCol='Genre', featuresCol='features', maxIter=10)
lr_pipeline = Pipeline(stages=[lr])
lr_model = lr_pipeline.fit(assembled_train_data)
lr_predictions = lr_model.transform(assembled_test_data)

# Random Forest classifier
rf = RandomForestClassifier(labelCol='Genre', featuresCol='features', numTrees=10)
rf_pipeline = Pipeline(stages=[rf])
rf_model = rf_pipeline.fit(assembled_train_data)
rf_predictions = rf_model.transform(assembled_test_data)

# Decision Tree classifier
dt = DecisionTreeClassifier(labelCol='Genre', featuresCol='features')
dt_pipeline = Pipeline(stages=[dt])
dt_model = dt_pipeline.fit(assembled_train_data)
dt_predictions = dt_model.transform(assembled_test_data)

# Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol='Genre', predictionCol='prediction', metricName='accuracy')

lr_accuracy = evaluator.evaluate(lr_predictions)
rf_accuracy = evaluator.evaluate(rf_predictions)
dt_accuracy = evaluator.evaluate(dt_predictions)

print("Logistic Regression Accuracy:", lr_accuracy)
print("Random Forest Accuracy:", rf_accuracy)
print("Decision Tree Accuracy:", dt_accuracy)

Logistic Regression Accuracy: 0.4020828616708173
Random Forest Accuracy: 0.4077428118632556
Decision Tree Accuracy: 0.39687570749377404


## Insights

Random Forest achieved the highest accuracy (42%) in predicting song genres based on provided features, outperforming Logistic Regression (41%) and Decision Tree (40%) models.