In [1]:
# Imports & Spark setup
import os
import shutil
import glob
import hdf5_getters
from pyspark.sql.types import FloatType, IntegerType, StructField, StructType, StringType
from pyspark.ml.feature import QuantileDiscretizer, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql import Row
from pyspark.sql.functions import broadcast, udf
import numpy as np

from tools import setup_spark_config, read_parquet_files

sc, spark = setup_spark_config("Full Analysis Of Million Song Dataset ")

# add local files to spark so that workers can use them as well
homedir = str(os.getcwd())+'/'
if 'ubuntu' in homedir:
    sc.addPyFile('/home/ubuntu/hdf5_getters.py')
    sc.addPyFile('/home/ubuntu/tools.py')

In [2]:
# Get all subdirs
def get_subdirs(basedir, homedir):
    subdirs = []
    for subdir in next(os.walk(homedir+basedir))[1]:
        subdirs.append(os.path.join(basedir, subdir))
    return subdirs

homedir = str(os.getcwd())+'/'
basedir = 'datasets'
subdirs_rdd = sc.parallelize(get_subdirs(basedir, homedir))
subsubdirs_rdd = subdirs_rdd.map(lambda subdir: get_subdirs(subdir, homedir)).flatMap(lambda x: x)
subsubsubdirs_rdd = subsubdirs_rdd.map(lambda subsubdir: get_subdirs(subsubdir, homedir)).flatMap(lambda x: x)
subsubsubdirs_rdd = subsubsubdirs_rdd.map(lambda subsubsubdir: homedir+subsubsubdir).cache()

In [3]:
print('%d dirs in dataset' % (subsubsubdirs_rdd.count()))
subsubsubdirs_rdd.take(5)

2028 dirs in dataset


['/home/ubuntu/datasets/C/U/U',
 '/home/ubuntu/datasets/C/U/X',
 '/home/ubuntu/datasets/C/U/V',
 '/home/ubuntu/datasets/C/U/M',
 '/home/ubuntu/datasets/C/U/J']

In [4]:
# iterate & get all files (songs)
def count_and_get_files(basedir, ext='.h5'):
    # modified version of: https://labrosa.ee.columbia.edu/millionsong/pages/iterate-over-all-songs
    cnt = 0
    all_files = []
    for root, dirs, files in os.walk(basedir):
        files = glob.glob(os.path.join(root,'*'+ext))
        for file in files:
            all_files.append(file)
        cnt += len(files)
    return cnt, all_files

file_names_rdd = subsubsubdirs_rdd \
                    .map(lambda subsubsubdir: count_and_get_files(subsubsubdir)[1]) \
                    .flatMap(lambda x: x) \
                    .cache()

In [5]:
print('%d files in dataset' % (file_names_rdd.count()))
file_names_rdd.take(5)

115976 files in dataset


['/home/ubuntu/datasets/C/U/U/TRCUUHJ128F932110C.h5',
 '/home/ubuntu/datasets/C/U/U/TRCUUMC12903CFF894.h5',
 '/home/ubuntu/datasets/C/U/U/TRCUUEI128F9329C49.h5',
 '/home/ubuntu/datasets/C/U/U/TRCUUXR12903CDCD7E.h5',
 '/home/ubuntu/datasets/C/U/U/TRCUUSB128F92F4413.h5']

In [6]:
#file_names = file_names_rdd.take(15040) # 3.5gb
#file_names = file_names_rdd.take(30080) # 7gb
file_names = file_names_rdd.take(45120) # 10.5gb
#file_names = file_names_rdd.take(60160) # 14gb
#file_names = file_names_rdd.take(75200) # 17.5gb
#file_names = file_names_rdd.take(90240) # 21gb

file_names_rdd = sc.parallelize(file_names).cache()

In [7]:
# Inspect some sample data
h5 = hdf5_getters.open_h5_file_read(file_names_rdd.take(1)[0])

