# Graph based Music Recommender. Task 6
For the user with Id 776748 print top-40 recommended tracks. Build music recommendations with the algorithm described in the lesson 3 of the fifth week. Initialize coordinates of vector x_0 corresponding to the user’s vertex and all the vertices from the task 5 with ones and all other coordinates with zeros. Do 5 iterations:

<pre>img</pre>

Take alpha = 0.15. and the next balancing functions:

* beta(user, user → artist) = 0.5
* beta(user, user → track) = 0.5
* beta(track, track → track) = 1
* beta(artist, artist → track) = 1

You should receive a table with 3 columns: “name”, “artist” and “rank”. Sort the resulting dataframe in descending order by “rank”, select top 40 recommended tracks, select only the columns “name”, “artist” and “rank”, leave 5 digits after the decimal point in “rank” and print the resulting dataframe.

<code>
...
Prayer Of The Refugee Artist: Rise Against 1.35278102029
Eagle Artist: Gotthard 1.21412311013
21 Guns Artist: Green Day 1.17301653219
Wait And Bleed Artist: Slipknot 0.921552328559
Beautiful disaster Artist: 311 0.921552328559
...
</code>

## Data description
There are two data sources for this assignment. They are DataFrames in parquet format.

The first dataset captures the user’s playing history.

Location - /data/sample264

Fields: trackId, userId, timestamp, artistId

* trackId - id of the track
* userId - id of the user
* artistId - id of the artist
* timestamp - timestamp of the moment the user starts listening to a track

The second is the meta data for track or artist.

Location - /data/meta

Fields: type, Name, Artist, Id

* Type could be “track” or “artist”
* Name is the title of the track, if the type == “track” and the name of the musician or group, if the type == “artist”.
* Artist states for the creator of the track in case the type == “track” and for the name of the musician or group in case the type == “artist”.
* Id - id of the item

In [1]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()

In [2]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

In [3]:
from pyspark.sql import Window
import pyspark.sql.functions as f

In [4]:
def normalize(df, key1, key2, field, n):    
    window = Window.partitionBy(key1).orderBy(f.col(field).desc())
    topsDF = df.withColumn("row_number", f.row_number().over(window)) \
        .filter(f.col("row_number") <= n) \
        .drop(f.col("row_number"))
    tmpDF = topsDF.groupBy(f.col(key1)).agg(f.col(key1), f.sum(f.col(field)).alias("sum_" + field))
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, f.col(field) / f.col("sum_" + field))
    return normalizedDF

In [5]:
# users, tracks and artists have different sets of ids => edge type is redundant

def track_track():
    A = data.select('userId', f.col('trackId').alias('track1'), f.col('timestamp').alias('ts1'))
    B = data.select('userId',f.col('trackId').alias('track2'), f.col('timestamp').alias('ts2'))
    AB = A.join(B, on=['userId']) \
        .filter((A['track1'] != B['track2']) & (f.abs(A['ts1']-B['ts2']) <= 7*60)) \
        .groupBy('track1', 'track2') \
        .agg(f.count("*").alias('count')) \
        .cache()
    df = normalize(AB, 'track1', 'track2', 'count', 1000) \
        .select(
            f.col('track1').alias('from'), 
            f.col('track2').alias('to'), 
            f.col('norm_count').alias('prob')) \
        .cache()
    return df

def user_track():
    X = data.select('userId', 'trackId').groupBy('userId', 'trackId').agg(f.count("*").alias('count')).cache()
    df = normalize(X, 'userId', 'trackId', 'count', 1000) \
        .select(
            f.col('userId').alias('from'), 
            f.col('trackId').alias('to'), 
            (f.col('norm_count') * 0.5).alias('prob') # beta user->track
        ) \
        .cache()
    return df

def user_artist():
    X = data.select('userId', 'artistId').groupBy('userId', 'artistId').agg(f.count("*").alias('count')).cache()
    df = normalize(X, 'userId', 'artistId', 'count', 1000) \
        .select(
            f.col('userId').alias('from'), 
            f.col('artistId').alias('to'), 
            (f.col('norm_count') * 0.5).alias('prob') # beta user->artist
        ) \
        .cache()
    return df

