In [1]:
from pyspark.sql import SparkSession
import csv
from io import StringIO
from pyspark import SparkFiles
from pyspark.sql.functions import col as spark_col, mean, stddev, max, min, sum, desc, when, lit, monotonically_increasing_id, abs, collect_list, size
from pyspark.sql.types import *
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from functools import reduce
from operator import add

In [2]:
spark = SparkSession.builder \
    .master("spark://192.168.2.156:7077") \
    .appName("group_30_project") \
    .config("spark.dynamicAllocation.enabled", True) \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
    .config("spark.shuffle.service.enabled", False) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .config("spark.executor.cores", 4) \
    .config("spark.dynamicAllocation.maxExecutors", 1) \
    .config("spark.sql.shuffle.partitions", 100) \
    .config("spark.driver.port",9999)\
    .config("spark.blockManager.port",10005)\
    .getOrCreate()
# RDD API
spark_context = spark.sparkContext
spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/20 00:25:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Add the file to SparkFiles
spark_context.addFile("/home/ubuntu/millionsong_subset.csv")

In [4]:
# Define a function to read the file on the driver node
def func_read_csv(iterator):
    # This will read the file from the driver's local filesystem
    with open(SparkFiles.get("millionsong_subset.csv"), 'r') as f:
        # Read the whole file as a string
        csv_data = f.read()
    return [csv_data]  # Return as a list with one item

In [5]:
# Read the CSV file on the driver
csv_content = spark_context.parallelize([1]).mapPartitions(func_read_csv).collect()[0]

# Convert the CSV string to a pandas DataFrame
# Use StringIO to make the string act like a file
pandas_df = pd.read_csv(StringIO(csv_content), header=0)

# Convert pandas DataFrame to Spark DataFrame
df = spark.createDataFrame(pandas_df)

                                                                                

In [6]:
# Display schema and sample data
df.printSchema()
df.show(5)

root
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- tempo: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- duration: double (nullable = true)



                                                                                

+------------------+------------------+----------------+----------------+-------+--------+------------+------+---------+
|           song_id|         artist_id|     artist_name|           title|  tempo|loudness|danceability|energy| duration|
+------------------+------------------+----------------+----------------+-------+--------+------------+------+---------+
|SOMZWCG12A8C13C480|ARD7TVE1187B99BFB1|          Casual|I Didn't Mean To| 92.198| -11.197|         0.0|   0.0|218.93179|
|SOCIWDW12A8C13D406|ARMJAGH1187FB546F3|    The Box Tops|       Soul Deep|121.274|  -9.843|         0.0|   0.0|148.03546|
|SOXVLOJ12AB0189215|ARKRRTF1187B9984DA|Sonora Santanera| Amor De Cabaret| 100.07|  -9.689|         0.0|   0.0|177.47546|
|SONHOTT12A8C13493C|AR7G5I41187FB4CE6C|        Adam Ant| Something Girls|119.293|  -9.013|         0.0|   0.0|233.40363|
|SOFSOCN12A8C143F5D|ARXR32B1187FB57099|             Gob|  Face the Ashes|129.738|  -4.501|         0.0|   0.0|209.60608|
+------------------+------------

In [7]:
def duplicate_spark_df(df, repetitions=10):
    """Duplicates a Spark DataFrame a specified number of times."""
    duplicated_dfs = [df] * repetitions
    result_df = duplicated_dfs[0]
    for i in range(1, len(duplicated_dfs)):
        result_df = result_df.union(duplicated_dfs[i])
    return result_df

In [8]:
duplicated_df = duplicate_spark_df(df, repetitions=30)
print(f"Original DataFrame count: {df.count()}")
print(f"Duplicated DataFrame count: {duplicated_df.count()}")

Original DataFrame count: 10000




Duplicated DataFrame count: 300000


                                                                                

In [9]:
# Convert relevant columns to numeric if needed
# numeric_columns = ["tempo", "loudness", "danceability", "energy", "duration"]
# for col_name in numeric_columns:
#     df = df.withColumn(col_name, df[col_name].cast("double"))
numeric_columns = ["tempo", "loudness", "danceability", "energy", "duration"]
for col_name in numeric_columns:
    duplicated_df = duplicated_df.withColumn(col_name, duplicated_df[col_name].cast("double"))

