In [None]:
spark.version

In [None]:
import pandas as pd
import numpy as np

In [None]:
from pyspark.sql.functions import col, desc, lit, udf, struct, collect_list, explode, size, concat_ws, split
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StringType, StructType, StructField
from pyspark.sql import Row

## Some Helpers

In [None]:
import time

class timeit():
    
    def __enter__(self):
        self.start = time.time()
        
    def __exit__(self, *args, **kwargs):
        print('{ runtime: %6.2f sec }' % (time.time() - self.start))


In [None]:
from nvd3_stat import Nvd3
nv = Nvd3()
nv.reloadNVD3()

## Data
### Artist Data

#### Skip the following steps if the data has already been saved ...

In [None]:
# def intOrNeg(value):
#     try:
#         i = int(value)
#     except:
#         i = -9999999
#     return i


In [None]:
# rawArtistData = sc.textFile("/data/lastfm/artist_data.txt")
# rawArtistData.count()

In [None]:
# rawArtistData.filter(lambda row: "\t" not in row).count() # to be ignored

In [None]:
# artistData = (rawArtistData.filter(lambda row: "\t" in row)
#                            .map(lambda row: row.split("\t"))
#                            .map(lambda row: Row(artistid=intOrNeg(row[0]), artistname=row[1]))
# ).toDF()

In [None]:
# artistData.where(artistData.artistid == -9999999).count()

In [None]:
# artistData = artistData.where(artistData.artistid != -9999999).cache()

In [None]:
# artistData.write.parquet("/data/lastfm/artist_data.parquet")

#### Start here if the data has already been saved and load it¶

In [None]:
artistData = spark.read.parquet("/data/lastfm/artist_data.parquet")

In [None]:
artistData.describe().show()

### Artist Alias

In [None]:
rawArtistAlias = sc.textFile("/data/lastfm/artist_alias.txt")
artistAlias = (rawArtistAlias.map(lambda row: row.split("\t"))
                             .filter(lambda row: row[0] != "")
                             .map(lambda row: (int(row[0]), int(row[1])))
)

In [None]:
idmap = dict(artistAlias.collect())

In [None]:
artistData.where(artistData.artistid.isin([2097164, 1001134])).show()

In [None]:
bcidmap = sc.broadcast(idmap)

### User Artist Data

#### Skip the following steps if the data has already been saved ...

In [None]:
# rawUserArtistData = sc.textFile("/data/lastfm/user_artist_data.txt", 6)

# userArtistData1 = (rawUserArtistData
#                    .map(lambda row: row.split(" "))
#                    .map(lambda row: Row(userid      = int(row[0]),
#                                         rawartistid = int(row[1]), 
#                                         artistid    = bcidmap.value.get(int(row[1]), int(row[1])),
#                                         playcount   = int(row[2])))
# ).toDF()

In [None]:
# userArtistData2 = (userArtistData1.select(["userid","artistid","playcount"])
#                                   .groupBy(["userid","artistid"])
#                                   .sum("playcount")
#                                   .withColumnRenamed("sum(playcount)", "playcount")
# )

In [None]:
# How many artists do users listen to

# userGroups = (userArtistData2.select(["artistid", "userId"])
#                              .groupBy(col("userid"))
#                              .count()
#              ).cache()

In [None]:
# hist = userGroups.rdd.values().histogram([0, 5, 10, 20,40,80,160,320,640,1280,2560,5120,10240,20480,450000])
# hist

In [None]:
# db = nv.discreteBarChart()

# db.plot({"Bucket":hist[0][1:], "Count":hist[1]}, "Bucket", "Count")

Remove upper "outliers"

In [None]:
# userArtistData = (userGroups.where(col("count") >= 10).where(col("count") <= 1500)
#                             .select(["userid"])
#                             .join(userArtistData2, on="userid"))

In [None]:
# (userArtistData.groupby("artistid")
#                .count()
#                .join(artistData, on="artistid")
#                .sort(desc("count"))
# ).show(10)

Remove artist 1034635 ( [unknown] )

In [None]:
# userArtistData = userArtistData.where(userArtistData.artistid != 1034635)

In [None]:
# !hdfs dfs -rm -r /data/lastfm/user_artist_data.parquet

In [None]:
# userArtistData.write.parquet("/data/lastfm/user_artist_data.parquet")

#### Start here if the data has already been saved and load it

In [None]:
userArtistData = spark.read.parquet("/data/lastfm/user_artist_data.parquet")

