In [1]:
dbutils.fs.rm("/FileStore/tables/play")

In [2]:
%fs 
ls /FileStore/tables/play

In [3]:
from pyspark.sql.types import *
schema = StructType([
  StructField("uid", StringType(), True),
  StructField("song_id", StringType(), True),
  StructField("song_name", StringType(), True),
  StructField("singer", StringType(), True),
  StructField("play_time", StringType(), True),
  StructField("song_length", StringType(), True),
  StructField("date", StringType(), True)
])

In [4]:
play = spark.createDataFrame([],schema=schema)
for path, name, _ in dbutils.fs.ls("/FileStore/tables/play"):
  df = spark.read.csv(path=path, schema=schema, header=True)
  play = play.unionAll(df)
play.createOrReplaceTempView("play")

In [5]:
%sql
select *
from play
limit 10

In [6]:
%sql
select 
  cast(trim(uid) as bigint) as uid, 
  cast(trim(song_id) as bigint) as song_id, 
  trim(song_name) as song_name,
  trim(singer) as singer,
  cast(trim(play_time) as bigint) as play_time, 
  cast(trim(song_length) as bigint) as song_length, 
  TO_DATE(cast(UNIX_TIMESTAMP(date, 'yyyy-MM-dd') as TIMESTAMP)) as date 
from play
where 
  uid is not null and
  song_id is not null and
  play_time is not null and
  song_length is not null and
  date is not null
limit 10;

In [7]:
play_filtered = sqlContext.sql("""
  select 
    cast(trim(uid) as bigint) as uid, 
    cast(trim(song_id) as bigint) as song_id, 
    trim(song_name) as song_name,
    trim(singer) as singer,
    cast(trim(play_time) as bigint) as play_time, 
    cast(trim(song_length) as bigint) as song_length, 
    TO_DATE(cast(UNIX_TIMESTAMP(date, 'yyyy-MM-dd') as TIMESTAMP)) as date 
  from play
  where 
    uid is not null and
    song_id is not null and
    play_time is not null and
    song_length is not null and
    date is not null
""")
# play_filtered.write.saveAsTable("play_filtered")
play_filtered.createOrReplaceTempView("play_filtered")

In [8]:
%sql
select
  count(*)
from play_filtered

In [9]:
%sql
-- find out how many play records has song_length = 0
select
  count(*)
from play_filtered
where 
  song_length = 0;

In [10]:
%sql
-- find out how many songs there are
select
  count(distinct(song_id))
from play_filtered

In [11]:
%sql
-- find out how may data a affected by inconsistent song_id
with bad_song_id as (
  select
    song_id,
    count(distinct(song_name)) as num_song_name
  from play_filtered
  group by song_id
  having num_song_name > 1
)

select 
  count(*)
from play_filtered p
join bad_song_id b on p.song_id = b.song_id

In [12]:
%sql
-- find out how many users there are
select
  count(distinct(uid))
from play_filtered

In [13]:
%sql
-- perentile of song length
select
  percentile(song_length, 0.999) as song_length_p999,
  percentile(song_length, 0.99) as song_length_p99,
  percentile(song_length, 0.95) as song_length_p95,
  percentile(song_length, 0.90) as song_length_p90,
  percentile(song_length, 0.50) as song_length_p50
from play_filtered

In [14]:
%sql
-- any one listen to songs for more than 24 hours in a day?
select count(distinct(uid)) as invalid_uid
from (
  select
    uid,
    date,
    sum(play_time) as total_play_time
  from play_filtered
  group by uid, date
  having sum(play_time) > 24 * 60 * 60
) p

In [15]:
%sql
-- # how many records does per uid have
select 
  percentile(cnt, 0.999) as cnt_p999,
  percentile(cnt, 0.99) as cnt_p99,
  percentile(cnt, 0.95) as cnt_p95,
  percentile(cnt, 0.90) as cnt_p90,
  percentile(cnt, 0.50) as cnt_p50
from (
  select
    uid,
    date,
    count(*) as cnt
  from play_filtered
  group by uid, date
) p

