In [0]:
%pyspark

from pyspark.sql import Window
from pyspark.sql.functions import avg, col, countDistinct, explode, lit, lower, max, regexp_replace, row_number, sqrt, trim, udf
from pyspark.sql.types import ArrayType, DoubleType, LongType, MapType, StringType, StructField, StructType

import numpy as np
import spacy

# Independent Variables

## Playlists

## Tracks

In [4]:
%pyspark

class Playlists:
  def __init__(self):
    self.extract()
    self.transform()

  def extract(self):
    self.playlists = (spark
        .read
        .option("multiline","true")
        .json("data/challenge_set.json"))
    
  def transform(self):
    playlists = (self.playlists
      .withColumn("playlist", explode("playlists"))
      .select(
          col("playlist.pid").alias("playlist_id"),
          regexp_replace(trim(lower(col("playlist.name"))), r"\s{2,}", " ").alias("playlist_name"),
          col("playlist.tracks").alias("tracks")))
    
    self.dataframe = playlists

playlists = Playlists()
playlists.dataframe.printSchema()

z.show(playlists.dataframe.limit(10))

In [5]:
%pyspark

class Tracks:
    def __init__(self, playlists):
        self.playlists = playlists
        self.transform()
        
    def transform(self):
        tracks = (self.playlists
            .withColumn("track", explode("tracks"))
            .select(
                col("playlist_id"),
                col("playlist_name"),
                col("track.artist_uri").alias("artist_uri"),
                col("track.artist_name").alias("artist_name"),
                col("track.track_uri").alias("track_uri"),
                col("track.track_name").alias("track_name")))
        
        self.dataframe = tracks
        
tracks = Tracks(playlists.dataframe)
tracks.dataframe.printSchema()

z.show(tracks.dataframe.limit(10))

# Data Preprocessing

## Playlists

## Tracks

In [9]:
%pyspark

class PlaylistPreprocessing:
    def __init__(self, playlists):
        self.playlists = playlists
        self.transform()
    
    def transform(self):
        playlists = (self.playlists
            .select(
                col("playlist_name"))
            .where(
                (col("playlist_name").isNotNull()) &
                (col("playlist_name") != ""))
            .distinct())

        self.dataframe = playlists

playlist_preprocessing = PlaylistPreprocessing(playlists.dataframe)
playlist_preprocessing.dataframe.printSchema()

z.show(playlist_preprocessing.dataframe.count())
z.show(playlist_preprocessing.dataframe.limit(3))

In [10]:
%pyspark

class TrackPreprocessing:
    def __init__(self, tracks):
        self.tracks = tracks
        self.transform()
    
    def transform(self):
        tracks = (self.tracks
            .where(
                (col("playlist_name").isNotNull()) &
                (col("playlist_name") != ""))
            .distinct())

        self.dataframe = tracks

track_preprocessing = TrackPreprocessing(tracks.dataframe)
track_preprocessing.dataframe.printSchema()

z.show(track_preprocessing.dataframe.count())
z.show(track_preprocessing.dataframe.limit(1))

# Dataset Splitting

## Playlists

## Tracks

In [14]:
%pyspark

class Dataset:
  def __init__(self, dataframe, fold=0, tracks=None):
    self.dataframe = dataframe
    self.seed = fold * 1000
    self.tracks = tracks
    
    self.split()

  def split(self):
    self.train, self.test, self.validation = self.dataframe.randomSplit([0.6, 0.2, 0.2], self.seed)
    
    if self.tracks:
        self.validation = (self.validation
            .join(self.tracks, ["playlist_name"]))

playlist_dataset = Dataset(playlist_preprocessing.dataframe, tracks=tracks.dataframe)

z.show(playlist_dataset.train.limit(1))
z.show(playlist_dataset.test.limit(1))
z.show(playlist_dataset.validation.limit(1))

In [15]:
%pyspark

