In [1]:
# Configure the necessary Spark environment
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

# Initialize PySpark to predefine the SparkContext variable 'sc'
#with open(os.path.join(spark_home, 'python/pyspark/shell.py')) as f:
#    code = f.read()
#    exec(code)#, global_vars, local_vars)

In [2]:
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkConf

import sqlite3
import pandas as pd
from sqlalchemy import create_engine

import sys

def printUsage():
    print("""
    cbRecommender.py <full/path/to/artist_similarity.db>
                     <path/to/lastfmJSON/in/hdfs (no hdfs://)>
                     <path/to/track_metadata.db>
                     <path/to/echonest_analysis_data.csv>
    """)


######### Global variabls ######### (gross)
# The following variables are broadcast to the spark
# cluster and can be used in the functions below
songTable = 'song_data'
s_conf = (SparkConf()
    .setMaster('local[*]')
    .setAppName('content_rec')
    .set("spark.executor.memory", "16g"))
sc = SparkContext()
sqlContext = SQLContext(sc)



In [3]:

### Set up database connections for metadata and similar artists
### This is starting to get really ugly.
### broadcasting this data is probably not a good idea
artist_engine = create_engine('sqlite:///' + "./data/artist_similarity.db")
sims = pd.read_sql_query(
    'SELECT * FROM similarity', artist_engine)
# broadcsasting these variables is probably a bad idea since 
# they ar quite big
similars = sc.broadcast(sims.similar)
similar_groups = sc.broadcast(sims.groupby('target').groups)

tagFile = open('lastfm_unique_tags.txt', 'r')
# make tag dictionary available across the cluster.
tags = [tagstr[0] for tagstr in map(lambda ts: ts.split('\t'),
                                    [next(tagFile) for x in range(500)])]
tagDictionary = sc.broadcast(tags)
tagFile.close()

######## Functions for feature extraction #########

# make a "vector" with indices corresoinding to values in 
# tagDictionary
def getTagVector(track):
    return {tagDictionary.value[tag]:1 for [tag, f] in track.tags
            if tag in tagDictionary.value}

# Actually... it isn't really necessary to represent the tags as a vector...
# we can use sets
def getTagSet(track):
    return {'track_id':track.track_id,
            'track_tags':[tag for [tag, f] in track.tags
                          if tag in tagDictionary.value]}

def getArtistID(track):
    return track.artist_id

# use the similar artists db to make a similar artists vector
# the set this returns is not integers, it is the actual artist_ids.
def getSimilarArtistsSet(track):
    artist_id = getArtistID(track)
    # if no similars are defined then return an empty list
    sims = similar_groups.value.get(artist_id, [])
    sim_ids = map(lambda r: similars.value[r], sims) + [artist_id]
    return {'track_id':track.track_id,
            'similar_artists':list(set(sim_ids))}

def jaccardSimilarity(setA, setB):
    i = len(setA.intersection(setB))
    u = len(setA.union(setB))
    return i/u

# Note, the elements of tags must be distinct from the elements of
# artists this works now because artists is a set of strings and tags
# is a set of integers.
def combineSets(tags, artists):
    return tags.union(artists)




In [4]:
    fullJSON   = './data/lastfm_full.json'

    metadata_engine = create_engine('sqlite:///'+ "./track_metadata.db")
    artistIDs = sqlContext.createDataFrame(
        pd.read_sql_query('SELECT track_id, artist_id FROM songs',
                          metadata_engine))
    print("reading json")
    trackDF   = sqlContext.read.json(fullJSON)
    print("Joining artist with track")
    completeDF = trackDF.join(artistIDs, trackDF.track_id == artistIDs.track_id)
    # cache the complete dataframe since we will be accessing it twice
    completeDF.cache()

 

reading json
Joining artist with track


DataFrame[artist: string, similars: array<array<string>>, tags: array<array<string>>, timestamp: string, title: string, track_id: string, track_id: string, artist_id: string]

In [5]:
tagSets = sqlContext.createDataFrame(completeDF.map(getTagSet))



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 178, localhost): java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
	at org.apache.spark.sql.columnar.NullableColumnBuilder$class.build(NullableColumnBuilder.scala:76)
	at org.apache.spark.sql.columnar.ComplexColumnBuilder.build(ColumnBuilder.scala:81)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
	at org.apache.spark.sql.columnar.NullableColumnBuilder$class.build(NullableColumnBuilder.scala:76)
	at org.apache.spark.sql.columnar.ComplexColumnBuilder.build(ColumnBuilder.scala:81)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)
	at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)


In [None]:
# artistSets = sqlContext.createDataFrame(completeDF.map(getSimilarArtistsSet))



In [None]:

    # create and RDD with all track features.
    trackFeatures = tagSets.join(artistSets,
                                 tagSets.track_id == artistSets.track_id)

    echoNest = sqlContext.createDataFrame(pd.read_csv(sys.argv[4]))
    allFeatures = trackFeatures.join(echoNest,
                                     allFeatures.track_id == echoNest.track_id)
    # TODO:
    # 1. function that takes a user and constructs a feature set (ie. tag set)
    #    from their top N songs (or all songs with a normalized play count above
    #    some threshold
    #    a. read in the triplets as a pandas DataFrame
    # 2. function that finds the most similar songs to a given song vector
    
    # save the sets so we can use them again...  also just as a test
    # to make sure that everything works correctly
    #tagsFile = 'hdfs:///users/wfvining/challenge2/tagSets.rdd'
    #tagSets.saveAsTextFile(tagsFile)
    #artistsFile = 'hdfs:///users/wfvining/challenge2/artistSets.rdd'
    #artistSets.saveAsTextFile(artistsFile)


