<b>Data Loading and Schema Understanding</b>


In [14]:
#Start Spark Session

from pyspark.sql import SparkSession
from pyspark.sql.functions import col          # Import col function
from pyspark.sql.functions import avg          # Import avg function

spark = SparkSession.builder.appName('lab3').getOrCreate()

In [15]:
#loading the data

path = 'gs://dsa_lab3/spotify_songs.csv'
file_type = 'csv'

#csv Options
infer_schema = 'true'
first_row_is_header = 'true'
delimiter = ','

#import csv

df = spark.read.format(file_type)\
    .option('inferSchema', infer_schema)\
    .option('header', first_row_is_header)\
    .option('sep', delimiter)\
    .load(path)

                                                                                

In [16]:
df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- track_album_release_date: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- playlist_genre: string (nullable = true)
 |-- playlist_subgenre: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)


In [17]:
#change datatypes of 'danceability', 'energy', 'loudness'

columns = ['danceability', 'energy', 'loudness']

for column in columns:
    df = df.withColumn(column, col(column).cast('double'))

df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- track_album_release_date: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- playlist_genre: string (nullable = true)
 |-- playlist_subgenre: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)


<b>Data Aggregation</b>

In [18]:
#Calculate the average danceability, energy, and tempo of tracks by artist

average_by_artist = df.groupBy('track_artist').agg(
    avg('danceability').alias('avg_danceability'),
    avg('energy').alias('avg_energy'),
    avg('tempo').alias('avg_tempo')
)

# Show the result
average_by_artist.show()


[Stage 6:>                                                          (0 + 1) / 1]

+---------------+-------------------+------------------+------------------+
|   track_artist|   avg_danceability|        avg_energy|         avg_tempo|
+---------------+-------------------+------------------+------------------+
|      Lil Nas X| 0.7744999999999999| 0.618590909090909|145.49290909090908|
|   INTERSECTION| 0.6513333333333334|0.7153333333333333|           110.282|
|       CHUNG HA|             0.6915|0.8160000000000001|          117.4565|
|      Henri PFR|              0.688|             0.693|           114.051|
|         *NSYNC|              0.581|0.8040000000000002|           132.168|
|      TheLavish|              0.699|             0.803|           106.025|
|         Grimes|             0.5555|0.7853333333333333| 141.9238333333333|
|     Eurythmics|  0.668142857142857|0.7522857142857143|126.45485714285714|
|   Shawn Desman|              0.767|             0.633|           120.031|
|     Snoop Dogg| 0.7402777777777779|0.7422500000000002| 98.20511111111111|
|     Chroma

                                                                                

In [19]:
#Identify the top 5 artists with the highest average track popularity.

# Change track_popularity to double data type
df = df.withColumn('track_popularity', col('track_popularity').cast('double'))

# Calculate average track popularity by artist
average_popularity_by_artist = df.groupBy('track_artist').agg(
    avg('track_popularity').alias('avg_track_popularity')
)

# Identify the top 5 artists with the highest average track popularity
top_artists = average_popularity_by_artist.orderBy(col('avg_track_popularity').desc()).limit(5)

# Show the result
top_artists.show()


[Stage 9:>                                                          (0 + 1) / 1]

+-------------+--------------------+
| track_artist|avg_track_popularity|
+-------------+--------------------+
|Trevor Daniel|                97.0|
|          Y2K|                91.0|
|  Don Toliver|   90.71428571428571|
|  Roddy Ricch|   88.21052631578948|
|       DaBaby|   87.85714285714286|
+-------------+--------------------+



                                                                                

<b>Data Transformation</b>

In [20]:
from pyspark.sql.functions import when

# Define a new column 'energy_level' based on energy values
df = df.withColumn('energy_level', when(col('energy') > 0.8, 'High Energy').otherwise('Regular Energy'))


In [21]:
# Calculate average popularity and loudness for each energy_level
average_by_energy_level = df.groupBy('energy_level').agg(
    avg('track_popularity').alias('avg_popularity'),
    avg('loudness').alias('avg_loudness')
)

# Show the result
average_by_energy_level.show()




+--------------+-----------------+------------------+
|  energy_level|   avg_popularity|      avg_loudness|
+--------------+-----------------+------------------+
|   High Energy|38.10813030385984|-4.875180217173095|
|Regular Energy|44.66595044344884|-7.636742925067416|
+--------------+-----------------+------------------+



                                                                                

<b>Data Exporting</b>

In [27]:
# Filter the DataFrame to get only 'High Energy' songs
df_high_energy = df.filter(col('energy_level') == 'High Energy')

# Save the filtered DataFrame to the specified path with header
df_high_energy.coalesce(1).write.csv("gs://dsa_lab3/high_energy_songs.csv", header = True)

print("High Energy songs have been written")

                                                                                

High Energy songs have been written
