In [1]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()


In [2]:
data = sparkSession.read.parquet("/data/sample264")
meta = sparkSession.read.parquet("/data/meta")

from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, rank, when, sum, abs, count, lit, round

beta_user_artist = 0.5
beta_user_track = 0.5
beta_track_track = 1
beta_artist_track = 1
alpha = 0.15

In [3]:
import datetime
track_pairs = data.alias("df1").join(data.alias("df2"), "userId", "inner")\
                               .filter("df1.trackId <> df2.trackId")\
                               .select(col("df1.trackId").alias("id1"), col("df2.trackId").alias("id2"),
                                       ((col("df2.timestamp") - col("df1.timestamp"))/60).alias("timeDif"))\

In [4]:
track_pairs_weights = track_pairs.select("id1", "id2",
                                         (when(abs(col("timeDif")) < 7, 1).otherwise(0)).alias("weight"))\
                                 .groupBy("id1", "id2")\
                                 .agg(sum("weight").alias("weights"))\
                                 .filter(col("weights")>0)

In [5]:
def norm(df, key1, field, n): 
    
    window = Window.partitionBy(key1).orderBy(col(key1), col(field).desc()) 
    
    topsDF = df.select("*", row_number().over(window).alias("row_number"))\
                                    .filter(col("row_number") < n)\
                                    .drop(col("row_number"))
            
    tmpDF = topsDF.groupBy(col(key1))\
                              .agg(sum(field).alias("total_" + field))
        
    normalizedDF = topsDF.join(tmpDF, key1, "inner")\
                         .withColumn("norm_" + field, col(field)/col("total_" + field))
                

    return normalizedDF

In [6]:
normilized_weights = norm(track_pairs_weights, 'id1', "weights", 51)

In [7]:
track_track = normilized_weights.orderBy(col("norm_weights").desc(), col("id1"), col("id2"))\
                            .withColumn("beta_weight", col("norm_weights") * beta_track_track)\
                            .select("id1", "id2", "beta_weight")
                           

In [8]:
userTrack = data.groupBy("userId", "trackId")\
                .agg(count('*').alias("weight"))

In [9]:
user_track = norm(userTrack, "userId", "weight", 1001).orderBy(col("norm_weight").desc(), 
                                                            col("userId"), 
                                                            col("trackId"))\
                                                   .withColumn("beta_weight", col("norm_weight") * beta_user_track)\
                                                   .select(col("userId").alias("id1"), col("trackId").alias("id2"),
                                                          "beta_weight")

In [10]:
userArtist = data.groupBy("userId", "artistId")\
                .agg(count('*').alias("weight"))

In [11]:
user_artist = norm(userArtist, "userId", "weight", 101).orderBy(col("norm_weight").desc(), 
                                                            col("userId"), 
                                                            col("artistId"))\
                                                   .withColumn("beta_weight", col("norm_weight") * beta_user_artist)\
                                                   .select(col("userId").alias("id1"), col("artistId").alias("id2"),
                                                          "beta_weight")
                                                   

In [12]:
artistTrack = data.groupBy("artistId", "trackId")\
                .agg(count('*').alias("weight"))

In [13]:
artist_track = norm(artistTrack, "artistId", "weight", 101).orderBy(col("norm_weight").desc(), 
                                                            col("artistId"), 
                                                            col("trackId"))\
                                                   .withColumn("beta_weight", col("norm_weight") * beta_artist_track)\
                                                   .select(col("artistId").alias("id1"), col("trackId").alias("id2"),
                                                          "beta_weight")
                                                   

In [14]:
userId = 776748
userTracks = data.filter(col("userId")==userId)\
                  .join(meta, col("trackId")==col("Id") ,"inner")\

userArtists = data.filter(col("userId")==userId)\
                  .join(meta, col("artistId")==col("Id") ,"inner")\

x_0_track_artist = userTracks.union(userArtists)\
                    .select("id")\
                    .orderBy("id")\
                    .distinct()\
                    .withColumn("p", lit(1))
          

In [15]:
edges = track_track.union(user_track).union(user_artist).union(artist_track)

In [None]:

user_id = data.select(col('userId').alias('id'))\
          .orderBy('userId')\
          .distinct()
        
track_id = data.select(col('trackId').alias("id"))\
              .orderBy('trackId')\
              .distinct()
        
artist_id = data.select(col('artistId').alias("id"))\
              .orderBy('artistId')\
              .distinct()
        
x_0 = user_id.union(track_id).union(artist_id).join(x_0_track_artist, "id", "left")\
            .select("id", when(col("p").isNull(), 0).otherwise(1).alias("p"))

u = x_0.select("id").withColumn("u_p", when(col("id")==userId, 1).otherwise(0))


In [None]:
for i in range(5):
    sigma = x_0.join(edges, col("id")==col("id1"), "left")\
                .na.fill(0, "beta_weight")\
                .withColumn("next_v", col("p")*col("beta_weight"))\
                .groupBy("id2")\
                .agg(sum("next_v").alias("sigma"))
    x_0 = u.join(sigma, col("id") == col("id2"), how="left")\
           .na.fill(0, ["sigma"])\
           .withColumn("beta_weights", alpha*col("u_p") + (1-alpha) * col("sigma"))\
            .select(col("id"), col("beta_weights").alias("p")).cache()


In [None]:
result = x_0.filter(col("id") != userId)\
            .join(meta, "id", "inner")\
            .select("Name", "Artist", round(col("p"), 5).alias("p"))\
            .orderBy(col("p").desc()).take(40)

In [None]:
for value in result:
    print('%s %s %s' % value)