# Processing entire 1,000,000 song dataset

*Andrea Soto*  
*MIDS W205 Final Project*  
*Project Name: Graph Model of the Million Song Dataset*

---

# Notebook Overview

This notebook processes the entire Million Song Dataset by running the following 4 scripts sequentially:

1. Create a list of .h5 files of the Million Song Dataset
2. Create a list of .json files of the Last.fm dataset
3. Read files, extract information, transform, and save as CSV

A short exmplanation on how to use each script is provided above the code that creates the scritps.

The output from this notebook will be used to upload the data into Neo4j in the notebook [Step 5 - Import Entire Dataset.ipynb](./Step 5 - Import Entire Dataset.ipynb)

**Requirements**

The volume with the Million Song Dataset should be mounted on /msong_dataset, the Last.fm dataset should have been downloaded, and the file 'sid_mismatches.tx' should already exist in the 'graph/import' folder.

**Updates**

The scripts were created based on the work perform on previous notebooks. However, the unique id for songs was changed from 'song-id' to 'track-id' because Neo4j's import can can only create relationships based on node's unique id, and not on node properties. Since the Last.fm dataset cross-reference songs by 'track-id', this was used as the unique id of the Song Nodes.

--- 
## Create Lists of Million Song Files (.h5) and Last.fm Files (.json)

In [10]:
%%bash
# python scripts/<name>.py <input_path> <output_path>
time python scripts/list_MDS_files.py    /msong_dataset/data /graph/import
time python scripts/list_LastFM_files.py /graph/lastfm/data /graph/import

File '/graph/import/list_hdf5_files.txt' successfully created
File '/graph/import/list_lastfm_files.txt' successfully created



real	31m49.765s
user	0m4.842s
sys	0m1.536s

real	0m4.906s
user	0m3.938s
sys	0m0.960s


In [15]:
!ls -l /graph/import

total 92020
-rw-rw-r-- 1 asoto asoto 47999999 Dec 14 10:43 list_hdf5_files.txt
-rw-rw-r-- 1 asoto asoto 46223365 Dec 14 10:43 list_lastfm_files.txt


## Extract data and save in CSV format

Command was run from terminal and the run time was copied below

In [None]:
time /usr/bin/spark-submit --master local[$(nproc)] \
scripts/extractData.py /graph/import /graph/import/tmp /graph/import $(nproc)

In [6]:
print 'Run Time'
print 'real\t774m15.972s\nuser\t 15m30.488s\nsys \t  8m22.516s\n'

Run Time
real	774m15.972s
user	 15m30.488s
sys 	  8m22.516s



**List generated directories**

In [7]:
!ls -l /graph/import/tmp

total 52
drwxrwxr-x 2 asoto asoto 4096 Dec 20 05:31 nodes_albums
drwxrwxr-x 2 asoto asoto 4096 Dec 20 03:19 nodes_artists
drwxrwxr-x 2 asoto asoto 4096 Dec 20 04:14 nodes_songs
drwxrwxr-x 2 asoto asoto 4096 Dec 20 15:48 nodes_tags
drwxrwxr-x 2 asoto asoto 4096 Dec 20 06:48 nodes_years
drwxrwxr-x 2 asoto asoto 4096 Dec 20 10:39 rel_artist_has_album
drwxrwxr-x 2 asoto asoto 4096 Dec 20 11:57 rel_artist_has_tag
drwxrwxr-x 2 asoto asoto 4096 Dec 20 09:22 rel_performs
drwxrwxr-x 2 asoto asoto 4096 Dec 20 08:07 rel_similar_artists
drwxrwxr-x 2 asoto asoto 4096 Dec 20 15:54 rel_similar_songs
drwxrwxr-x 2 asoto asoto 4096 Dec 20 15:54 rel_song_has_tag
drwxrwxr-x 2 asoto asoto 4096 Dec 20 14:31 rel_song_in_album
drwxrwxr-x 2 asoto asoto 4096 Dec 20 13:14 rel_song_year


**Size of each directory**

In [8]:
!du -h /graph/import/tmp