track_dataset = Dataset(tracks.dataframe)

z.show(track_dataset.train.limit(2))
z.show(track_dataset.test.limit(2))
z.show(track_dataset.validation.limit(2))

# Baseline Model

## Train

## Test

In [19]:
%pyspark

class Random:
    def __init__(self, tracks, _=None):
        self.tracks = tracks
        self.transform()
        
    def transform(self):
        tracks = (self.tracks
            .select(
                col("track_uri").alias("recommendation_track_uri"))
            .distinct())
        
        tracks = (tracks
            .sample(fraction=500/tracks.count(), seed=0)
            .limit(500))
            
        self.dataframe = tracks

random = Random(track_dataset.train)

z.show(random.dataframe.limit(10))

In [20]:
%pyspark

class RandomRecommendations:
  def __init__(self, test, random, _=None, __=None):
    self.test = test
    self.random = random
    self.transform()

  def transform(self):
    random = (self.random
      .select(
        col("recommendation_track_uri")))

    recommendations = (self.test
      .join(random))

    self.dataframe = recommendations
    
random_recommendations = RandomRecommendations(track_dataset.test, random.dataframe)

z.show(random_recommendations.dataframe.limit(10))

## Evaluation

In [22]:
%pyspark

class Evaluation:
    def __init__(self, validation, recommendations, key="playlist_id", fold=None):
        self.validation = validation
        self.recommendations = recommendations
        self.key = key
        self.fold = fold
        
        self.evaluate()
        
    def evaluate(self):
        playlists = (self.validation
            .groupBy(self.key)
            .agg(
                countDistinct("track_uri").alias("tracks")))
        
        tracks = (self.recommendations
            .select(
                col("recommendation_track_uri")))

        tracks_hits = (self.validation
            .join(tracks, self.validation["track_uri"] == tracks["recommendation_track_uri"])
            .groupBy(self.key)
            .agg(
                countDistinct("track_uri").alias("tracks_hits")))
                
        evaluation = (playlists
            .join(tracks_hits, [self.key], "left")
            .withColumn("R-precision", col("tracks_hits") / col("tracks"))
            .agg(
                avg("R-precision").alias("R-precision")))
                
        if self.fold:
            evaluation = (evaluation
                .withColumn("fold", lit(self.fold)))
        
        self.dataframe = evaluation

In [23]:
%pyspark

random_evaluation = Evaluation(track_dataset.validation, random_recommendations.dataframe)

z.show(random_evaluation.dataframe.limit(10))

# Content-Based

## Embeddings

In [26]:
%pyspark

nlp = spacy.load("en_core_web_sm")

def embeddings(text):
    return nlp(text).vector.tolist()

