# Million Song Database
IS622 Final Project  
Aaron Palumbo | December 2015

## About the Data

The <a href=http://labrosa.ee.columbia.edu/millionsong/tasteprofile>data</a> are provided by The Echo Nest.

From the website:

> Welcome to the Taste Profile subset, the official user dataset of the Million Song Dataset.

> The Echo Nest is committed to giving back to the research community (for instance by creating the MSD!), and they prove it again by releasing the Taste Profile dataset. The dataset contains real user - play counts from undisclosed partners, all songs already matched to the MSD. if you were looking for the right collaborative filtering dataset with audio features, this might be for you! Plus, you can link that user data to lyrics, tags and Last.fm's similar songs, thus you have many viewpoint for explaining the data.

The Million Song Dataset Challenge, B. McFee, T. Bertin-Mahieux, D. Ellis and G. Lanckriet, AdMIRe '12 [pdf][bib]

The listening data from EchoNest comes as one big text file. Each line contains three fields: user, song, play count.

We can see the file on disk:

In [4]:
%ls -lh ../data/train_triplets.txt

-rw-r--r-- 1 apalumbo apalumbo 2.8G Dec 19  2011 ../data/train_triplets.txt


We can copy this to HDFS with the command line tool:

    hdfs dfs -put {{ fileLoc }} {{ fileHDFS }}

Here I am using the <a href="http://jinja.pocoo.org/Jinja">Jinja2</a> syntax to reference variables.

In [5]:
# show file in hadoop
import pydoop.hdfs as hdfs
hdfs.lsl("/user/apalumbo/final/train_triplets.txt")

[{'block_size': 134217728,
  'group': 'supergroup',
  'kind': 'file',
  'last_access': 1450542763,
  'last_mod': 1450543445,
  'name': u'hdfs://localhost:9000/user/apalumbo/final/train_triplets.txt',
  'owner': 'apalumbo',
  'path': u'hdfs://localhost:9000/user/apalumbo/final/train_triplets.txt',
  'permissions': 420,
  'replication': 1,
  'size': 3001659271L}]

## Objective

The data consists of:

* 1,019,318 unique users
* 384,546 unique MSD songs
* 48,373,586 user - song - play count triplets

Our goal is to compare three tools for analyzing this data:

* pandas
* Spark
* Hadoop


We will make this comparison based on normal tasks encountered while working with data of this type and try to draw some conclusions about the appropriateness of each of these tools. Obviously, the first criterion we will use in the comparison is the feasibility. Assuming the task is feasible in all three tools we will then move to complexity and time. Complexity will be somewhat subjective while time will be more objective. In our conclusions we will also discuss how will each of these methods scale.

> _Notes_
> * we will be using Apache Spark 1.5.1
* Hadoop 2.7.1 accessed from python with pydoop 1.1.0
* pandas 0.17.1
* we will exercise the tool sequentially and confirm that memory has been released to ensure the resources of the machine are dedicated to the tool at hand.

## Spark

### Setup

In [6]:
# Reset the namespace
%reset -f

In [7]:
%%bash
cat /proc/meminfo | grep Mem

MemTotal:       14361144 kB
MemFree:         7252340 kB
MemAvailable:   12531740 kB


In [8]:
import os
import sys
from pyechonest import song

In [9]:
spark_home = "/home/apalumbo/workspace/cuny_msda_is622/spark-1.5.1-bin-hadoop2.6/"

# Path for Spark source folder
os.environ['SPARK_HOME'] = spark_home

# Append pyspark to Python Path
sys.path.append(spark_home + "python/")

# Append py4j to Python Path
sys.path.append(spark_home + "python/lib/py4j-0.8.2.1-src.zip")

# Launch Spark
execfile(spark_home + "python/pyspark/shell.py")

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:08:32)
SparkContext available as sc, HiveContext available as sqlContext.


### Dependencies

In [10]:
# Libraries 
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *

