In [0]:
%spark.pyspark
from pyspark.sql import SQLContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")


In [1]:
%spark.pyspark
# Read Triplets
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
rddTriplets = sc.textFile("file:///home/hadoop/train_triplets.txt").map(lambda x: x.split("\t")).map(lambda p: (p[0], p[1], int(p[2])))
schemaT = StructType([StructField("user_id", StringType(), True), StructField("song_id", StringType(), True), StructField("playcount", IntegerType(), True)])
sqlContext = SQLContext(sc)
triplets = sqlContext.createDataFrame(rddTriplets, schema=schemaT)

In [2]:
%spark.pyspark
from pyspark.sql import functions as F
# read mismatches
mismatches = (
    spark.read.text('file:///home/hadoop/sid_mismatches.txt')
    .select(
        F.trim(F.col('value').substr(9, 18)).alias('song_id').cast(StringType()),
        F.trim(F.col('value').substr(28, 18)).alias('track_id').cast(StringType())
    )
)

In [3]:
%spark.pyspark
# remove mismatches song from triplets
triplets = (
    triplets
    .join(
        mismatches
        .select('song_id'),
        on='song_id',
        how='left_anti'
    )
)

In [4]:
%spark.pyspark
# count unique user_id
triplets.select('user_id').dropDuplicates().count()

In [5]:
%spark.pyspark
# count unique song_id
triplets.select('song_id').dropDuplicates().count()

In [6]:
%spark.pyspark
# most active users
active = (
    triplets
    .groupBy('user_id')
    .agg(
        F.sum('playcount').alias('playcount'),
        F.collect_list('song_id').alias('songs')
    )
    .orderBy('playcount', ascending=False)
    .limit(1)
    .rdd.take(1)[0]
)
active.__getattr__('user_id'), len(active.__getattr__('songs')) 

In [7]:
%spark.pyspark
# song popularity
song_popularity = (
    triplets
    .groupBy('song_id')
    .agg(
        F.sum('playcount').alias('playcount')
    )
)

n = song_popularity.approxQuantile('playcount', [0.0, 0.25, 0.5, 0.75, 1.0], 0.0)
  # [1.0, 8.0, 31.0, 130.0, 726885.0]


In [8]:
%spark.pyspark
# user popularity
user_activity = (
    triplets
    .groupBy('user_id')
    .agg(
        F.count('song_id').alias('songcount')
    )
)

m = user_activity.approxQuantile('songcount', [0.0, 0.25, 0.5, 0.75, 1.0], 0.0)
  # [3.0, 15.0, 26.0, 53.0, 4316.0]

In [9]:
%spark.pyspark
# inactive songs
inactive_song = (
    song_popularity
    .filter(song_popularity.playcount < n[1])
    .select('song_id')
)

# inactive users
inactive_user = (
    user_activity
    .filter(user_activity.songcount < m[1])
    .select('user_id')
)

# remove inactives from triplets
triplets = (
    triplets
    .join(inactive_song, on='song_id', how='left_anti')
    .join(inactive_user, on='user_id', how='left_anti')
)

In [10]:
%spark.pyspark
# count
triplets.count()


In [11]:
%spark.pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, QuantileDiscretizer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics

# ALS Recomm
# convert user and song into an integer index
indexer_user = StringIndexer(inputCol='user_id', outputCol='user')
indexer_song = StringIndexer(inputCol='song_id', outputCol='item')

In [12]:
%spark.pyspark
# pipeline
pipeline = Pipeline(stages=[indexer_user, indexer_song]) 
pipelineModel = pipeline.fit(triplets)
dataset = pipelineModel.transform(triplets)

In [13]:
%spark.pyspark
# convert playcount into double (as rating)
dataset = (
    dataset
    .withColumn('user', F.col('user').cast(IntegerType()))
    .withColumn('item', F.col('item').cast(IntegerType()))
    .withColumn('rating', F.col('playcount').cast(IntegerType()))
    .select(['user', 'item', 'rating'])
)

In [14]:
%spark.pyspark
# create dictionary of fraction 30% for each user
f = (
    dataset
    .select('user')
    .dropDuplicates()
    .withColumn('temp', F.lit(0))
    .groupBy('temp')
    .agg(F.collect_list('user').alias('list'))
    .select('list')
    .rdd.take(1)[0].__getattr__('list')
)

fractions = dict(
    (user, 0.3) for user in f
)

# sample test set using fractions
test = dataset.sampleBy('user', fractions, seed=1)

# get training set by remove test set from full dataset
training = (
    dataset
    .join(
        test
        .select(['user', 'item']),
        on=['user', 'item'],
        how='left_anti'
    )
)

In [15]:
%spark.pyspark
# select three users
test.orderBy('rating', ascending=False).show(10, False)

In [16]:
%spark.pyspark
# create als model / Set the seed. 
als = ALS(seed=1)

# fit training set and tranform test set
model = als.fit(training)

In [17]:
%spark.pyspark
from pyspark.sql.types import ArrayType
u = [525941, 519321, 208949]
users = sqlContext.createDataFrame(
    [(525941,), (519321,), (208949,)],
    StructType([StructField('user', IntegerType())])
)

# recommend for users
recommends = model.recommendForUserSubset(users, 10)

# get prediction and labels
def recommend(recommendations):
    items = []
    for item, rating in recommendations:
        items.append(item)
    return items

udf_recommend = F.udf(lambda recommendations: recommend(recommendations), ArrayType(IntegerType()))

recommends = (
    recommends
    .withColumn('recommends', udf_recommend(recommends.recommendations))
    .select(
        F.col('user').cast(IntegerType()),
        F.col('recommends')
    )
)

recommends.show(3, False)

In [18]:
%spark.pyspark
# get labels
ground_truths = (
    test
    .filter(F.col('user').isin(u))
    .orderBy('rating', ascending=False)
    .groupBy('user')
    .agg(F.collect_list('item').alias('ground_truths'))
)

ground_truths.show(3, False)

In [19]:
%spark.pyspark
compare = recommends.join(ground_truths, on='user', how='left')
compare = [(r.__getattr__('recommends'), r.__getattr__('ground_truths')) for r in compare.collect()]
compare = sc.parallelize(compare)

In [20]:
%spark.pyspark
# print metrics
metrics = RankingMetrics(compare)
print(metrics.precisionAt(10))

print(metrics.ndcgAt(10))

print(metrics.meanAveragePrecision)


# predict test and rmse
predict = model.transform(test)
predict = predict.filter(F.col('prediction') != float('nan'))
reg_eval = RegressionEvaluator(predictionCol='prediction', labelCol='rating', metricName='rmse')
reg_eval.evaluate(predict)
