In [3]:
%pip install pandas pycountry

Collecting pandas
  Using cached pandas-2.2.3-cp313-cp313-win_amd64.whl.metadata (19 kB)
Collecting pycountry
  Using cached pycountry-24.6.1-py3-none-any.whl.metadata (12 kB)
Collecting numpy>=1.26.0 (from pandas)
  Using cached numpy-2.2.3-cp313-cp313-win_amd64.whl.metadata (60 kB)
Collecting pytz>=2020.1 (from pandas)
  Using cached pytz-2025.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Using cached tzdata-2025.1-py2.py3-none-any.whl.metadata (1.4 kB)
Using cached pandas-2.2.3-cp313-cp313-win_amd64.whl (11.5 MB)
Using cached pycountry-24.6.1-py3-none-any.whl (6.3 MB)
Using cached numpy-2.2.3-cp313-cp313-win_amd64.whl (12.6 MB)
Using cached pytz-2025.1-py2.py3-none-any.whl (507 kB)
Using cached tzdata-2025.1-py2.py3-none-any.whl (346 kB)
Installing collected packages: pytz, tzdata, pycountry, numpy, pandas
Successfully installed numpy-2.2.3 pandas-2.2.3 pycountry-24.6.1 pytz-2025.1 tzdata-2025.1
Note: you may need to restart the kernel to use upda

In [1]:
import time
import pandas as pd
import pycountry

In [10]:
# Start timing the entire process
start_time = time.time()

# Step 1: Load datasets
load_start = time.time()
spotify_df = pd.read_csv("universal_top_spotify_songs.csv")
age_df = pd.read_csv("MedianAge.csv")
load_end = time.time()
print(f"Time to load datasets: {load_end - load_start:.2f} seconds")

# Step 2: Convert country codes to names using pycountry
convert_start = time.time()
def code_to_name(code):
    try:
        return pycountry.countries.get(alpha_2=code).name
    except:
        return None  # Handle invalid or missing codes

spotify_df["country_name"] = spotify_df["country"].apply(code_to_name)
convert_end = time.time()
print(f"Time to convert country codes: {convert_end - convert_start:.2f} seconds")

# Step 3: Calculate average tempo per country
tempo_start = time.time()
avg_tempo_per_country = spotify_df.groupby("country_name")["tempo"].mean().reset_index()
tempo_end = time.time()
print(f"Time to calculate average tempo: {tempo_end - tempo_start:.2f} seconds")

# Step 4: Extract most recent average age (2025)
age_start = time.time()
age_df["average_age"] = age_df["2025"]
age_df = age_df[["Country", "average_age"]]
age_end = time.time()
print(f"Time to extract average age: {age_end - age_start:.2f} seconds")

# Step 5: Merge datasets
merge_start = time.time()
merged_df = pd.merge(avg_tempo_per_country, age_df, left_on="country_name", right_on="Country")
merge_end = time.time()
print(f"Time to merge datasets: {merge_end - merge_start:.2f} seconds")

# Step 6: Calculate deviation (difference between tempo and age)
deviation_start = time.time()
merged_df["deviation"] = merged_df["tempo"] - merged_df["average_age"]
deviation_end = time.time()
print(f"Time to calculate deviation: {deviation_end - deviation_start:.2f} seconds")

# End timing the entire process (before plots are displayed)
end_time = time.time()
print(f"\nTotal execution time: {end_time - start_time:.2f} seconds")


Time to load datasets: 7.21 seconds
Time to convert country codes: 1.82 seconds
Time to calculate average tempo: 0.11 seconds
Time to extract average age: 0.01 seconds
Time to merge datasets: 0.02 seconds
Time to calculate deviation: 0.01 seconds

Total execution time: 9.17 seconds


In [8]:
!pip install -q findspark
import findspark
findspark.init()


[notice] A new release of pip is available: 23.1.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method
spark

In [10]:
import multiprocessing
print(multiprocessing.cpu_count())

12


In [7]:
!pip install pyspark
!pip install pycountry




[notice] A new release of pip is available: 23.1.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.1.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [11]:
# Load the CSV file
file_path = "new_spotify.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Inspect the schema and data
df.printSchema()
df.show(5)


# Step 3: Convert DataFrame to RDD and process
spotify_rdd = df.rdd

# Step 4: Inspect the processed data
for row in spotify_rdd.take(5):
    print(row)

