In [1]:
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.


## Data description (DataFrames in parquet format)

Location - `/data/sample264`

Fields: `trackId`, `userId`, `timestamp`, `artistId`

- `trackId` - `id` of the track
- `userId` - `id` of the user
- `artistId` - `id` of the artist
- `timestamp` - `timestamp` of the moment the user starts listening to a track
- `Location` - `/data/meta`

Fields: `type`, `Name`, `Artist`, `Id`

Type could be “track” or “artist”
Name is the title of the track if the type == “track” and the name of the musician or group if the type == “artist”.
Artist states for the creator of the track in case the type == “track” and for the name of the musician or group in case the type == “artist”.
Id - id of the item

In [2]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local [2]").getOrCreate()

In [3]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

## Normalization could be done by next function

In [12]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, sum, desc, asc

def norm(df, key1, key2, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(field).desc())
        
    topsDF = df.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= n) \
        .drop(col("row_number")) 
        
    tmpDF = topsDF.groupBy(col(key1)).agg(col(key1), sum(col(field)).alias("sum_" + field))
   
    normalizedDF = topsDF.join(tmpDF, key1, "inner") \
        .withColumn("norm_" + field, col(field) / col("sum_" + field)) \
        .cache()

    return normalizedDF

In [5]:
from pyspark.sql import Window
from pyspark.sql.functions import col, rank

userTrack = data.groupBy(col("userId"), col("trackId")).count()

userTrackNorm = (
    norm(userTrack, "userId", "trackId", "count", 1000)
    .withColumn("id", col("userId"))
    .withColumn("id2", col("trackId"))
    .withColumn("norm_count", col("norm_count") * 0.5)
    .select(col("id"), col("id2"), col("norm_count"))
)

window = Window.orderBy(col("norm_count"))
    
userTrackList = (
    userTrackNorm.withColumn("position", rank().over(window))
    .filter(col("position") < 50)
    .orderBy(col("id"), col("id2"))
    .select(col("id"), col("id2"))
    .take(40)
)

In [6]:
for val in userTrackList:
    print "%s %s" % val

415763 853951
436158 889948
586043 800288
586043 800317
586043 801522
586043 804741
586043 805880
586043 806233
586043 806439
586043 807873
586043 808328
586043 810571
586043 811212
586043 811848
586043 815635
586043 818116
586043 819591
586043 821062
586043 822375
586043 825775
586043 825997
586043 826725
586043 831955
586043 833018
586043 834780
586043 834887
586043 835312
586043 837744
586043 838944
586043 842614
586043 844606
586043 851992
586043 852304
586043 852734
586043 852863
586043 855577
586043 856643
586043 858618
586043 858992
586043 860220


## Graph based Music Recommender. Task 1

Build the edges of the type “track-track”. To do it you will need to count the collaborative similarity between all the tracks: if a user has listened To the tracks A and B together in THE limited time interval (equal to 7 minutes), then you should add 1 to the weight of the edge from vertex A to vertex B. For each track choose top 40 tracks similar to the initial one and normalize weights of its edges (divide the weight of each edge on a summary of weights of all edges).

Sort the resulting Data Frame in ascending order by the column norm_count, take top 40 rows, select only the columns “id1”, “id2”, sort them in descending order this time first by “id1”, then by “id2” and print the columns “id1”, “id2” of the resulting dataframe.Example:

```
54719	767867
54719	767866
50787	327676
```

---

_For all tasks use the same ipython notebook, each task should be the continuation of the previous._

In [7]:
TIME_DELTA = 60 * 7

In [48]:
data_1 = data.alias('d1')
data_2 = data.alias('d2')

cond = (
    (col('d1.userId') == col('d2.userId'))
    & (col('d1.timestamp') < col('d2.timestamp'))
    & (col('d1.timestamp') + TIME_DELTA > col('d2.timestamp'))
    & (col('d1.trackId') != col('d2.trackId'))
)

djoin = (
    data_1.join(data_2, cond,'left_outer')
        .select(col('d1.trackId').alias('id1'), col('d2.trackId').alias('id2'))
    .dropna()
    .groupBy(col('id1'), col('id2'))
    .count()
)

track_track = norm(djoin, 'id1', 'id2', 'count', 40)

result = (
    track_track
    .orderBy(desc('norm_count'))
    .limit(40)
    .select(col('id1'), col('id2'))
    .orderBy(asc('id1'), asc('id2'))
)

for row in result.collect():
    print '{r.id1} {r.id2}'.format(r=row)


808110 894437
809289 847119
814446 870227
819569 800325
827209 942995
829292 871752
830062 849304
832475 925631
832553 836728
836522 907798
841759 898484
844651 897648
844819 834559
852427 825116
856311 875086
857303 943835
875876 916850
878289 814956
879172 898823
879366 814475
882856 841509
889636 799651
890604 904285
890920 838812
895618 944759
903281 810518
904487 810488
907516 845402
923176 831580
926952 818440
932765 860022
933119 883990
935205 829417
940165 823397
941115 949312
945602 955854
946119 894774
947056 843884
956167 810599
962224 919321
