In [40]:
# Import any necessary libraries / modules
import os
import sys
import timeit

import pandas as pd # we only use pandas as examples to compare to
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, when, split
from pyspark.sql.types import IntegerType, FloatType


In [24]:
# Create a path to the dataset and read into our Spark Session

DATA_PATH = os.path.join("..", "datasets", "tracks_features.csv")
print(DATA_PATH)

spark = SparkSession.builder.appName("SongRecommendation").getOrCreate()

'''
First exercise is to just compare execution time between Pandas and PySpark...

There is already a meaningful difference due to dataset size of 1.2million rows, but remember 
that Spark is being run locally (single cluster) and our dataset is still relatively small 
(i.e. 350mb vs hundreds of gigs or tb in real-world applications)
'''
# Function to read with PySpark
def read_with_spark():
    df = spark.read.csv(DATA_PATH)

# Function to read with Pandas
def read_with_pandas():
    df = pd.read_csv(DATA_PATH)

# Measure execution time
spark_time = timeit.timeit(read_with_spark, number=1)  # Run once
pandas_time = timeit.timeit(read_with_pandas, number=1)  # Run once

# Print results
print(f"Time taken with PySpark: {spark_time:.4f} seconds")
print(f"Time taken with Pandas: {pandas_time:.4f} seconds")

../datasets/tracks_features.csv
Time taken with PySpark: 1.3483 seconds
Time taken with Pandas: 3.2269 seconds


In [46]:
# Let's ready our data in memory and get ready for data processing
df = spark.read.csv(DATA_PATH, header=True, inferSchema=True)
# Show the first 5 tracks
df.show(5)

                                                                                

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-----------+--------+------------+------------------+---+-------------------+----+-----------+------------+----------------+-------------------+-------+-----------------+-----------+--------------+----+------------+
|                  id|                name|               album|            album_id|             artists|          artist_ids|track_number|disc_number|explicit|danceability|            energy|key|           loudness|mode|speechiness|acousticness|instrumentalness|           liveness|valence|            tempo|duration_ms|time_signature|year|release_date|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-----------+--------+------------+------------------+---+-------------------+----+-----------+------------+----------------+-------------------+----

In [None]:
# Collect general information on original dataset
nrows, ncols = df.count(), len(df.columns)
print(f"Data contains {nrows} rows and {ncols} cols")
df.printSchema() # <-- uh-oh, all data types are string...


Data contains 1204025 rows and 24 cols
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- album: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- artist_ids: string (nullable = true)
 |-- track_number: string (nullable = true)
 |-- disc_number: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- time_signature: string (nullable = true)
 |-- year: string (nullable = true)
 |-- release_date: string (nulla

In [48]:
'''
Drop rows with missing values and drop duplicate rows w/ matching artist_ids and case-insensitive names

NOTE: .count() requires computation across whole cluster, so probably inefficient in real 
world to query often 
'''

df = (df
      .dropna()
      .withColumn("name_lower", lower(col("name")))
      .dropDuplicates(["name_lower", "artist_ids"])
      .drop("name_lower")
)

nrows, ncols = df.count(), len(df.columns)            
print(f"Data contains {nrows} rows and {ncols} cols")




Data contains 1138490 rows and 24 cols


                                                                                

In [None]:
# We were unable to infer the schema when reading the CSV with PySpark
# Let's cast these columns to their correct types

# Cast explicit to an IntType
bool_col = "explicit"
df = df.withColumn(bool_col, when(col(bool_col) == "True", 1)
                                    .when(col(bool_col) == "False", 0)
                                    .otherwise(0)) 

# Convert string representation of list into array of strings
df = df.withColumn("artists", split(col("artists"), ",\s*")) \
       .withColumn("artist_ids", split(col("artist_ids"), ",\s*"))

# Cast the other numerical columns
int_cols = ["track_number", "disc_number", "key", "mode", "duration_ms", "year"]
float_cols = ["danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "time_signature"]

for col_name in int_cols:
    df = df.withColumn(col_name, col(col_name).cast(IntegerType()))

for col_name in float_cols:
    df = df.withColumn(col_name, col(col_name).cast(FloatType()))

df.printSchema()
df.show(5)