# in byte format --> decode to string
print('Sample artist: %s, \
      \nsong: %s, \
      \nartist familiarity: %0.2f, \
      \nartist hotness: %0.2f, \
      \nsong hotness: %0.2f, \
      \nkey: %d, \
      \ntempo: %0.2f \
      \nyear: %d' % \
      (hdf5_getters.get_artist_name(h5).decode('UTF-8'), \
       hdf5_getters.get_title(h5).decode('UTF-8'), \
       float(hdf5_getters.get_artist_familiarity(h5)), \
       float(hdf5_getters.get_artist_hotttnesss(h5)), \
       float(hdf5_getters.get_song_hotttnesss(h5)), \
       int(hdf5_getters.get_key(h5)), \
       float(hdf5_getters.get_tempo(h5)), \
       int(hdf5_getters.get_year(h5))))

h5.close()

Sample artist: Cesare Cremonini,       
song: Dev'essere cosi (unplugged),       
artist familiarity: 0.60,       
artist hotness: 0.43,       
song hotness: 0.35,       
key: 4,       
tempo: 187.40       
year: 2008


In [8]:
# get artist name for each song
def get_artist_name(filename):
    file = hdf5_getters.open_h5_file_read(filename)
    artist_name = hdf5_getters.get_artist_name(file).decode('UTF-8')
    file.close()
    return artist_name

artist_names_rdd = file_names_rdd.map(get_artist_name)

In [9]:
artist_names_rdd.take(5)

['Cesare Cremonini',
 'Adam Richmond',
 'The Mother Truckers',
 'utopia:banished',
 'California Oranges']

In [10]:
# get all artists and songs in format: (artist, song)
def get_artist_and_song(filename):
    file = hdf5_getters.open_h5_file_read(filename)
    artist = hdf5_getters.get_artist_name(file).decode('UTF-8')
    song = hdf5_getters.get_title(file).decode('UTF-8')
    file.close()
    return artist, song

artist_song_rdd = file_names_rdd.map(lambda x: get_artist_and_song(x)).cache()

In [11]:
# group songs by their artist in format: (artist, [song1, song2, song3...])
# very slow operation --> if printing songs is uninteresting, use reduceByKey instead
grouped_artist_song_rdd = artist_song_rdd.groupByKey().mapValues(list).cache()

In [12]:
n_unique_artists = grouped_artist_song_rdd.count()
print('%d unique artists in dataset' % (n_unique_artists))

for artist_songs in grouped_artist_song_rdd.take(5):
    print('\n%s has %d songs:' % (artist_songs[0], len(artist_songs[1])))
    songs = ''
    for song in artist_songs[1]:
        print('- %s' % (song))

23560 unique artists in dataset

Gloria Estefan has 3 songs:
- Me Odio
- No Me Vuelvo A Enamorar
- Cuts Both Ways

Lulu has 5 songs:
- Every Woman Knows (Think Twice Remix)
- Saved (2007 Remastered LP Version)
- Sois libre et ris
- I'll Come Running Over
- Take Good Care Of Yourself (2007 Remastered LP Version)

POCAHONTAS has 1 songs:
- Hijo De La Luna

Los Lobos has 3 songs:
- I Got To Let You Know [Live at The Paradiso_ Amsterdam 1987]
- Estoy Sentado Aquí
- Come On Let's Go

Cesare Basile has 1 songs:
- Waltz # 4


In [13]:
# check if any artist has no songs -- shouldn't be possible
print('%d artists have no songs' % (grouped_artist_song_rdd.filter(lambda x: len(x[1]) < 1).count()))

0 artists have no songs


In [14]:
# get song data to use for analysis
def get_song_data(filename):
    # round floats to 2 decimals
    file = hdf5_getters.open_h5_file_read(filename)
    loudness = float(hdf5_getters.get_loudness(file))
    song_hotness = float(hdf5_getters.get_song_hotttnesss(file))
    year = int(hdf5_getters.get_year(file))    
    artist_familiarity = float(hdf5_getters.get_artist_familiarity(file))
    artist_hotness = float(hdf5_getters.get_artist_hotttnesss(file))
    key = int(hdf5_getters.get_key(file))
    tempo = float(hdf5_getters.get_tempo(file))
    file.close()
    return loudness, song_hotness, year, artist_familiarity, artist_hotness, key, tempo