import IPython.display as dis
from pyechonest import config
import json

sqlCtx = SQLContext(sc)

# Paths
fileHDFS = "hdfs:///user/apalumbo/final/train_triplets.txt"
# use for testing
# fileHDFS = "hdfs:///user/apalumbo/final/train_triplets_100.txt"

### Loading Data

To use Spark to load the data and look at the first few records, is fast and easy:

First we need a split function:

In [11]:
def splitFun(line):
    row = []
    for field in line.split("\t"):
        try:
            row.append(int(field))
        except ValueError:
            row.append(str(field))
    return row

In [12]:
%%time
songs_spark_ref = sc.textFile(fileHDFS)

songs_spark = songs_spark_ref.map(lambda line: splitFun(line))

dis.display(songs_spark.take(10))

[['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOAKIMP12A8C130995', 1],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOAPDEY12A81C210A9', 1],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBBMDR12A8C13253B', 2],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBFNSP12AF72A0E22', 1],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBFOVM12A58A7D494', 1],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBNZDC12A6D4FC103', 1],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBSUJE12A6D4F8CF5', 2],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBVFZR12A6D4F8AE3', 1],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBXALG12A8C13C108', 1],
 ['b80344d063b5ccb3212f76538f3d9e43d87dca9e', 'SOBXHDL12A81C204C0', 1]]

CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 2.53 s


We can use the %%timeit magic to measure how fast this operation is.

In [13]:
%%timeit
songs_spark_ref.map(lambda line: splitFun(line)).take(10)

10 loops, best of 3: 57.7 ms per loop


### Dataset statistics

The first thing we would like to do is to determine some basic information about the data. We can start with the overall size.

The sqlcontext provides a nice tool for this.

Now we create a schema and load the data:

In [14]:
schema = StructType([StructField("user", StringType()), 
                     StructField("song", StringType()), 
                     StructField("playCount", IntegerType())])

# Convert the RDD to a spark DataFrame
sdf = songs_spark.toDF(schema)
sdf.show(10)

+--------------------+------------------+---------+
|                user|              song|playCount|
+--------------------+------------------+---------+
|b80344d063b5ccb32...|SOAKIMP12A8C130995|        1|
|b80344d063b5ccb32...|SOAPDEY12A81C210A9|        1|
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|        2|
|b80344d063b5ccb32...|SOBFNSP12AF72A0E22|        1|
|b80344d063b5ccb32...|SOBFOVM12A58A7D494|        1|
|b80344d063b5ccb32...|SOBNZDC12A6D4FC103|        1|
|b80344d063b5ccb32...|SOBSUJE12A6D4F8CF5|        2|
|b80344d063b5ccb32...|SOBVFZR12A6D4F8AE3|        1|
|b80344d063b5ccb32...|SOBXALG12A8C13C108|        1|
|b80344d063b5ccb32...|SOBXHDL12A81C204C0|        1|
+--------------------+------------------+---------+
only showing top 10 rows



In [None]:
%%time
print "Num Rows: {}".format(sdf.select("user").count())

In [None]:
%%time
print "Num Unique Users: {}".format(sdf.select("user").distinct().count())

In [15]:
%%time
print "Num Unique Songs: {}".format(sdf.select("song").distinct().count())

Num Rows: 48373586
Num Unique Users: 1019318
Num Unique Songs: 384546
CPU times: user 1.08 s, sys: 676 ms, total: 1.76 s
Wall time: 35min 43s


The overall size, number of unique users, and number of unique songs are shown above.

### Most Popular Songs

One of our objectives with this data set is to build a recommendation engine. One simple way to do this is to simply recommend the most popular artists or songs. Let's see how we do this in Spark.

