# Collaborative Filtering Recommendation

Use pyspark and sql in databricks to build a recommendation system and predict the rating scores for users in the test and recommend songs with top rate

## Bring in data and EDA

In [3]:
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import lit
from pyspark.sql.functions import trim
from pyspark.sql.functions import round
from pyspark.sql.functions import when
import pyspark.sql.functions as func
spark = SparkSession \
    .builder \
    .appName("music player") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark.catalog.clearCache()

schema_all_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True), StructField("date", StringType(), True)])

schema_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True)])

all_play = spark.createDataFrame([], schema = schema_all_play)

  
for path, name, _ in dbutils.fs.ls('/FileStore/tables/play/'):
# for path, name, _ in dbutils.fs.ls('/FileStore/tables/gz/'): 
  df = spark.read.csv(path, sep = "\t", schema = schema_play)
  date = name.split("_")[0].strip()
  df = df.withColumn("date",lit(date))

  all_play = all_play.unionAll(df)
# all_play = all_play.filter(all_play['play_time'] > 0.8 * all_play['song_length'] )
# all_play.cache()
# print(all_play.count())
all_play.createOrReplaceTempView("all_play")

In [4]:
all_play.select('uid').distinct().count()

In [5]:
all_play.cache()

In [6]:
all_play.select('song_id').distinct().count()

In [7]:
all_play.select('song_name').distinct().count()

In [8]:
n = spark.sql("""
SELECT cast(uid as Integer), cast(song_id as Integer), (case when rk >= 0.8 then 3 when rk <= 0.3 then 1 else 2 end)  as rate
FROM 
(
SELECT  uid, song_id, cnt_song_uid,  percent_rank(cnt_song_uid) over(partition by uid order by cnt_song_uid) as rk , count(uid) over(partition by song_id) as cnt_uid
FROM (
SELECT uid, song_id, count(*) as cnt_song_uid  FROM 
(
SELECT uid, song_id,  play_time,  date, first_value(song_length) over(order by cnt_song desc) as song_length, first_value(cnt_date) over(partition by uid order by cnt_date desc) as max
FROM 
(
SELECT uid, song_id,  play_time, song_length, date, count(*) over(partition by song_id, song_length) as cnt_song, dense_rank() over(order by uid, date)  as cnt_date
FROM ALL_PLAY
WHERE singer > 'z' and substring(singer, -1, 1) > 'z' and substring(singer, -1, 1) not in ('手', '曲', '谦', '妍', '刚', '阳', '乐', '纬', '伦', '星', '雷',  '声',  '然', '歌', '团', '事', '物') and play_time > 0.8*song_length and uid not in (1685126, 751824, 37025504, 1791497, 1062806, 497685, 751824, 37025504, 1791497, 1062806, 497685, 736305, 0, 1749320, 1679121, 46532274, 28638487, 533817, 155948236, 637650, 32166203, 64268006, 26036032, 32104145, 398309, 154539052, 16517426, 27954505, 168095004, 1883192, 1710083, 1963913, 924065, 167615175, 167848561, 168565798, 168510160) and song_id > 9999 and song_id not in (5237384, 13273544, 15807836)
) t1
) t2 WHERE max > 25 and song_length > 100 and song_length < 480 and ( (play_time > 999 and play_time/1000>0.8*song_length)  or play_time < 1.1*song_length)
GROUP BY uid, song_id
)h
) t4
WHERE cnt_uid > 1
""")

In [9]:
all_play.cache()
all_play.count()

In [10]:
n.cache()
n.count()

## Collaborative Filtering

In [11]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
training, testing = n.randomSplit([0.8, 0.2])
als = ALS(maxIter = 5, regParam = 0.01, userCol = 'uid', itemCol = 'song_id', ratingCol = 'rate')
model = als.fit(training)
predictions = model.transform(testing)
evaluator = RegressionEvaluator(metricName =  'rmse', labelCol = 'rate', predictionCol = 'prediction')