In [None]:
userArtistData.show(5)

In [None]:
userGroups = (userArtistData.select(["artistid", "userId"])
                            .groupBy(col("userid"))
                            .count())

hist = userGroups.rdd.values().histogram([20,40,80,160,240,320,480,560,640,800, 960,1280, 1500])
db = nv.discreteBarChart()

db.plot({"Bucket":hist[0][1:], "Count":hist[1]}, "Bucket", "Count")

## The model

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

def alsModel(df, rank, maxIter, regParam, alpha):
    als = ALS(rank=rank, maxIter=maxIter, regParam=regParam, alpha=alpha, 
              implicitPrefs=True,
              userCol="userid", itemCol="artistid", ratingCol="playcount")
    return als.fit(df)

## Model Evaluation

** Note: Fast but can lead to NAN in predictions when algorithm runs out of training or test data**

In [None]:
training, test = userArtistData.randomSplit([0.9, 0.1])
training.cache()
test.cache()

In [None]:
# model = alsModel(training, rank=10, maxIter=5, regParam=0.01, alpha=1.0)
model = alsModel(training, rank=10, maxIter=5, regParam=1.0, alpha=40.0)

### Manually examinig some example users

In [None]:
def compareFavs(user, count=20):
    test_user = test.where(col("userid") == user).cache()
    training_user = training.where(col("userid") == user)

    # 1) Extract Favourites from training set
    training_favs = (training_user.join(artistData, on="artistid")
                                  .sort(desc("playcount"))
                                  .select(col("playcount").alias("training_playcount"), 
                                          col("artistname").alias("training_artist"))
                    ).limit(count).toPandas()

    # 2) Extract Favourites from test set (cond = True)
    test_favs = (test_user.join(artistData, on="artistid")
                          .sort(desc("playcount"))
                          .select(col("playcount").alias("test_playcount"), 
                                  col("artistname").alias("test_artist"))
                 
                ).limit(count).toPandas()

    # 3) Extract Favourites from test set (cond = True)
    predictions = (model.transform(test_user.select(["userid", "artistid"]))
                        .dropna()).limit(count)

    recommendations = (predictions.join(artistData, on="artistid")
                                  .sort(desc("prediction"))
                                  .select(col("prediction"), 
                                          col("artistname").alias("recommended_artist")) 
                      ).toPandas()

    test_user.unpersist()
    return training_favs.join(recommendations.join(test_favs))

#### User 1

In [None]:
user = 2268277
compareFavs(user).head(15)

#### User 2

In [None]:
user = 2294531
compareFavs(user).head(15)

### By calulating the AUC for one example user

In [None]:
import numpy as np
import pandas as pd

allArtists = np.array(userArtistData.select("artistid")
                                    .distinct()
                                    .rdd
                                    .flatMap(lambda x: x)
                                    .collect())

#### Sample a random set from all unique artists of same length as input, however without elements of input

In [None]:
def sample(positive, all):
    negative = np.random.choice(all, size=2*positive.size)
    return np.setdiff1d(negative, positive)[:positive.size]

In [None]:
# user = 2294531
user = 2268277

#### Let's take the listened artist in the test set as "condition true" (what the user really listened) ...

In [None]:
testPositive = (test.where(col("userid") == user)
                    .select(["userid", "artistid"]))
                    
testPositive.show(3)

#### ... and randomly select a list of artists from all artist this user never listened to

In [None]:
testPositiveList = np.array(testPositive.rdd.flatMap(lambda x: x).collect())
testNegativeList = sample(testPositiveList, allArtists)

testNegative = (spark.createDataFrame(pd.DataFrame({"artistid":testNegativeList,
                                                    "userid":[user]*len(testNegativeList)})))

testNegative.show(3)

#### Create the predictions for the positive and negative test cases

In [None]:
positivePredictions = model.transform(testPositive).dropna().withColumn("cond", lit(1))

negativePredictions = model.transform(testNegative).dropna().withColumn("cond", lit(0))

predictions = positivePredictions.union(negativePredictions).select(["cond", "prediction"])

##### a) Spark BinaryClassificationEvaluator

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import BinaryClassificationEvaluator

toDouble = udf(lambda x: float(x), DoubleType())

def spark_metric(predictions, roc=True):
    metricName="areaUnderROC" if roc else "areaUnderPR"
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="cond", metricName=metricName)
    metric = evaluator.evaluate(predictions.select(["cond", toDouble("prediction").alias("prediction")]))
    return metric
    

