In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, avg, count, desc, floor, corr
import numpy as np
import time

spark_session = SparkSession.builder\
        .master("spark://192.168.2.223:7077") \
        .appName("Song Analysis with Decade Tempo")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
        .config("spark.executor.cores",1)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD API
sqlContext = SQLContext(spark_session.sparkContext)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/05 07:35:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
start_time = time.time()

In [3]:
# Load the dataset
df = sqlContext.read.csv('/home/ubuntu/length-tempo-by-year/MillionSongSubset_aggregated.csv',
                                       header='true', inferSchema='true').cache()

df.show()

                                                                                

+----+-------+---------+
|Year|  Tempo| Duration|
+----+-------+---------+
|2008|124.059|148.74077|
|2004| 80.084|252.99546|
|2007|  77.15|163.63057|
|1982| 92.897|  216.842|
|1998| 86.981|312.99873|
|2004|157.715|218.90567|
|2005|146.331|580.70159|
|2004| 84.992|283.48036|
|2001| 99.959| 135.8624|
|1982|104.989|146.33751|
|2000|105.095|208.95302|
|1981|100.042|154.93179|
|2004| 80.893|146.80771|
|1973|  110.1|199.96689|
|2007| 92.971|319.84281|
|2008|117.936|258.16771|
|1963|121.896|207.35955|
|2001| 152.99|258.29832|
|2004|104.465| 158.1971|
|2004|138.512|461.71383|
+----+-------+---------+
only showing top 20 rows



In [4]:
# Filter out rows with non-values or values <= 0 for Year, Tempo, and Duration
df_filtered = df.filter((col("Year") > 1970) & (col("Tempo") > 0) & (col("Duration") > 0))

In [5]:
# Calculate decade and average Tempo by Decade
df_with_decade = df_filtered.withColumn("Decade", (floor(df_filtered["Year"] / 10) * 10))
df_decade_tempo = df_with_decade.groupBy("Decade").agg(
    avg("Tempo").alias("average_tempo"), 
    avg("Duration").alias("average_duration"), 
    count("*").alias("songs_count")).orderBy("Decade")

df_nondecade_tempo = df_with_decade.groupBy("Year").agg(
    avg("Tempo").alias("average_tempo"), 
    avg("Duration").alias("average_duration"), 
    count("*").alias("songs_count")).orderBy("Year")

df_nondecade_tempo_filtered = df_nondecade_tempo.filter(df_nondecade_tempo["songs_count"] > 5).orderBy("Year")

In [6]:
df_decade_tempo.show()

+------+------------------+------------------+-----------+
|Decade|     average_tempo|  average_duration|songs_count|
+------+------------------+------------------+-----------+
|  1970|131.70000444444437|250.65361626666666|        225|
|  1980|126.86822425629293|235.72452128146452|        437|
|  1990| 124.6324845961699|242.58668049958308|       1201|
|  2000|125.99748443042954|239.68609247930607|       2537|
|  2010|118.92348437499999| 238.0424034375001|         64|
+------+------------------+------------------+-----------+



In [7]:
df_nondecade_tempo_filtered.show()

+----+------------------+------------------+-----------+
|Year|     average_tempo|  average_duration|songs_count|
+----+------------------+------------------+-----------+
|1971|         136.16196|259.55428919999997|         25|
|1972|129.17204166666667|238.54539749999995|         24|
|1973|116.35612499999998|294.16444416666667|         24|
|1974|125.08609090909091| 239.4913463636364|         22|
|1975|125.41183333333332| 277.4406354166667|         24|
|1976|137.26139999999998|210.99404933333332|         15|
|1977|139.33685714285716|255.30692800000003|         35|
|1978|134.38385000000002|247.85456749999997|         20|
|1979|137.51694444444448| 226.0566886111111|         36|
|1980|126.89337499999999|210.43873062499998|         32|
|1981|131.61677142857144| 217.7055291428571|         35|
|1982|125.14521999999995|251.23742419999996|         50|
|1983| 126.1436808510638|237.46428063829794|         47|
|1984|127.95625000000003|         239.31873|         32|
|1985|132.86585294117648|242.61

In [8]:
correlation_df = df_filtered.select(
    corr("Tempo", "Year").alias("tempo_year_corr"),
    corr("Duration", "Year").alias("duration_year_corr")
)

correlation_df.show()

+--------------------+--------------------+
|     tempo_year_corr|  duration_year_corr|
+--------------------+--------------------+
|-0.02507322891847...|-0.01188938532914...|
+--------------------+--------------------+



In [9]:
correlation_decade_df = df_with_decade.select(
    corr("Tempo", "Decade").alias("tempo_decade_corr"),
    corr("Duration", "Decade").alias("duration_decade_corr")
)

correlation_decade_df.show()

+--------------------+--------------------+
|   tempo_decade_corr|duration_decade_corr|
+--------------------+--------------------+
|-0.02702604138344...|-0.01303098492627...|
+--------------------+--------------------+



In [10]:
print("Time taken: ", time.time() - start_time)

Time taken:  14.56863522529602


In [11]:
# release the cores for another application!
spark_session.stop()