371M	/graph/import/tmp/rel_song_has_tag
162M	/graph/import/tmp/rel_similar_artists
79M	/graph/import/tmp/nodes_songs
36M	/graph/import/tmp/rel_performs
2.6G	/graph/import/tmp/rel_similar_songs
264K	/graph/import/tmp/nodes_years
3.4M	/graph/import/tmp/nodes_albums
64M	/graph/import/tmp/rel_artist_has_tag
5.8M	/graph/import/tmp/nodes_artists
12M	/graph/import/tmp/rel_song_year
9.1M	/graph/import/tmp/nodes_tags
8.9M	/graph/import/tmp/rel_artist_has_album
38M	/graph/import/tmp/rel_song_in_album
3.4G	/graph/import/tmp


---
# Load CSV files to Neo4j

Loading the data into Neo4j was done in the notebook [Step 5 - Import Entire Dataset.ipynb](./Step 5 - Import Entire Dataset.ipynb).

---
# Script Creation

# scripts/list_MDS_files.py

**Script to create a list of HDF5 files**

**USAGE:**
> `python scripts/list_MDS_files.py <path_to_msdataset> <output_path> <true/false to overwrite (default:false)>`

>`<path_to_msdataset>` : path to the Million Song Dataset folder called 'data' where the directory structure /\*/\*/\*/trackID.h5 begins

>`<output_path>` : where the file **'list_hdf5_files.txt'** with the list of the files will be saved

> `<true/false to overwrite (default:false)>` : optional parameter to overwrite **'list_hdf5_files.txt'** if it exitst. Default is set to false

In [6]:
%%writefile scripts/list_MDS_files.py
#!/usr/bin/env python
import os
import glob
import sys
import shutil

def main(inDir, outDir, overwrite = False):
    
    try:
        os.path.exists(inDir)
    except:
        print "Input file: '%s' does not exist"%(inDir)
    else:
        outFile = outDir + '/list_hdf5_files.txt'
        if not os.path.exists(outFile) or overwrite:
            # List all paths of songs
            get_song_paths = glob.glob(inDir+'/*/*/*/*.h5')
            
            if not get_song_paths:
                print "No HDF5 (.h5) files foung in '%s'"%(inDir)
                print "Check that the file structure under '%s' is /*/*/*/song_files.h5"%(inDir)
            else:
                with open(outFile,'w') as f:
                    f.writelines('\n'.join(p for p in get_song_paths))
                    f.close()
                print "File '%s' successfully created"%(outFile)
        else:
            print "File '%s' already exists"%(outFile)
    
if __name__ == '__main__':
    '''
    Creates the file 'list_hdf5_files.txt' with the list of HDF5 files
    
    USE:
    python list_MDS_files.py <path to songs> <save list path> <OPTIONAL overwrite>
    
    Paths should NOT include '/' at the end
    If the file already exists, it will not be overwritten. Send 'True' to overwrite
    '''
    
    input_path = sys.argv[1]  
    output_path = sys.argv[2]
    
    # Option to overwrite current file
    overwrite = False
    if len(sys.argv) > 3:
        overwrite  = sys.argv[3]
    
    main(input_path, output_path, overwrite)

Overwriting scripts/list_MDS_files.py


# scripts/list_LastFM_files.py

**Script to create a list of JSON files**

**USAGE:**
> `python scripts/list_MDS_files.py <path_to_msdataset> <output_path> <true/false to overwrite (default:false)>`

>`<path_to_msdataset>` : path to the Last.fm Dataset folder called where the directory structure /\*/\*/\*/trackID.json begins

>`<output_path>` : where the file **'list_lastfm_files.txt'** with the list of the files will be saved

> `<true/false to overwrite (default:false)>` : optional parameter to overwrite **'list_lastfm_files.txt'** if it exitst. Default is set to false

In [7]:
%%writefile scripts/list_LastFM_files.py
#!/usr/bin/env python
import os
import glob
import sys
import shutil

def main(inDir, outDir, overwrite = False):
    
    try:
        os.path.exists(inDir)
    except:
        print "Input file: '%s' does not exist"%(inDir)
    else:
        outFile = outDir + '/list_lastfm_files.txt'
        if not os.path.exists(outFile) or overwrite:
            # List all paths of songs
            get_song_paths = glob.glob(inDir+'/*/*/*/*.json')
            
            if not get_song_paths:
                print "No JSON files foung in '%s'"%(inDir)
                print "Check that the file structure under '%s' is /*/*/*/song_files.json"%(inDir)
            else:
                with open(outFile,'w') as f:
                    f.writelines('\n'.join(p for p in get_song_paths))
                    f.close()
                print  "File '%s' successfully created"%(outFile)
        else:
            print "File '%s' already exists"%(outFile)
    