In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName =  'rmse', labelCol = 'rate', predictionCol = 'prediction')

predictions = predictions.where("prediction != 'NaN'")
rmse = evaluator.evaluate(predictions)

In [13]:
evaluator.evaluate(predictions)

In [None]:
# predict songs' rate for users in the prediction dataset 
# songs with top rating scores are recommended  
# other data explorations are as follows

## Other EDA

In [14]:
# other data exploration
dbutils.fs.rm("/FileStore/tables/play/20170301_play.log")
dbutils.fs.rm("/FileStore/tables/play/20170339_1_play.log")
for i in range(1,10):
  print(dbutils.fs.rm("/FileStore/tables/play/2017030" + str(i) + "_1_play.log") )

In [18]:
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import lit
from pyspark.sql.functions import trim
from pyspark.sql.functions import round
from pyspark.sql.functions import when
import pyspark.sql.functions as func
spark = SparkSession \
    .builder \
    .appName("music player") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark.catalog.clearCache()

schema_all_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True), StructField("date", StringType(), True)])

schema_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True)])

all_play = spark.createDataFrame([], schema = schema_all_play)

  
for path, name, _ in dbutils.fs.ls('/FileStore/tables/play/'):
# for path, name, _ in dbutils.fs.ls('/FileStore/tables/gz/'): 
  df = spark.read.csv(path, sep = "\t", schema = schema_play)
  date = name.split("_")[0].strip()
  df = df.withColumn("date",lit(date))

  all_play = all_play.unionAll(df)

all_play.createOrReplaceTempView("all_play")

In [19]:
%sql
SELECT COUNT(*)
FROM all_play
WHERE play_time > 0.8*song_length

count(1)
88402275


In [20]:
all_play.count()
# with 0.8 rule dropping

In [21]:
all_play.groupby(func.substring(all_play['uid'],1, 1)).count().show()
# another test 4.09min 1.06 min, why so fast?

In [22]:
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import lit
from pyspark.sql.functions import trim
from pyspark.sql.functions import round
from pyspark.sql.functions import when
import pyspark.sql.functions as func
spark = SparkSession \
    .builder \
    .appName("music player") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark.catalog.clearCache()

schema_all_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True), StructField("date", StringType(), True)])

schema_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True)])

all_play = spark.createDataFrame([], schema = schema_all_play)

  
for path, name, _ in dbutils.fs.ls('/FileStore/tables/play/'):
# for path, name, _ in dbutils.fs.ls('/FileStore/tables/gz/'): 
  df = spark.read.csv(path, sep = "\t", schema = schema_play)
  date = name.split("_")[0].strip()
  df = df.withColumn("date",lit(date))

  all_play = all_play.unionAll(df)
all_play = all_play.filter(all_play['play_time'] > 0.8 * all_play['song_length'] )
# all_play.cache()
# print(all_play.count())
all_play.createOrReplaceTempView("all_play")

In [23]:
all_play = all_play.filter( (all_play.uid != 1685126) \
                    &(all_play.uid != 751824) \
                    &(all_play.uid != 37025504) \
                    &(all_play.uid != 1791497) \
                    &(all_play.uid != 1062806) \
                    &(all_play.uid != 497685) \
                    &(all_play.uid != 736305) \
                    &(all_play.uid > 99999) \
                    &(all_play.uid != 1749320) \
                    &(all_play.uid != 1679121) \
                    &(all_play.uid != 46532274) \
                    &(all_play.uid != 28638487) \
                    &(all_play.uid != 533817) \
                    &(all_play.uid != 155948236) \
                    &(all_play.uid != 637650) \
                    &(all_play.uid != 32166203) \
                    &(all_play.uid != 64268006) \
                    &(all_play.uid != 26036032) \
                    &(all_play.uid != 32104145) \
                    &(all_play.uid != 398309) \
                    &(all_play.uid != 154539052)\
                    & (all_play.uid != 16517426)\
                    & (all_play.uid != 27954505)\
                    & (all_play.uid != 168095004)\
                    & (all_play.uid != 1710083)\
                    & (all_play.uid != 1963913)\
                    & (all_play.uid != 924065)\
                    & (all_play.uid != 167615175)\
                    & (all_play.uid != 167848561)\
                    & (all_play.uid != 168565798)\
                    & (all_play.uid != 168510160)  )

