In [1]:
%sql
select percentile(cnt,0.25) as cnt_25,
       percentile(cnt,0.5) as cnt_50,
       percentile(cnt,0.75) as cnt_75,
       percentile(cnt,0.9) as cnt_90,
       percentile(cnt,0.95) as cnt_95,
       percentile(cnt,0.99) as cnt_99,
       percentile(cnt,0.995) as cnt_995,
       percentile(cnt,0.999) as cnt_999
from (
  select uid, count(*) as cnt
  from play_sid_sl
  group by uid
  )


In [2]:
robots=sqlContext.sql("""
select uid
from play_sid_sl
group by uid
having count(*)>2489
""")
robots.write.saveAsTable("robots")

In [3]:
play_filtered=sqlContext.sql("""
select *
from play_sid_sl
where uid not in (
 select uid
 from robots
 )
""")
play_filtered.write.saveAsTable("play_filtered")
sqlContext.cacheTable("play_filtered")

In [4]:
down_filtered=sqlContext.sql("""
select *
from down_sid
where uid not in (
 select uid
 from robots
 )
""")
down_filtered.write.saveAsTable("down_filtered")
#sqlContext.cacheTable("down_filtered")

In [5]:
%sql
select count(*) as num_rows, count(distinct uid) as num_users, count(distinct sid) as num_songs
from play_filtered

In [6]:
%sql
select count(*) as num_rows, count(distinct uid) as num_users, count(distinct sid) as num_songs
from down_filtered

In [7]:
%sql
select weekofyear(date),min(date), max(date), count(*)
from play_filtered
group by weekofyear(date)
order by weekofyear(date)
--week 13-18 are whole weeks

In [8]:
%sql
select percentile(play_time/song_length*1.0,0.25) as times_25,
       percentile(play_time/song_length*1.0,0.5) as times_50,
       percentile(play_time/song_length*1.0,0.75) as times_75,
       percentile(play_time/song_length*1.0,0.9) as times_90,
       percentile(play_time/song_length*1.0,0.95) as times_95
from play_filtered
where date<"2017-05-08" and date>"2017-03-28"

In [9]:
%sql --cumulative play times
select percentile(cnt,0.25) as cnt_25, percentile(cnt,0.5) as cnt_50, percentile(cnt,0.75) as cnt_75, percentile(cnt,0.90) as cnt_90, percentile(cnt,0.925) as cnt_93, percentile(cnt,0.95) as cnt_95, percentile(cnt,0.975) as cnt_975, percentile(cnt,0.99) as cnt_99, percentile(cnt,0.999) as cnt_999
from (
  select uid, sid, sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end) as cnt
  from play_filtered
  group by uid, sid
  )

In [10]:
%sql
select score,count(*) 
from (
select uid, sid, 
       case when sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end)<2 then 0
            when sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end)<4 then 1
            when sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end)<7 then 2
            else 3 end as score
from play_filtered
group by uid, sid
  )
group by score
order by score

In [11]:
from pyspark.ml.evaluation import Evaluator

class eprEvaluator(Evaluator):    
  def _evaluate(self, predictions):
    predictions.createOrReplaceTempView("predictions")
    result = spark.sql("""
      select sum(prank*rating)/sum(rating) as expected_prank
      from (
         select uid, sid, rating, 
               (rank() over (partition by uid order by prediction desc)-1)*1.0/(count(sid) over (partition by uid)-1) as prank
         from predictions
          )
    """).collect()[0][0]    
    return float(result)
  

In [12]:
def pop_fit(ratings_mat):
    ratings_mat.createOrReplaceTempView("ratings_mat")
    model = spark.sql("""
      select sid, count(*) as prediction
      from ratings_mat
      group by sid
    """)   
    return model
      
def pop_transform(model, test):
    model.createOrReplaceTempView("model")
    test.createOrReplaceTempView("test")
    predictions = spark.sql("""
      select t.*, m.prediction
      from test t left join
           model m
           on t.sid=m.sid
    """)   
    return predictions

In [13]:
def bestepr(test):
    test.createOrReplaceTempView("test")
    result = spark.sql("""
      select sum(prank*rating)/sum(rating) as expected_prank
      from (
         select uid, sid, rating, 
               (rank() over (partition by uid order by rating desc)-1)*1.0/(count(sid) over (partition by uid)-1) as prank
         from test
          )
    """).collect()[0][0]    
    return float(result)

