In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pandas

In [61]:
import findspark
findspark.init()

In [62]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr,when

In [None]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [63]:
spark

Loading data into a Pandas Dataframe then into a Spark DataFrame

In [64]:
music_streaming = pd.read_csv('music_streaming.csv')
sdf = spark.createDataFrame(music_streaming) 

In [65]:
sdf.printSchema()
sdf.show(10)
print('Rows',sdf.count()) #15517 before cleaning

root
 |-- Artist Name: string (nullable = true)
 |-- Track Name: string (nullable = true)
 |-- Popularity: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_in min/ms: double (nullable = true)
 |-- time_signature: long (nullable = true)
 |-- Genre: long (nullable = true)

+--------------------+--------------------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+------------------+--------------+-----+
|         Artist Name|          Track Name|Popularity|danceability|energy| key|loudness|mode|sp

In [66]:
#data cleaning 

# handling invalid rows and Null Values this is provided by spark
sdf1=spark.read.option("header",True).option("mode",'DROPMALFORMED').csv('music_streaming.csv')

#showing missing values in all columns
from pyspark.sql.functions import col,isnan, when, count
sdf1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sdf1.columns]
   ).show()

#the mising values didnt exceed  even 10% inn any of the 3 columns that have them so 
#we wont remove them just fill with 0
sdf1 = sdf1.fillna(0)


# droping Duplicates
sdf1 = sdf1.dropDuplicates()  # Drop duplicate rows
sdf1=sdf1.dropDuplicates(['Track Name'])



+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|Artist Name|Track Name|Popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|duration_in min/ms|time_signature|Genre|
+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+
|          0|         0|       394|           0|     0|1743|       0|   0|          0|           0|            3587|       0|      0|    0|                 0|             0|    0|
+-----------+----------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-----+------------------+--------------+-----+



In [67]:
sdf1.count() #13021 rows after droping duplicates

13021

In [68]:
# Import required libraries
from pyspark.sql.types import StringType



# Convert milliseconds to minutes
#if i understand right the values of duration have some in mins and some in ms
#So my logic is that if the value bigger than 1000 then definetly thats ms so i transform it to min else we leave it as it is

#so here i put a new column that if the value of duration_in min/ms bigger than 100O we put the old value/60000 
#otherwise we put the same old value
sdf1 = sdf1.withColumn("duration_in min", when(col("duration_in min/ms") > 10000, col("duration_in min/ms") / 60000).otherwise(col("duration_in min/ms")))

#we drop the old column as we dont need it anymore
sdf1 =sdf1.drop('duration_in min/ms')
sdf1.show(10)

# Classification

#mapping = {   0: "Rock",1: "Indie",2: "Alt",3: "Pop",4: "Metal",5: "HipHop",6: "Alt_Music",7: "Blues",8: "Acoustic/Folk",9: "Instrumental",10: "Country"}

sdf1 = sdf1.withColumn("Genre", when(col("Genre") == 0, "Rock").when(col("Genre") == 1, "Indie").when(col("Genre") == 2, "Alt").when(col("Genre") == 3, "Pop")
.when(col("Genre") == 4, "Metal").when(col("Genre") == 5, "HipHop").when(col("Genre") == 6, "Alt_Music").when(col("Genre") == 7, "Blues").when(col("Genre") == 8, "Acoustic/Folk")
.when(col("Genre") == 9, "Instrumental").when(col("Genre") == 10, "Country").otherwise(col("Genre")))



sdf1.show(10)


+--------------------+--------------------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----+------------------+
|         Artist Name|          Track Name|Popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|Genre|   duration_in min|
+--------------------+--------------------+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----+------------------+
|Shaarib Toshi, Ar...|"Bandeya (feat. A...|        60|       0.465| 0.551|   7|   -6.58|   1|     0.0362|       0.619|        6.03E-06|   0.109|  0.415| 89.459|             4|    3|       3.077766667|
|B Praak, Tanishk ...|"Baras Baras (Fro...|        54|       0.493| 0.402|   2|  -6.629|   1|     0.0309|       0.619|            null|  0.0629|  0.431| 155.57|             4|    3|       3.335916

In [69]:
#b

#removing songs that exceeds 5 mins using the filter function where the column duration is smaller than 5 so we neglect all
#those who exceed 5
Duration = sdf1.filter(col('duration_in min/ms') <= 5)
Duration.show(10)


