In [1]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 76kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 37.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=2e34ebe9344f3dddd066f7db772ae752ffad89ba064fd46bae8396216a5d0865
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [2]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession as spark

In [3]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [33]:
from operator import *
import os
import sys
import re
import random
from pyspark.mllib.recommendation import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *

import pandas as pd
import numpy as np
from time import time

sqlContext = SQLContext(sc)

In [5]:
# Load user_artist_data
userArtistDataSchema = StructType([ \
    StructField("userId", LongType(), True), \
    StructField("artistId", LongType(), True), \
    StructField("playCount", IntegerType(), True)])

userArtistDf = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false', delimiter=' ') \
    .load("user_artist_data.txt", schema = userArtistDataSchema) \
    .cache()

userArtistDf.show()

+-------+--------+---------+
| userId|artistId|playCount|
+-------+--------+---------+
|1000002|       1|       55|
|1000002| 1000006|       33|
|1000002| 1000007|        8|
|1000002| 1000009|      144|
|1000002| 1000010|      314|
|1000002| 1000013|        8|
|1000002| 1000014|       42|
|1000002| 1000017|       69|
|1000002| 1000024|      329|
|1000002| 1000025|        1|
|1000002| 1000028|       17|
|1000002| 1000031|       47|
|1000002| 1000033|       15|
|1000002| 1000042|        1|
|1000002| 1000045|        1|
|1000002| 1000054|        2|
|1000002| 1000055|       25|
|1000002| 1000056|        4|
|1000002| 1000059|        2|
|1000002| 1000062|       71|
+-------+--------+---------+
only showing top 20 rows



In [6]:
# Number of unique users
uniqueUsers = userArtistDf.select("userId").distinct().count()
print("Total number of users: ", uniqueUsers)

Total number of users:  2553


In [7]:
# Number of unique artists
uniqueArtists = userArtistDf.select("artistId").distinct().count()
print("Total number of artists: ", uniqueArtists)

Total number of artists:  131887


In [8]:
# Compute users' activity
userActivity = userArtistDf.groupBy("userId").sum("playCount").collect()
print('\n', '5 samples of the result: ','\n',userActivity[0:5])


 5 samples of the result:  
 [Row(userId=1000061, sum(playCount)=244), Row(userId=1000070, sum(playCount)=20200), Row(userId=1000313, sum(playCount)=201), Row(userId=1000832, sum(playCount)=1064), Row(userId=1000905, sum(playCount)=214)]


In [9]:
# Load artist_data
customSchemaArtist = StructType([ \
    StructField("artistID", LongType(), True), \
    StructField("name", StringType(), True)])

artistDf = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false', delimiter='\t', mode='DROPMALFORMED')\
    .load("artist_data.txt", schema = customSchemaArtist ) \
    .cache()     

print('10 samples of artists')   
artistDf.show(10)