def cosine_similarity(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

to_cosine_similarity = udf(cosine_similarity, DoubleType())
to_embeddings = udf(embeddings, ArrayType(DoubleType()))

In [27]:
%pyspark

class PlaylistEmbeddings:
    def __init__(self, playlists):
        self.playlists = playlists
        self.transform()
        
    def transform(self):
        playlists = (self.playlists
            .withColumn("playlist_embedding", to_embeddings(col("playlist_name"))))
            
        self.dataframe = playlists

```
sudo su
source /opt/conda/miniconda3/bin/activate

pip install spacy
python -m spacy download en_core_web_sm
```

In [29]:
%pyspark

embeddings = PlaylistEmbeddings(playlist_dataset.train)
embeddings.dataframe.printSchema()

z.show(embeddings.dataframe.count())
z.show(embeddings.dataframe.limit(5))

## Similarities

In [31]:
%pyspark

class PlaylistSimilarities:
    def __init__(self, playlists_embeddings):
        self.playlists = playlists_embeddings
        self.transform()
        
    def transform(self):
        playlists = (self.playlists.alias("playlists")
            .join(self.playlists.alias("recommendations"),
                col("playlists.playlist_name") < col("recommendations.playlist_name"))
            .withColumn("similarity",
                to_cosine_similarity(
                    col("playlists.playlist_embedding"),
                    col("recommendations.playlist_embedding")))
            .where(
                (col("similarity") > 0.5))
            .select(
                col("playlists.playlist_name"),
                col("recommendations.playlist_name").alias("recommendation_playlist_name"),
                col("similarity")))
                
        self.dataframe = playlists

playlist_similarities = PlaylistSimilarities(embeddings.dataframe)
playlist_similarities.dataframe.printSchema()

z.show(playlist_similarities.dataframe.count())
z.show(playlist_similarities.dataframe.limit(10))

In [32]:
%pyspark

class PlaylistSimilaritiesWithTracks:
    def __init__(self, playlist_similarities, tracks):
        self.similarities = playlist_similarities
        self.tracks = tracks
        self.transform()
        
    def transform(self):
        tracks = (self.tracks
            .select(
                col("playlist_name").alias("recommendation_playlist_name"),
                col("artist_uri").alias("recommendation_artist_uri"),
                col("artist_name").alias("recommendation_artist_name"),
                col("track_uri").alias("recommendation_track_uri"),
                col("track_name").alias("recommendation_track_name")))
        
        window = Window.partitionBy(col("playlist_name")).orderBy(col("similarity").desc())
        similarities = (self.similarities
            .join(tracks, ["recommendation_playlist_name"])
            .groupBy(
                col("playlist_name"),
                col("recommendation_artist_uri"),
                col("recommendation_artist_name"),
                col("recommendation_track_uri"),
                col("recommendation_track_name"))
            .agg(avg("similarity").alias("similarity"))
            .withColumn("position", row_number().over(window))
            .where(col("position") <= 500))

        self.dataframe = similarities
        
playlist_similarities_with_tracks = PlaylistSimilaritiesWithTracks(
    playlist_similarities.dataframe,
    tracks.dataframe)
playlist_similarities_with_tracks.dataframe.printSchema()

z.show(playlist_similarities_with_tracks.dataframe.count())
z.show(playlist_similarities_with_tracks.dataframe.limit(1))

In [33]:
%pyspark

class PlaylistSimilaritiesWrapper:
    def __init__(self, playlists, tracks):
        self.playlists = playlists
        self.tracks = tracks
        
        self.transform()
        
    def transform(self):
        embeddings = PlaylistEmbeddings(self.playlists)
        similarities = PlaylistSimilarities(embeddings.dataframe)
        similarities_with_tracks = PlaylistSimilaritiesWithTracks(similarities.dataframe, tracks.dataframe)

        self.dataframe = similarities_with_tracks.dataframe
        
playlist_similarities_wrapper = PlaylistSimilaritiesWrapper(playlist_dataset.train, tracks.dataframe)
playlist_similarities_wrapper.dataframe.printSchema()

z.show(playlist_similarities_wrapper.dataframe.count())
z.show(playlist_similarities_wrapper.dataframe.limit(5))

## Evaluation

In [35]:
%pyspark

class Recommendations:
  def __init__(self, test, similarities, key="playlist_id", interaction="track_uri", total=500):
    self.test = test
    self.similarities = similarities
    self.key = key
    self.interaction = interaction
    self.total = total
    
    self.transform()

  def transform(self):
    similarities = (self.similarities
      .select(
        col(self.interaction),
        col("recommendation_artist_uri"),
        col("recommendation_track_uri"),
        col("similarity")))
    
    window = (Window
      .partitionBy(
        col(self.key))
      .orderBy(
        col("similarity").desc()))

    recommendations = (self.test
      .join(similarities, [self.interaction])
      .withColumn("position", row_number().over(window))
      .where(
        col("position") <= self.total))

    recommendations = (recommendations
      .select(sorted(recommendations.columns)))

    self.dataframe = recommendations

In [36]:
%pyspark

playlist_recommendations = Recommendations(
    playlist_dataset.test,
    playlist_similarities_wrapper.dataframe,
    key="playlist_name",
    interaction="playlist_name")
    
playlist_recommendations.dataframe.printSchema()
z.show(playlist_recommendations.dataframe.limit(10))

In [37]:
%pyspark

playlist_evaluation = Evaluation(
    playlist_dataset.validation, 
    playlist_recommendations.dataframe, 
    key="playlist_name")

z.show(playlist_evaluation.dataframe.limit(10))

# Collaborative Filtering

## Jaccard Index

### Track Similarities

### Recommendations

In [42]:
%pyspark

class TrackSimilarities:
  def __init__(self, dataframe, _=None, similarity="jaccard"):
    self.tracks = dataframe
    self.similarity = similarity
    
    self.transform()

  def transform(self):
    tracks = self.tracks

    similarities = (tracks.alias("tracks")
      .join(tracks.alias("recommendations"),
        col("tracks.track_uri") != col("recommendations.track_uri"))
      .where(col("tracks.playlist_id") == col("recommendations.playlist_id"))
      .select(
        col("tracks.artist_uri"),
        col("tracks.artist_name"),
        col("tracks.track_uri"),
        col("tracks.track_name"),
        col("tracks.playlist_id"),
        col("recommendations.artist_uri").alias("recommendation_artist_uri"),
        col("recommendations.artist_name").alias("recommendation_artist_name"),
        col("recommendations.track_uri").alias("recommendation_track_uri"),
        col("recommendations.track_name").alias("recommendation_track_name"))
      .groupBy(
        col("tracks.artist_uri"),
        col("tracks.artist_name"),
        col("tracks.track_uri"),
        col("tracks.track_name"),
        col("recommendation_artist_uri"),
        col("recommendation_artist_name"),
        col("recommendation_track_uri"),
        col("recommendation_track_name"))
      .agg(
        countDistinct("playlist_id").alias("intersection")))

    playlists = (tracks
      .groupBy("track_uri")
      .agg(
        countDistinct("playlist_id").alias("playlists")))

    window_jaccard = (Window
      .partitionBy(col("tracks.track_uri"))
      .orderBy(col("similarity_jaccard").desc()))
    
    window_cosine = (Window
      .partitionBy(col("tracks.track_uri"))
      .orderBy(col("similarity_cosine").desc()))
      
    similarities = (similarities
      .join(playlists.alias("a"), ["track_uri"])
      .join(playlists.alias("b"), col("recommendation_track_uri") == col("b.track_uri"))
      .withColumn("union",
        col("a.playlists") + col("b.playlists") - col("intersection"))
      .withColumn("similarity_jaccard",
        col("intersection") / col("union"))
      .withColumn("similarity_cosine",
        col("intersection") / (sqrt(col("a.playlists")) * sqrt(col("b.playlists"))))
      .withColumn("similarity", col(f"similarity_{self.similarity}"))
      .withColumn("position_jaccard", row_number().over(window_jaccard))
      .withColumn("position_cosine", row_number().over(window_cosine))
      .withColumn("position", col(f"position_{self.similarity}"))
      .select(
        col("tracks.artist_uri"),
        col("tracks.artist_name"),
        col("tracks.track_uri"),
        col("tracks.track_name"),
        col("a.playlists").alias("track_playlists"),
        col("recommendation_artist_uri"),
        col("recommendation_artist_name"),
        col("recommendation_track_uri"),
        col("recommendation_track_name"),
        col("b.playlists").alias("recommendation_track_playlists"),
        col("intersection"),
        col("union"),
        col("similarity_jaccard"),
        col("similarity_cosine"),
        col("similarity"),
        col("position_jaccard"),
        col("position_cosine"),
        col("position"))
      .where(col("position") <= 500))

    self.dataframe = similarities
    
track_similarities = TrackSimilarities(track_dataset.train)
track_similarities.dataframe.printSchema()

z.show(track_similarities.dataframe.count())
z.show(track_similarities.dataframe.limit(10))

In [43]:
%pyspark

recommendations = Recommendations(track_dataset.test, track_similarities.dataframe)
recommendations.dataframe.printSchema()

z.show(recommendations.dataframe.limit(10))

### Evaluation

In [45]:
%pyspark
        
evaluation = Evaluation(track_dataset.validation, recommendations.dataframe)

z.show(evaluation.dataframe.limit(10))

## Cosine Similarity

### Track Similarities

### Recommendations

In [49]:
%pyspark

class TrackSimilaritiesCosine(TrackSimilarities):
  def __init__(self, dataframe, _=None, similarity="cosine"):
    self.tracks = dataframe
    self.similarity = similarity
    
    self.transform()

track_similarities_cosine = TrackSimilaritiesCosine(
    track_dataset.train, 
    similarity="cosine")

track_similarities_cosine.dataframe.printSchema()

z.show(track_similarities_cosine.dataframe.count())
z.show(track_similarities_cosine.dataframe.limit(5))

In [50]:
%pyspark

recommendations_cosine = Recommendations(
    track_dataset.test,
    track_similarities_cosine.dataframe)

recommendations_cosine.dataframe.printSchema()

z.show(recommendations_cosine.dataframe.limit(10))

### Evaluation

In [52]:
%pyspark
        
evaluation_cosine = Evaluation(
    track_dataset.validation, 
    recommendations_cosine.dataframe)

z.show(evaluation_cosine.dataframe.limit(10))

# Hybrid Recommender System

## Data Preprocessing

## Hybrid Recommendation

In [56]:
%pyspark

hybrid_dataset = Dataset(track_preprocessing.dataframe)

z.show(hybrid_dataset.train.limit(5))
z.show(hybrid_dataset.test.limit(5))
z.show(hybrid_dataset.validation.limit(5))

In [57]:
%pyspark

class HybridRecommendations:
    def __init__(self, test, playlist_similarities, track_similarities, weights):
        self.test = test
        self.playlist_similarities = playlist_similarities
        self.track_similarities = track_similarities
        
        self.playlist_weight, self.track_weight = weights
        self.total = 500

        self.transform()
        
    def transform(self):
        playlist_recommendations = Recommendations(
            self.test,
            self.playlist_similarities,
            key="playlist_name",
            interaction="playlist_name",
            total=(self.playlist_weight * self.total))
            
        track_recommendations = Recommendations(
            self.test,
            self.track_similarities,
            key="playlist_id",
            interaction="track_uri",
            total=(self.track_weight * self.total))
            
        self.dataframe = playlist_recommendations.dataframe.union(track_recommendations.dataframe)

hybrid_recommendations = HybridRecommendations(
    hybrid_dataset.test, 
    playlist_similarities_wrapper.dataframe,
    track_similarities.dataframe,
    [0.5, 0.5])

hybrid_recommendations.dataframe.printSchema()
z.show(hybrid_recommendations.dataframe.limit(5))


## Evaluation

In [59]:
%pyspark
        
hybrid_evaluation = Evaluation(hybrid_dataset.validation, hybrid_recommendations.dataframe)

z.show(hybrid_evaluation.dataframe.limit(10))

# Cross Validation

## Random Cross Validation

## Hybrid Cross Validation

In [63]:
%pyspark

from functools import reduce
from pyspark.sql import DataFrame

class CrossValidation:
    def __init__(self, input, folds, train_builder, test_builder, key, interaction=None, tracks=None):
        self.input = input
        self.folds = folds
        self.train_builder = train_builder
        self.test_builder = test_builder
        self.key = key
        self.interaction = interaction
        self.tracks = tracks
        self.dataframe = None
        
        self.evaluate()
    
    def evaluate_for(self, fold):
        dataset = Dataset(self.input, fold=fold, tracks=self.tracks)
        
        train = self.train_builder(dataset.train, self.tracks)
        test = self.test_builder(dataset.test, train.dataframe, self.key, self.interaction)
        evaluation = Evaluation(dataset.validation, test.dataframe, self.key, fold)
        
        return evaluation.dataframe
    
    def evaluate(self):
        all = reduce(DataFrame.unionAll, [self.evaluate_for(i + 1) for i in range(self.folds)])
        all = (all.union(all
            .agg(
                avg(col("R-precision")).alias("R-precision"))
            .withColumn("fold", lit("avg"))))
            
        self.dataframe = all
        
        
        
        
        
        
        
        

In [64]:
%pyspark

from functools import reduce
from pyspark.sql import DataFrame

class HybridCrossValidation:
    def __init__(self, input_playlists, input_tracks, folds, train_builder_playlists, train_builder_tracks, test_builder):
        self.input_playlists = input_playlists
        self.input_tracks = input_tracks
        self.folds = folds
        self.train_builder_playlists = train_builder_playlists
        self.train_builder_tracks = train_builder_tracks
        self.test_builder = test_builder
        self.dataframe = None
        
        self.evaluate()
    
    def evaluate_for(self, fold):
        playlist_dataset = Dataset(self.input_playlists, fold=fold, tracks=self.input_tracks)
        hybrid_dataset = Dataset(self.input_tracks, fold=fold)
         
        train_for_playlist = self.train_builder_playlists(playlist_dataset.train, self.input_tracks)
        train_for_track = self.train_builder_tracks(hybrid_dataset.train)
        
        test = self.test_builder(
            hybrid_dataset.test, 
            train_for_playlist.dataframe, 
            train_for_track.dataframe,
            [0.5, 0.5])
        
        evaluation = Evaluation(hybrid_dataset.validation, test.dataframe, fold=fold)
        
        return evaluation.dataframe
    
    def evaluate(self):
        all = reduce(DataFrame.unionAll, [self.evaluate_for(i + 1) for i in range(self.folds)])
        all = (all.union(all
            .agg(
                avg(col("R-precision")).alias("R-precision"))
            .withColumn("fold", lit("avg"))))
            
        self.dataframe = all

## Baseline Model

## Content-Based Model

In [67]:
%pyspark

cross_validation = CrossValidation(
    tracks.dataframe, 
    5, 
    Random, 
    RandomRecommendations,
    key="playlist_id")
    
    

z.show(cross_validation.dataframe)

In [68]:
%pyspark

playlist_cross_validation = CrossValidation(
    playlist_preprocessing.dataframe, 
    5, 
    PlaylistSimilaritiesWrapper, 
    Recommendations,
    key="playlist_name",
    interaction="playlist_name",
    tracks=tracks.dataframe)

z.show(playlist_cross_validation.dataframe)

## Collaborative Filtering Model

### Jaccard Index

### Cosine Similarity

In [72]:
%pyspark

track_cross_validation = CrossValidation(
    tracks.dataframe, 
    4, 
    TrackSimilarities, 
    Recommendations,
    key="playlist_id",
    interaction="track_uri")

z.show(track_cross_validation.dataframe)

In [73]:
%pyspark

track_cross_validation_cosine = CrossValidation(
    tracks.dataframe, 
    4, 
    TrackSimilaritiesCosine, 
    Recommendations,
    key="playlist_id",
    interaction="track_uri")

z.show(track_cross_validation_cosine.dataframe)

## Hybrid Model

In [75]:
%pyspark

hybrid_cross_validation = HybridCrossValidation(
    playlist_preprocessing.dataframe,
    track_preprocessing.dataframe,
    5,
    PlaylistSimilaritiesWrapper, 
    TrackSimilarities, 
    HybridRecommendations)

z.show(hybrid_cross_validation.dataframe)