songs_rdd = file_names_rdd.map(get_song_data).cache()

In [15]:
songs_rdd.take(5)

[(-16.346,
  0.34911996465177997,
  2008,
  0.5997560759410356,
  0.42955964045297707,
  4,
  187.402),
 (-15.124, nan, 0, 0.16951676160053092, 0.27417477953702857, 2, 110.2),
 (-8.663, nan, 0, 0.5820459775659689, 0.3619682702757134, 7, 120.1),
 (-8.206, nan, 0, 0.522553237646043, 0.33196871960348096, 10, 169.054),
 (-5.571,
  0.21204540548371908,
  0,
  0.4242028280284693,
  0.2554103094339133,
  11,
  154.868)]

In [16]:
# move song data from RDD to DF & table view for optimization & Spark-SQL queries
fields = [StructField("loudness", FloatType()), \
          StructField("song_hotness", FloatType()), \
          StructField("year", IntegerType()), \
          StructField("artist_familiarity", FloatType()), \
          StructField("artist_hotness", FloatType()), \
          StructField("key", IntegerType()), \
          StructField("tempo", FloatType())]

schema = StructType(fields)

songs_df = spark.createDataFrame(songs_rdd, schema)
songs_df.createOrReplaceTempView("songs")

In [17]:
songs_df.show()

+--------+------------+----+------------------+--------------+---+-------+
|loudness|song_hotness|year|artist_familiarity|artist_hotness|key|  tempo|
+--------+------------+----+------------------+--------------+---+-------+
| -16.346|  0.34911996|2008|        0.59975606|    0.42955965|  4|187.402|
| -15.124|         NaN|   0|        0.16951676|    0.27417478|  2|  110.2|
|  -8.663|         NaN|   0|          0.582046|    0.36196828|  7|  120.1|
|  -8.206|         NaN|   0|        0.52255327|    0.33196872| 10|169.054|
|  -5.571|   0.2120454|   0|        0.42420283|     0.2554103| 11|154.868|
|  -9.749|  0.52683705|   0|        0.70708585|    0.45426837|  4|110.516|
|  -3.975|  0.53237844|2007|         0.8513696|     0.5836399|  7|150.114|
| -11.614|         NaN|   0|         0.6089661|     0.4037475|  7|179.416|
|  -9.094|         0.0|1993|         0.4612176|    0.32213077| 11|127.772|
|  -7.559|         NaN|   0|         0.5565503|     0.3414383|  2|148.831|
| -13.309|         0.0|  

In [18]:
n_songs = file_names_rdd.count() # each file corresponds to one song
print("There are %d songs in total" % (n_songs))

There are 45120 songs in total


In [19]:
# filter out songs with NaN values and no year
filtered_songs_df = spark.sql("SELECT * FROM songs WHERE \
                                  isNaN(loudness) = false AND \
                                  isNaN(song_hotness) = false AND \
                                  isNaN(year) = false AND \
                                  year > 0 AND \
                                  isNaN(artist_familiarity) = false AND \
                                  isNaN(artist_hotness) = false AND \
                                  isNaN(key) = false AND \
                                  isNaN(tempo) = false")
filtered_songs_df.createOrReplaceTempView("songs")

In [20]:
filtered_songs_df.show()

+--------+------------+----+------------------+--------------+---+-------+
|loudness|song_hotness|year|artist_familiarity|artist_hotness|key|  tempo|
+--------+------------+----+------------------+--------------+---+-------+
| -16.346|  0.34911996|2008|        0.59975606|    0.42955965|  4|187.402|
|  -3.975|  0.53237844|2007|         0.8513696|     0.5836399|  7|150.114|
|  -9.094|         0.0|1993|         0.4612176|    0.32213077| 11|127.772|
|  -7.548|   0.2998775|2005|         0.5636851|    0.32525325|  5|  156.5|
|  -7.688|  0.42870227|2009|         0.4956395|    0.37228364|  4|119.288|
| -15.216|   0.4518259|1996|        0.48260397|    0.29409504|  8| 97.388|
| -18.329|    0.541552|1976|        0.67577136|     0.4354689|  7|151.361|
|  -8.325|   0.4381202|2003|        0.54568464|      0.396207|  6|133.635|
| -17.636|   0.7462104|2006|           0.70012|     0.5343837|  7|120.632|
|  -9.352|   0.2291441|2001|         0.5058246|    0.34626162|  1| 96.005|
|  -8.017|         0.0|20