In [None]:
with timeit():
    auc1 = spark_metric(predictions)
    aucpr1 = spark_metric(predictions, roc=False)

print(auc1, aucpr1)

##### b) Spark + Scikit Learn Metrics

In [None]:
import sklearn.metrics as skm

In [None]:
def skm_metrics(predictions, roc=True):
    preds = predictions.toPandas()
    if roc:
        auc = skm.roc_auc_score(preds["cond"], preds["prediction"])
    else:
        auc = skm.average_precision_score(preds["cond"], preds["prediction"])
    return auc

In [None]:
with timeit():
    auc2   = skm_metrics(predictions)
    aucpr2 = skm_metrics(predictions, False)

print(auc2, aucpr2)

In [None]:
def rocChart(positivePredictions, negativePredictions):
    preds = positivePredictions.select("prediction", "cond").union(
            negativePredictions.select("prediction", "cond")).toPandas()
    fpr, tpr, _ = skm.roc_curve(preds["cond"], preds["prediction"])

    roc = nv.lineChart()
    
    config = {"width":600, "height":500, "color":nv.c20(3,5,7,8,1),
              "xDomain":[0,1], "yDomain":[0,1.05],
              "xAxis":{"axisLabel":"False Positive Rate", "tickFormat":",.2f"},
              "yAxis":{"axisLabel":"True Positive Rate", "tickFormat":",.2f"}
             }
    
    roc.addLine({"FPR":fpr, "TPR":tpr}, "FPR", "TPR")
    roc.addLine({"X":[0,1], "Baseline":[0,1]}, "X", "Baseline", lineAttributes={"style":"dotted"})
    
    roc.plot(config=config)

rocChart(positivePredictions, negativePredictions)


### By a "mean AUC"

#### Make a broadcast variable out of allArtists

In [None]:
from pyspark.sql.types import ArrayType, IntegerType, DoubleType
from pyspark.sql.functions import collect_list, explode

In [None]:
bcAllArtists = sc.broadcast(allArtists)

def sample2(positive):
    all = bcAllArtists.value
    plen = len(positive)
    negative = np.random.choice(all, size=2*plen)
    return np.setdiff1d(negative, positive)[:plen].tolist()

sampleUdf = udf(sample2, ArrayType(IntegerType()))

In [None]:
positiveTest = test.select(["userid", "artistid"]).withColumn("cond", lit(1))
positiveTest.cache()

negativeTest = (test.groupBy("userid")
                    .agg(collect_list("artistid").alias("positiveAtists"))
                    .withColumn("negativeArtists", sampleUdf('positiveAtists'))
                    .select(["userid", explode("negativeArtists").alias("artistid")])
)
negativeTest.cache()

In [None]:
positivePredictions = (model.transform(positiveTest).dropna().withColumn("cond", lit(1))).cache()
negativePredictions = (model.transform(negativeTest).dropna().withColumn("cond", lit(0))).cache()

predictions = positivePredictions.select(["userid", "cond", "prediction"]).union(
                negativePredictions.select(["userid", "cond", "prediction"])
              )

In [None]:
def metrics(conds, predictions, roc=True):
    if len(conds) < 2 or len(predictions) < 2:
        return None
    if len(set(conds)) == 1:
        return None

    if roc:
        auc = skm.roc_auc_score(conds, predictions)
    else:
        auc = skm.average_precision_score(conds, predictions)

    return auc.item()       # convert numpy type to python type


def auc(conds, predictions):
    return metrics(conds, predictions, True)
    
def aucpr(conds, predictions):
    return metrics(conds, predictions, False)

   
aucUdf = udf(auc, DoubleType())
aucprUdf = udf(aucpr, DoubleType())


### Mean AUC

In [None]:
allAucs = (predictions.groupby("userid")
                      .agg(collect_list("cond").alias("conds"), 
                           collect_list("prediction").alias("predictions"))
                      .withColumn("auc", aucUdf("conds", "predictions"))
)

allAucs.describe("auc").show()


### Mean Average Precision

In [None]:
allAucprs = (predictions.groupby("userid")
                        .agg(collect_list("cond").alias("conds"), 
                             collect_list("prediction").alias("predictions"))
                        .withColumn("aucpr", aucprUdf("conds", "predictions"))
)

allAucprs.describe("aucpr").show()