In [16]:
%sql
-- for each uid in each date, how many times that a song id will appear
select 
  percentile(cnt, 0.999) as cnt_p999,
  percentile(cnt, 0.99) as cnt_p99,
  percentile(cnt, 0.95) as cnt_p95,
  percentile(cnt, 0.90) as cnt_p90,
  percentile(cnt, 0.50) as cnt_p50
from (
  select
    uid,
    song_id,
    date,
    count(*) as cnt
  from play_filtered
  group by uid, song_id, date
) p

In [17]:
%sql
-- what are songs with multiple song length?
select 
  song_id, 
  count(distinct(song_length)) as cnt
from play_filtered
group by song_id
having cnt > 1
order by cnt desc

In [18]:
%sql
-- how many song have different song length
select
  count(song_id)
from (
  select 
    song_id
  from play_filtered
  group by song_id
  having count(distinct(song_length)) > 1
) p

In [19]:
new_song_id = sqlContext.sql(
"""
select
  song_name,
  singer,
  row_number() over (order by song_name, singer) as sid
from
  play_filtered
group by song_name, singer
"""
)
new_song_id.createOrReplaceTempView("new_song_id")

In [20]:
play_reindexed = sqlContext.sql(
"""
select
  uid,
  sid,
  n.song_name,
  n.singer,
  play_time,
  song_length,
  date
from play_filtered p
join new_song_id n
on p.song_name=n.song_name
and p.singer=n.singer
"""
)
play_reindexed.createOrReplaceTempView("play_reindexed")

In [21]:
# repartition play_reindexed
# play_reindexed = play_reindexed.repartition("date")
# play_reindexed.write.saveAsTable("play_reindexed")
play_reindexed.cache()
sqlContext.cacheTable("play_reindexed")

In [22]:
%sql
select
  sid,
  song_name,
  singer,
  count(distinct(song_length)) as count
from
  play_reindexed
group by sid, song_name, singer
having count > 1
order by count desc

In [23]:
%sql
with song_length_rank as
(
  select
      sid,
      song_length,
      rank() over (partition by sid order by count(*) desc) as rank
    from
      play_reindexed
    group by sid, song_length
),
sid_song_length as 
(
  select
    sid,
    song_length
  from
    song_length_rank
  where rank=1
)
select 
  uid,
  p.sid,
  song_name,
  singer,
  play_time,
  l.song_length,
  date
from play_reindexed p
join sid_song_length l on p.sid=l.sid
where l.song_length > 0

In [24]:
# for each sid that has multiple song_length, reassign song length to be the one with most count
# and drop records with 0 song length
play_reindexed = sqlContext.sql(
"""
with song_length_rank as
(
  select
      sid,
      song_length,
      rank() over (partition by sid order by count(*) desc) as rank
    from
      play_reindexed
    group by sid, song_length
),
sid_song_length as 
(
  select
    sid,
    song_length
  from
    song_length_rank
  where rank=1
)
select 
  uid,
  p.sid,
  song_name,
  singer,
  play_time,
  l.song_length,
  date
from play_reindexed p
join sid_song_length l on p.sid=l.sid
where l.song_length > 0
"""
)

In [25]:
%sql
-- in this data set, how many songs does most people plays?
select
  percentile(count, 0.95) as P99,
  percentile(count, 0.99) as P99,
  percentile(count, 0.999) as P999
from (
  select 
    count(*) as count
  from
    play_reindexed
  group by
    uid, sid
) p

In [26]:
%sql
select
    uid,
    sid,
    rank() over (partition by uid,sid order by count(*) desc) as rank
  from
    play_reindexed
  group by uid

In [27]:
# drop bot user bases on play count
play_reindexed = sqlContext.sql(
"""
with song_length_rank as
(
  select
      sid,
      song_length,
      rank() over (partition by sid order by count(*) desc) as rank
    from
      play_reindexed
    group by sid, song_length
),
sid_song_length as 
(
  select
    sid,
    song_length
  from
    song_length_rank
  where rank=1
)
select 
  uid,
  p.sid,
  song_name,
  singer,
  play_time,
  l.song_length,
  date
from play_reindexed p
join sid_song_length l on p.sid=l.sid
where l.song_length > 0
"""
)

