In [None]:
from pyspark.sql import SparkSession 
 
spark_session = SparkSession.builder\
        .master("spark://192.168.2.129:7077") \
        .appName("MSD_analysis_pt1_app")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.cores.max", 4)\
        .getOrCreate()

In [None]:
df1 = spark_session.read.csv('hdfs://192.168.2.129:50000/hdfs/data/MSD_Sub_as_CSV.csv', header=True)
df2 = spark_session.read.csv('hdfs://192.168.2.129:50000/hdfs/data/MSD_Sub_as_CSV.csv', header=True)


In [None]:
from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2]

#Switch range (loop count) to
#24 for 500k songs
#49 for 1million songs 
#99 for 2million songs
#149 for 3million songs
#199 for 4 million songs

for i in range(24):
    dfs.append(df1)
    dfs.append(df2)
    
msd_df = reduce(DataFrame.unionAll, dfs)
msd_df.count()


In [None]:
msd_df.rdd.getNumPartitions()

In [None]:
msd_df = msd_df.repartition(16)

In [None]:
msd_df.rdd.getNumPartitions()

In [None]:
#msd_df.printSchema()

In [None]:
#msd_df.show()

### T1: Famous (Familiarity above 0.5) artists with more than 6 fairly successful (hotness above 0.5) songs in the list that are most inconsistent in the hotness of their songs (std)

In [None]:
%%time
import time

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan
from pyspark.sql.functions import col
start_time = time.time()
msd_df.select('ArtistName','ArtistFamiliarity', 'Title', 'Duration', 'Hotness', 'Year')\
                        .filter(~isnan('Hotness'))\
                        .filter(col('ArtistFamiliarity')>0.5)\
                        .filter(col('Hotness')>0.5)\
                        .groupBy('ArtistName')\
                        .agg(F.mean('Hotness'),\
                             F.max('Hotness'),\
                             F.mean('Duration'),\
                             F.mean('ArtistFamiliarity'),\
                             F.stddev('Hotness'),\
                             F.countDistinct('Title'))\
                        .filter(col('count(Title)')>6)\
                        .orderBy(col('stddev_samp(Hotness)').desc())\
                        .show()
end_time = time.time()
print("Exec-time:", (end_time - start_time))


##### Visualization

In [None]:
import matplotlib.pyplot as plt

result_df = msd_df.select('ArtistName', 'ArtistFamiliarity', 'Title', 'Duration', 'Hotness', 'Year') \
    .filter(~isnan('Hotness')) \
    .filter(col('ArtistFamiliarity') > 0.5) \
    .filter(col('Hotness') > 0.5) \
    .groupBy('ArtistName') \
    .agg(F.mean('Hotness'), \
         F.max('Hotness'), \
         F.mean('Duration'), \
         F.mean('ArtistFamiliarity'), \
         F.stddev('Hotness'), \
         F.countDistinct('Title')) \
    .filter(col('count(Title)') > 6) \
    .orderBy(col('stddev_samp(Hotness)').desc()) \
    .limit(3) \
    .collect()

artists = [row['ArtistName'] for row in result_df]
hotness_means = [row['avg(Hotness)'] for row in result_df]
hotness_stdevs = [row['stddev_samp(Hotness)'] for row in result_df]

x_pos = [i for i, _ in enumerate(artists)]
fig, ax = plt.subplots()
rects1 = ax.bar(x_pos, hotness_means, color='tab:blue', alpha=0.5, yerr=hotness_stdevs)
for i, v in enumerate(hotness_stdevs):
    ax.text(i-0.1, hotness_means[i] + v + 0.02, "{:.2f}".format(v), fontsize=10)

plt.xticks(x_pos, artists)
plt.ylabel('avg(Hotness)')
plt.xlabel('Artist')
plt.title('Famous Aritists with highest inconsistency of Song Hotness')
plt.show()


### T2: Development of Songduration and BPM over the decades

In [None]:
%%time

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan
from pyspark.sql.functions import col


msd_df = msd_df.withColumn("Duration", msd_df["Duration"].cast(DoubleType()))
msd_df = msd_df.withColumn("Tempo", msd_df["Tempo"].cast(DoubleType()))


df_result = msd_df.select('Year', 'Tempo', 'Duration', 'ArtistFamiliarity')\
    .filter(msd_df.Year.between(1950,2009))\
    .groupBy(F.concat(F.floor((msd_df.Year - 1960) / 10) * 10 + 1960, F.lit('s')).alias('Decade'))\
    .agg(F.mean('Duration'),\
         F.min('Duration'),\
         F.mean('Tempo'),\
         F.max('Tempo'),\
         F.mean('ArtistFamiliarity'),\
         F.count('Year'))\
    .orderBy(col("Decade").desc())\
    .show()

In [None]:
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan
from pyspark.sql.functions import col
import pandas as pd


msd_df = msd_df.withColumn("Duration", msd_df["Duration"].cast(DoubleType()))
msd_df = msd_df.withColumn("Tempo", msd_df["Tempo"].cast(DoubleType()))


df_result = msd_df.select('Year', 'Tempo', 'Duration', 'ArtistFamiliarity')\
    .filter(msd_df.Year.between(1950,2009))\
    .groupBy(F.concat(F.floor((msd_df.Year - 1960) / 10) * 10 + 1960, F.lit('s')).alias('Decade'))\
    .agg(F.mean('Duration'),\
         F.min('Duration'),\
         F.mean('Tempo'),\
         F.max('Tempo'),\
         F.mean('ArtistFamiliarity'),\
         F.count('Year'))\
    .orderBy(col("Decade").desc())\
    .collect()



df_pandas = pd.DataFrame(df_result, columns=['Decade', 'avg(Duration)', 'min(Duration)', 'avg(Tempo)', 'max(Tempo)', 'avg(ArtistFamiliarity)', 'count(Year)']).iloc[::-1]

fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

df_pandas.plot(x='Decade', y='avg(Duration)', ax=ax1, label='Duration', color='tab:blue')
df_pandas.plot(x='Decade', y='avg(Tempo)', ax=ax2, label='Tempo', color='tab:red')

ax1.set_xlabel('Decade')
ax1.set_ylabel('Duration')
ax2.set_ylabel('Tempo')
ax2.legend(loc = 'center right')
plt.title('Trendlines of Duration and Tempo over the Decades')


plt.show()



In [None]:
spark_session.stop()