if __name__ == '__main__':
    '''
    Creates the file 'list_lastfm_files.txt' with the list of HDF5 files
    
    USE:
    python list_MDS_files.py <path to songs> <save list path> <OPTIONAL overwrite>
    
    Paths should NOT include '/' at the end
    If the file already exists, it will not be overwritten. Send 'True' to overwrite
    '''
    input_path = sys.argv[1]  
    output_path = sys.argv[2]
    
    # Option to overwrite current file
    overwrite = False
    if len(sys.argv) > 3:
        overwrite  = sys.argv[3]
    
    main(input_path, output_path, overwrite)

Overwriting scripts/list_LastFM_files.py


### Test listing scripts

In [40]:
%%bash
python scripts/list_MDS_files.py MillionSongSubset/data test

File 'test/list_hdf5_files.txt' successfully created


In [56]:
%%bash
python scripts/list_LastFM_files.py MillionSongSubset/lastfm_subset test

File 'test/list_lastfm_files.txt' successfully created


In [59]:
%%bash
ls -l test

total 1068
-rw-rw-r-- 1 asoto asoto 509999 Dec 14 07:01 list_hdf5_files.txt
-rw-rw-r-- 1 asoto asoto 578459 Dec 14 07:13 list_lastfm_files.txt


# scripts/extractData.py 

**Script to process data and create part-00x files that have CSV format for all nodes and relations**

**USAGE:**

> Used with `spark-submit`  

> `/usr/bin/spark-submit --master local[<CPUs>] \
scripts/extractData.py <path_to_lists> <output_path> <path_to_mismatches> <CPUs>`


>`<path_to_lists>` : path to **'list_hdf5_files.txt'** and **'list_lastfm_files.txt'** files. Both files must be placed in the same directory.

>`<output_path>` : where all the CSV files for the nodes and relationships will be saved. Sub-folders will be created for the following outputs:

> 1. nodes_artists  
> 1. nodes_tags   
> 1. nodes_albums   
> 1. nodes_songs    
> 1. nodes_years  
> 1. rel_performs         
> 1. rel_artist_has_album  
> 1. rel_song_in_album
> 1. rel_similar_artists  
> 1. rel_similar_songs  
> 1. rel_artist_has_tag   
> 1. rel_song_has_tag   
> 1. rel_song_year

> `<path_to_mismatches>` : optional parameter to overwrite **'list_lastfm_files.txt'** if it exitst. Default is set to false

> `<CPUs>` : Number of CPUs available for parallel processing. This will also be used to partition files to ensure all the CPUs are used by Spark.

**Sample Usage**

>`time /usr/bin/spark-submit --master local[$(nproc)] \
scripts/extractData.py /graph/import /graph/import/tmp /graph/import $(nproc)`

In [2]:
!echo $(nproc)

32


In [18]:
%%writefile scripts/extractData.py
#!/usr/bin/env python
import os
import glob
import sys
import shutil
from pyspark import SparkContext
import numpy as np
import h5py
import json

def parse_mismatches(line):
    '''
    This function extracts the songID and trackID of the mismatched records.
    Returned value: ('songID', 'trackID')
    '''
    return line[8:45].split()


def get_h5_info(path):
    '''
    Takes a path to a song stored as an HDF5 file and returns a dictionary with the 
    information that will be included in the graph
    ''' 
    d = {}
    with h5py.File(path, 'r') as f:
        song_id = f['metadata']['songs']['song_id'][0]
        track_id = f['analysis']['songs']['track_id'][0]
        
        if [song_id, track_id] not in songsToRemove.value:

            # --- Artist Info -----------------------------
            d.setdefault('artist_id', f['metadata']['songs']['artist_id'][0])
            d.setdefault('artist_mbid', f['metadata']['songs']['artist_mbid'][0])
            d.setdefault('artist_7did', f['metadata']['songs']['artist_7digitalid'][0])
            d.setdefault('artist_name', f['metadata']['songs']['artist_name'][0])

            # --- Song Info -----------------------------
            d.setdefault('song_id', song_id)
            d.setdefault('track_id', track_id)
            d.setdefault('title', f['metadata']['songs']['title'][0])
            d.setdefault('dance', f['analysis']['songs']['danceability'][0])
            d.setdefault('dur', f['analysis']['songs']['duration'][0])
            d.setdefault('energy', f['analysis']['songs']['energy'][0])
            d.setdefault('loudness', f['analysis']['songs']['loudness'][0])

            # --- Year -----------------------------
            d.setdefault('year', f['musicbrainz']['songs']['year'][0])

            # --- Album -----------------------------
            d.setdefault('album', f['metadata']['songs']['release'][0])

            # --- Similar Artist -----------------------------
            d.setdefault('a_similar', np.array(f['metadata']['similar_artists']))

            # --- Artist Terms -----------------------------
            d.setdefault('a_terms', np.array(f['metadata']['artist_terms']))
            d.setdefault('a_tfrq', np.array(f['metadata']['artist_terms_freq']))
            d.setdefault('a_tw', np.array(f['metadata']['artist_terms_weight']))

            return d
        else: 
            pass

