In [1]:
import os
#execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

In [2]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()

In [3]:
playHistoryPath = '/data/sample264'
playHistoryGraph = sparkSession.read.parquet(playHistoryPath)

root
 |-- userId: integer (nullable = true)
 |-- trackId: integer (nullable = true)
 |-- artistId: integer (nullable = true)
 |-- timestamp: long (nullable = true)



In [4]:
metaDataPath = '/data/meta'
metaDataGraph = sparkSession.read.parquet(metaDataPath)

root
 |-- type: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- Id: integer (nullable = true)



In [5]:
playHistoryGraph.createTempView("history1")
playHistoryGraph.createTempView("history2")

In [6]:
user = 776748

alpha = 0.15
beta_user_artist = 0.5
beta_user_track = 0.5
beta_track_track = 1
beta_artist_track = 1

# Task 1

In [7]:
from pyspark.sql.functions import count, col

consecutiveTracksForUser = sparkSession.sql( \
    "select h1.trackId as track1, h2.trackId as track2, h1.userId as user " \
    "from history1 h1, history2 h2 " \
    "where h1.userId = h2.userId " \
    "and h1.trackId != h2.trackId " \
    "and abs(h2.timestamp - h1.timestamp) <= 420 " \
).groupBy(col("track1"), col("track2")) \
.count().alias("count") \
.orderBy(col("track1"), col("track2")) \
.cache()

In [8]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number, sum

window = Window.partitionBy("track1").orderBy(col("count").desc())
       
topsDF = consecutiveTracksForUser.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= 40) \
        .drop(col("row_number")) \
        .orderBy(col("track1"), col("track2")) \
        .cache()

In [9]:

sumsDF = topsDF.groupBy(col("track1")) \
    .agg(sum(col("count")).alias("sum_weights")) \
    .orderBy("track1")

In [10]:
normalized_count = topsDF.join(sumsDF, "track1", "inner") \
    .withColumn("weight", col("count") / col("sum_weights"))


In [11]:
track_track = normalized_count.withColumn("next_value", col("weight") * beta_track_track) \
    .select(
        col("track1").alias("source"), \
        col("track2").alias("target"), \
        col("next_value") \
    ) \
    .cache()
    

# Task 2

In [12]:
from pyspark.sql.functions import count, col

tracksPerUser = sparkSession.sql( \
    "select userId as user, trackId as track " \
    "from history1 "
).groupBy(col("user"), col("track")) \
.count().alias("count") \
.orderBy(col("count").desc(), col("user"), col("track"))

In [13]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number, sum

window = Window.partitionBy("user").orderBy(col("count").desc())
       
topsTracksPerUser = tracksPerUser.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= 1000) \
        .drop(col("row_number")) \
        .cache()

In [14]:
sumsTopsTracksPerUser = topsTracksPerUser.groupBy(col("user")) \
    .agg(sum(col("count")).alias("sum_weights")) \
    .orderBy("user")

In [15]:
normalized_topsTracksPerUser = topsTracksPerUser.join(sumsTopsTracksPerUser, "user", "inner") \
    .withColumn("norm_weight", col("count") / col("sum_weights")) \
    .cache()

In [16]:
user_track = normalized_topsTracksPerUser \
    .withColumn("next_value", col("norm_weight") * beta_user_track) \
    .select(col("user").alias("source"), \
            col("track").alias("target"), \
            col("next_value")) \
    .cache()

# Task 3

In [17]:
from pyspark.sql.functions import count, col

artistsPerUser = sparkSession.sql( \
    "select userId as user, artistId as artist " \
    "from history1 "
).groupBy(col("user"), col("artist")) \
.count().alias("count") \
.orderBy(col("count").desc(), col("user"), col("artist"))

In [18]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number, sum

window = Window.partitionBy("user").orderBy(col("count").desc())
       
topsArtistsPerUser = artistsPerUser.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= 100) \
        .drop(col("row_number")) \
        .cache()

In [19]:
sumsTopsArtistsPerUser = topsArtistsPerUser.groupBy(col("user")) \
    .agg(sum(col("count")).alias("sum_weights")) \
    .orderBy("user")

In [20]:
normalized_topsArtistsPerUser = topsArtistsPerUser.join(sumsTopsArtistsPerUser, "user", "inner") \
    .withColumn("norm_weight", col("count") / col("sum_weights"))

In [21]:
user_artist = normalized_topsArtistsPerUser \
    .withColumn("next_value", col("norm_weight")*beta_user_artist) \
    .select(col("user").alias("source"), \
            col("artist").alias("target"), \
            col("next_value")) \
    .cache()

# Task 4

In [22]:
from pyspark.sql.functions import count, col

tracksPerArtist = sparkSession.sql( \
        "select artistId as artist, trackId as track " \
        "from history1 "
    ).groupBy(col("artist"), col("track")) \
    .count().alias("count") \
    .orderBy(col("count").desc(), col("artist"), col("track"))

