# Set up

In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local[*]").getOrCreate()

In [3]:
from pyspark.sql import functions as f
from pyspark.sql import Window

# Load data

In [4]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

In [5]:
data.show(10)

+------+-------+--------+----------+
|userId|trackId|artistId| timestamp|
+------+-------+--------+----------+
| 13065| 944906|  978428|1501588527|
|101897| 799685|  989262|1501555608|
|215049| 871513|  988199|1501604269|
|309769| 857670|  987809|1501540265|
|397833| 903510|  994595|1501597615|
|501769| 818149|  994975|1501577955|
|601353| 958990|  973098|1501602467|
|710921| 916226|  972031|1501611582|
|  6743| 801006|  994339|1501584964|
|152407| 913509|  994334|1501571055|
+------+-------+--------+----------+
only showing top 10 rows



In [6]:
meta.show(10)

+------+--------------------+--------------------+-------+
|  type|                Name|              Artist|     Id|
+------+--------------------+--------------------+-------+
| track|               Smile| Artist: Josh Groban|1223851|
| track|Chuni Ashkharhe Q...|Artist: Razmik Amyan|1215486|
| track|           Dark City|Artist: Machinae ...|1296462|
| track|       Not Sensitive|        Artist: Moby|1249694|
|artist|Artist: Carlos Pu...|Artist: Carlos Pu...|1352221|
| track|Thiz Gangsta Chit...|Artist: Tha Dogg ...|1217194|
| track|            Ruffneck|    Artist: Skrillex|1245681|
| track|              Incerc|       Artist: Spike|1193283|
|artist|Artist: Wallenber...|Artist: Wallenber...|1333444|
| track|               remix|    Artist: Flo Rida|1246378|
+------+--------------------+--------------------+-------+
only showing top 10 rows



In [7]:
from pyspark.sql import Row
r = user = Row(
    'type',
    'Id',
    'Name',
    'Artist',
    'Score'
)
r

<Row(type, Id, Name, Artist, Score)>

In [8]:
user = {
    'type': 'user',
    'Id': 776748,
    'Name': 'User',
    'Artist': 'User',
    'Score': 1
}
user = r(user)
user = spark.createDataFrame(user) \
    .select('type', 'Id', 'Name', 'Artist', 'Score')



In [9]:
user.show(1)

+----+------+----+------+-----+
|type|    Id|Name|Artist|Score|
+----+------+----+------+-----+
|user|776748|User|  User|    1|
+----+------+----+------+-----+



In [10]:
templet = meta.select('type', 'Id')
user2 = user.select('type', 'Id')
templet = user2.union(templet).orderBy('Id')

In [11]:
templet.show(10)

+-----+-------+
| type|     Id|
+-----+-------+
| user| 776748|
|track|1145999|
|track|1146003|
|track|1146005|
|track|1146007|
|track|1146009|
|track|1146010|
|track|1146011|
|track|1146012|
|track|1146014|
+-----+-------+
only showing top 10 rows



#  Top-40 vertices

In [12]:
findArtists = data.join(meta, data.artistId == meta.Id) \
    .where((f.col('userId') == 776748) & (f.col('type') == u'artist')) \
    .select('Artist', 'Name', 'type', 'Id').distinct()

In [13]:
findArtists.show(10)

+--------------------+--------------------+------+-------+
|              Artist|                Name|  type|     Id|
+--------------------+--------------------+------+-------+
|        Artist: Korn|        Artist: Korn|artist|1361795|
|Artist: 3 Doors Down|Artist: 3 Doors Down|artist|1363546|
|   Artist: Disturbed|   Artist: Disturbed|artist|1360803|
|  Artist: Clawfinger|  Artist: Clawfinger|artist|1343667|
|Artist: Thousand ...|Artist: Thousand ...|artist|1341337|
|Artist: Serj Tankian|Artist: Serj Tankian|artist|1331090|
|Artist: Three Day...|Artist: Three Day...|artist|1348237|
|  Artist: Papa Roach|  Artist: Papa Roach|artist|1350632|
|    Artist: Iggy Pop|    Artist: Iggy Pop|artist|1347764|
|Artist: Rise Against|Artist: Rise Against|artist|1337033|
+--------------------+--------------------+------+-------+
only showing top 10 rows



