In [2]:
#! /usr/bin/env python
from __future__ import division
import math
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.window import Window

In [4]:
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local[2]"))

In [5]:
TIME_DELTA = 421 # 7 minutes 1 second

In [6]:
sql = SQLContext(sc)
data = sql.read.parquet("/data/sample264")

In [8]:
pairs_per_user = data.alias('l').join(data.alias('r'), on='userId')\
        .where('r.trackId != l.trackId')\
        .select(f.col('userId').alias('id'),
                f.col('l.trackId').alias('track1'),
                f.col('r.trackId').alias('track2'),
                f.col('l.timestamp').alias('timestamp1'),
                f.col('r.timestamp').alias('timestamp2'))

In [9]:
pairs_per_user = pairs_per_user.\
    withColumn('weight', f.when(
        f.abs(f.col('timestamp1') - f.col('timestamp2')) < TIME_DELTA, 1)\
        .otherwise(0)
    )

In [10]:
aggregated_pairs = pairs_per_user.\
    select(f.col('track1'),
           f.col('track2'),
           f.col('weight')).\
    groupBy(f.col('track1'), f.col('track2')).\
    agg(f.sum(f.col('weight')).alias('weight'))

In [17]:
window = Window.partitionBy('track1').orderBy(f.col('weight').desc())

#For each track choose top 40 tracks ordered by weight similar to it
top_40_tracks = aggregated_pairs.withColumn('rank_num', f.rank().over(window)).\
    where(f.col('rank_num') < 40).\
    drop('rank_num')
    
    
#normalize weights of its edges 
#(divide the weight of each edge on a sum of weights of all edges)
top_sum = top_40_tracks.\
    groupBy('track1').\
    agg(f.sum(f.col('weight')).alias('node_sum'))
    
normalized_tracks = top_40_tracks.join(top_sum, on='track1').\
    withColumn('norm_weight', f.col('weight')/f.col('node_sum')).\
    drop('node_sum', 'weight')

In [None]:
# Sort the resulting Data Frame in the descending order by the column norm_weight, 
# and then in the ascending order this time first by “id1”, then by “id2”. 
# Take top 40 rows, select only the columns “id1”, “id2”,
# and print the columns “id1”, “id2” of the resulting dataframe.

result = normalized_tracks.orderBy(
    f.col('norm_weight').desc(),
    f.col('track1').asc(),
    f.col('track2').asc()).\
    select('track1', 'track2').\
    take(40)

for row in result:
    print(row[0], row[1], sep='\t')