In [14]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [15]:
# Build the recommendation model on week13-17 data
rating_im_train=spark.sql("""
select p.uid, p.sid, (case when (d.uid is not null and d.sid is not null) then cnt+1 else cnt end) as rating
from (
  select uid, sid, sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end) as cnt
  from play_filtered
  where weekofyear(date)<18 and weekofyear(date)>12
  group by uid, sid
      )p left join
      (
      select *
      from down_filtered
      where weekofyear(date)<18 and weekofyear(date)>12
      )d
      on p.uid=d.uid and p.sid=d.sid
""")

In [16]:
# Evaluate the model by computing the expected percentile ranking on week18 data
rating_im_test=spark.sql("""
select p.uid, p.sid, (case when (d.uid is not null and d.sid is not null) then cnt+1 else cnt end) as rating
from (
  select uid, sid, sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end) as cnt
  from play_filtered
  where weekofyear(date)=18
  group by uid, sid
      )p left join
      (
      select *
      from down_filtered
      where weekofyear(date)=18
      )d
      on p.uid=d.uid and p.sid=d.sid
""")

In [17]:
model_pop=pop_fit(rating_im_train)
predictions_pop=pop_transform(model_pop, rating_im_test)
epr_evaluator = eprEvaluator()
epr_pop = epr_evaluator.evaluate(predictions_pop)
print("Expected percentile ranking for popularity recommendation = " + str(epr_pop))

In [18]:
best_epr = bestepr(rating_im_test)
print("The best percentile ranking possible =" + str(best_epr))

In [19]:
als_im= ALS(alpha=30, maxIter=5, rank=50, regParam=0.1, userCol="uid", itemCol="sid", ratingCol="rating", coldStartStrategy="drop",                         implicitPrefs=True, nonnegative=False)

model_im = als_im.fit(rating_im_train)

predictions_im = model_im.transform(rating_im_test)

epr_evaluator = eprEvaluator()
epr_im = epr_evaluator.evaluate(predictions_im)
print("Expected percentile ranking for implicit rating = " + str(epr_im))

In [20]:
als_im= ALS(alpha=200, maxIter=7, rank=50, regParam=0.08, userCol="uid", itemCol="sid", ratingCol="rating", coldStartStrategy="drop",                         implicitPrefs=True, nonnegative=False)

model_im = als_im.fit(rating_im_train)

predictions_im = model_im.transform(rating_im_test)

epr_evaluator = eprEvaluator()
epr_im = epr_evaluator.evaluate(predictions_im)
print("Expected percentile ranking for implicit rating = " + str(epr_im))

In [21]:
# Generate top 10 movie recommendations for each user
userRecs_im = model_im.recommendForAllUsers(10)
# Generate top 10 user recommendations for each song
songRecs_im = model_im.recommendForAllItems(10)

In [22]:
rating_im=spark.sql("""
   select p.uid, p.sid, (case when (d.uid is not null and d.sid is not null) then cnt+1 else cnt end) as rating
   from (
         select uid, sid, sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end) as cnt
         from play_filtered
         group by uid, sid
         )p left join 
         down_filtered d
         on p.uid=d.uid and p.sid=d.sid
      """)
(rating_im_tr, rating_im_te) = rating_im.randomSplit([0.8, 0.2],seed=50)

In [23]:
model_pop=pop_fit(rating_im_tr)
predictions_pop=pop_transform(model_pop, rating_im_te)
epr_evaluator = eprEvaluator()
epr_pop = epr_evaluator.evaluate(predictions_pop)
print("Expected percentile ranking for popularity recommendation = " + str(epr_pop))

best_epr = bestepr(rating_im_te)
print("The best percentile ranking possible =" + str(best_epr))

In [24]:
als_im= ALS(alpha=30, maxIter=5, rank=50, regParam=0.1, userCol="uid", itemCol="sid", ratingCol="rating", coldStartStrategy="drop",                         implicitPrefs=True, nonnegative=False)

model_im_r = als_im.fit(rating_im_tr)

