# Graph based Music Recommender

This general task consists of 6 subtasks (4 are mandatory and 2 are honor).

The playbook contains solutions for all tasks (as required by course).

## Initialize the context and read data

### Data description

There are two data sources for this task. They are DataFrames in parquet format:
* user’s playing history (`data/sample264`)
* meta data for track or artist (`/data/meta`)

#### User's playing history

* `trackId` - id of the track
* `userId` - id of the user
* `artistId` - id of the artist
* `timestamp` - timestamp of the moment the user starts listening to a track

#### Meta data for track or artist

* `type` - record type. Could be "track" or "artist"
* `Name` - title of the track, if the type == "track" and the name of the musician or group, if the type == "artist"
* `Artist` - states for the creator of the track in case the type == "track" 
and for the name of the musician or group in case the type == "artist"
* `id` - id of the item

In [1]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.enableHiveSupport().master("yarn").getOrCreate()

In [2]:
data = spark_session.read.parquet("/data/sample264").cache()
meta = spark_session.read.parquet("/data/meta")

## Define util functions

In [61]:
from pyspark.sql import Window
from pyspark.sql.functions import col, count, row_number, sum as sql_sum


def top_n(df, for_col, by_col, n):
    """
    Calculates top-N rows for each value in 
    {for_col} column by sum of values from {by_col} column.
    
    :param df: dataframe to filter
    :param for_col: string name of column to filter
    :param by_col: string name of column to filter by
    :param n: a number which indicates how much top rows
              select for each value in {for_col}
    :return: filtered dataframe
    """
    window = Window.partitionBy(for_col).orderBy(col(by_col).desc())
    top_n_df = df.withColumn("row_number", row_number().over(window)) \
    .filter(col("row_number") <= n) \
    .drop(col("row_number"))
    
    return top_n_df


def normalize_weight(df, aggr_col, weight_col="weight"):
    """
    Normalizes values in specified column with formula: 
      `norm_weight = weight / total_weight`
     where:
       * weight - value in {weight_col} column
       * total_weight - sum of values in {weight_col}
                        grouped by values in {aggr_col}
    
    Adds column `norm_{weight_col}` to the dataframe with 
    normalized weights.
    
    :param df: dataframe to modify
    :param aggr_col: column name to aggregate weights by
    :param weight_col: column name with weights to normalize
    :return: a new dataframe with column for normalized weights
    """
    sums_df = df.groupBy(aggr_col).agg(sql_sum(col(weight_col)).alias("total_weight"))
    normalized_df = df.join(sums_df, aggr_col) \
        .withColumn("norm_" + weight_col, col(weight_col) / col("total_weight")) \
        .drop(col("total_weight"))
    
    return normalized_df

## Task 1. Build the edges of the type `track-track`

Build the edges of the type `track-track`. To do it you will need to count the collaborative similarity between all the tracks: if a user has started listening to track `B` within 7 minutes after starting track `A`, then you should add `1` to the weight of the edge from vertex `A` to vertex `B` (initial weight is equal to 0).

**Example:**

```
userId artistId trackId timestamp
7        12        1          1534574189
7        13        4          1534574289 
5        12        1          1534574389 
5        13        4          1534594189 
6        12        1          1534574489 
6        13        4          1534574689 
```

The track `1` is similar to the track `4` with the weight 2 (before normalization): the user `7` and the user `6` listened these 2 tracks together in the `7 minutes` long window:

```
userId 7: 1534574289  - 1534574189 = 100 seconds = 1 min 40 seconds < 7 minutes
userId 6: 1534574689 - 1534574489 = 200 seconds = 3 min 20 seconds < 7 minutes
```

Note that the track `4` is similar to the track `1` with the same weight 2.

**Tip:** consider joining the graph to itself with the UserId and remove pairs with the same tracks.For each track choose top 50 tracks ordered by weight similar to it and normalize weights of its edges (divide the weight of each edge on a sum of weights of all edges). Use rank() to choose top 40 tracks as is done in the demo.

Sort the resulting Data Frame in the descending order by the column norm_weight, and then in the ascending order this time first by `id1`, then by `id2`. Take top 40 rows, select only the columns `id1`, `id2`, and print the columns `id1`, `id2` of the resulting dataframe.

**Output example:**

```
54719		767867
54719		767866
50787		32767
```

In [62]:
from pyspark.sql import Window
from pyspark.sql.functions import abs as sql_abs, col


# Time interval (in seconds) in which two played tracks
# considers as similar. Is equals to 7 minutes.
TRACKS_SIMILARITY_TIME_WINDOW = 420