In [24]:
all_play.cache()

In [25]:
all_play.groupby('uid').agg(func.countDistinct('song_id').alias('count')).orderBy('count', ascending = False).show(1000)

In [26]:
all_play.groupby('uid').agg(func.countDistinct('song_id').alias('count')).orderBy('count', ascending = False).show(1000)

In [27]:
# coding=utf-8
# HEADUP!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!the thing i want to change, don't with column but change while filter
# also play time only need to use 1 or 1000 cutoff no need to change but the song length still need..(count how many needs the change, maybe not too many..then maybe want to only change the one need(only partition part of the sons) or maybe can use join, try jiahui's whether faster both due to gz and due to the song_length strategy, is she only using the ones need change?)
# uid thing, first abnormal uid, very close, then day of uid and then count songs, hopefully each uid with lots of songs, then by song count uid, drop songs with less thant 10 uid
# then partiontoin by song id to have three cutoff for ratings, maybe this can be joined with the above above step, both by uid count songs(the step could be by uid songid date first, then drop those smaller dates, or songs with samller uid, actaully the above can be just one steop)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import lit
from pyspark.sql.functions import trim
from pyspark.sql.functions import round
from pyspark.sql.functions import when
import pyspark.sql.functions as func
spark = SparkSession \
    .builder \
    .appName("music player") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark.catalog.clearCache()

schema_all_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True), StructField("date", StringType(), True)])

schema_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True)])

all_play = spark.createDataFrame([], schema = schema_all_play)

  
for path, name, _ in dbutils.fs.ls('/FileStore/tables/play/'):
# for path, name, _ in dbutils.fs.ls('/FileStore/tables/gz/'): 
  df = spark.read.csv(path, sep = "\t", schema = schema_play)
  date = name.split("_")[0].strip()
  df = df.withColumn("date",lit(date))

  all_play = all_play.unionAll(df)
# all_play.cache()
# print(all_play.count())  
all_play = all_play.withColumn('uid', trim(all_play.uid).cast(IntegerType()))
all_play = all_play.withColumn('song_id', trim(all_play.song_id).cast(IntegerType()))
all_play = all_play.withColumn('play_time', round(trim(all_play.play_time).cast(FloatType())) )
all_play = all_play.withColumn('song_length', round(trim(all_play.song_length).cast(FloatType())) )

all_play = all_play.filter( (all_play['play_time'] > 0.8 * all_play['song_length']) & (all_play.uid != 1685126) \
                    &(all_play.uid != 751824) \
                    &(all_play.uid != 37025504) \
                    &(all_play.uid != 1791497) \
                    &(all_play.uid != 1062806) \
                    &(all_play.uid != 497685) \
                    &(all_play.uid != 736305) \
                    &(all_play.uid > 99999) \
                    &(all_play.uid != 1749320) \
                    &(all_play.uid != 1679121) \
                    &(all_play.uid != 46532274) \
                    &(all_play.uid != 28638487) \
                    &(all_play.uid != 533817) \
                    &(all_play.uid != 155948236) \
                    &(all_play.uid != 637650) \
                    &(all_play.uid != 32166203) \
                    &(all_play.uid != 64268006) \
                    &(all_play.uid != 26036032) \
                    &(all_play.uid != 32104145) \
                    &(all_play.uid != 398309) \
                    &(all_play.uid != 154539052)\
                    & (all_play.uid != 16517426)\
                    & (all_play.uid != 27954505)\
                    & (all_play.uid != 168095004)\
                    & (all_play.uid != 1710083)\
                    & (all_play.uid != 1963913)\
                    & (all_play.uid != 924065)\
                    & (all_play.uid != 167615175)\
                    & (all_play.uid != 167848561)\
                    & (all_play.uid != 168565798)\
                    &(all_play.uid != 168510160) & (all_play.song_id > 9999) & (all_play['song_length'] > 80) & (all_play['play_time'] < 500 )  & (all_play['song_length'] < 480)  )
   
                          