predictions_im_r = model_im.transform(rating_im_te)
epr_evaluator = eprEvaluator()
epr_im_r = epr_evaluator.evaluate(predictions_im_r)
print("Expected percentile ranking for implicit rating - random split = " + str(epr_im_r))

In [25]:
als_im= ALS(alpha=100, maxIter=7, rank=50, regParam=0.1, userCol="uid", itemCol="sid", ratingCol="rating", coldStartStrategy="drop",                         implicitPrefs=True, nonnegative=False)

model_im_r = als_im.fit(rating_im_tr)

predictions_im_r = model_im.transform(rating_im_te)
epr_evaluator = eprEvaluator()
epr_im_r = epr_evaluator.evaluate(predictions_im_r)
print("Expected percentile ranking for implicit rating - random split = " + str(epr_im_r))

In [26]:
rating_ex=spark.sql("""
  select p.uid, p.sid, least(case when (d.uid is not null and d.sid is not null) then score+1 else score end, 3) as rating
  from (
    select uid, sid, 
           case when sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end)<2 then 0
                when sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end)<4 then 1
                when sum(case when play_time/song_length*1.0>=0.8 then 1 else 0 end)<7 then 2
                else 3 end as score
    from play_filtered
    group by uid, sid
        )p left join 
        down_filtered d
        on p.uid=d.uid and p.sid=d.sid 
        """)

# Randomly split the dataset to train:test as 0.8:0.2; random seed=20
(rating_ex_train, rating_ex_test) = rating_ex.randomSplit([0.8, 0.2],seed=20)

In [27]:
model_pop=pop_fit(rating_ex_train)
predictions_pop=pop_transform(model_pop, rating_ex_test)
epr_evaluator = eprEvaluator()
epr_pop = epr_evaluator.evaluate(predictions_pop)
print("Expected percentile ranking for popularity recommendation = " + str(epr_pop))

best_epr = bestepr(rating_ex_test)
print("The best percentile ranking possible =" + str(best_epr))

In [28]:
als_ex = ALS(rank=50, maxIter=7, regParam=0.06, userCol="uid", itemCol="sid", ratingCol="rating", coldStartStrategy="drop",                                    implicitPrefs=False, nonnegative=False)
model_ex = als_ex.fit(rating_ex_train)

predictions_ex = model_ex.transform(rating_ex_test)

rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_ex = rmse_evaluator.evaluate(predictions_ex)
epr_evaluator = eprEvaluator()
epr_ex = epr_evaluator.evaluate(predictions_ex)

print("RMSE for explicit rating = " + str(rmse_ex))
print ("Expected percentile ranking for explicit rating = " + str(epr_ex))

In [29]:
als_ex = ALS(rank=50, maxIter=7, regParam=0.1, userCol="uid", itemCol="sid", ratingCol="rating", coldStartStrategy="drop",                                    implicitPrefs=False, nonnegative=False)
model = als_ex.fit(rating_ex_train)

predictions = model.transform(rating_ex_test)

rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = rmse_evaluator.evaluate(predictions)
epr_evaluator = eprEvaluator()
epr = epr_evaluator.evaluate(predictions)

print("RMSE for explicit rating = " + str(rmse))
print ("Expected percentile ranking = " + str(epr))

In [30]:
#Grid search with TestValidationSplit
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

als_ex= ALS(userCol="uid", itemCol="sid", ratingCol="rating", implicitPrefs=False, coldStartStrategy="drop", maxIter=1, rank=5)
grid=ParamGridBuilder().addGrid(als_ex.regParam, [0.03,0.06]).build()
#.addGrid(als_ex.regParam, [0.03,0.06,0.09])
rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

tvs = TrainValidationSplit(estimator=als_ex, estimatorParamMaps=grid, evaluator=rmse_evaluator,trainRatio=0.8,seed=100)
tvsModel = tvs.fit(rating_ex_train)
predictions = tvsModel.transform(rating_ex_test)

best_model= tvsModel.bestModel
print ("The rank for best model is " + str(best_model.rank))
print ("The Max number of iteration for best model is " + str(best_model._java_obj.parent().getMaxIter()))

In [31]:
# Generate top 10 movie recommendations for each user
userRecs_ex = model_ex.recommendForAllUsers(10)
# Generate top 10 user recommendations for each song
songRecs_ex = model_ex.recommendForAllItems(10)