# Make track-track edges including filtering by time window
track_to_track_df = data.alias("d1") \
    .join(data.alias("d2"),
          (col("d1.userId") == col("d2.userId")) & \
          (col("d1.trackId") != col("d2.trackId")) & \
          (sql_abs(col("d1.timestamp") - col("d2.timestamp")) <= TRACKS_SIMILARITY_TIME_WINDOW)) \
    .groupBy(col("d1.trackId").alias("track1"), col("d2.trackId").alias("track2")) \
    .count() \
    .select(col("track1"), col("track2"), col("count").alias("weight")) \
    .cache()

# Get top 40 of track2 for each track1.
tops_track_to_track_df = top_n(track_to_track_df, "track1", "weight", 40) \
    .orderBy(col("weight").desc(), col("track1"), col("track2"))

# Normalize track-track edges weight.
track_to_track_normalized_df = normalize_weight(tops_track_to_track_df, "track1") \
    .orderBy(col("norm_weight").desc(), col("track1"), col("track2")).cache()

In [5]:
# task1_result = track_to_track_normalized_df.select(col("track1"), col("track2")).take(40)
# for val in task1_result:
#     print("%s %s" % val)

## Task 2. Build the edges of the type `user-track`

Build the edges of the type `user-track`. Take the amount of times the track was listened by the user as the weight of the edge from the user’s vertex to the track’s vertex.

**Tip:** group the dataframe by columns `userId` and `trackId` and use function `count` of DF API.

For each user take top-1000 and normalize them.

Sort the resulting Data Frame in descending order by the column norm_weight, and then in ascending order this time first by `id1`, then by `id2`. Take top 40 rows, select only the columns `id1`, `id2`, and print the columns `id1`, `id2` of the resulting dataframe.

The part of the result on the sample dataset:

```
...
195 946408
215 860111
235 897176
300 857973
321 915545
...
```

In [6]:
from pyspark.sql.functions import col


# Make user-track edges. The weight is how many times user listens to a track
user_to_track_df = data.groupBy(col("userId"), col("trackId")) \
    .count() \
    .select(col("userId"), col("trackId"), col("count").alias("weight")) \
    .cache()

# Select top 1000 tracks for each user
tops_user_to_track_df = top_n(user_to_track_df, "userId", "weight", 1000) \
    .orderBy(col("weight").desc(), col("userId"), col("trackId")) \

# Normalize user-track edges weight
user_to_track_normalized_df = normalize_weight(tops_user_to_track_df, "userId", "weight") \
    .orderBy(col("norm_weight").desc(), col("userId"), col("trackId")).cache()

In [7]:
# task2_result = user_to_track_normalized_df.select(col("userId"), col("trackId")).take(40)
# for val in task2_result:
#     print("%s %s" % val)

## Task 3. Build the edges of the type `user-artist`

Build the edges of the type `user-artist`. Take the amount of times the user has listened to the artist’s tracks as the weight of the edge from the user’s vertex to the artist’s vertex. 

**Tip:** group the dataframe by the columns userId and trackId and use the function `count` of DF API. For each user take top-100 artists and normalize weights.

Sort the resulting Data Frame in descending order by the column norm_weight, and then in ascending order this time first by `id1`, then by `id2`. Take top 40 rows, select only the columns `id1`, `id2`, and print the columns `id1`, `id2` of the resulting dataframe.

The part of the result on the sample dataset:

```
...
131 983068
195 997265
215 991696
235 990642
288 1000564
...
```

In [8]:
from pyspark.sql.functions import col


# Make user-artist edges. The weight is how many times user listens to an artist
user_to_artist_df = data.groupBy(col("userId"), col("artistId")) \
    .count() \
    .select(col("userId"), col("artistId"), col("count").alias("weight")) \
    .cache()

# Select top 100 actors for each user
tops_user_to_artist_df = top_n(user_to_artist_df, "userId", "weight", 100) \
    .orderBy(col("weight").desc(), col("userId"), col("artistId")) \

# Normalize user-artist edges weight
user_to_artist_normalized_df = normalize_weight(tops_user_to_artist_df, "userId", "weight") \
    .orderBy(col("norm_weight").desc(), col("userId"), col("artistId")).cache()

In [9]:
# task3_result = user_to_artist_normalized_df.select(col("userId"), col("artistId")).take(40)
# for val in task3_result:
#     print("%s %s" % val)

## Task 4. Build the edges of the type `artist-track`

Build the edges of the type `artist-track`. Take the amount of times the track **HAS BEEN** listened by all users as the weight of the edge from the artist’s vertex to the track’s vertex. 

**Tip:** group the dataframe by the columns `artistId` and `trackId` and use the function `count` of DF API. For each artist take top-100 tracks and normalize weights.

