In [7]:
# Part 1: Data Exploration with Apache Spark (10%) 
#Data Loading and Schema Understanding (2%) Load the dataset into a Spark DataFrame. 
#Print the schema and verify the data types of each column
#starting spark session
from pyspark.sql import SparkSession
spark =SparkSession.builder.appName("spark-spotify").getOrCreate()

In [8]:
#loading the data
#data is in the bucket
path = 'gs://hive-buckets/spotify_songs.csv'
file_type = "csv"

#csv file
infer_schema = 'true'
first_row_is_header = 'true'
delimiter = ','

#import csv
df= spark.read.format(file_type)\
    .option("inferSchema",infer_schema)\
    .option("header",first_row_is_header)\
    .option("sep",delimiter)\
    .load(path)

In [9]:
#total number of rows in spotify data
total_rows=df.count()
print("total number of rows in spotify data:",total_rows) 

total number of rows in spotify data: 32833


In [10]:
df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- track_album_release_date: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- playlist_genre: string (nullable = true)
 |-- playlist_subgenre: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)


In [38]:
df.head() #grab the first row of your data

Row(track_id='6f807x0ima9a1j3VPbc7VN', track_name="I Don't Care (with Justin Bieber) - Loud Luxury Remix", track_artist='Ed Sheeran', track_popularity=66, track_album_id='2oCs0DGTsRO98Gh5ZSl2Cx', track_album_name="I Don't Care (with Justin Bieber) [Loud Luxury Remix]", track_album_release_date='2019-06-14', playlist_name='Pop Remix', playlist_id='37i9dQZF1DXcZDD7cfEKhW', playlist_genre='pop', playlist_subgenre='dance pop', danceability=0.748, energy=0.916, key='6', loudness='-2.634', mode='1', speechiness=0.0583, acousticness=0.102, instrumentalness=0.0, liveness=0.0653, valence=0.518, tempo=122.036, duration_ms=194754.0)

In [45]:
#2.
#Data Aggregation (3%)

#Calculate the average danceability, energy, and tempo of tracks by artist.

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.types import DoubleType
df = df.withColumn("danceability", df["danceability"].cast(DoubleType()))
df = df.withColumn("energy", df["energy"].cast(DoubleType()))
#average_ = (df.groupBy("track_artist").avg("danceability","energy", "tempo"))
average_D_E_Tstats = (df.agg(avg("danceability"), avg("energy"), avg("tempo")))
average_D_E_Tstats.show()

+------------------+------------------+------------------+
| avg(danceability)|       avg(energy)|        avg(tempo)|
+------------------+------------------+------------------+
|0.6548413911824773|0.6987159026254852|120.82878928517026|
+------------------+------------------+------------------+



In [63]:
#Identify the top 5 artists with the highest average track popularity
from pyspark.sql.functions import avg, col
avg_pop = (
    df.groupBy("track_artist")
    .agg(avg("track_popularity"))
)
top5 = avg_pop.orderBy(col("avg(track_popularity)").desc()).limit(5)
top5.show()

[Stage 35:>                                                         (0 + 2) / 2]

+-------------+---------------------+
| track_artist|avg(track_popularity)|
+-------------+---------------------+
|Trevor Daniel|                 97.0|
|          Y2K|                 91.0|
|  Don Toliver|    90.71428571428571|
|  Roddy Ricch|    88.21052631578948|
|       DaBaby|    87.85714285714286|
+-------------+---------------------+



                                                                                

In [None]:
#3. Data Transformation (3%)• Create a new column called “energy_level” that classifies tracks as'High Energy' (energy > 0.8) or 'Regular Energy' (energy ≤ 0.8)
#Group the data by this new energy classification and calculate theaverage popularity and loudness for each energy_level

In [74]:
from pyspark.sql.functions import when,avg, col
df = df.withColumn("energy_level", when(col("energy") > 0.8, "High Energy").otherwise("Regular Energy"))
group_energy_level = (df.groupBy("energy_level").agg(avg("track_popularity"), avg("loudness")))
group_energy_level.show()

[Stage 38:>                                                         (0 + 2) / 2]

+--------------+---------------------+------------------+
|  energy_level|avg(track_popularity)|     avg(loudness)|
+--------------+---------------------+------------------+
|   High Energy|    38.10813030385984|-4.875180217173095|
|Regular Energy|    44.66595044344884|-7.636742925067416|
+--------------+---------------------+------------------+



                                                                                

In [77]:
# Data Exporting (2%)• Export the data that have been classified as 'High Energy'
from pyspark.sql.functions import when,avg, col
high_energy=df.filter(col("energy_level") == "High Energy")
high_energy.show
high_energy.write.csv("/path/to/high_energy", header=True)

AnalysisException: path hdfs://cluster-nov-m/path/to/high_energy already exists.