In [21]:
n_songs_left = filtered_songs_df.count()
n_songs_left_frac = n_songs_left / n_songs * 100
print("There are %d songs left after removing songs with NaN values, corresponding to %0.2f%% of the total amount of songs" \
       % (n_songs_left, n_songs_left_frac))

There are 15320 songs left after removing songs with NaN values, corresponding to 33.95% of the total amount of songs


In [22]:
# write songs to parquet (better than CSV)
dst_dir = homedir+'parsed-'+basedir
if os.path.isdir(dst_dir):
    shutil.rmtree(dst_dir)
filtered_songs_df.write.parquet(dst_dir)

In [23]:
"""
# write songs to CSV (for comparison purposes)
dst_dir = homedir+'parsed-'+basedir+'-csv'
if os.path.isdir(dst_dir):
    shutil.rmtree(dst_dir)
filtered_songs_df.write.csv(dst_dir, header=True)
"""

"\n# write songs to CSV (for comparison purposes)\ndst_dir = homedir+'parsed-'+basedir+'-csv'\nif os.path.isdir(dst_dir):\n    shutil.rmtree(dst_dir)\nfiltered_songs_df.write.csv(dst_dir, header=True)\n"

In [24]:
# read songs data from parquet files
basedir = homedir+'parsed-'+basedir
songs_df = read_parquet_files(basedir, spark)

In [25]:
# group songs by year
grouped_year_songs_df = spark.sql("SELECT AVG(loudness), \
                              AVG(song_hotness), AVG(artist_familiarity), \
                              AVG(key), AVG(tempo), \
                              AVG(artist_hotness), year FROM songs \
                              GROUP BY year")

In [26]:
n_years = grouped_year_songs_df.count()
print("There are songs from %d different years in the dataset" % (n_years))

There are songs from 61 different years in the dataset


In [27]:
grouped_year_songs_df.orderBy("year", ascending=False).show()

+-------------------+-------------------+-----------------------+------------------+------------------+-------------------+----+
|      avg(loudness)|  avg(song_hotness)|avg(artist_familiarity)|          avg(key)|        avg(tempo)|avg(artist_hotness)|year|
+-------------------+-------------------+-----------------------+------------------+------------------+-------------------+----+
| -3.993000030517578|  0.644711971282959|     0.6272614598274231|               2.0| 143.0050048828125| 0.5325385928153992|2011|
| -8.028590893590605| 0.5416193984739193|     0.6237667761501167|5.0811688311688314| 124.9831883690574| 0.4522572713238852|2010|
| -7.964498000171132|0.49272277063416864|     0.6169314381263095| 5.351297405189621|122.80484237214048| 0.4377286074582688|2009|
| -8.195168193681694|0.46935270025209275|     0.6181123493906703| 5.221206581352834|124.39913437614929|0.43039013528464265|2008|
| -8.015104006290436|0.44904645688533784|     0.6212394068658352|            5.2808|124.104260839

In [28]:
# see linear correlation between years and the other features
loudness_correlation = 100*float(songs_df.stat.corr("loudness", "year"))
song_hotness_correlation = 100*float(songs_df.stat.corr("song_hotness", "year"))
artist_familiarity_correlation = 100*float(songs_df.stat.corr("artist_familiarity", "year"))
key_correlation = 100*float(songs_df.stat.corr("key", "year"))
tempo_correlation = 100*float(songs_df.stat.corr("tempo", "year"))
artist_hotness_correlation = 100*float(songs_df.stat.corr("artist_hotness", "year"))