Sort the resulting Data Frame in descending order by the column `norm_weight`, and then in ascending order this time first by `id1`, then by `id2`. Take top 40 rows, select only the columns `id1`, `id2`, and print the columns `id1`, “id2” of the resulting dataframe.

The part of the result on the sample dataset:

```
...
968017 859321
968022 852786
968034 807671
968038 964150
968042 835935
...
```

In [10]:
from pyspark.sql.functions import col


# Make artist-track edges. The weight is how many times
# an artist's track has been listened by a user
artist_to_track_df = data.groupBy(col("artistId"), col("trackId")) \
    .count() \
    .select(col("artistId"), col("trackId"), col("count").alias("weight")) \
    .cache()

# Select top 100 tracks for each artist
tops_artist_to_track_df = top_n(artist_to_track_df, "artistId", "weight", 100) \
    .orderBy(col("weight").desc(), col("artistId"), col("trackId"))

# Normalize user-artist edges weight
artist_to_track_normalized_df = normalize_weight(tops_artist_to_track_df, "artistId", "weight") \
    .orderBy(col("norm_weight").desc(), col("artistId"), col("trackId")).cache()

In [11]:
# task4_result = artist_to_track_normalized_df.select(col("artistId"), col("trackId")).take(40)
# for val in task4_result:
#     print("%s %s" % val)

## \[Honor] Task 5. Find tracks and artists for a user

For the user with `userId=776748` find all the tracks and artists connected to him. Use original dataframe not a normalized one. Sort founded items first by artist then by name in ascending order, leave only columns ”Artist” and “Name” and print top-40.

Each output line can take one of the following forms:

```
Artist: <artist-name> <track-name>
Artist: <artist-name> Artist: <artist-name>
```

These two forms help distinguish `user-track` suggestions (as shown in 1) from `user-artist` suggestions (as shown in 2).

The part of the result on the sample dataset:

```
...
Artist: Blur Artist: Blur
Artist: Blur Girls and Boys
Artist: Clawfinger Artist: Clawfinger
Artist: Clawfinger Nothing Going On
Artist: Disturbed Artist: Disturbed
...
```

In [13]:
from pyspark.sql.functions import col


TARGET_USER_ID = "776748"

user_music_history_df = data.filter(col("userId") == TARGET_USER_ID).cache()
user_tracks_df = user_music_history_df.select(col("trackId").alias("id")).distinct()
user_artist_df = user_music_history_df.select(col("artistId").alias("id")).distinct()

user_artists_tracks_df = user_tracks_df.union(user_artist_df) \
    .join(meta, "id") \
    .orderBy(col("Artist"), col("Name")).cache()

In [14]:
# task5_result = user_artists_tracks_df.select(col("Artist"), col("Name")).take(40)
# for artist, name in task5_result:
#     print(artist, name, sep="\t")

## \[Honor] Task 6. Build music recomnedations

For the user with `userId=776748` print top-40 recommended tracks. Build music recommendations with the algorithm described in the lesson 3 of the fifth week. Initialize coordinates of vector `x_0` corresponding to the user’s vertex and all the vertices from the task 5 with ones and all other coordinates with zeros. 

Do 5 iterations.

Take `alpha = 0.15` and the next balancing functions:
  * beta(user, user → artist) =  0.5
  * beta(user, user → track) =  0.5
  * beta(track, track → track) = 1
  * beta(artist, artist → track) = 1

You should receive a table with 3 columns: `name`, `artist` and `rank`. Sort the resulting dataframe in descending order by `rank`, select top 40 recommended tracks, select only the columns `name`, `artist` and `rank`, leave 5 digits after the decimal point in `rank` and print the resulting dataframe.

The part of the result on the sample dataset:

```
...
Prayer Of The Refugee Artist: Rise Against 1.35278102029
Eagle Artist: Gotthard 1.21412311013
21 Guns Artist: Green Day 1.17301653219
Wait And Bleed Artist: Slipknot 0.921552328559
Beautiful disaster Artist: 311 0.921552328559
...
```

In [15]:
TARGET_USER_ID = "776748"

# Beta-functions
BETA_USER_ARTIST = 0.5
BETA_USER_TRACK = 0.5
BETA_TRACK_TRACK = 1.0
BETA_ARTIST_TRACK = 1.0

# Constants
ALPHA = 0.15

ITERATIONS = 5

In [39]:
from pyspark.sql.functions import col, when


# Build vertices dataframe with columns:
#   * prob - 1.0 if item belongs to user, 0.0 otherwise
#   * id - id of an item
user_weighted_music_history_df = data \
    .withColumn("vprob", when(col("userId") == TARGET_USER_ID, 1.0).otherwise(0.0)).cache()

user_init_vertices = user_weighted_music_history_df \
    .select(col("userId").alias("id"), col("vprob")) \
    .distinct()

