In [26]:
import os
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [27]:
sparkSession = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()

In [28]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

In [29]:
def normalization(data, key1, key2, field, threshold): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    top = data.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= threshold) \
        .drop(col("row_number")) 
        
    df = top.groupBy(col(key1)).agg(col(key1), sum(col(field)).alias("sum_" + field))
   
    normalizedData = top.join(df, key1, "inner") \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedData

In [30]:
tracks = data.groupBy(col("userId"), col("trackId")).count()

tracksNorm = normalization(tracks, "userId", "trackId", "count", 1000) \
        .withColumn("id", col("userId")) \
        .withColumn("id2", col("trackId")) \
        .withColumn("norm_count", col("norm_count")) \
        .select(col("id"), col("id2"), col("norm_count"))     

window = Window.orderBy(col("norm_count").desc())
    
trackList = tracksNorm.withColumn("position", rank().over(window))\
    .filter(col("position") < 100)\
    .sort(["norm_count","id","id2"],ascending=[0,1,1])\
    .select(col("id"), col("id2")).take(40)

In [31]:
for val in trackList:
    print "%s %s" % val

66 965774
116 867268
128 852564
131 880170
195 946408
215 860111
235 897176
300 857973
321 915545
328 943482
333 818202
346 864911
356 961308
428 943572
431 902497
445 831381
488 841340
542 815388
617 946395
649 901672
658 937522
662 881433
698 935934
708 952432
746 879259
747 879259
776 946408
784 806468
806 866581
811 948017
837 799685
901 871513
923 879322
934 940714
957 945183
989 878364
999 967768
1006 962774
1049 849484
1057 920458