print("%0.2f%% correlation between loudness and year" % (loudness_correlation))
print("%0.2f%% correlation between song_hotness and year" % (song_hotness_correlation))
print("%0.2f%% correlation between artist_familiarity and year" % (artist_familiarity_correlation))
print("%0.2f%% correlation between key and year" % (key_correlation))
print("%0.2f%% correlation between tempo and year" % (tempo_correlation))
print("%0.2f%% correlation between artist_hotness and year" % (artist_hotness_correlation))

27.09% correlation between loudness and year
7.95% correlation between song_hotness and year
5.47% correlation between artist_familiarity and year
1.51% correlation between key and year
-1.09% correlation between tempo and year
3.48% correlation between artist_hotness and year


In [29]:
# see linear correlation between years and the average of other features
loudness_correlation = 100*float(grouped_year_songs_df.stat.corr("avg(loudness)", "year"))
song_hotness_correlation = 100*float(grouped_year_songs_df.stat.corr("avg(song_hotness)", "year"))
artist_familiarity_correlation = 100*float(grouped_year_songs_df.stat.corr("avg(artist_familiarity)", "year"))
key_correlation = 100*float(grouped_year_songs_df.stat.corr("avg(key)", "year"))
tempo_correlation = 100*float(grouped_year_songs_df.stat.corr("avg(tempo)", "year"))
artist_hotness_correlation = 100*float(grouped_year_songs_df.stat.corr("avg(artist_hotness)", "year"))

print("%0.2f%% correlation between avg(loudness) and year" % (loudness_correlation))
print("%0.2f%% correlation between avg(song_hotness) and year" % (song_hotness_correlation))
print("%0.2f%% correlation between avg(artist_familiarity) and year" % (artist_familiarity_correlation))
print("%0.2f%% correlation between avg(key) and year" % (key_correlation))
print("%0.2f%% correlation between avg(tempo) and year" % (tempo_correlation))
print("%0.2f%% correlation between avg(artist_hotness) and year" % (artist_hotness_correlation))

79.57% correlation between avg(loudness) and year
26.40% correlation between avg(song_hotness) and year
50.81% correlation between avg(artist_familiarity) and year
-7.27% correlation between avg(key) and year
50.23% correlation between avg(tempo) and year
50.76% correlation between avg(artist_hotness) and year


In [30]:
# discretize hotness to 10 different values
discretizer = QuantileDiscretizer(numBuckets=10, inputCol="song_hotness", outputCol="discrete_song_hotness")
discretized_df = discretizer.fit(songs_df).transform(songs_df)
discretized_df.createOrReplaceTempView("discrete_hotness_songs")

In [31]:
discretized_df.show()

+--------+------------+----+------------------+--------------+---+-------+---------------------+
|loudness|song_hotness|year|artist_familiarity|artist_hotness|key|  tempo|discrete_song_hotness|
+--------+------------+----+------------------+--------------+---+-------+---------------------+
| -16.346|  0.34911996|2008|        0.59975606|    0.42955965|  4|187.402|                  3.0|
|  -3.975|  0.53237844|2007|         0.8513696|     0.5836399|  7|150.114|                  6.0|
|  -9.094|         0.0|1993|         0.4612176|    0.32213077| 11|127.772|                  1.0|
|  -7.548|   0.2998775|2005|         0.5636851|    0.32525325|  5|  156.5|                  2.0|
|  -7.688|  0.42870227|2009|         0.4956395|    0.37228364|  4|119.288|                  4.0|
| -15.216|   0.4518259|1996|        0.48260397|    0.29409504|  8| 97.388|                  4.0|
| -18.329|    0.541552|1976|        0.67577136|     0.4354689|  7|151.361|                  6.0|
|  -8.325|   0.4381202|2003|  

In [32]:
# group songs by hotness
grouped_hotness_songs_df = spark.sql("SELECT AVG(loudness), \
                              AVG(artist_familiarity), \
                              AVG(key), AVG(tempo), \
                              AVG(artist_hotness), discrete_song_hotness FROM discrete_hotness_songs \
                              GROUP BY discrete_song_hotness")

