In [29]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, sum

def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    topsDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    tmpDF = topsDF.groupBy(col(key1)).agg(col(key1), sum(col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedDF

In [30]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.enableHiveSupport().master("yarn").getOrCreate()

In [31]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, sum, col, abs, count

In [32]:
data = spark_session.read.parquet("/data/sample264")
meta = spark_session.read.parquet("/data/meta")

#### Normalization could be done by next function

In [33]:
data1 = data.select(
    col('userId').alias('userId'), 
    col('trackId').alias('trackId1'), 
    col('timestamp').alias('timestamp1')
)

data2 = data.select(
    col('userId').alias('userId'), 
    col('trackId').alias('trackId2'), 
    col('timestamp').alias('timestamp2')
)

In [34]:
newdf = (data1.join(data2,'userId').cache()
         .filter(col('trackId1') != col('trackId2'))
         .filter(abs(col('timestamp1') - col('timestamp2')) <= 420).cache()
         .groupBy(col('trackId1'), col('trackId2'))
         .count().alias('count').cache()
        )

In [35]:
normalized = norm(newdf, "trackId1", "trackId2", "count", 40)

In [36]:
result = normalized.orderBy(col('norm_count').desc(),col('trackId1'),col('trackId2')).limit(40)

In [37]:
finalresult = result.select('trackId1','trackId2')

In [38]:
for t1,t2 in finalresult.collect():
    print('{}\t{}'.format(t1,t2))

798256	923706
798319	837992
798322	876562
798331	827364
798335	840741
798374	816874
798375	810685
798379	812055
798380	840113
798396	817687
798398	926302
798405	867217
798443	905923
798457	918918
798460	891840
798461	940379
798470	840814
798474	963162
798477	883244
798485	955521
798505	905671
798545	949238
798550	936295
798626	845438
798691	818279
798692	898823
798702	811440
798704	937570
798725	933147
798738	894170
798745	799665
798782	956938
798801	950802
798820	890393
798833	916319
798865	962662
798931	893574
798946	946408
799012	809997
799024	935246
