In [1]:
sc

<pyspark.context.SparkContext at 0xb1f38f0c>

In [2]:
rawUserArtistData = sc.textFile('audio_data/user_artist_data.txt')

In [3]:
rawUserArtistData.getNumPartitions()

13

In [41]:
userIDs = rawUserArtistData.map(lambda line: float(line.split()[0]))

In [5]:
artistIDs = rawUserArtistData.map(lambda line: float(line.split()[1]))

### Maximum value matches as given in the book

In [6]:
userIDs.stats()

(count: 24296858, mean: 1947573.26535, stdev: 496000.544975, max: 2443548.0, min: 90.0)

In [7]:
artistIDs.stats()

(count: 24296858, mean: 1718704.09376, stdev: 2539389.04017, max: 10794401.0, min: 1.0)

### Construct ID to Artist Name Map

In [3]:
rawArtistData = sc.textFile('audio_data/artist_data.txt')

In [4]:
def getArtistIDAndName(line):
    """Gets artist id and name from a line in artist_data.txt"""
    
    # split about tab
    tokens = line.split('\t')
    
    try:
        return [int(tokens[0]), tokens[1].strip()]
    except Exception as e:
        return None

In [45]:
id2name = rawArtistData.map(lambda line: getArtistIDAndName(line))

In [32]:
# id2name.count()

1848707

In [46]:
id2name.take(5)

[[1134999, u'06Crazy Life'],
 [6821360, u'Pang Nakarin'],
 [10113088, u'Terfel, Bartoli- Mozart: Don'],
 [10151459, u'The Flaming Sidebur'],
 [6826647, u'Bodenstandig 3000']]

#### Remove None elements for invalid lines

In [62]:
id2name = id2name.filter(lambda ele: ele is not None)

#### No invalid elements

In [35]:
# none_count = id2name.filter(lambda ele: ele is None)

In [36]:
# none_count.count()

0

#### Sanity Check

In [16]:
# id2name.lookup(1134999)

[u'06Crazy Life']

#### Construct Alias to Canonical Name mapping

In [5]:
rawArtistAlias = sc.textFile('audio_data/artist_alias.txt')

In [6]:
def MapAliasToCanonical(line):
    tokens = line.split('\t')
    
    try: 
        return [int(tokens[0]), int(tokens[1])]
    except Exception as e:
        return None

In [7]:
artistAlias = rawArtistAlias.map(lambda line: MapAliasToCanonical(line))

#### Remove None elements

In [8]:
artistAlias = artistAlias.filter(lambda ele: ele is not None)

In [21]:
# artistAlias.count()

190892

In [22]:
# artistAlias.take(10)

[[1092764, 1000311],
 [1095122, 1000557],
 [6708070, 1007267],
 [10088054, 1042317],
 [1195917, 1042317],
 [1112006, 1000557],
 [1187350, 1294511],
 [1116694, 1327092],
 [6793225, 1042317],
 [1079959, 1000557]]

#### No invalid elements

In [23]:
# none_count = artistAlias.filter(lambda ele: ele is None)

In [24]:
# none_count.count()

0

#### Sanity Check

In [25]:
# print(artistAlias.lookup(6803336))

[1000010]


In [37]:
# del id2name, userIDs, artistIDs

### Building the Model

In [9]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [10]:
bArtistAlias = sc.broadcast(artistAlias.collectAsMap())

#### Sanity Check the broadcast map

In [40]:
# artistAlias.take(1)

[[1092764, 1000311]]

In [41]:
# print(bArtistAlias.value.get(3990103102312))

None


In [42]:
# print(bArtistAlias.value.get(1092764))

1000311


In [11]:
def replaceAlias(line):
    """Replaces alias with canonical artist id"""
    userID, artistID, play_count = [int(ele) for ele in line.split()]
    canonicalArtistID = bArtistAlias.value.get(artistID)
    
    # If this artist id is mapped to a canonical artist id, use that, else this is already a canonical artist id
    if(canonicalArtistID is not None):
        artistID = canonicalArtistID
    
    return Rating(userID, artistID, play_count)

In [12]:
trainData = rawUserArtistData.map(lambda line: replaceAlias(line)).cache()

In [45]:
# trainData.take(1)

[Rating(user=1000002, product=1, rating=55.0)]

In [13]:
model = ALS.trainImplicit(
    ratings = trainData, 
    rank = 10, 
    iterations = 5, 
    lambda_ = 0.01, 
    alpha = 1.0)

In [24]:
model

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0xb07a118c>

#### Save the model

In [66]:
model.save(sc, 'collaborative_model')

In [30]:
model.userFeatures().first()

(90,
 array('d', [0.13551528751850128, -0.050239574164152145, 0.08770570158958435, -0.774518609046936, 0.9834627509117126, 0.09070051461458206, 0.9237454533576965, -0.2517358362674713, -0.1647377461194992, -0.15752772986888885]))

#### Verify that #latent features = 10

In [34]:
len(model.userFeatures().first()[1])

10

In [89]:
rawArtistsForUser = rawUserArtistData.map(lambda line: line.split(' ')) \
                                     .filter(lambda tokens: int(tokens[0]) == 2093760)

### Get unique artist IDs from the above RDD

In [90]:
uniqueArtists = set(rawArtistsForUser.map(lambda tokens: int(tokens[1])).collect())

In [91]:
uniqueArtists

{378, 813, 942, 1180, 1255340}

In [92]:
uniqueArtistsNames = id2name.filter(lambda (artist_id, artist_name): artist_id in uniqueArtists) \
                            .map(lambda (artist_id, artist_name): artist_name)

#### Same artists as in book

In [93]:
uniqueArtistsNames.collect()

[u'David Gray', u'Blackalicious', u'Jurassic 5', u'The Saw Doctors', u'Xzibit']

In [94]:
recommendations = model.call("recommendProducts", 2093760, 5)

In [95]:
uniqueRecAristIDs = set(map(lambda x: x.product, recommendations))

In [96]:
uniqueRecAristIDs

{2814, 4605, 1001819, 1007614, 1300642}

#### Get artist names from recommended Artist IDs

In [97]:
uniqueRecArtistNames = id2name.filter(lambda (aid, aname): aid in uniqueRecAristIDs) \
                              .map(lambda (aid, aname): aname)

In [98]:
uniqueRecArtistNames.collect()

[u'50 Cent', u'Snoop Dogg', u'Jay-Z', u'2Pac', u'The Game']