10 samples of artists
+--------+--------------------+
|artistID|                name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
|10186265|Jota Quest e Ivet...|
| 6828986|       Toto_XX (1977|
|10236364|         U.S Bombs -|
| 1135000|artist formaly kn...|
|10299728|Kassierer - Musik...|
+--------+--------------------+
only showing top 10 rows



In [10]:
#Loading artist_alias_data
customSchemaArtistAlias = StructType([ \
    StructField("MisspelledArtistID", LongType(), True), \
    StructField("StandardArtistID", LongType(), True)])

artistAliasDf = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false', delimiter='\t', mode='DROPMALFORMED')\
    .load("artist_alias.txt", schema = customSchemaArtistAlias ) \
    .cache()

print('10 samples of mispelled artists')
artistAliasDf.show(10)

10 samples of mispelled artists
+------------------+----------------+
|MisspelledArtistID|StandardArtistID|
+------------------+----------------+
|           1092764|         1000311|
|           1095122|         1000557|
|           6708070|         1007267|
|          10088054|         1042317|
|           1195917|         1042317|
|           1112006|         1000557|
|           1187350|         1294511|
|           1116694|         1327092|
|           6793225|         1042317|
|           1079959|         1000557|
+------------------+----------------+
only showing top 10 rows



In [11]:
# Create alias dictionary
artistAlias = artistAliasDf.rdd.map(lambda row: ( row[0] , row[1])).collectAsMap()

In [12]:
# Improve of the dictionary
for k in artistAlias.keys():
    v = artistAlias[k]
    old_v = v

    while v in artistAlias.keys():
        # the standard ID is not an entry in the dictionary anymore
        v = artistAlias[v]
        if(v == old_v):
            # avoid the infinite loop
            break
        old_v = v
    # remap the alias value
    artistAlias[k] = v

In [13]:
bArtistAlias = sc.broadcast(artistAlias)

def replaceMispelledIds(fields):
    finalId = bArtistAlias.value.get(fields[1], fields[1]) 
    return (fields[0], finalId, fields[2])


newUserArtistDf = sqlContext.createDataFrame(
    userArtistDf.rdd.map(replaceMispelledIds), 
    userArtistDataSchema)

newUserArtistDf.show(10)

+-------+--------+---------+
| userId|artistId|playCount|
+-------+--------+---------+
|1000002|       1|       55|
|1000002| 1000006|       33|
|1000002| 1000007|        8|
|1000002| 1000009|      144|
|1000002| 1000010|      314|
|1000002| 1000013|        8|
|1000002| 1000014|       42|
|1000002| 1000017|       69|
|1000002| 1000024|      329|
|1000002| 1000025|        1|
+-------+--------+---------+
only showing top 10 rows



In [14]:
uniqueArtists = newUserArtistDf.select("artistId").distinct().count()
print("Total number of artists: ", uniqueArtists)

Total number of artists:  127090


# **Collaborative Recommendation**

In [15]:
rawArtistData = sc.textFile("artist_data.txt")

def xtractFields(s):
    line = re.split("\s|\t",s,1)
    if (len(line) > 1):
        try:
            return (int(line[0]), str(line[1].strip()))
        except ValueError:
            return (-1,"")
    else: 
        return (-1,"")

artistByID = rawArtistData.map(xtractFields).filter(lambda x: x[0] > 0)

In [16]:
rawUserArtistData = sc.textFile("user_artist_data.txt")

def disambiguate(line):
    [userId, artistId, count] = line.split(' ')
    finalArtistId = bArtistAlias.value.get(artistId, artistId)
    # we recontruct the triple
    return (userId, finalArtistId, count)


userArtistDataRDD = rawUserArtistData.map(disambiguate)
userArtistDataRDD.take(10)

[('1000002', '1', '55'),
 ('1000002', '1000006', '33'),
 ('1000002', '1000007', '8'),
 ('1000002', '1000009', '144'),
 ('1000002', '1000010', '314'),
 ('1000002', '1000013', '8'),
 ('1000002', '1000014', '42'),
 ('1000002', '1000017', '69'),
 ('1000002', '1000024', '329'),
 ('1000002', '1000025', '1')]

In [17]:
allData = userArtistDataRDD.map(lambda x : ((x[0],x[1]),x[2]) ) \
            .reduceByKey(lambda x,y : x+y)\
            .map(lambda r: Rating(r[0][0],r[0][1],r[1])) \
            .repartition(4).cache()

In [22]:
allData.toDF().columns

['user', 'product', 'rating']

In [23]:
train, test = allData.randomSplit([0.8,0.2])
train.cache()
test.cache()

PythonRDD[110] at RDD at PythonRDD.scala:53

In [24]:
# Get all unique artistId, and broadcast them
allItemIDs = np.array(allData.map(lambda x: x[1]).distinct().collect())
bAllItemIDs = sc.broadcast(allItemIDs)

In [25]:
from random import randint

# Depend on the number of item in userIDAndPosItemIDs,
# create a set of "negative" products for each user. These are randomly chosen
# from among all of the other items, excluding those that are "positive" for the user.
# NOTE 1: mapPartitions operates on many (user,positive-items) pairs at once
# NOTE 2: flatMap breaks the collections above down into one big set of tuples
def extractNegative(userIDAndPosItemIDs):
    def pickEnoughNegatives(line):
        userID = line[0]
        posItemIDSet = set(line[1])
        #posItemIDSet = line[1]
        negative = []
        allItemIDs = bAllItemIDs.value
        # Keep about as many negative examples per user as positive. Duplicates are OK.
        i = 0
        while (i < len(allItemIDs) and len(negative) < len(posItemIDSet)):
            itemID = allItemIDs[randint(0,len(allItemIDs)-1)]
            if itemID not in posItemIDSet:
                negative.append(itemID)
            i += 1
        
        # Result is a collection of (user,negative-item) tuples
        return map(lambda itemID: (userID, itemID), negative)

    # Init an RNG and the item IDs set once for partition
    # allItemIDs = bAllItemIDs.value
    return map(pickEnoughNegatives, userIDAndPosItemIDs)

def ratioOfCorrectRanks(positiveRatings, negativeRatings):
    
    # find number elements in arr that has index >= start and has value smaller than x
    # arr is a sorted array
    def findNumElementsSmallerThan(arr, x, start=0):
        left = start
        right = len(arr) -1
        # if x is bigger than the biggest element in arr
        if start > right or x > arr[right]:
            return right + 1
        mid = -1
        while left <= right:
            mid = (left + right) // 2
            if arr[mid] < x:
                left = mid + 1
            elif arr[mid] > x:
                right = mid - 1
            else:
                while mid-1 >= start and arr[mid-1] == x:
                    mid -= 1
                return mid
        return mid if arr[mid] > x else mid + 1
    
    ## AUC may be viewed as the probability that a random positive item scores
    ## higher than a random negative one. Here the proportion of all positive-negative
    ## pairs that are correctly ranked is computed. The result is equal to the AUC metric.
    correct = 0 ## L
    total = 0 ## L
    
    # sorting positiveRatings array needs more cost
    #positiveRatings = np.array(map(lambda x: x.rating, positiveRatings))

    negativeRatings = list(map(lambda x:x.rating, negativeRatings))
    
    #np.sort(positiveRatings)
    negativeRatings.sort()# = np.sort(negativeRatings)
    total = len(positiveRatings)*len(negativeRatings)
    
    for positive in positiveRatings:
        # Count the correctly-ranked pairs
        correct += findNumElementsSmallerThan(negativeRatings, positive.rating)
        
    ## Return AUC: fraction of pairs ranked correctly
    return float(correct) / total

def calculateAUC(positiveData, bAllItemIDs, predictFunction):
    # Take held-out data as the "positive", and map to tuples
    positiveUserProducts = positiveData.map(lambda r: (r[0], r[1]))
    # Make predictions for each of them, including a numeric score, and gather by user
    positivePredictions = predictFunction(positiveUserProducts).groupBy(lambda r: r.user)
    
    # Create a set of "negative" products for each user. These are randomly chosen 
    # from among all of the other items, excluding those that are "positive" for the user. 
    negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions(extractNegative).flatMap(lambda x: x)
    # Make predictions on the rest
    negativePredictions = predictFunction(negativeUserProducts).groupBy(lambda r: r.user)
    
    return (
            positivePredictions.join(negativePredictions)
                .values()
                .map(
                    lambda positive_negativeRatings: ratioOfCorrectRanks(positive_negativeRatings[0], positive_negativeRatings[1])
                )
                .mean()
            )

In [36]:
evaluations = []

for rank in [10, 50]:
    for lambda_ in [1.0, 0.0001]:
        for alpha in [1.0, 40.0]:
            print("Train model with rank=%d lambda_=%f alpha=%f" % (rank, lambda_, alpha))
            # with each combination of params, we should run multiple times and get avg
            # for simple, we only run one time.
            model = ALS.trainImplicit(train, rank=rank, iterations =5 , lambda_=lambda_, alpha=alpha )

            auc = calculateAUC( test , bAllItemIDs, model.predictAll)

            evaluations.append(((rank, lambda_, alpha), auc))

            model.userFeatures().unpersist()
            
            model.productFeatures().unpersist()

Train model with rank=10 lambda_=1.000000 alpha=1.000000
Train model with rank=10 lambda_=1.000000 alpha=40.000000
Train model with rank=10 lambda_=0.000100 alpha=1.000000
Train model with rank=10 lambda_=0.000100 alpha=40.000000
Train model with rank=50 lambda_=1.000000 alpha=1.000000
Train model with rank=50 lambda_=1.000000 alpha=40.000000
Train model with rank=50 lambda_=0.000100 alpha=1.000000
Train model with rank=50 lambda_=0.000100 alpha=40.000000


In [37]:
evaluations.sort(key = lambda x:x[1] , reverse = True)

evalDataFrame = pd.DataFrame(data=evaluations)
evalDataFrame.columns = ['Parameters', 'AUC']
print(evalDataFrame)

train.unpersist()
test.unpersist()

           Parameters       AUC
0     (10, 1.0, 40.0)  0.938372
1      (10, 1.0, 1.0)  0.934456
2  (10, 0.0001, 40.0)  0.933470
3      (50, 1.0, 1.0)  0.932215
4   (10, 0.0001, 1.0)  0.930334
5     (50, 1.0, 40.0)  0.928285
6  (50, 0.0001, 40.0)  0.912797
7   (50, 0.0001, 1.0)  0.904820


PythonRDD[110] at RDD at PythonRDD.scala:53

In [39]:
def artistNames(line):
#     [artistID, name]
    if (line[0] in recArtist ): #look for the recomended artist IDs in artistByID
    
        return True
    else:
        return False

In [40]:
model = ALS.trainImplicit(train, rank= evaluations[0][0][0], iterations =5 , lambda_= evaluations[0][0][1], alpha= evaluations[0][0][2] )
allData.unpersist()

userID = 1000002
recommendations = model.recommendProducts(userID, 5)

recArtist = set(map(lambda x: x.product, recommendations) )

recList = artistByID.filter(artistNames).values().collect()
print(recList)

unpersist(model)

['Every Little Thing', '松浦亜弥', 'Aus-Rotten', 'sonar', 'Aikawa Nanase']