In [10]:
# Check for missing values
# print("Missing values per column:")
# for col_name in df.columns:
#     missing_count = df.filter(spark_col(col_name).isNull()).count()
#     print(f"{col_name}: {missing_count}")
print("Missing values per column:")
for col_name in duplicated_df.columns:
    missing_count = duplicated_df.filter(spark_col(col_name).isNull()).count()
    print(f"{col_name}: {missing_count}")

Missing values per column:


                                                                                

song_id: 0


                                                                                

artist_id: 0


                                                                                

artist_name: 0


                                                                                

title: 0


                                                                                

tempo: 0


                                                                                

loudness: 0


                                                                                

danceability: 0


                                                                                

energy: 0




duration: 0


                                                                                

In [11]:
# Remove rows with missing values (if required)
# df_clean = df.dropna()
df_clean = duplicated_df.dropna()

In [12]:
# Z-score method for individual features
def find_outliers_zscore(df, numeric_cols, threshold=2.5):
    # from pyspark.sql.functions import col as spark_col, abs, when, lit
    
    outlier_df = df
    for col_name in numeric_cols:
        # Calculate statistics
        stats = df.select(
            mean(col_name).alias("mean"),
            stddev(col_name).alias("stddev")
        ).collect()[0]
        
        # Skip if standard deviation is zero
        if stats["stddev"] == 0:
            continue
        
        # Calculate z-scores
        outlier_df = outlier_df.withColumn(
            f"{col_name}_zscore",
            abs((spark_col(col_name) - stats["mean"]) / stats["stddev"])
        )
        
        # Flag outliers
        outlier_df = outlier_df.withColumn(
            f"{col_name}_is_outlier",
            when(spark_col(f"{col_name}_zscore") > threshold, 1).otherwise(0)
        )
    
    # Count outlier flags per row
    for col_name in numeric_cols:
        if f"{col_name}_is_outlier" in outlier_df.columns:
            if "outlier_count" not in outlier_df.columns:
                outlier_df = outlier_df.withColumn("outlier_count", spark_col(f"{col_name}_is_outlier"))
            else:
                outlier_df = outlier_df.withColumn(
                    "outlier_count", 
                    spark_col("outlier_count") + spark_col(f"{col_name}_is_outlier")
                )
    
    if "outlier_count" not in outlier_df.columns:
        outlier_df = outlier_df.withColumn("outlier_count", lit(0))
    
    return outlier_df

In [13]:
# Find Outliers
start_time = time.time()
df_outliers_zscore = find_outliers_zscore(df_clean, numeric_columns)

# Find the most unusual songs
unusual_songs = df_outliers_zscore.orderBy(desc("outlier_count")).limit(20)
print("Most unusual songs:")
unusual_songs.select("song_id", "artist_name", "title", "outlier_count").show()

# Find which features are most often causing outliers
feature_outlier_counts = {}
for col_name in numeric_columns:
    if f"{col_name}_is_outlier" in df_outliers_zscore.columns:
        outlier_count = df_outliers_zscore.filter(spark_col(f"{col_name}_is_outlier") == 1).count()
        feature_outlier_counts[col_name] = outlier_count

print("Features most often causing outliers:")
for feature, count in sorted(feature_outlier_counts.items(), key=lambda x: x[1], reverse=True):
    print(f"{feature}: {count}")

processing_time = time.time() - start_time
print(f"Processing time for outlier detection: {processing_time:.2f} seconds")

                                                                                

Most unusual songs:


                                                                                