In [23]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number, sum

window = Window.partitionBy("artist").orderBy(col("count").desc())
       
topsTracksPerArtist = tracksPerArtist.withColumn("row_number", row_number().over(window)) \
        .filter(col("row_number") <= 100) \
        .drop(col("row_number")) \
        .cache()

In [24]:
sumsTracksPerArtist = topsTracksPerArtist.groupBy(col("artist")) \
    .agg(sum(col("count")).alias("sum_weights")) \
    .orderBy("artist")

In [25]:
normalized_topsTracksPerArtist = topsTracksPerArtist.join(sumsTracksPerArtist, "artist", "inner") \
    .withColumn("norm_weight", col("count") / col("sum_weights")) \
    .cache()

In [26]:
artist_track = normalized_topsTracksPerArtist \
    .withColumn("next_value", col("norm_weight")*beta_artist_track) \
    .select(col("artist").alias("source"), \
            col("track").alias("target"), \
            col("next_value")) \
    .cache()

# Task 6

In [27]:
edges = track_track \
    .union(user_track) \
    .union(user_artist) \
    .union(artist_track) \
    .cache()

In [28]:
from pyspark.sql.functions import when, lit

user_data = playHistoryGraph.filter(col("userId") == user)

users = playHistoryGraph \
    .select(col("userId").alias("id")) \
    .distinct() \
    .withColumn("p", when(col("id") == user, 1.0).otherwise(0.0))

tracks = playHistoryGraph \
    .select(col("trackId").alias("id")) \
    .distinct() \
    .join(user_data.select(col("trackId").alias("id"), lit(1).alias("temp")).distinct(), on="id", how="left") \
    .withColumn("p", when(col("temp").isNotNull(), 1.0).otherwise(0.0)) \
    .select(col("id"), col("p"))

artists = playHistoryGraph \
    .select(col("artistId").alias("id")) \
    .distinct() \
    .join(user_data.select(col("artistId").alias("id"), lit(1).alias("temp")).distinct(), on="id", how="left") \
    .withColumn("p", when(col("temp").isNotNull(), 1.0).otherwise(0.0)) \
    .select(col("id"), col("p"))

x = users \
    .union(artists) \
    .union(tracks) \
    .cache()

In [29]:
u = x.withColumn("u_prob", when(col("id") == user, 1.0).otherwise(0.0)) \
    .select("id", "u_prob") \
    .cache()



In [None]:
from pyspark.sql.functions import expr

for _ in range(5):
    sigma = x.join(edges, on=expr("id = source"), how="left") \
        .na.fill(0.0, ["next_value"]) \
        .withColumn("acc", col("p") * col("next_value")) \
        .groupBy("target") \
        .agg(sum("acc").alias("sigma"))
        
    x = u.join(sigma, on=expr("id = target"), how="left") \
         .na.fill(0.0, ["sigma"]) \
         .withColumn("next_value", alpha*col("u_prob") + (1-alpha) * col("sigma")) \
         .select(col("id"), col("next_value").alias("p")) \
         .cache()

In [None]:
from pyspark.sql.functions import round

results = x.where("id != " + str(user)) \
    .join(metaDataGraph, on="id") \
    .orderBy(col("p").desc()) \
    .select(col("Name"), col("Artist"), round(col("p"), 5).alias("p")) \
    .cache()

In [None]:
for name, artist, p in results.limit(40).collect():
    print("{} {} {}".format(name, artist, p))

Kill The DJ Artist: Green Day 1.42809
Come Out and Play Artist: The Offspring 1.37473
I Hate Everything About You Artist: Three Days Grace 1.37362
Prayer Of The Refugee Artist: Rise Against 1.35278
Eagle Artist: Gotthard 1.21412
21 Guns Artist: Green Day 1.17302
Beautiful disaster Artist: 311 0.92155
Wait And Bleed Artist: Slipknot 0.92155
Here To Stay Artist: Korn 0.91653
Hard Rock Hallelujah Artist: Lordi 0.91653
Nothing Going On Artist: Clawfinger 0.80983
In The End Artist: Linkin Park 0.80292
Numb Artist: Linkin Park 0.80292
Sky is Over Artist: Serj Tankian 0.68799
Kryptonite Artist: 3 Doors Down 0.68799
Take It Out On Me Artist: Thousand Foot Krutch 0.47024
Girls and Boys Artist: Blur 0.40245
Cocaine Artist: Nomy 0.20893
Getting Away With Murder Artist: Papa Roach 0.20648
Artist: Green Day Artist: Green Day 0.01181
Artist: Clawfinger Artist: Clawfinger 0.00472
Artist: The Offspring Artist: The Offspring 0.00472
Artist: Linkin Park Artist: Linkin Park 0.00472
The Vengeful One Artis