all_play.cache()
print(all_play.count())
all_play.createOrReplaceTempView("all_play")

In [28]:
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import lit
from pyspark.sql.functions import trim
from pyspark.sql.functions import round
from pyspark.sql.functions import when
import pyspark.sql.functions as func
spark = SparkSession \
    .builder \
    .appName("music player") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark.catalog.clearCache()

schema_all_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True), StructField("date", StringType(), True)])

schema_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True)])

all_play = spark.createDataFrame([], schema = schema_all_play)

for path, name, _ in dbutils.fs.ls('/FileStore/tables/'):
# for path, name, _ in dbutils.fs.ls('/FileStore/tables/play/plays'):  
# for path, name, _ in dbutils.fs.ls('/FileStore/tables/play/'):  
  if name == "snap/" or name == "play/" or name == "plays/":
    break
  df = spark.read.csv(path, sep = "\t", schema = schema_play)
  date = name.split("_")[0].strip()
  df = df.withColumn("date",lit(date))

  all_play = all_play.unionAll(df)
  
all_play = all_play.withColumn('uid', trim(all_play.uid).cast(IntegerType()))
all_play = all_play.withColumn('song_id', trim(all_play.song_id).cast(IntegerType()))
all_play = all_play.withColumn('play_time', round(trim(all_play.play_time).cast(FloatType())) )
all_play = all_play.withColumn('song_length', round(trim(all_play.song_length).cast(FloatType())) )
# all_play = all_play.withColumn('date', trim(all_play.date))
# all_play = all_play.withColumn('song_name', trim(all_play.song_name))
# all_play = all_play.withColumn('singer', trim(all_play.singer))
all_play = all_play.filter( (all_play['play_time'] > 0.8 * all_play['song_length']) & (all_play.uid != 1685126) \
                    &(all_play.uid != 751824) \
                    &(all_play.uid != 37025504) \
                    &(all_play.uid != 1791497) \
                    &(all_play.uid != 1062806) \
                    &(all_play.uid != 497685) \
                    &(all_play.uid != 736305) \
                    &(all_play.uid > 99999) \
                    &(all_play.uid != 1749320) \
                    &(all_play.uid != 1679121) \
                    &(all_play.uid != 46532274) \
                    &(all_play.uid != 28638487) \
                    &(all_play.uid != 533817) \
                    &(all_play.uid != 155948236) \
                    &(all_play.uid != 637650) \
                    &(all_play.uid != 32166203) \
                    &(all_play.uid != 64268006) \
                    &(all_play.uid != 26036032) \
                    &(all_play.uid != 32104145) \
                    &(all_play.uid != 398309) \
                    &(all_play.uid != 154539052)\
                    & (all_play.uid != 167615175)\
                    &(all_play.uid != 168510160) & (all_play.song_id > 9999) & (all_play['song_length'] > 80) & (all_play['play_time'] < 500 )  & (all_play['song_length'] < 480)  )
all_play.cache()
print(all_play.count())
all_play.createOrReplaceTempView("all_play")

