## Introduction to Data Science Assignment 3
__Parham Javan 810800008<br>__
__Yaser Azad 810800003__

## Install & Import Libraries

In [31]:
import importlib.util

# Check if PySpark is installed
if importlib.util.find_spec("pyspark") is None:
    # Install PySpark
    !pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month
from pyspark.sql.functions import mean, stddev
from pyspark.sql.functions import to_date
from pyspark.sql.functions import min, max, median, avg, format_number
from pyspark.sql.functions import col, when
from pyspark.sql.functions import split, explode


For better visualization of dataframes in jupyter notebooks, since they are rendered to html

In [32]:
from IPython.display import display, HTML
display(HTML('<style>pre { white-space: pre !important; }</style>'))


## Main Task

---
<br>
Step 1: Check the Schema

In [33]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Spotify Analysis") \
    .getOrCreate()

# Read the parquet file into a DataFrame
df = spark.read.parquet("spotify.parquet")

# Print the schema
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- album: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- artist_ids: string (nullable = true)
 |-- track_number: long (nullable = true)
 |-- disc_number: long (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: long (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- year: long (nullable = true)
 |-- release_date: string (nullable = true)



---
<br>
Step 2: Preprocess Columns

In [34]:
# Convert 'release_date' to date type
df = df.withColumn('release_date', to_date(df['release_date']))
column_data_type = df.schema['release_date'].dataType
print("Data type of column '{}': {}".format('release_date', column_data_type))

Data type of column 'release_date': DateType()


---
<br>
Step 3: Aggregation, Filtering, and Transformation

Aggregation

In [35]:
# Aggregate statistics for danceability and energy features by year
aggregate_statistics = df.groupBy('year').agg(
    format_number(min('danceability'), 4).alias('min_danceability'),
    format_number(max('danceability'), 4).alias('max_danceability'),
    format_number(median('danceability'), 4).alias('median_danceability'),
    format_number(avg('danceability'), 4).alias('avg_danceability'),
    format_number(min('energy'), 4).alias('min_energy'),
    format_number(max('energy'), 4).alias('max_energy'),
    format_number(median('energy'), 4).alias('median_energy'),
    format_number(avg('energy'), 4).alias('avg_energy'),
    format_number(avg('duration_ms'), 4).alias('avg_duration_ms')
)
print("\nAverage Characteristics of Songs Per Year:")
aggregate_statistics.show()


Average Characteristics of Songs Per Year:
+----+----------------+----------------+-------------------+----------------+----------+----------+-------------+----------+---------------+
|year|min_danceability|max_danceability|median_danceability|avg_danceability|min_energy|max_energy|median_energy|avg_energy|avg_duration_ms|
+----+----------------+----------------+-------------------+----------------+----------+----------+-------------+----------+---------------+
|1950|          0.0650|          0.7820|             0.3540|          0.3613|    0.0015|    0.9620|       0.1640|    0.2412|   228,289.9317|
|1936|          0.3440|          0.8340|             0.6580|          0.6403|    0.0463|    0.7390|       0.2120|    0.2470|   164,607.3012|
|1951|          0.1350|          0.8260|             0.5230|          0.5097|    0.0108|    0.8630|       0.2940|    0.3305|   193,564.6571|
|1958|          0.0000|          0.8960|             0.4300|          0.4269|    0.0020|    0.9810|       0.30

---
Filtering

In [36]:
# Filter the dataset to exclude the songs with explicit content
non_explicit_songs = df.select('name', 'album', 'artists', 'duration_ms', 'release_date').filter(df['explicit'] == False)
print("\ndataset excluding explicit songs:")
non_explicit_songs.show()


dataset excluding explicit songs:
+--------------------+--------------------+--------------------+-----------+------------+
|                name|               album|             artists|duration_ms|release_date|
+--------------------+--------------------+--------------------+-----------+------------+
|             Testify|The Battle Of Los...|['Rage Against Th...|     210133|  1999-11-02|
|    Calm Like a Bomb|The Battle Of Los...|['Rage Against Th...|     298893|  1999-11-02|
|Sleep Now In the ...|The Battle Of Los...|['Rage Against Th...|     205600|  1999-11-02|
|Born of a Broken Man|The Battle Of Los...|['Rage Against Th...|     280960|  1999-11-02|
|      Born As Ghosts|The Battle Of Los...|['Rage Against Th...|     202040|  1999-11-02|
|               Maria|The Battle Of Los...|['Rage Against Th...|     228093|  1999-11-02|
|Voice of the Voic...|The Battle Of Los...|['Rage Against Th...|     151573|  1999-11-02|
|New Millennium Homes|The Battle Of Los...|['Rage Against Th...| 

---
Transformation

In [37]:
# Convert duration from milliseconds to minutes
df_added = df.select('name', 'album', 'artists', 'duration_ms', 'release_date').withColumn('song_length_minutes', format_number(col('duration_ms') / 60000, 4))
df_added.show()

+--------------------+--------------------+--------------------+-----------+------------+-------------------+
|                name|               album|             artists|duration_ms|release_date|song_length_minutes|
+--------------------+--------------------+--------------------+-----------+------------+-------------------+
|             Testify|The Battle Of Los...|['Rage Against Th...|     210133|  1999-11-02|             3.5022|
|     Guerrilla Radio|The Battle Of Los...|['Rage Against Th...|     206200|  1999-11-02|             3.4367|
|    Calm Like a Bomb|The Battle Of Los...|['Rage Against Th...|     298893|  1999-11-02|             4.9816|
|           Mic Check|The Battle Of Los...|['Rage Against Th...|     213640|  1999-11-02|             3.5607|
|Sleep Now In the ...|The Battle Of Los...|['Rage Against Th...|     205600|  1999-11-02|             3.4267|
|Born of a Broken Man|The Battle Of Los...|['Rage Against Th...|     280960|  1999-11-02|             4.6827|
|      Bor

---
Transformation

In [39]:
# Create a new binary feature 'long_song' indicating whether the song duration is above a certain threshold (e.g., 15 minutes).
threshold = 15 * 60 * 1000  # 5 minutes in milliseconds
df_added = df_added.withColumn('long_song', when(df['duration_ms'] > threshold, 1).otherwise(0))

print("\ndataset with added \'song_length_minutes\' and \'long_song\' columns:")
df_added.show()

# Count the number of long songs in the DataFrame
count_long_songs = df_added.filter(df_added['long_song'] == 1).count()
print("Number of long songs:", count_long_songs)

# Count the total number of songs in the DataFrame
total_songs_count = df_added.count()
print("Total number of songs:", total_songs_count)

print("Percetage of long song:", (count_long_songs/total_songs_count)*100 , "%")


dataset with added 'song_length_minutes' and 'long_song' columns:
+--------------------+--------------------+--------------------+-----------+------------+-------------------+---------+
|                name|               album|             artists|duration_ms|release_date|song_length_minutes|long_song|
+--------------------+--------------------+--------------------+-----------+------------+-------------------+---------+
|             Testify|The Battle Of Los...|['Rage Against Th...|     210133|  1999-11-02|             3.5022|        0|
|     Guerrilla Radio|The Battle Of Los...|['Rage Against Th...|     206200|  1999-11-02|             3.4367|        0|
|    Calm Like a Bomb|The Battle Of Los...|['Rage Against Th...|     298893|  1999-11-02|             4.9816|        0|
|           Mic Check|The Battle Of Los...|['Rage Against Th...|     213640|  1999-11-02|             3.5607|        0|
|Sleep Now In the ...|The Battle Of Los...|['Rage Against Th...|     205600|  1999-11-02|    

---
<br>
Step 4: Dealing with Array Columns

In [40]:
# For example, if we want to explode the 'artists' array to get each artist in a separate row
# Split the 'artists' string by comma and create an array
df_with_array = df.withColumn('artists_array', split(df['artists'], ', '))

# Explode the array to get each artist in a separate row
df_exploded = df_with_array.withColumn('artist', explode(df_with_array['artists_array']))

# Drop the intermediate 'artists_array' column
df_exploded = df_exploded.drop('artists_array')

# Show the resulting DataFrame
print("\nExploded DataFrame with Each Artist in a Separate Row:")
df_exploded = df_exploded.select('name', 'album', 'artist', 'duration_ms', 'release_date')
df_exploded.show()


Exploded DataFrame with Each Artist in a Separate Row:
+--------------------+--------------------+--------------------+-----------+------------+
|                name|               album|              artist|duration_ms|release_date|
+--------------------+--------------------+--------------------+-----------+------------+
|             Testify|The Battle Of Los...|['Rage Against Th...|     210133|  1999-11-02|
|     Guerrilla Radio|The Battle Of Los...|['Rage Against Th...|     206200|  1999-11-02|
|    Calm Like a Bomb|The Battle Of Los...|['Rage Against Th...|     298893|  1999-11-02|
|           Mic Check|The Battle Of Los...|['Rage Against Th...|     213640|  1999-11-02|
|Sleep Now In the ...|The Battle Of Los...|['Rage Against Th...|     205600|  1999-11-02|
|Born of a Broken Man|The Battle Of Los...|['Rage Against Th...|     280960|  1999-11-02|
|      Born As Ghosts|The Battle Of Los...|['Rage Against Th...|     202040|  1999-11-02|
|               Maria|The Battle Of Los...|[

---
<br>
Step 5: Top-K Records

In [41]:
# Step 5: Top-K Records

# For example, let's find the top 20 songs based on valence
top_songs = df.select('name', 'album', 'artists', 'duration_ms', 'release_date').orderBy(df['valence'].desc()).limit(20)
print("\nTop 20 Songs based on Valence:")
top_songs.show()

# Stop the SparkSession
spark.stop()



Top 20 Songs based on Valence:
+--------------------+--------------------+--------------------+-----------+------------+
|                name|               album|             artists|duration_ms|release_date|
+--------------------+--------------------+--------------------+-----------+------------+
|         Breakbeat 4|Best Of Neverendi...|        ['Dj Swamp']|      32000|  2005-08-18|
|           Crag Lake|          Buck Fever|   ['Estradasphere']|      48147|  2001-01-01|
|The Drunken Landlady|    The Family Album|   ['The McCarthys']|      79960|  2010-05-08|
|  The Whistling Song|        Wild Animals|['The Pinker Tones']|     232507|  2008-06-03|
|Let It Snoki Doki...|         8-Bit Jesus|  ['Doctor Octoroc']|      66960|  2008-12-20|
|               Genie|History: Mission ...|  ['The Gravel Pit']|      25867|  1997-01-01|
|       In This World|            Shine On|             ['Lea']|     379173|  2005-08-11|
|         La Petacona|Country a la Mexi...|['Cuhamileros de ...|    