In [28]:
"""
make utility matrix from play_indexed
"""
def createUtility(df, play_threashold):
  df.createOrReplaceTempView("raw");
  utility = spark.sql("""
    select
      uid,
      sid,
      date,
      count(*) as count
    from raw
    where play_time * 1.0 / song_length > 0.6 and play_time is not null
    group by uid, sid, date
  """)
  
  return utility

util = createUtility(play_reindexed, 0.6)
util.createOrReplaceTempView("util")

In [29]:
util.show(1)

In [30]:
%sql
select
  date,
  count(*)
from util
group by date
order by date

In [31]:
test = sqlContext.sql(
"""
select
  *
from util
where date >= "2017-05-06"
"""
)

In [32]:
training = sqlContext.sql(
"""
select
  *
from util
where date >= "2017-04-06" and
  date < "2017-05-06"
"""
)

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
als = ALS(maxIter=5, regParam=0.01, userCol="uid", itemCol="sid", ratingCol="count",
          coldStartStrategy="drop", nonnegative=True, implicitPrefs=True)

In [34]:
from pyspark.ml.evaluation import Evaluator

class TestEvaluator(Evaluator):    
  def _evaluate(self, dataset):
    dataset.createOrReplaceTempView("dataset")
    result = spark.sql(
    """
    with r as (
      select
        uid,
        sid,
        count,
        (rank() over (partition by uid order by prediction desc ) -1) / (count(sid) over (partition by uid) - 1) as score
      from dataset
    )

    select
      sum(score) / sum(count)
    from r
    """                 
    ).collect()[0][0]
    
    return float(result)

evaluator = TestEvaluator()

In [35]:
from pyspark.ml.tuning import TrainValidationSplit, TrainValidationSplitModel
import numpy as np
class CustomTrainValidationSplit(TrainValidationSplit):
    def __init__(self, validation, training, estimator=None, estimatorParamMaps=None, evaluator=None):
        super(CustomTrainValidationSplit, self).__init__()
        self.estimator = estimator
        self.evaluator = evaluator
        self.estimatorParamMaps = estimatorParamMaps
        self.validation = validation
        self.training = training

    def _fit(self, dataset):
        est = self.estimator
        epm = self.estimatorParamMaps
        numModels = len(epm)
        eva = self.evaluator
        metrics = [0.0] * numModels
        validation = self.validation
        train = self.training
        for j in range(numModels):
            model = est.fit(train, epm[j])
            metric = eva.evaluate(model.transform(validation, epm[j]))
            metrics[j] += metric
        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)
        bestModel = est.fit(dataset, epm[bestIndex])
        return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
      
    def _copyValues(self, to, extra=None):
        """
        Copies param values from this instance to another instance for
        params shared by them.

        :param to: the target instance
        :param extra: extra params to be copied
        :return: the target instance with param values copied
        """
        paramMap = self._paramMap.copy()
        if extra is not None:
            paramMap.update(extra)
        for param in self.params:
            # copy default params
            if param in self._defaultParamMap and to.hasParam(param.name):
                to._defaultParamMap[to.getParam(param.name)] = self._defaultParamMap[param]
            # copy explicitly set params
            if param in paramMap and to.hasParam(param.name):
                to._set(**{param.name: paramMap[param]})
        return to

In [36]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.recommendation import ALS
als = ALS(regParam=0.01, userCol="uid", itemCol="sid", ratingCol="count",
          coldStartStrategy="drop", nonnegative=True, implicitPrefs=True)
paramGrid = ParamGridBuilder().addGrid(als.maxIter, [2,5,10]).build()
tvs = CustomTrainValidationSplit(estimator=als,
                           estimatorParamMaps=paramGrid,
                           evaluator=TestEvaluator(),
                           validation=test,
                           training=training)
result = tvs.fit(training)

In [37]:
prediction = result.bestModel.transform(test)
evaluator.evaluate(prediction)

In [38]:
user_recs = result.bestModel.recommendForAllUsers(3)