+--------------------+--------------------+----------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+------------+---------------+
|         Artist Name|          Track Name|Popularity|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|       Genre|duration_in min|
+--------------------+--------------------+----------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+------------+---------------+
|Shaarib Toshi, Ar...|"Bandeya (feat. A...|        60|       0.465| 0.551|  7|   -6.58|   1|     0.0362|       0.619|        6.03E-06|   0.109|  0.415| 89.459|             4|         Pop|    3.077766667|
|B Praak, Tanishk ...|"Baras Baras (Fro...|        54|       0.493| 0.402|  2|  -6.629|   1|     0.0309|       0.619|            null|  0.0629|  0.431| 155.57|             4|         P

In [70]:
#c

#filtering with column Artist Name in here isin() loops on the array 
#and checks if the value of the Name is one of those 
Names = sdf1.filter(col('Artist Name').isin(['J. Cole', 'Novo Amor', 'Anson Seabra']))
Names.show(10)



+------------+--------------------+----------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+------------+------------------+
| Artist Name|          Track Name|Popularity|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|       Genre|   duration_in min|
+------------+--------------------+----------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+------------+------------------+
|     J. Cole|     9 5 . s o u t h|        84|       0.713| 0.793|  2|  -5.277|   1|      0.397|       0.271|            null|   0.727|  0.203| 71.724|             4|      HipHop|3.2824333333333335|
|     J. Cole|           a m a r i|        86|       0.725| 0.713|  4|  -6.173|   0|      0.187|       0.127|            null|   0.617|  0.207| 65.988|             4|Instrumental|2.4736833333333332|
|    

In [71]:
#d

#group by the Genre(category) and .counts to get how many occurences per this category
Category = sdf1.groupBy('Genre').count()
Category.show()


+-------------+-----+
|        Genre|count|
+-------------+-----+
|          Alt| 1066|
|      117.017|    1|
| Instrumental| 1529|
|    Alt_Music| 1757|
|        Metal|  355|
|         Rock|  529|
|        Blues|  439|
|Acoustic/Folk| 1504|
|          Pop|  370|
|      Country| 3514|
|       HipHop| 1121|
|        Indie|  836|
+-------------+-----+



In [72]:
#e

from pyspark.sql.functions import desc


#displaying the artists with the most songs for example from the results Britney spears has 52 songs
#so we group ny the Artist Name and the agg.count to count how many times it occured the we sort 
#descendingly to take the top 5 limit

TopArtists = sdf1.groupby(['Artist Name'])\
.agg({"Artist Name": "count"})\
.sort("count(Artist Name)", ascending=False)\
.limit(5).show()


+------------------+------------------+
|       Artist Name|count(Artist Name)|
+------------------+------------------+
|   Backstreet Boys|                62|
|    Britney Spears|                48|
|          Westlife|                45|
|The Rolling Stones|                30|
|                U2|                27|
+------------------+------------------+



In [73]:
#F

#ordering the dataframe by Popularity then taking the 1St 10 songs 
Top_10_Songs = sdf1.orderBy(desc('Popularity')).limit(10)
#selecting just the 'Artist Name' and the 'Track Name'from it
Top_10_Songs.select('Artist Name','Track Name').show()


#Top_10_Songs.select('Artist Name','Track Name','Popularity').show()


+--------------------+--------------------+
|         Artist Name|          Track Name|
+--------------------+--------------------+
|      Olivia Rodrigo|            good 4 u|
|            Doja Cat|Kiss Me More (fea...|
|The Kid LAROI, Ju...|STAY (with Justin...|
|     Los Legendarios|                Fiel|
|Justin Bieber, Da...|Peaches (feat. Da...|
|                 BTS| Permission to Dance|
|      Rauw Alejandro|          Todo De Ti|
|      Olivia Rodrigo|             deja vu|
|               Riton|Friday (feat. Muf...|
|          The Weeknd|Save Your Tears (...|
+--------------------+--------------------+



In [74]:
#g

#just same as F but here with column danceability instead of popularity
party = sdf1.orderBy(desc('danceability')).limit(5)
party.select('Track Name', 'Artist Name').show()

#party.select('Artist Name','Track Name','danceability').show()



+--------------------+---------------+
|          Track Name|    Artist Name|
+--------------------+---------------+
|      Gucci Umbrella|Whookilledkenny|
|Divine Gosa - Swi...|      Radioclit|
|           kawamurra|        sunflwr|
|  Dancing in My Room|       347aidan|
|    OG Bobby Johnson|           QUE.|
+--------------------+---------------+