def artist_track():
    X = data.select('artistId', 'trackId').groupBy('artistId', 'trackId').agg(f.count("*").alias('count')).cache()
    df = normalize(X, 'artistId', 'trackId', 'count', 1000) \
        .select(
            f.col('artistId').alias('from'), 
            f.col('trackId').alias('to'), 
            f.col('norm_count').alias('prob')) \
        .cache()
    return df

In [6]:
# edges
E = track_track() \
    .union(user_track()) \
    .union(user_artist()) \
    .union(artist_track()) \
    .cache()

In [7]:
userId = 776748

def users():
    users = data.select('userId').distinct() \
        .select(
            f.col('userId').alias('id'), 
            f.when(f.col('userId') == userId, 1.0).otherwise(0.0).alias('rank'))
    return users

def tracks():
    user_tracks = data.filter(f.col('userId') == userId).select('trackId', f.lit(1).alias('aux')).distinct()
    tracks = data.select('trackId').distinct() \
        .join(user_tracks, on=['trackId'], how='left') \
        .select(
            f.col('trackId').alias('id'), 
            f.when(f.col('aux').isNotNull(), 1.0).otherwise(0.0).alias('rank'))
    return tracks

def artists():
    user_artsts = data.filter(f.col('userId') == userId).select('artistId', f.lit(1).alias('aux')).distinct()
    artists = data.select('artistId').distinct() \
        .join(user_artsts, on=['artistId'], how='left') \
        .select(
            f.col('artistId').alias('id'), 
            f.when(f.col('aux').isNotNull(), 1.0).otherwise(0.0).alias('rank'))
    return artists

In [8]:
# vertices
X = users() \
    .union(tracks()) \
    .union(artists()) \
    .cache()

In [None]:
# [0,0...0,1,0...0,0]
U = X.select('id', f.when(f.col('id') == userId, 1.0).otherwise(0.0).alias('u')).cache()

for _ in range(5):
    # summ of incomes from outgoing vertices (sigma)
    S = X.join(E, X['id']==E['from'], how='left') \
        .na.fill(0.0, ['prob']) \
        .select('to', (f.col('prob') * f.col('rank')).alias('val')) \
        .groupBy(f.col('to').alias('id')).agg(f.sum('val').alias('sum'))

    # x1 = next(x0)
    X = U.join(S, on=['id'], how='left') \
        .na.fill(0.0, ['sum']) \
        .select('id', ((0.15 * f.col('u')) + (0.85 * f.col('sum'))).alias('rank')) \
        .cache()

In [None]:
# top 40 tracks = top 40 track and artists
recommendations = X.join(meta, on=['id'], how='inner') \
    .select('Name', 'Artist', f.round(f.col('rank'), 5).alias('rank')) \
    .orderBy(f.desc('rank'))

for name, artist, rank in recommendations.take(40):
    print("{} {} {}".format(name, artist, rank))

Kill The DJ Artist: Green Day 1.42809
Come Out and Play Artist: The Offspring 1.37473
I Hate Everything About You Artist: Three Days Grace 1.37362
Prayer Of The Refugee Artist: Rise Against 1.35278
Eagle Artist: Gotthard 1.21412
21 Guns Artist: Green Day 1.17302
Beautiful disaster Artist: 311 0.92155
Wait And Bleed Artist: Slipknot 0.92155
Here To Stay Artist: Korn 0.91653
Hard Rock Hallelujah Artist: Lordi 0.91653
Nothing Going On Artist: Clawfinger 0.80983
Numb Artist: Linkin Park 0.80292
In The End Artist: Linkin Park 0.80292
Kryptonite Artist: 3 Doors Down 0.68799
Sky is Over Artist: Serj Tankian 0.68799
Take It Out On Me Artist: Thousand Foot Krutch 0.47024
Girls and Boys Artist: Blur 0.40245
Cocaine Artist: Nomy 0.20893
Getting Away With Murder Artist: Papa Roach 0.20648
Artist: Green Day Artist: Green Day 0.01181
Artist: Linkin Park Artist: Linkin Park 0.00472
Artist: The Offspring Artist: The Offspring 0.00472
Artist: Clawfinger Artist: Clawfinger 0.00472
She Keeps Me Up Artist