+------------------+--------------------+--------------------+-------------+
|           song_id|         artist_name|               title|outlier_count|
+------------------+--------------------+--------------------+-------------+
|SOITEVV12A6D4F5969|The American Boyc...|Stille nacht_ hei...|            2|
|SOABYBF12A8C138A09|        Klaus Badelt|     Severe Severing|            2|
|SORJPQM12A8C1338BB|David Zinman;Pitt...|Concerto for Orch...|            2|
|SODANYE12AB0182941|     R. Carlos Nakai|    Red Wind (World)|            2|
|SOOIZES12AB018A80F|             L.A.V.I|     Mui Mal_ Animal|            2|
|SOCIALB12AB01811A2|       Scott Glasgow|              Hunted|            2|
|SOSZOXP12A6D4FB9A9|      Roger Reynolds|Binaural Presenta...|            2|
|SOZZBDC12A8C146917|        Morton Gould|Fall River Legend...|            2|
|SOITEVV12A6D4F5969|The American Boyc...|Stille nacht_ hei...|            2|
|SOAPOVV12AB018CFC8|                Taal|              Noises|            2|



Features most often causing outliers:
loudness: 8130
duration: 5970
tempo: 4860
Processing time for outlier detection: 55.50 seconds


                                                                                

In [14]:
# Calculate the range for each artist
start_time = time.time()
artist_ranges = df_clean.groupBy("artist_id", "artist_name") \
    .agg(
        *[
            (max(spark_col(c)) - min(spark_col(c))).alias(f"{c}_range")
            for c in numeric_columns
        ]
    )

# Calculate song_count using collect_list and size()
song_counts = df_clean.groupBy("artist_id").agg(collect_list("artist_id").alias("artist_list"))
song_counts = song_counts.withColumn("song_count", size(spark_col("artist_list")))
song_counts = song_counts.drop("artist_list")

# Join song_counts back to artist_ranges
artist_ranges = artist_ranges.join(song_counts, "artist_id", "left")

# Calculate a combined range score (normalized by feature)
feature_max_ranges = df_clean.groupBy() \
    .agg(*[max(spark_col(c)).alias(f"{c}_max_range") for c in numeric_columns]) \
    .collect()[0]

# Normalize ranges and create a combined score
for c in numeric_columns:
    max_range = feature_max_ranges[f"{c}_max_range"]
    if max_range > 0:  # Avoid division by zero
        artist_ranges = artist_ranges.withColumn(
            f"{c}_normalized_range",
            spark_col(f"{c}_range") / max_range
        )
    else:
        artist_ranges = artist_ranges.withColumn(
            f"{c}_normalized_range",
            lit(0)
        )

# Sum up normalized ranges for overall diversity score
range_cols = [f"{c}_normalized_range" for c in numeric_columns]
artist_ranges = artist_ranges.withColumn(
    "style_diversity_score",
    reduce(add, [spark_col(c) for c in range_cols])
)

# Filter out artists with too few songs
min_songs = 5
diverse_artists = artist_ranges.filter(spark_col("song_count") >= min_songs) \
    .orderBy(desc("style_diversity_score"))

print("Artists with the widest range of musical styles:")
diverse_artists.select("artist_name", "song_count", "style_diversity_score").show(20)

processing_time = time.time() - start_time
print(f"Processing time for artist diversity analysis: {processing_time:.2f} seconds")

                                                                                

Artists with the widest range of musical styles:


                                                                                

+--------------------+----------+---------------------+
|         artist_name|song_count|style_diversity_score|
+--------------------+----------+---------------------+
|      Diamanda Galas|        60|    68.89089872927948|
|Penguin Café Orch...|       180|    58.38209720604238|
| James Newton Howard|       150|     49.7904450817003|
|               Hinge|        60|    49.48610802627586|
|            Bare Jr.|        90|   47.762491463587075|
|            Galactic|       120|   46.287844535115774|
|       Janet Jackson|       210|    45.86887870850419|
|              Neviss|        60|    45.59803987456597|
|             Cam'Ron|       240|    45.09548335619707|
|    Mario Rosenstock|       390|     43.4246367619808|
|          The Nelons|       150|    42.35694032485055|
|              Eminem|       240|    40.85020994062773|
|           Broadcast|       180|    40.68665576030034|
|         Steve Smith|        60|    40.23185638596018|
|            Schizoid|       180|     39.9221127

                                                                                

In [15]:
# Stop the SparkSession
spark.stop()