vertices_df = user_init_vertices \
    .union(user_weighted_music_history_df.select(col("trackId").alias("id"), col("vprob"))) \
    .union(user_weighted_music_history_df.select(col("artistId").alias("id"), col("vprob"))) \
    .distinct() \
    .cache()

# Build the initial vertex dataframe
u_df = vertices_df.withColumn("user_prob", when(col("id") == TARGET_USER_ID, 1.0).otherwise(0.0)) \
    .select(col("id"), col("user_prob").alias("uprob")) \
    .cache()

In [63]:
from pyspark.sql.functions import col


# Build edges dataframe with columns:
#   * src - source vertex id
#   * dst - target vertex id
#   * prob - probability to go from src to dst with applied beta-function
balanced_track_to_track_df = track_to_track_normalized_df \
    .withColumn("eprob", col("norm_weight") * BETA_TRACK_TRACK) \
    .select(col("track1").alias("src"), col("track2").alias("dst"), col("eprob")).cache()

balanced_user_to_track_df = user_to_track_normalized_df \
    .withColumn("eprob", col("norm_weight") * BETA_USER_TRACK) \
    .select(col("userId").alias("src"), col("trackId").alias("dst"), col("eprob")).cache()

balanced_user_to_artist_df = user_to_artist_normalized_df \
    .withColumn("eprob", col("norm_weight") * BETA_USER_ARTIST) \
    .select(col("userId").alias("src"), col("artistId").alias("dst"), col("eprob")).cache()

balanced_artist_to_track_df = artist_to_track_normalized_df \
    .withColumn("eprob", col("norm_weight") * BETA_ARTIST_TRACK) \
    .select(col("artistId").alias("src"), col("trackId").alias("dst"), col("eprob")).cache()

balanced_edges_df = balanced_track_to_track_df \
    .union(balanced_user_to_track_df) \
    .union(balanced_user_to_artist_df) \
    .union(balanced_artist_to_track_df) \
    .cache()

In [65]:
from pyspark.sql.functions import col, sum as sql_sum


# Calculate recommendations

for _ in range(ITERATIONS):
    # Calculate probability to be in vertex v
    next_v_probs = vertices_df \
        .join(balanced_edges_df, col("id") == col("src"), "left") \
        .fillna({"eprob": 0.0}) \
        .withColumn("new_prob", col("vprob") * col("eprob")) \
        .groupBy("dst").agg(sql_sum(col("new_prob")).alias("sigma"))

    # Update verticies probabilities
    vertices_df = u_df \
        .join(next_v_probs, col("id") == col("dst"), "left") \
        .fillna({"sigma": 0.0}) \
        .withColumn("next_value", ALPHA * col("uprob") + (1 - ALPHA) * col("sigma")) \
        .select(col("id"), col("next_value").alias("vprob")) \
        .cache()

In [66]:
from pyspark.sql.functions import col, round as sql_round

# Retrieve result
recommendations_df = vertices_df.filter(col("id") != TARGET_USER_ID) \
    .join(meta, "id") \
    .orderBy(col("vprob").desc()) \
    .select(col("Name"), col("Artist"), sql_round(col("vprob"), 5).alias("Probability"))

In [67]:
task6_result = recommendations_df.take(40)
for name, artist, prob in task6_result:
    print(name, artist, prob, sep=" ")

Kill The DJ Artist: Green Day 1.42809
Come Out and Play Artist: The Offspring 1.37473
I Hate Everything About You Artist: Three Days Grace 1.37362
Prayer Of The Refugee Artist: Rise Against 1.35278
Eagle Artist: Gotthard 1.21412
21 Guns Artist: Green Day 1.17302
Beautiful disaster Artist: 311 0.92155
Wait And Bleed Artist: Slipknot 0.92155
Here To Stay Artist: Korn 0.91653
Hard Rock Hallelujah Artist: Lordi 0.91653
Nothing Going On Artist: Clawfinger 0.80983
Numb Artist: Linkin Park 0.80292
In The End Artist: Linkin Park 0.80292
Kryptonite Artist: 3 Doors Down 0.68799
Sky is Over Artist: Serj Tankian 0.68799
Take It Out On Me Artist: Thousand Foot Krutch 0.47024
Girls and Boys Artist: Blur 0.40245
Cocaine Artist: Nomy 0.20893
Getting Away With Murder Artist: Papa Roach 0.20648
Artist: Green Day Artist: Green Day 0.01181
Artist: Linkin Park Artist: Linkin Park 0.00472
Artist: The Offspring Artist: The Offspring 0.00472
Artist: Clawfinger Artist: Clawfinger 0.00472
She Keeps Me Up Artist

In [None]:
spark_session.stop()