In [29]:
cc = all_play.groupby('uid', 'date').agg(func.count('song_id').alias('cnt'), (round((func.sum('play_time')/86400),1)).alias('sum'))
# .orderBy('sum', ascending = False)
dd = cc.groupby('uid').agg(func.countDistinct ('date').alias('cntD'), func.sum('cnt').alias('cntS') )
# ccc = cc.filter(cc['sum'] > 1)
# ccc.cache()
# ccc
ee = cc.filter(cc['sum'] > 1).groupby('uid').agg(func.sum('cnt').alias('cnts'),func.countDistinct ('date').alias('cnt_d'), func.first('sum').alias('top_sum'))
# .orderBy('cnt', ascending = False).show(1000)
f = ee.join(dd, ee.uid == dd.uid).select(dd.uid.alias('uid'), ee.top_sum.alias('top_sum'), dd.cntS.alias('cntS1'), ee.cnts.alias('cnts'), dd.cntD.alias('cntD1'), ee.cnt_d.alias('cnt_d'))
f = f.withColumn('pct1', f.cnts/f.cntS1)
f = f.withColumn('pct2', f.cnt_d/f.cntD1)
# ff = f.select('uid', ee.top_sum, dd.cntS, ee.cnts, ee.cnts/dd.cntS, dd.cntD, ee.cnt_d, ee.cnt_d/dd.cntD)
# .orderBy(ee.top_sum, ascending = False).filter(dd.cntD > 24)
f.select('uid').distinct().show(1000)

In [30]:
dbutils.fs.mkdirs("FileStore/tables/play")
s = StructType([ StructField("uid", StringType(), True)] )
d = spark.read.csv('FileStore/tables/test1.csv', schema = s)

In [31]:
# coding=utf-8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import lit
from pyspark.sql.functions import trim
from pyspark.sql.functions import round
from pyspark.sql.functions import when
import pyspark.sql.functions as func
spark = SparkSession \
    .builder \
    .appName("music player") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark.catalog.clearCache()

schema_all_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True), StructField("date", StringType(), True)])

schema_play = StructType([ StructField("uid", StringType(), True), StructField("device", StringType(), True),StructField("song_id", StringType(), True), StructField("song_type", StringType(), True), StructField("song_name", StringType(), True), StructField("singer", StringType(), True), StructField("play_time", StringType(), True),StructField("song_length", StringType(), True), StructField("paid_flag", StringType(), True)])

all_play = spark.createDataFrame([], schema = schema_all_play)

  
for path, name, _ in dbutils.fs.ls('/FileStore/tables/play/'):
# for path, name, _ in dbutils.fs.ls('/FileStore/tables/gz/'): 
  df = spark.read.csv(path, sep = "\t", schema = schema_play)
  date = name.split("_")[0].strip()
  df = df.withColumn("date",lit(date))

  all_play = all_play.unionAll(df)
# all_play.cache()
# print(all_play.count())  
# all_play = all_play.withColumn('uid', trim(all_play.uid).cast(IntegerType()))
# all_play = all_play.withColumn('song_id', trim(all_play.song_id).cast(IntegerType()))
# all_play = all_play.withColumn('play_time', round(trim(all_play.play_time).cast(FloatType())) )
# all_play = all_play.withColumn('song_length', round(trim(all_play.song_length).cast(FloatType())) )

all_play = all_play.filter( (all_play['play_time'] > 0.8 * all_play['song_length']) & (all_play.uid != 1685126) \
                    &(all_play.uid != 751824) \
                    &(all_play.uid != 37025504) \
                    &(all_play.uid != 1791497) \
                    &(all_play.uid != 1062806) \
                    &(all_play.uid != 497685) \
                    &(all_play.uid != 736305) \
                    &(all_play.uid > 99999) \
                    &(all_play.uid != 1749320) \
                    &(all_play.uid != 1679121) \
                    &(all_play.uid != 46532274) \
                    &(all_play.uid != 28638487) \
                    &(all_play.uid != 533817) \
                    &(all_play.uid != 155948236) \
                    &(all_play.uid != 637650) \
                    &(all_play.uid != 32166203) \
                    &(all_play.uid != 64268006) \
                    &(all_play.uid != 26036032) \
                    &(all_play.uid != 32104145) \
                    &(all_play.uid != 398309) \
                    &(all_play.uid != 154539052)\
                           
                    & (all_play.uid != 16517426)\
                    & (all_play.uid != 27954505)\                   
                    & (all_play.uid != 168095004)\
                    & (all_play.uid != 1710083)\       
                    & (all_play.uid != 1963913)\
                    & (all_play.uid != 924065)\
                    & (all_play.uid != 167615175)\
                    & (all_play.uid != 167848561)\      
                    & (all_play.uid != 168565798)\
                    &(all_play.uid != 168510160) & (all_play.song_id > 9999) & (all_play['song_length'] > 80) & (all_play['play_time'] < 500 )  & (all_play['song_length'] < 480)  )