In [33]:
grouped_hotness_songs_df.orderBy("discrete_song_hotness", ascending=False).show()

+-------------------+-----------------------+-----------------+------------------+-------------------+---------------------+
|      avg(loudness)|avg(artist_familiarity)|         avg(key)|        avg(tempo)|avg(artist_hotness)|discrete_song_hotness|
+-------------------+-----------------------+-----------------+------------------+-------------------+---------------------+
| -8.264136008105673|     0.7182547533578691|5.366622864651774|124.61489686702774| 0.5150832961192269|                  9.0|
|  -8.61522301401565|     0.6790808589663055|5.180349062702004|125.24948229478419|0.47486551014469297|                  8.0|
| -8.868828756591073|     0.6474130619012939|5.363398692810457|126.70131234063042|0.45046668712025373|                  7.0|
| -8.949385912450227|     0.6242417906820191|5.431551499348109|125.74190221585071|0.43281927654298685|                  6.0|
| -9.615878017195206|     0.6009626929612985|5.395303326810176|124.90745083397692| 0.4169134982780779|                  5.0|


In [34]:
# see linear correlation between discrete_hotness and the other features
loudness_correlation = 100*float(discretized_df.stat.corr("loudness", "discrete_song_hotness"))
artist_hotness_correlation = 100*float(discretized_df.stat.corr("artist_hotness", "discrete_song_hotness"))
artist_familiarity_correlation = 100*float(discretized_df.stat.corr("artist_familiarity", "discrete_song_hotness"))
key_correlation = 100*float(discretized_df.stat.corr("key", "discrete_song_hotness"))
tempo_correlation = 100*float(discretized_df.stat.corr("tempo", "discrete_song_hotness"))
year_correlation = 100*float(discretized_df.stat.corr("year", "discrete_song_hotness"))

print("%0.2f%% correlation between loudness and discrete_song_hotness" % (loudness_correlation))
print("%0.2f%% correlation between artist_hotness and discrete_song_hotness" % (artist_hotness_correlation))
print("%0.2f%% correlation between artist_familiarity and discrete_song_hotness" % (artist_familiarity_correlation))
print("%0.2f%% correlation between key and discrete_song_hotness" % (key_correlation))
print("%0.2f%% correlation between tempo and discrete_song_hotness" % (tempo_correlation))
print("%0.2f%% correlation between year and discrete_song_hotness" % (artist_hotness_correlation))

14.39% correlation between loudness and discrete_song_hotness
45.49% correlation between artist_hotness and discrete_song_hotness
48.42% correlation between artist_familiarity and discrete_song_hotness
-0.04% correlation between key and discrete_song_hotness
2.29% correlation between tempo and discrete_song_hotness
45.49% correlation between year and discrete_song_hotness


In [35]:
# see linear correlation between discrete_hotness and the average of the other features
loudness_correlation = 100*float(grouped_hotness_songs_df.stat.corr("avg(loudness)", "discrete_song_hotness"))
song_hotness_correlation = 100*float(grouped_hotness_songs_df.stat.corr("avg(artist_hotness)", "discrete_song_hotness"))
artist_familiarity_correlation = 100*float(grouped_hotness_songs_df.stat.corr("avg(artist_familiarity)", "discrete_song_hotness"))
key_correlation = 100*float(grouped_hotness_songs_df.stat.corr("avg(key)", "discrete_song_hotness"))
tempo_correlation = 100*float(grouped_hotness_songs_df.stat.corr("avg(tempo)", "discrete_song_hotness"))

print("%0.2f%% correlation between avg(loudness) and discrete_song_hotness" % (loudness_correlation))
print("%0.2f%% correlation between avg(artist_hotness) and discrete_song_hotness" % (song_hotness_correlation))
print("%0.2f%% correlation between avg(artist_familiarity) and discrete_song_hotness" % (artist_familiarity_correlation))
print("%0.2f%% correlation between avg(key) and discrete_song_hotness" % (key_correlation))
print("%0.2f%% correlation between avg(tempo) and discrete_song_hotness" % (tempo_correlation))