def get_json_info(path):
    with open(path) as data_file:    
        return json.load(data_file)

def makeCSVline(line):
    return ','.join(str(line[f]) for f in fieldsBrC.value)
    
def artistToTags(record):
    '''
    Concatenate artist with each tag
    Normalize tag frequency and weight
    '''
    normalize_frq = record['a_tfrq'] / sum(record['a_tfrq'])
    normalize_w = record['a_tw'] / sum(record['a_tw'])
    terms = record['a_terms']
    artist = record['artist_id']
    
    result = []
    for i in range(len(terms)):
        result.append( artist +","+ terms[i] +","+ str(normalize_frq[i]) +","+ str(normalize_w[i]))
    
    return result

def songToTags(record):
    '''
    Concatenate song with each tag
    '''
    tags = record['tags']
    total_weight = sum(float(w[1]) for w in tags)
    track_id = record['track_id']
    
    result = []
    for i in range(len(tags)):
        if (tags[i][0] <> None) and (tags[i][0] <> ''):
            result.append( track_id +","+ tags[i][0] +","+ str(float(tags[i][1])/total_weight))
    
    return result

if __name__ == '__main__':
    '''
    input_path: path to where the list of hdf5 and json files was created
    output_path: a temporary directory where the Spark CSV files separated as part-000xx files will be stored
    mismatch_path: path to where the mismatches file is located
    
    DO NOT INCLUDE '/' AT THE END OF PATH
    Cannot change file names
    '''
    
    inDir = sys.argv[1]  
    outDir = sys.argv[2]
    mismatch_path = sys.argv[3]
    cpus = int(sys.argv[4])
    
    #main(input_path, output_path)
    # === Start Spark Context ===
    sc = SparkContext(appName="SparkProcessing")
    
    # =====================================================================
    # === Load mismatches ===
    toRemoveRDD = sc.textFile('file://'+mismatch_path+'/sid_mismatches.txt',cpus).map(parse_mismatches)
    #songsToRemove = toRemoveRDD.collect()
    songsToRemove = sc.broadcast(toRemoveRDD.collect())
    
    
    # =====================================================================
    # === Load list of files ====== Extract Song Data ===
    song_pathsRDD   = sc.textFile('file://' + inDir + '/list_hdf5_files.txt',cpus)
    lastfm_pathsRDD = sc.textFile('file://' + inDir + '/list_lastfm_files.txt',cpus)
    
    # === Extract Song Data ===
    songsRDD = song_pathsRDD.map(get_h5_info).filter(lambda x: x<>None).cache()
    
    
    # =====================================================================
    # == Delete Sub-Folders ===
    folders = ['/nodes_artists','/nodes_songs','/nodes_albums','/nodes_years','/nodes_tags',
              '/rel_similar_artists', '/rel_performs','/rel_artist_has_album','/rel_artist_has_tag',
               '/rel_song_in_album','/rel_similar_songs', '/rel_song_has_tag', '/rel_song_year']
    for p in folders:
        if os.path.exists(outDir+p):
            shutil.rmtree(outDir+p)
    
    
    # === ARTISTS ===
    # CSV Format: artist_id, artist_mb_id, artist_7d_id, artist_name
    fields = ['artist_id', 'artist_mbid', 'artist_7did', 'artist_name']
    fieldsBrC = sc.broadcast(fields)
    songsRDD.map(makeCSVline).distinct().saveAsTextFile('file://'+outDir+'/nodes_artists')
    
    # === SONGS ===
    # CSV Format: song_id, track_id, song_title, danceability, duration, energy, loudness
    fields = ['song_id', 'track_id', 'title', 'dance', 'dur', 'energy','loudness']
    fieldsBrC = sc.broadcast(fields)
    songsRDD.map(makeCSVline).distinct().saveAsTextFile('file://'+outDir+'/nodes_songs')
    
    # === ALBUMS ===
    # CSV Format: album_name
    songsRDD.map(lambda x: x['album']).distinct().saveAsTextFile('file://'+outDir+'/nodes_albums')
    
    # === YEAR ===
    # CSV Format: year
    songsRDD.map(lambda x: x['year']).filter(
        lambda x: int(x) > 0).distinct().saveAsTextFile('file://'+outDir+'/nodes_years')

    
    # === SIMILAR_TO relationship between artist and artist ===
    # CSV Format: from_artist_id, to_artist_id
    # Similar Artist to Artist (directional, no properties)
    similarArtistsRDD = songsRDD.map(lambda x: (x['artist_id'],x['a_similar'])).flatMapValues(lambda x: x)
    similarArtistsRDD.distinct().map(lambda x: x[0]+","+x[1]).saveAsTextFile('file://'+outDir+'/rel_similar_artists')
    
    # === PERFORMS relationship between artist and song ===
    # CSV Format: artist_id, song_id
    # Artist Performs Song (directional, no properties)
    songsRDD.map(lambda x: x['artist_id']+","+x['track_id']).distinct().saveAsTextFile('file://'+outDir+'/rel_performs')
    
    # === HAS_ALBUM relationship between artist and album  ===
    # CSV Format: artist_id, album_name
    # Artist Has Album (directional, no properties)
    songsRDD.map(lambda x: x['artist_id']+","+x['album']).distinct().saveAsTextFile('file://'+outDir+'/rel_artist_has_album')

    # === HAS_TAG relationship between artist and tags ===
    # CSV Format: artist_id, tag_name, tag_frequency, tag_weight
    # Artist Has Tags (directional, has properties frequency and weight)
    songsRDD.flatMap(artistToTags).distinct().saveAsTextFile('file://'+outDir+'/rel_artist_has_tag')
    
    # === RELEASED_ON relationship between song and year
    # CSV Format: song_id, year
    # Song Released in Year (directional, no properties)
    songsRDD.filter(lambda x: int(x['year'])<>0).map(
        lambda x: x['track_id']+","+str(x['year'])).saveAsTextFile('file://'+outDir+'/rel_song_year')
    
    # === IN_ALBUM relationship between song and album ===
    # CSV Format: song_id, album_name
    # Song In Album (direction, no properties)   
    songsRDD.map(lambda x: x['track_id']+","+x['album']).distinct().saveAsTextFile('file://'+outDir+'/rel_song_in_album')
    
    # === Extract Lastfm Song Data ===
    lastfmRDD = lastfm_pathsRDD.map(get_json_info).cache()
    
    # === TAGS ===
    # CSV Format: tag_name
    artistTags = songsRDD.flatMap(lambda x: x['a_terms']).distinct()
    songTags = lastfmRDD.flatMap(lambda x: x['tags']).map(lambda x: x[0]).distinct()
    allTags = songTags.union(artistTags).distinct()
    allTags.saveAsTextFile('file://'+outDir+'/nodes_tags')
    
    # === SIMILAR_TO relationship between song and song ===
    #CSV Format: from_track_id, to_track_id, similarity_measure
    # Similar Song to Song (directional, with property similarity measure)   
    similarSongsRDD = lastfmRDD.filter(
        lambda x: x['similars']<>[]).map(
        lambda x: (x['track_id'],x['similars'])).flatMapValues(lambda x: x)
    similarSongsRDD.map(
        lambda x: x[0]+","+x[1][0]+","+str(x[1][1])).saveAsTextFile('file://'+outDir+'/rel_similar_songs')
    
    # === HAS_TAG relationship between song and tags
    # CSV Format: track_id, tag_name, tag_weight
    lastfmRDD.flatMap(songToTags).saveAsTextFile('file://'+outDir+'/rel_song_has_tag')
    
    # === Stop Spark Context ===
    sc.stop()

Overwriting scripts/extractData.py
