In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local[*]").getOrCreate()

In [3]:
from pyspark.sql import functions as f
from pyspark.sql import Window

In [4]:
data = sparkSession.read.parquet("/data/sample264")

In [5]:
data.take(10)

[Row(userId=13065, trackId=944906, artistId=978428, timestamp=1501588527),
 Row(userId=101897, trackId=799685, artistId=989262, timestamp=1501555608),
 Row(userId=215049, trackId=871513, artistId=988199, timestamp=1501604269),
 Row(userId=309769, trackId=857670, artistId=987809, timestamp=1501540265),
 Row(userId=397833, trackId=903510, artistId=994595, timestamp=1501597615),
 Row(userId=501769, trackId=818149, artistId=994975, timestamp=1501577955),
 Row(userId=601353, trackId=958990, artistId=973098, timestamp=1501602467),
 Row(userId=710921, trackId=916226, artistId=972031, timestamp=1501611582),
 Row(userId=6743, trackId=801006, artistId=994339, timestamp=1501584964),
 Row(userId=152407, trackId=913509, artistId=994334, timestamp=1501571055)]

In [6]:
tracks = data.alias('df1').join(data.alias('df2'), 'userId') \
    .withColumn('diff', f.abs(f.col('df1.timestamp') - f.col('df2.timestamp'))) \
    .where((f.col('diff') < 420) & (f.col('df1.trackId') != f.col('df2.trackId'))) \
    .select(f.col('df1.trackId').alias('id1'), f.col('df2.trackId').alias('id2')) \
    .withColumn('array', f.array(f.col('id1'), f.col('id2'))) \
    .select(f.sort_array('array', asc=True).alias('array')) \
    .withColumn('id1', f.col('array')[0]) \
    .withColumn('id2', f.col('array')[1]) \
    .select('id1', 'id2') \
    .groupBy(f.col('id1'), f.col('id2')).count()

In [7]:
tracks.take(10)

[Row(id1=802640, id2=953737, count=32),
 Row(id1=874523, id2=900026, count=2),
 Row(id1=812616, id2=834204, count=2),
 Row(id1=891623, id2=927170, count=30),
 Row(id1=905671, id2=953850, count=4),
 Row(id1=930224, id2=933768, count=94),
 Row(id1=800288, id2=944500, count=2),
 Row(id1=814652, id2=884527, count=2),
 Row(id1=857076, id2=858940, count=2),
 Row(id1=889722, id2=899439, count=6)]

In [8]:
def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(f.col(field).desc())
        
    topsDF = df.withColumn("row_number", f.row_number().over(window)) \
        .filter(f.col("row_number") <= n) \
        .drop(f.col("row_number")) 
        
    tmpDF = topsDF.groupBy(f.col(key1)).agg(f.col(key1), f.sum(f.col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, f.col(field) / f.col("sum_" + field)) \
        .cache()

    return normalizedDF

In [9]:
tracksNorm = norm(tracks, "id1", "id2", "count", 40) \
        .select('id1', 'id2', 'norm_count')

In [10]:
tracksNorm.take(30)

[Row(id1=798477, id2=883244, norm_count=1.0),
 Row(id1=798692, id2=898823, norm_count=1.0),
 Row(id1=800467, id2=855206, norm_count=1.0),
 Row(id1=801701, id2=920990, norm_count=1.0),
 Row(id1=802599, id2=908754, norm_count=0.03571428571428571),
 Row(id1=802599, id2=937714, norm_count=0.03571428571428571),
 Row(id1=802599, id2=811513, norm_count=0.03571428571428571),
 Row(id1=802599, id2=929402, norm_count=0.03571428571428571),
 Row(id1=802599, id2=924227, norm_count=0.03571428571428571),
 Row(id1=802599, id2=901687, norm_count=0.03571428571428571),
 Row(id1=802599, id2=860294, norm_count=0.03571428571428571),
 Row(id1=802599, id2=880642, norm_count=0.03571428571428571),
 Row(id1=802599, id2=920627, norm_count=0.03571428571428571),
 Row(id1=802599, id2=843219, norm_count=0.03571428571428571),
 Row(id1=802599, id2=892457, norm_count=0.03571428571428571),
 Row(id1=802599, id2=823001, norm_count=0.03571428571428571),
 Row(id1=802599, id2=899859, norm_count=0.03571428571428571),
 Row(id1=8

In [11]:
window = Window.orderBy(f.col("norm_count"))
    
TrackList = tracksNorm.withColumn("position", f.rank().over(window))\
    .filter(f.col("position") < 50)\
    .orderBy(f.col("id1"), f.col("id2"))\
    .select('id1', 'id2')\
    .take(40)

In [12]:
for val in TrackList:
    print "%s %s" % val

805688 806854
805688 823746
805688 836340
805688 854090
805688 899991
805688 901935
805688 904206
805688 933768
805688 947087
821288 823531
821288 824065
821288 824301
821288 829307
821288 831005
821288 843100
821288 846529
821288 854531
821288 858618
821288 860342
821288 868259
821288 874329
821288 878313
821288 895464
821288 905479
821288 909000
821288 909165
821288 913122
821288 934081
821288 940362
821288 940574
821288 944451
821288 947260
821288 952128
821288 963117
821288 966527
854531 855221
854531 872475
854531 878313
854531 891839
854531 893868