98.54% correlation between avg(loudness) and discrete_song_hotness
97.77% correlation between avg(artist_hotness) and discrete_song_hotness
98.50% correlation between avg(artist_familiarity) and discrete_song_hotness
4.82% correlation between avg(key) and discrete_song_hotness
67.34% correlation between avg(tempo) and discrete_song_hotness


In [36]:
# transform data to vector to use for clustering
input_cols = ["loudness", \
              "song_hotness", \
              "artist_familiarity", \
              "artist_hotness", \
              "key", \
              "tempo"]
vecAssembler = VectorAssembler(inputCols=input_cols, \
                               outputCol="features")
vec_df = vecAssembler.transform(songs_df)

In [37]:
vec_df.show()

+--------+------------+----+------------------+--------------+---+-------+--------------------+
|loudness|song_hotness|year|artist_familiarity|artist_hotness|key|  tempo|            features|
+--------+------------+----+------------------+--------------+---+-------+--------------------+
| -16.346|  0.34911996|2008|        0.59975606|    0.42955965|  4|187.402|[-16.346000671386...|
|  -3.975|  0.53237844|2007|         0.8513696|     0.5836399|  7|150.114|[-3.9749999046325...|
|  -9.094|         0.0|1993|         0.4612176|    0.32213077| 11|127.772|[-9.0939998626708...|
|  -7.548|   0.2998775|2005|         0.5636851|    0.32525325|  5|  156.5|[-7.5479998588562...|
|  -7.688|  0.42870227|2009|         0.4956395|    0.37228364|  4|119.288|[-7.6880002021789...|
| -15.216|   0.4518259|1996|        0.48260397|    0.29409504|  8| 97.388|[-15.215999603271...|
| -18.329|    0.541552|1976|        0.67577136|     0.4354689|  7|151.361|[-18.329000473022...|
|  -8.325|   0.4381202|2003|        0.54

In [38]:
# fit a KMeans model to the vector transform of the grouped (by year) data
kmeans = KMeans(k=len(input_cols), seed=1)
model = kmeans.fit(vec_df.select('features'))

In [39]:
# cluster the vector transform of the grouped (by year) data
transformed_df = model.transform(vec_df)

In [40]:
transformed_df.show()

+--------+------------+----+------------------+--------------+---+-------+--------------------+----------+
|loudness|song_hotness|year|artist_familiarity|artist_hotness|key|  tempo|            features|prediction|
+--------+------------+----+------------------+--------------+---+-------+--------------------+----------+
| -16.346|  0.34911996|2008|        0.59975606|    0.42955965|  4|187.402|[-16.346000671386...|         5|
|  -3.975|  0.53237844|2007|         0.8513696|     0.5836399|  7|150.114|[-3.9749999046325...|         0|
|  -9.094|         0.0|1993|         0.4612176|    0.32213077| 11|127.772|[-9.0939998626708...|         3|
|  -7.548|   0.2998775|2005|         0.5636851|    0.32525325|  5|  156.5|[-7.5479998588562...|         2|
|  -7.688|  0.42870227|2009|         0.4956395|    0.37228364|  4|119.288|[-7.6880002021789...|         3|
| -15.216|   0.4518259|1996|        0.48260397|    0.29409504|  8| 97.388|[-15.215999603271...|         4|
| -18.329|    0.541552|1976|        0

In [41]:
# create dataframe for each centroid
centroids = model.clusterCenters()
centroids = np.array(centroids).T.tolist()
centroids.append([i for i in range(len(input_cols))])

R = Row("loudness", \
        "song_hotness", \
        "artist_familiarity", \
        "artist_hotness", \
        "key", \
        "tempo", \
        "centroid")
centroids_df = sc.parallelize([R(*r) for r in zip(*centroids)]).toDF()

In [42]:
centroids_df.show()

+-------------------+------------------+------------------+-------------------+------------------+------------------+--------+
|           loudness|      song_hotness|artist_familiarity|     artist_hotness|               key|             tempo|centroid|
+-------------------+------------------+------------------+-------------------+------------------+------------------+--------+
| -8.858958260339122|0.4325800949433544|0.6063084503764415| 0.4208683091039593|5.3645645645645645|139.63880275233728|       0|
|-13.855989993140755| 0.398096589350581| 0.594022916191641|  0.416415683831903| 5.160200250312891|  60.7642752023155|       1|
| -8.680848375059838| 0.437697964220546|0.6100420256270919| 0.4220462501049042| 5.261860465116279|166.68070898011675|       2|
| -9.499035311602103|0.4234504753298597|0.6035877241560436| 0.4188378432803185| 5.392938209331652|118.15619771988665|       3|
|  -9.63685176697683|0.4296833192086197|0.6069181274227154|0.42129169335256766| 5.267635843660629| 92.844669476

In [43]:
# add fictional genre to each centroid
genres = ["hot&loud", "plain", "mellow&soft", "mainstream", "hip&hop", "R&B"]

def add_genre(centroid):
    print(centroid)
    return genres[int(centroid)]

udf_add_genre = udf(add_genre, StringType())
genres_df = centroids_df.withColumn("genre", udf_add_genre("centroid")).select("centroid", "genre")

In [44]:
# add centroid genre to songs
song_genres_df = transformed_df.join(broadcast(genres_df), transformed_df.prediction == genres_df.centroid) \
    .select("loudness", \
            "song_hotness", \
            "artist_familiarity", \
            "artist_hotness", \
            "key", \
            "tempo", \
            "genre")

In [45]:
song_genres_df.show()

+--------+------------+------------------+--------------+---+-------+-----------+
|loudness|song_hotness|artist_familiarity|artist_hotness|key|  tempo|      genre|
+--------+------------+------------------+--------------+---+-------+-----------+
| -16.346|  0.34911996|        0.59975606|    0.42955965|  4|187.402|        R&B|
|  -3.975|  0.53237844|         0.8513696|     0.5836399|  7|150.114|   hot&loud|
|  -9.094|         0.0|         0.4612176|    0.32213077| 11|127.772| mainstream|
|  -7.548|   0.2998775|         0.5636851|    0.32525325|  5|  156.5|mellow&soft|
|  -7.688|  0.42870227|         0.4956395|    0.37228364|  4|119.288| mainstream|
| -15.216|   0.4518259|        0.48260397|    0.29409504|  8| 97.388|    hip&hop|
| -18.329|    0.541552|        0.67577136|     0.4354689|  7|151.361|   hot&loud|
|  -8.325|   0.4381202|        0.54568464|      0.396207|  6|133.635|   hot&loud|
| -17.636|   0.7462104|           0.70012|     0.5343837|  7|120.632| mainstream|
|  -9.352|   0.2

In [46]:
song_genres_df.createOrReplaceTempView("songs_with_genres")

In [47]:
n_mainstream_songs = spark.sql("SELECT COUNT(*) FROM songs_with_genres WHERE genre = \"mainstream\"").collect()[0][0]
n_hotnloud_songs = spark.sql("SELECT COUNT(*) FROM songs_with_genres WHERE genre = \"hot&loud\"").collect()[0][0]
n_mellownsoft_songs = spark.sql("SELECT COUNT(*) FROM songs_with_genres WHERE genre = \"mellow&soft\"").collect()[0][0]
n_plain_songs = spark.sql("SELECT COUNT(*) FROM songs_with_genres WHERE genre = \"plain\"").collect()[0][0]

print("There are %d mainstream songs in the dataset" % (n_mainstream_songs))
print("There are %d hot&loud songs in the dataset" % (n_hotnloud_songs))
print("There are %d mellow&soft songs in the dataset" % (n_mellownsoft_songs))
print("There are %d plain songs in the dataset" % (n_plain_songs))

There are 3965 mainstream songs in the dataset
There are 3330 hot&loud songs in the dataset
There are 2150 mellow&soft songs in the dataset
There are 799 plain songs in the dataset


In [48]:
sc.stop()