In [91]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()

In [92]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

In [93]:
from pyspark.sql import Window
from pyspark.sql.functions import *

def norm(df, key1, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    topsDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    tmpDF = df.agg(sum(col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.crossJoin(tmpDF) \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedDF

In [94]:
data.cache()

data_copy = data.withColumnRenamed("timestamp", "timestamp2").withColumnRenamed("trackId", "trackId2")

track_to_track = data \
    .join(data_copy, (data.userId == data_copy.userId) & ~(data.trackId == data_copy.trackId2), "inner") \
    .withColumn("time_diff", abs(col("timestamp")-col("timestamp2"))) \
    .withColumn("weight", when(col("time_diff") < 7*60, 1).otherwise(0)) \
    .groupBy(col("trackId"), col("trackId2")) \
    .agg(sum("weight").alias("weight"))
    

normalized = norm(track_to_track, "trackId", "weight", 40)

window = Window.orderBy(col("norm_weight").desc())

top = normalized.withColumn("pos", row_number().over(window)) \
    .filter(col("pos") <= 40) \
    .orderBy(col("trackId").asc(), col("trackId2").asc()) \
    .collect()

    
for item in top:
    print "%s %s" % (item.trackId, item.trackId2)
    
    




805688 947174
810775 821251
821251 810775
821288 825174
821288 924215
825174 821288
831005 864690
831005 940362
841340 858904
846587 946758
854531 870292
854531 879259
854531 933030
858904 841340
858904 870292
858904 871513
858904 947174
864690 831005
864690 940362
870292 854531
870292 858904
870292 871513
870292 939606
871513 858904
871513 870292
871513 933030
879259 854531
886091 940951
901965 908335
908335 901965
924215 821288
933030 854531
933030 871513
939606 870292
940362 831005
940362 864690
940951 886091
946758 846587
947174 805688
947174 858904