root
 |-- spotify_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- daily_rank: string (nullable = true)
 |-- daily_movement: string (nullable = true)
 |-- weekly_movement: string (nullable = true)
 |-- country: string (nullable = true)
 |-- snapshot_date: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- is_explicit: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- album_release_date: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = t

In [22]:
import time
from pyspark.sql import SparkSession
import pycountry

# Reuse the existing SparkSession and SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext  # Get the existing SparkContext

# Start timing the entire process
start_time = time.time()

# Step 1: Load datasets
load_start = time.time()
spotify_rdd = sc.textFile("new_spotify.csv")
age_rdd = sc.textFile("MedianAge.csv")
load_end = time.time()
print(f"Time to load datasets: {load_end - load_start:.2f} seconds")

# Step 2: Convert country codes to names using pycountry
convert_start = time.time()
def code_to_name(code):
    try:
        return pycountry.countries.get(alpha_2=code).name
    except:
        return None  # Handle invalid or missing codes

# Remove header and map Spotify data to (country_code, tempo)
header = spotify_rdd.first()
spotify_rdd = spotify_rdd.filter(lambda line: line != header).map(lambda line: line.split(","))
spotify_rdd = spotify_rdd.map(lambda x: (x[6], float(x[23])))  # (country_code, tempo)

# Convert country codes to names
spotify_rdd = spotify_rdd.map(lambda x: (code_to_name(x[0]), x[1]))  # (country_name, tempo)

# Filter out null values
spotify_rdd = spotify_rdd.filter(lambda x: x[0] is not None and x[1] is not None)
convert_end = time.time()
print(f"Time to convert country codes: {convert_end - convert_start:.2f} seconds")

# Step 3: Calculate average tempo per country using MapReduce
tempo_start = time.time()
# Map: Emit (country_name, (tempo, 1)) for each song
tempo_mapped = spotify_rdd.map(lambda x: (x[0], (x[1], 1)))

# Reduce: Sum tempo and count for each country
tempo_reduced = tempo_mapped.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Calculate average tempo
avg_tempo_rdd = tempo_reduced.mapValues(lambda x: x[0] / x[1])  # (country_name, avg_tempo)
tempo_end = time.time()
print(f"Time to calculate average tempo: {tempo_end - tempo_start:.2f} seconds")

# Step 4: Extract most recent average age (2025) using MapReduce
age_start = time.time()
# Remove header and map age data to (country_name, average_age)
header = age_rdd.first()
age_rdd = age_rdd.filter(lambda line: line != header).map(lambda line: line.split(","))
age_rdd = age_rdd.map(lambda x: (x[0], float(x[-1])))  # (country_name, average_age)

# Filter out null values
age_rdd = age_rdd.filter(lambda x: x[0] is not None and x[1] is not None)
age_end = time.time()
print(f"Time to extract average age: {age_end - age_start:.2f} seconds")

# Step 5: Join datasets using MapReduce
join_start = time.time()
# Join on country_name
joined_rdd = avg_tempo_rdd.join(age_rdd)  # (country_name, (avg_tempo, average_age))
join_end = time.time()
print(f"Time to join datasets: {join_end - join_start:.2f} seconds")

# Step 6: Calculate deviation (difference between tempo and age)
deviation_start = time.time()
# Map: Calculate deviation
deviation_rdd = joined_rdd.mapValues(lambda x: x[0] - x[1])  # (country_name, deviation)

# Filter out invalid deviations
deviation_rdd = deviation_rdd.filter(lambda x: x[1] is not None and not isinstance(x[1], float))
deviation_end = time.time()
print(f"Time to calculate deviation: {deviation_end - deviation_start:.2f} seconds")

# Step 7: Collect results and print
collect_start = time.time()
collect_end = time.time()
print(f"Time to collect results: {collect_end - collect_start:.2f} seconds")

# End timing the entire process
end_time = time.time()
print(f"\nTotal execution time: {end_time - start_time:.2f} seconds")

# Stop Spark (optional, as it will be stopped automatically when the session ends)
spark.stop()

Time to load datasets: 0.22 seconds
Time to convert country codes: 1.01 seconds
Time to calculate average tempo: 0.05 seconds
Time to extract average age: 0.72 seconds
Time to join datasets: 0.03 seconds
Time to calculate deviation: 0.00 seconds
Time to collect results: 0.00 seconds

Total execution time: 2.03 seconds