In [14]:
findTracks = data.join(meta, data.trackId == meta.Id) \
    .where((f.col('userId') == 776748) & (f.col('type') == u'track')) \
    .select('Artist', 'Name', 'type', 'Id').distinct()

In [15]:
findTracks.show(10)

+--------------------+--------------------+-----+-------+
|              Artist|                Name| type|     Id|
+--------------------+--------------------+-----+-------+
|   Artist: Green Day|             21 Guns|track|1299891|
|Artist: The Offsp...|   Come Out and Play|track|1238423|
|        Artist: Nomy|             Cocaine|track|1294015|
|  Artist: Papa Roach|Getting Away With...|track|1310723|
|Artist: Three Day...|I Hate Everything...|track|1322142|
|  Artist: Nickelback|     She Keeps Me Up|track|1246253|
|        Artist: Korn|        Here To Stay|track|1220928|
|        Artist: Blur|      Girls and Boys|track|1290141|
|  Artist: Clawfinger|    Nothing Going On|track|1197701|
|         Artist: 311|  Beautiful disaster|track|1200833|
+--------------------+--------------------+-----+-------+
only showing top 10 rows



In [16]:
findAll = findArtists.union(findTracks) \
    .orderBy('Artist', 'type', 'Name') \
    .select('type', 'Id') \
    .take(40)
findAll = spark.createDataFrame(findAll) \
    .withColumn('Score', f.lit(1))

In [17]:
findAll.show(10)

+------+-------+-----+
|  type|     Id|Score|
+------+-------+-----+
|artist|1363546|    1|
| track|1273367|    1|
|artist|1336647|    1|
| track|1200833|    1|
|artist|1355952|    1|
| track|1290141|    1|
|artist|1343667|    1|
| track|1197701|    1|
|artist|1360803|    1|
| track|1180278|    1|
+------+-------+-----+
only showing top 10 rows



# Initialize coordinates of vector

In [18]:
user3 = user.select('type', 'Id', 'Score')
initial_x = user3.union(findAll).orderBy('Id')

In [19]:
initial_x.show(10)

+-----+-------+-----+
| type|     Id|Score|
+-----+-------+-----+
| user| 776748|    1|
|track|1160421|    1|
|track|1176891|    1|
|track|1178831|    1|
|track|1179685|    1|
|track|1180278|    1|
|track|1197701|    1|
|track|1200833|    1|
|track|1218821|    1|
|track|1220928|    1|
+-----+-------+-----+
only showing top 10 rows



In [20]:
check = initial_x.where(f.col('Id') == 1361795)

In [21]:
check.show(1)

+------+-------+-----+
|  type|     Id|Score|
+------+-------+-----+
|artist|1361795|    1|
+------+-------+-----+



# Define norm fuction

