In [1]:
from pyspark import RDD
from pyspark import SparkContext

rdd_analysis_songs = sc.pickleFile("../data/analysis-songs")
rdd_musicbrainz_songs = sc.pickleFile("../data/musicbrainz-songs")
rdd_metadata_songs = sc.pickleFile("../data/metadata-songs")

In [2]:
rdd_analysis_songs.first()

('TRAAAAW128F429D538',
 array([ (22050, 'a222795e07cd65b7a530f1346f520649',  0.,  218.93179,  0.247,  0., 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,  0.736, -11.197, 0,  0.636,  218.932,  92.198, 4,  0.778, 'TRAAAAW128F429D538')],
       dtype=[('analysis_sample_rate', '<i4'), ('audio_md5', 'S32'), ('danceability', '<f8'), ('duration', '<f8'), ('end_of_fade_in', '<f8'), ('energy', '<f8'), ('idx_bars_confidence', '<i4'), ('idx_bars_start', '<i4'), ('idx_beats_confidence', '<i4'), ('idx_beats_start', '<i4'), ('idx_sections_confidence', '<i4'), ('idx_sections_start', '<i4'), ('idx_segments_confidence', '<i4'), ('idx_segments_loudness_max', '<i4'), ('idx_segments_loudness_max_time', '<i4'), ('idx_segments_loudness_start', '<i4'), ('idx_segments_pitches', '<i4'), ('idx_segments_start', '<i4'), ('idx_segments_timbre', '<i4'), ('idx_tatums_confidence', '<i4'), ('idx_tatums_start', '<i4'), ('key', '<i4'), ('key_confidence', '<f8'), ('loudness', '<f8'), ('mode', '<i4'), ('mode_confidence',

In [3]:
rdd_metadata_songs.first()

('TRAAAAW128F429D538',
 array([ ('', 165270,  0.58179377,  0.40199754, 'ARD7TVE1187B99BFB1',  nan, 'California - LA',  nan, 'e77e51a5-4761-45b3-9847-2051f811e366', 'Casual', 4479, '', 0, 0, 'Fear Itself', 300848,  0.60211999, 'SOMZWCG12A8C13C480', "I Didn't Mean To", 3401791)],
       dtype=[('analyzer_version', 'S32'), ('artist_7digitalid', '<i4'), ('artist_familiarity', '<f8'), ('artist_hotttnesss', '<f8'), ('artist_id', 'S32'), ('artist_latitude', '<f8'), ('artist_location', 'S1024'), ('artist_longitude', '<f8'), ('artist_mbid', 'S40'), ('artist_name', 'S1024'), ('artist_playmeid', '<i4'), ('genre', 'S1024'), ('idx_artist_terms', '<i4'), ('idx_similar_artists', '<i4'), ('release', 'S1024'), ('release_7digitalid', '<i4'), ('song_hotttnesss', '<f8'), ('song_id', 'S32'), ('title', 'S1024'), ('track_7digitalid', '<i4')]))

In [4]:
import numpy as np
import numpy.lib.recfunctions as rfn

def merge_array_parameters(row):
    return (row[0], rfn.merge_arrays([row[1][0], row[1][1]], flatten=True))

big_rdd = rdd_analysis_songs.join(rdd_metadata_songs).map(merge_array_parameters)
big_rdd.first()[1]

array([ (22050, '1bca3d3174c84c8e385c8d612d85af2a',  0.,  230.60853,  0.073,  0., 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8,  0.913, -4.438, 1,  0.7,  226.667,  124.144, 4,  0.352, 'TRAOOGD128F92E4768', '', 49992,  0.79633706,  0.58292158, 'ARSW5F51187FB4CFC9',  nan, 'Seattle, WA',  nan, '4bd95eea-b9f6-4d70-a36c-cfea77431553', 'Alice In Chains', 65, '', 0, 0, 'Music Bank', 385038,  0.81839438, 'SOPANEB12A8C13E81E', 'God Smack', 4274865)],
      dtype=[('analysis_sample_rate', '<i4'), ('audio_md5', 'S32'), ('danceability', '<f8'), ('duration', '<f8'), ('end_of_fade_in', '<f8'), ('energy', '<f8'), ('idx_bars_confidence', '<i4'), ('idx_bars_start', '<i4'), ('idx_beats_confidence', '<i4'), ('idx_beats_start', '<i4'), ('idx_sections_confidence', '<i4'), ('idx_sections_start', '<i4'), ('idx_segments_confidence', '<i4'), ('idx_segments_loudness_max', '<i4'), ('idx_segments_loudness_max_time', '<i4'), ('idx_segments_loudness_start', '<i4'), ('idx_segments_pitches', '<i4'), ('idx_segments_

In [5]:
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors
import math

def map_features_to_vectors(input_row):
    song_id = input_row[0]
    
    # take only number values
    row_data = [input_row[1][x][0] for x in input_row[1].dtype.names if (not x=='artist_7digitalid') and (isinstance(input_row[1][x][0], int) or isinstance(input_row[1][x][0], float))]
    # replace nan by 0
    row_data = map(lambda x: 0 if math.isnan(x) else x, row_data)
    
    return (Vectors.dense(row_data),)

In [6]:
data = big_rdd.map(map_features_to_vectors)
data.first()

(DenseVector([0.0, 230.6085, 0.073, 0.0, 0.913, -4.438, 0.7, 226.667, 124.144, 0.352, 0.7963, 0.5829, 0.0, 0.0, 0.8184]),)

In [None]:
pca_arr = sqlContext.createDataFrame(data, ["features"])

In [None]:
pca_ml = PCA(k=15, inputCol="features", outputCol="pcaFeatures")
model = pca_ml.fit(pca_arr)
transformed = model.transform(pca_arr)
transformed.select("pcaFeatures").collect()