#                     &(all_play.uid != 398309) \
#                     &(all_play.uid != 154539052)\
#                     
all_play.cache()
print(all_play.count())
all_play.createOrReplaceTempView("all_play")


In [32]:
cc = all_play.groupby('uid', 'date').agg(func.count('song_id').alias('cnt'), (round((func.sum('play_time')/86400),1)).alias('sum'))
dd = cc.groupby('uid').agg(func.countDistinct ('date').alias('cntD'), func.sum('cnt').alias('cntS') )
ee = cc.filter(cc['sum'] > 1).groupby('uid').agg(func.sum('cnt').alias('cnts'),func.countDistinct ('date').alias('cnt_d'), func.first('sum').alias('top_sum'))
f = ee.join(dd, ee.uid == dd.uid).select(dd.uid.alias('uid'), ee.top_sum.alias('top_sum'), dd.cntS.alias('cntS1'), ee.cnts.alias('cnts'), dd.cntD.alias('cntD1'), ee.cnt_d.alias('cnt_d'))
f = f.withColumn('pct1', f.cnts/f.cntS1)
f = f.withColumn('pct2', f.cnt_d/f.cntD1).filter(dd.cntD > 24).orderBy('cntS1', ascending = False)
# .orderBy('cntD1', ascending = False)

In [33]:
f.show(1000)
# 167613273 2373,  167981299 4688 .38 pct, 167848561 11311| 5205| 42| 7| 0.46 this denefintely need frop, 168791869| 1.1| 2507| 367| 35| 1| 0.14, |168965697| 2.1| 6927| 1622| 31| 1| 0.23, 
|168858307| 1.1| 2160| 356| 27| 1| 0.1648148148148148|0.037037037037037035| |168125418| 3.8| 3005| 1364| 33| 1| 0.45,
|168501892| 1.1| 2041| 366| 31| 1| 0.179
|168250921| 1.1| 9560| 2257| 38| 4| 0.2360878661087866| 0.10526315789473684| this one!
|168788495| 1.2| 2152| 403| 31| 1| 0.18
167977274| 1.2| 4032| 455| 38| 1| 0.112
|168238453| 1.5| 1544| 507| 25| 1| 0.32
169022311| 2.9| 1121| 936| 31| 1| 0.834

 27954505| 1.4|22858|20204| 42| 34| 0.8838
   1710083| 1.3|19829|13126| 42| 20| 0.66
    | 16517426| 1.5|23703|21957| 44| 38| 0.926

In [34]:
f.show(1000)
| uid|top_sum|cntS1| cnts|cntD1|cnt_d| pct1| pct2| +---------+-------+-----+-----+-----+-----+--------------------+--------------------+ | 16517426| 1.5|23703|21957| 44| 38| 0.9263384381723833| 0.8636363636363636| | 27954505| 1.4|22858|20204| 42| 34| 0.8838918540554729| 0.8095238095238095| | 1710083| 1.3|19829|13126| 42| 20| 0.6619597559130567| 0.47619047619047616| | 1963913| 1.3|13206| 6605| 42| 12| 0.5001514463122823| 0.2857142857142857| |168565798| 1.2|11375| 413| 42| 1|0.036307692307692305|0.023809523809523808| |167848561| 2.6|11311| 5205| 42| 7| 0.46017151445495535| 0.16666666666666666| | 924065| 1.4| 9994| 4172| 37| 9| 0.4174504702821693| 0.24324324324324326|

In [35]:
f.select('uid').distinct().show(1000)

In [36]:
all_play.createOrReplaceTempView("all_play")