In [22]:
def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(f.col(field).desc())
        
    topsDF = df.withColumn('row_number', f.row_number().over(window)) \
        .filter(f.col('row_number') <= n) \
        .drop(f.col('row_number')) 
        
    tmpDF = topsDF.groupBy(f.col(key1)).agg(f.col(key1), f.sum(f.col(field)).alias('sum_' + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, 'inner') \
        .withColumn('norm_' + field, f.col(field) / f.col('sum_' + field)) \
        .cache()

    return normalizedDF

# User-track edge

In [23]:
userTrack = data.groupBy('userId', 'trackId').count()

In [24]:
userTrackNorm = norm(userTrack, 'userId', 'trackId', 'count', 1000) \
    .select('userId', 'trackId', 'norm_count')

In [25]:
userTrackNorm.show(10)

+------+-------+------------------+
|userId|trackId|        norm_count|
+------+-------+------------------+
|  3175| 947718|0.1111111111111111|
|  3175| 940951|0.1111111111111111|
|  3175| 845631|0.1111111111111111|
|  3175| 864690|0.1111111111111111|
|  3175| 831005|0.1111111111111111|
|  3175| 930432|0.1111111111111111|
|  3175| 965012|0.1111111111111111|
|  3175| 858940|0.1111111111111111|
|  3175| 829307|0.1111111111111111|
|  5518| 961148|               0.5|
+------+-------+------------------+
only showing top 10 rows



In [26]:
userTrackNext = userTrackNorm.select(f.col('trackId').alias('Id'), f.col('norm_count').alias('Next1')) \
    .where(f.col('userId') == 776748) \
    .withColumn('type', f.lit('track')) \
    .select('type', 'Id', 'Next1').cache()

In [27]:
userTrackNext.show(10)

+-----+-------+--------------------+
| type|     Id|               Next1|
+-----+-------+--------------------+
|track|1226775|  0.1111111111111111|
|track|1197701| 0.07407407407407407|
|track|1238423| 0.07407407407407407|
|track|1299891| 0.07407407407407407|
|track|1225222|0.037037037037037035|
|track|1176891|0.037037037037037035|
|track|1220928|0.037037037037037035|
|track|1310723|0.037037037037037035|
|track|1179685|0.037037037037037035|
|track|1294015|0.037037037037037035|
+-----+-------+--------------------+
only showing top 10 rows



# User-artist edge

In [28]:
userArtist = data.groupBy('userId', 'artistId').count()

In [29]:
userArtistNorm = norm(userArtist, 'userId', 'artistId', 'count', 100) \
    .select('userId', 'artistId', 'norm_count')

In [30]:
userArtistNorm.show(10)

+------+--------+------------------+
|userId|artistId|        norm_count|
+------+--------+------------------+
|  3175|  981306|0.2222222222222222|
|  3175|  995274|0.1111111111111111|
|  3175|  986492|0.1111111111111111|
|  3175|  976051|0.1111111111111111|
|  3175| 1000709|0.1111111111111111|
|  3175|  984798|0.1111111111111111|
|  3175|  969751|0.1111111111111111|
|  3175| 1000564|0.1111111111111111|
|  5518|  978963|               0.5|
|  5518|  984128|0.3333333333333333|
+------+--------+------------------+
only showing top 10 rows



In [31]:
userArtistNext = userArtistNorm.select(f.col('artistId').alias('Id'), f.col('norm_count').alias('Next2')) \
    .where(f.col('userId') == 776748) \
    .withColumn('type', f.lit('artist')) \
    .select('type', 'Id', 'Next2').cache()

In [32]:
userArtistNext.show(10)

+------+-------+--------------------+
|  type|     Id|               Next2|
+------+-------+--------------------+
|artist|1358867| 0.18518518518518517|
|artist|1343667| 0.07407407407407407|
|artist|1358472| 0.07407407407407407|
|artist|1330944| 0.07407407407407407|
|artist|1331090|0.037037037037037035|
|artist|1336647|0.037037037037037035|
|artist|1337033|0.037037037037037035|
|artist|1355952|0.037037037037037035|
|artist|1359522|0.037037037037037035|
|artist|1348237|0.037037037037037035|
+------+-------+--------------------+
only showing top 10 rows



# Track-track edge

In [33]:
trackTrack = data.alias('df1').join(data.alias('df2'), 'userId') \
    .withColumn('diff', f.abs(f.col('df1.timestamp') - f.col('df2.timestamp'))) \
    .where((f.col('diff') <= 420) & (f.col('df1.trackId') != f.col('df2.trackId'))) \
    .select(f.col('df1.trackId').alias('id1'), f.col('df2.trackId').alias('id2')) \
    .groupBy(f.col('id1'), f.col('id2')).count()

In [34]:
trackTrackNorm = norm(trackTrack, 'id1', 'id2', 'count', 40) \
    .select('id1', 'id2', 'norm_count') \
    .orderBy('id1', 'id2')    

In [35]:
trackTrackNorm.show(10)

+------+------+----------+
|   id1|   id2|norm_count|
+------+------+----------+
|798256|923706|       1.0|
|798258|808254|       0.5|
|798258|810685|       0.5|
|798261|911939|       0.6|
|798261|916840|       0.2|
|798261|943188|       0.2|
|798290|880442|       0.5|
|798290|906999|       0.5|
|798302|836228|       0.5|
|798302|893311|       0.5|
+------+------+----------+
only showing top 10 rows



In [36]:
def getTrackTrackNext(trackId, trackTrackNorm):
    
    trackTrackNext = trackTrackNorm.select(f.col('id2').alias('Id'), f.col('norm_count').alias('Next3')) \
        .where(f.col('id1') == trackId) \
        .withColumn('type', f.lit('track')) \
        .select('type', 'Id', 'Next3')
        
    return trackTrackNext

In [37]:
check = getTrackTrackNext(798256, trackTrackNorm)
check.show(10)

+-----+------+-----+
| type|    Id|Next3|
+-----+------+-----+
|track|923706|  1.0|
+-----+------+-----+



# Artist-track edge

In [38]:
artistTrack = data.groupBy('artistId', 'trackId').count()

In [39]:
artistTrackNorm = norm(artistTrack, 'artistId', 'trackId', 'count', 100) \
    .select('artistId', 'trackId', 'norm_count')

In [40]:
artistTrackNorm.show(10)

+--------+-------+-------------------+
|artistId|trackId|         norm_count|
+--------+-------+-------------------+
|  968694| 827354|               0.25|
|  968694| 820606|               0.25|
|  968694| 897139|               0.25|
|  968694| 925696|               0.25|
|  969344| 933592|                1.0|
|  969479| 959227|0.44166666666666665|
|  969479| 819606|                0.2|
|  969479| 929291|0.10833333333333334|
|  969479| 798826|              0.075|
|  969479| 890444|               0.05|
+--------+-------+-------------------+
only showing top 10 rows



In [41]:
def getArtistTrackNext(artistId, artistTrackNorm):
    
    artistTrackNext = artistTrackNorm.select(f.col('trackId').alias('Id'), f.col('norm_count').alias('Next4')) \
        .where(f.col('artistId') == artistId) \
        .withColumn('type', f.lit('track')) \
        .select('type', 'Id', 'Next4')
        
    return artistTrackNext

In [42]:
check = getArtistTrackNext(968694, artistTrackNorm)
check.show(10)

+-----+------+-----+
| type|    Id|Next4|
+-----+------+-----+
|track|827354| 0.25|
|track|820606| 0.25|
|track|897139| 0.25|
|track|925696| 0.25|
+-----+------+-----+



# Set parameters

In [43]:
u = user3

In [44]:
u.show(10)

+----+------+-----+
|type|    Id|Score|
+----+------+-----+
|user|776748|    1|
+----+------+-----+



In [45]:
alpha = 0.15

In [46]:
beta = [0.5, 0.5, 1, 1]

# Iterate

In [47]:
trackTrackNexts = {}
artistTrackNexts = {}

In [48]:
fromUser = userTrackNext.join(userArtistNext, ['type', 'Id'], 'Outer') \
    .fillna(0) \
    .select('type', 'Id', \
            (beta[0] * f.col('Next1') + beta[1] * f.col('Next2')).alias('Next')) \
    .cache()

In [49]:
fromUser.show(10)

+------+-------+--------------------+
|  type|     Id|                Next|
+------+-------+--------------------+
| track|1294161|0.018518518518518517|
|artist|1347764|0.018518518518518517|
| track|1160421|0.018518518518518517|
|artist|1331090|0.018518518518518517|
|artist|1337033|0.018518518518518517|
| track|1179685|0.018518518518518517|
| track|1178831|0.018518518518518517|
| track|1220928|0.018518518518518517|
| track|1225222|0.018518518518518517|
| track|1246253|0.018518518518518517|
+------+-------+--------------------+
only showing top 10 rows



In [50]:
x = initial_x

In [51]:
for i in range(1):
    
    print('Iteration ' + str(i+1) + ' ...')
    
    # for user to tracks and artists
    print('Calculate user to tracks and artists...')
    user_score = x.where(f.col('type') == 'user').collect()[0].Score
    new_x = fromUser.select('type', 'Id', (user_score * f.col('Next')).alias('Next'))
    
    
    # for tracks to tracks
    print('Calculate tracks to tracks...')
    trackIds = x.select('Id').where(f.col('type') == 'track').collect()
    
    for row in trackIds:
        
        trackId = row.Id
        track_score = x.where((f.col('type') == 'track') & (f.col('Id') == trackId)).collect()[0].Score
        if trackId in trackTrackNexts:
            trackTrackNext = trackTrackNexts[trackId]
        else:
            trackTrackNext = getTrackTrackNext(trackId, trackTrackNorm)
            trackTrackNexts[trackId] = trackTrackNext
        
        new_x = new_x.join(trackTrackNext, ['type', 'Id'], 'Outer') \
            .fillna(0) \
            .select('type', 'Id', \
                    (f.col('Next') + track_score * beta[2] * f.col('Next3')).alias('Next'))
        
        
    # for artists to tracks
    print('Calculate artists to tracks...')
    artistIds = x.select('Id').where((f.col('type') == 'artist') & (f.col('Score') > 0)).collect()
    
    for row in artistIds:
        
        artistId = row.Id
        artist_score = x.where((f.col('type') == 'artist') & (f.col('Id') == artistId)).collect()[0].Score
        if artistId in artistTrackNexts:
            artistTrackNext = artistTrackNexts[artistId]
        else:
            artistTrackNext = getArtistTrackNext(artistId, artistTrackNorm)
            artistTrackNexts[artistId] = artistTrackNext
        
        new_x = new_x.join(artistTrackNext, ['type', 'Id'], 'Outer') \
            .fillna(0) \
            .select('type', 'Id', \
                    (f.col('Next') + artist_score * beta[3] * f.col('Next4')).alias('Next'))
        
        
    # damp
    new_x = new_x.join(u, ['type', 'Id'], 'Outer') \
        .fillna(0) \
        .select('type', 'Id', \
                (alpha * f.col('Score') + (1 - alpha) * f.col('Next')).alias('Score')).cache()
    
    
    x = new_x
    x.show(5)

Iteration 1 ...
Calculate user to tracks and artists...
Calculate tracks to tracks...
Calculate artists to tracks...
+------+-------+-------------------+
|  type|     Id|              Score|
+------+-------+-------------------+
| track|1322142| 1.7157407407407408|
|artist|1360803|0.01574074074074074|
| track|1225222| 2.5657407407407407|
|artist|1348237|0.01574074074074074|
|artist|1331090|0.01574074074074074|
+------+-------+-------------------+
only showing top 5 rows



In [52]:
result = meta.join(x, ['type', 'Id'], 'Right') \
    .select('Name', 'Artist', 'Score') \
    .where(f.col('Id') != 776748) \
    .orderBy(f.col('Score').desc()) \
    .take(40)

In [53]:
for val in result:
    print "%s %s %s" % val

Come Out and Play Artist: The Offspring 2.66243386243
Prayer Of The Refugee Artist: Rise Against 2.56574074074
Eagle Artist: Gotthard 2.56574074074
21 Guns Artist: Green Day 1.9196957672
Here To Stay Artist: Korn 1.71574074074
I Hate Everything About You Artist: Three Days Grace 1.71574074074
Hard Rock Hallelujah Artist: Lordi 1.71574074074
Wait And Bleed Artist: Slipknot 1.57407407407
Beautiful disaster Artist: 311 1.57407407407
Kill The DJ Artist: Green Day 1.55900793651
Nothing Going On Artist: Clawfinger 1.38743386243
Numb Artist: Linkin Park 1.14907407407
Girls and Boys Artist: Blur 1.14907407407
In The End Artist: Linkin Park 1.14907407407
Take It Out On Me Artist: Thousand Foot Krutch 1.14907407407
Cocaine Artist: Nomy 1.00740740741
Getting Away With Murder Artist: Papa Roach 0.987169312169
The Vengeful One Artist: Disturbed 0.865740740741
Sunday Artist: Iggy Pop 0.865740740741
She Keeps Me Up Artist: Nickelback 0.865740740741
Sky is Over Artist: Serj Tankian 0.865740740741
Kryp