Now we can group by song and sum play counts to get a measure of the most popular songs / artists (this takes about 11.5 minutes:

In [16]:
%%time
groupedSongs = sdf.groupBy('song')
songsByPlayCount = groupedSongs.sum('playCount') \
        .sort('sum(playCount)', ascending=False)
topSongs = songsByPlayCount.take(10)
dis.display([[line[i] for i, j in enumerate(line)] for line in topSongs])

[[u'SOBONKR12A58A7A7E0', 726885],
 [u'SOAUWYT12A81C206F1', 648239],
 [u'SOSXLTC12AF72A7F54', 527893],
 [u'SOFRQTD12A81C233C0', 425463],
 [u'SOEGIYH12A6D4FC0E3', 389880],
 [u'SOAXGDH12A8C13F8A1', 356533],
 [u'SONYKOW12AB01849C9', 292642],
 [u'SOPUCYA12A8C13A694', 274627],
 [u'SOUFTBI12AB0183F65', 268353],
 [u'SOVDSJC12A58A7A271', 244730]]

CPU times: user 380 ms, sys: 228 ms, total: 608 ms
Wall time: 12min 6s


In [17]:
# Use echonest API to look up user/song information
echonestAPI = json.load(open("../echonest_info.json", "rb"))
config.ECHO_NEST_API_KEY = echonestAPI['api_key']

for i in topSongs:
    try:
        s = song.Song(i.song)
        print "artist: {}, title: {}".format(s.artist_name, s.title)
    except IndexError:
        print "{} not found".format(i)
    

Row(song=u'SOBONKR12A58A7A7E0', sum(playCount)=726885) not found


UnicodeEncodeError: 'ascii' codec can't encode character u'\xf6' in position 2: ordinal not in range(128)

## Hadoop

Now let's see how we would do the same tasks with Hadoop.

### Setup

In [None]:
# Clear the namespace
%reset -f

In [None]:
%%bash
cat /proc/meminfo | grep Mem

### Dependencies

In [None]:
import pydoop.hdfs as hdfs
import pydoop.mapreduce.api as api
import os
import sys
from pyechonest import song
from subprocess import call

# Paths
fileHDFS = "hdfs:///user/apalumbo/final/train_triplets.txt"
# use for testing
# fileHDFS = "hdfs:///user/apalumbo/final/train_triplets_100.txt"
fileOutput = "hdfs:///user/apalumbo/final/hadoop_output.txt"

In [None]:
hdfs.lsl(fileHDFS)

In [None]:
colnames = ["user", "song", "playCount"]

### Loading Data

In [None]:
def splitFun(line):
    row = []
    for field in line.split("\t"):
        try:
            row.append(int(field))
        except ValueError:
            row.append(str(field))
    return row

In [None]:
def hadoop_take(file_path, take_lines):
    output = []
    i = 0
    with hdfs.open(file_path, "r") as f:
        for line in f:
            output.append(line)
            i += 1
            if i >= take_lines:
                break
    return output

songs_hadoop = hadoop_take(fileHDFS, 10)
[splitFun(x) for x in songs_hadoop]

In [None]:
%%timeit
songs_hadoop = hadoop_take(fileHDFS, 10)
[splitFun(x) for x in songs_hadoop]

Although it is more complicated, it is faster than Spark.

### Dataset statistics

I spent time trying to use the python libraries mrjob and pydoop and was unable to get them functioning. I have not been able to isolate the problem. Instead, I will do the hadoop part in R.

    ----
     
     
     
     
     
     
     
     
     
     
     
     
    ----

## Pandas

In [None]:
# Clear the namespace
%reset -f

In [None]:
import pandas as pd

In [None]:
colnames = ["user", "song", "playCount"]

fileLoc  = "file:///home/apalumbo/is622/final_project/data/train_triplets.txt"

### Loading the Data

In [None]:
%%time
songs_pandas = pd.read_csv(fileLoc, sep="\t", header=None, names=colnames)

In [None]:
%%timeit
songs_pandas.head(10)

## Appendix

In [None]:
# used to connect a console to the notebook
%connect_info