In [37]:
%sql
SELECT date, sum(if(trim(uid)=uid,0,1)) as sum1, sum(if(trim(song_id)=song_id,0,1)) as sum2, sum(if(trim(song_length)=song_id,0,1)) as sum3, sum(if(trim(play_time)=song_id,0,1)) as sum4
from all_play
group by date
order by sum1 desc, sum2 desc, sum3 desc, sum4 desc

date,sum1,sum2,sum3,sum4
20170301,3422930,3422936,3422936,3422936
20170302,2453601,2453607,2453607,2453607
20170339,2194479,2194482,2194482,2194482
20170303,1852995,1852999,1852999,1852999
20170304,1710065,1710065,1710065,1710065
20170305,1608947,1608947,1608947,1608947
20170306,1352351,1352351,1352351,1352351
20170307,1289449,1289449,1289449,1289449
20170309,1173766,1173766,1173766,1173766
20170331,2138,3041,7031208,6903167


In [38]:
x = all_play.groupby('song_id').agg(func.countDistinct('song_name').alias('d')).orderBy('d', ascending = False)

In [39]:
x.filter(x['d'] < 9).show(50)


In [40]:
all_play.filter(all_play['song_id'] == 13761415).select('song_name').distinct().show()

In [41]:
%sql
SELECT DISTINCT SONG_NAME
FROM ALL_PLAY
WHERE SONG_ID = 13761415

SONG_NAME
Audition(The Fools Who Dream)-From ＂La La Land＂ Soundtrack
"Audition(The Fools Who Dream)-From "";La La Land""; Soundtrack"
"Audition(The Fools Who Dream)-From ""La La Land"" Soundtrack"
"Audition(The Fools Who Dream)-From \""La La Land\"" Soundtrack"
"Audition(The Fools Who Dream)-From ""La La Land"" Soundtrack"
"Audition(The Fools Who Dream)-From "";La La Land""; Soundtrack"
Audition(The Fools Who Dream)-From ＂La La Land＂ Soundtrack
Audition(The Fools Who Dream)


In [42]:
all_play.filter(all_play.song_name == "刚好遇见你" ).agg(func.countDistinct('uid')).show()

In [43]:
all_play.agg(func.countDistinct('uid')).show()

In [44]:
all_play.filter(all_play.play_time > 480).groupby('song_length').count().orderBy('song_length', ascending = False).show(400)

In [45]:
all_play.filter(all_play.play_time > 480).groupby('date').count().orderBy('date', ascending = False).show(60)

In [46]:
all_play.filter(all_play.play_time > 480).groupby('song_length').count().orderBy('count', ascending = False).show(400)

In [47]:
all_play.filter( (all_play['song_length'] < 479)&(all_play['song_length'] > 459)  & (all_play['play_time'] > 480) ).show()

In [48]:
all_play.filter( (all_play['song_length'] == 268) & (all_play['play_time'] > 480) ).show()

In [49]:
all_play.filter(all_play.uid == 154418068).groupby('date').count().show(500)

In [50]:
all_play.filter(all_play.uid == 154418068).groupby('play_time').count().show(500)

In [51]:
all_play.filter( (all_play['play_time'] > 480) ).show(1000)

In [52]:
all_play.groupby(round(all_play['play_time']/all_play['song_length'], 1)).count().orderBy('count', ascending = False).show(500)

In [53]:
la = all_play.groupby('song_name').agg(func.countDistinct('uid').alias('cnt')).orderBy('cnt', ascending = False)

In [54]:
la.show(1000)

In [55]:
la.createOrReplaceTempView('la')

In [56]:
%sql
SELECT *
FROM LA

song_name,cnt
刚好遇见你,79629
凉凉-(电视剧《三生三世十里桃花》片尾曲),72670
逆流成河,58411
演员,52074
没有你陪伴真的好孤单,46445
成都,41205
三生三世-(电视剧《三生三世十里桃花》主题曲),40944
走着走着就散了,34897
告白气球,33503
你还要我怎样,33178
