# Spotify MPD Collaborative Filtering
- Idé: Rekommendera ett namn för givna spellistan genom att ge den samma namn som den närmaste playlisten.
- Skapa en funktion som konverterar spellistorna att ha bara artisten och namnet i sig.

In [1]:
import json
import os
import numpy as np
import math
from typing import List, Tuple, Union
from collections import defaultdict
import random
import time
import pickle


class Collaborative_Recommender:
    Vector = List[int]

    def __init__(self, path: str, rewrite_data: bool = False, playlist_limit: int = 0) -> None:
        """
        path: file path to the spotify jsons
        rewrite_data: True = Always create new vectors based off the jsons, False = Try to use pickled
            vectors, if not found generate new vectors and pickle them.
        playlist_limit: Limit the number of playlists to use
        """
        self.playlists = []
        self.path = path
        self.rewrite_data = rewrite_data
        self.load_data(path, playlist_limit)
        # Shuffle playlists as to reduce possible bias
        random.shuffle(self.playlists)

    def load_data(self, data_path: str, limit: int = 0):
        try:
            print("Loading the found jsons:", os.listdir(data_path))
            for filename in os.listdir(data_path):
                with open(os.path.join(data_path, filename), "rt", encoding="utf-8") as f:
                    self.playlists.extend(json.load(f)["playlists"])
            print(f"Loaded {len(os.listdir(data_path))} jsons")

            if limit > 0:
                self.playlists = self.playlists[:limit]
        except FileNotFoundError as e:
            print("File not found:", e)

    def extract_unique_songs(self):
        """Puts all unique songs from the playlist into an array"""
        self.unique_songs = sorted(
            {
                f"{song['artist_name']} - {song['track_name']}"
                for playlist in self.playlists
                for i, song in enumerate(playlist["tracks"])
            }
        )
        print(f"Found {len(self.unique_songs)} unique songs")

    def extract_playlist_names(self):
        """Puts all playlists' names into an array with the same order as the playlists"""
        self.playlist_names = [playlist["name"] for playlist in self.playlists]

    def sample_tracks(self, pos: int, num: int):
        for s in self.playlists[pos]["tracks"][:num]:
            print(f"{s['artist_name']} - {s['track_name']}")

    def to_track_names(self, playlists) -> List[List[str]]:
        """
        Compiles every track in every playlist an '<artist> - <song>' name, into an array of the
        same shape as the playlists
        """
        return [
            [f"{s['artist_name']} - {s['track_name']}" for i, s in enumerate(playlist["tracks"])]
            for playlist in playlists
        ]

    def make_playlist_vector(self, playlist: Union[dict, List[str]]) -> List[int]:
        """
        Given a list of playlists, produce a vector whose ith element is 1
        if unique_songs[i] is in the list, 0 otherwise
        """
        playlist_ = []
        # Checks if the functiun gets called from multiprocessing
        if isinstance(playlist, tuple):
            _, playlist_ = playlist
        else:
            playlist_ = playlist
        # Checking whether it's a dict or list, since one can supply a list of song names too
        if isinstance(playlist_, dict):
            return [
                1
                if song in [f"{s['artist_name']} - {s['track_name']}" for s in playlist_["tracks"]]
                else 0
                for song in self.unique_songs
            ]
        elif isinstance(playlist_, List):
            return [1 if song in playlist_ else 0 for song in self.unique_songs]

    def make_user_based_vectors(self):
        """
        Generates sparse feature vectors of the given playlists
        Assigns 1 to n'th position if the playlist includes song n, else 0
        Uses multiprocessing to speed up the generation
        Pickles the vectors so they dont have to be recalculated on a rerun
        """
        import multiprocessing as mp

        # Using multiprocessing to speed up the vector creation
        # Spawns a new python process to run the make_playlist_vector method
        p = mp.Pool(14)  # 14 available processes, dunno if this works badly on <16 cpu threads
        t0 = time.time()

        try:
            if self.rewrite_data:
                print("Rewriting data: Generating vectors and dumping them")
                self.extract_unique_songs()
                self.playlist_vectors = p.map(self.make_playlist_vector, enumerate(self.playlists))
                vector_file = open("playlist_vectors.pickle", "wb")
                playlist_file = open("playlists.pickle", "wb")
                print("Pickling...")
                pickle.dump(self.playlist_vectors, vector_file)
                pickle.dump(self.playlists, playlist_file)
            else:
                vector_file = open("playlist_vectors.pickle", "rb")
                playlist_file = open("playlists.pickle", "rb")

                print("Pickled data found.")
                self.playlist_vectors = pickle.load(vector_file)
                self.playlists = pickle.load(playlist_file)
                self.extract_unique_songs()
        except (FileNotFoundError):
            print("Pickled data not found, generating vectors...")
            self.extract_unique_songs()
            self.playlist_vectors = p.map(self.make_playlist_vector, enumerate(self.playlists))
            vector_file = open("playlist_vectors.pickle", "wb")
            playlist_file = open("playlists.pickle", "wb")
            print("Pickling...")
            pickle.dump(self.playlist_vectors, vector_file)
            pickle.dump(self.playlists, playlist_file)
        finally:
            vector_file.close()
            playlist_file.close()
            self.named_playlists = self.to_track_names(self.playlists)

        t1 = time.time()
        print(f"Time taken: {round(t1-t0, 2)} s")
        print("playlist_vectors length:", len(self.playlist_vectors))

    def make_item_based_vectors(self):
        """
        Flips the playlist-songs matrix into songs-playlists
        So that each row is a song, and the sparse vector tells which playlist this song is in
        """
        self.song_playlist_matrix: List[List[int]] = [
            [playlist_vector[i] for playlist_vector in self.playlist_vectors]
            for i, _ in enumerate(self.unique_songs)
        ]

    def cosine_similarity(self, v1: Vector, v2: Vector) -> float:
        """Calculates the cosine between two feature vectors"""
        return np.dot(v1, v2) / math.sqrt(np.dot(v1, v1) * np.dot(v2, v2))

    def compute_similarities(self, pv) -> List[float]:
        """Runs cosine_similarity on each playlist_vector"""
        return [self.cosine_similarity(pv, pv_i) for pv_i in self.playlist_vectors]

    def sort_similar_playlists(
        self, similarities: List[float], user_id: int
    ) -> List[Tuple[int, float]]:
        """
        Create a sorted list of the similar playlists.
        Each element is a tuple of the playlist's id and its cosine similarity
        """
        # Puts the similarity into a tuple with its playlist id
        id_similarity = [
            (id, similarity)
            for id, similarity in enumerate(similarities)
            if id != user_id and similarity > 0  # user_id is actually the user's playlist's id
        ]
        return sorted(id_similarity, key=lambda pair: pair[-1], reverse=True)

    def suggest_name(self, similarities: List[Tuple[int, float]], length: int = 4) -> str:
        """Slaps together the top playlists' names first word and calls it a name"""
        suggested_name = ""
        for z in range(length):
            # print(f"Most similar playlist{[z]} name:", self.playlist_names[similarities[z][0]])
            suggested_name = " ".join(
                [suggested_name, self.playlist_names[similarities[z][0]].split(" ")[0]]
            )
        return suggested_name.strip()

    def user_based_suggestions(
        self, similarities, max_suggestions: int = 10
    ) -> Tuple[List[Tuple[str, float]], str]:
        """
        Sorts the given similarities(playlist similarity scores),
        gives each song a similarity score, and returns
        the most similar songs & a suggested name for the playlist.
        """
        suggestions: Dict[str, float] = defaultdict(float)
        # Sort playlists into (playlist_id, similarity)
        sorted_similarities = self.sort_similar_playlists(similarities, self.user_id)

        suggested_name = self.suggest_name(sorted_similarities)

        # Sum up the song similarities
        for other_user_id, similarity in sorted_similarities:
            for song in self.named_playlists[other_user_id]:
                suggestions[song] += similarity

        # Convert them to a sorted list
        suggestions = sorted(suggestions.items(), key=lambda pair: pair[-1], reverse=True)

        # Exclude the user_id's supplied songs
        return [
            (suggestion, weight)  # weight = summed up score from the playlists
            for suggestion, weight in suggestions
            if suggestion not in self.named_playlists[self.user_id]
        ][:max_suggestions], suggested_name

    def user_based_recommendation(
        self, playlist: Union[dict, List[str]], limit: int = 10, user_id: int = -1
    ) -> Tuple[List[Tuple[str, float]], str]:
        """Return a recommendation of tracks based off the given playlist(can be
        mpd-formatted or list of strings)"""
        self.user_id = user_id
        similarities = self.compute_similarities(self.make_playlist_vector(playlist))
        suggestions, name = self.user_based_suggestions(
            similarities=similarities, max_suggestions=limit
        )
        return suggestions, name

    def compute_song_similarities(self, song_id: int) -> List[float]:
        """Runs cosine_similarity on each song-playlists feature vector against the given song"""
        return [
            self.cosine_similarity(self.song_playlist_matrix[song_id], pl_vector_j)
            for pl_vector_j in self.song_playlist_matrix
        ]

    def most_similar_songs_to(self, song_id: int) -> List[Tuple[str, float]]:
        """
        Create a sorted list of the similar songs.
        Each element in the returned list is a tuple of the songs's name and its cosine similarity
        """
        similarities = self.compute_song_similarities(song_id)
        song_similarity_pairs = [
            (self.unique_songs[other_song_id], similarity)
            for other_song_id, similarity in enumerate(similarities)
            if song_id != other_song_id and similarity > 0
        ]
        return sorted(song_similarity_pairs, key=lambda pair: pair[-1], reverse=True)

    def item_based_suggestions(
        self, playlist_vector: List[int], max_suggestions: int = 10
    ) -> List[Tuple[str, float]]:
        """
        Goes through each song in the playlist vector and computes the most similar songs to that
        song, then gives each song a similarity score, and returns the most similar songs.
        The more frequently a computed similar song appears while going through each
        song in the playlist, the higher its ranking will be.
        """
        suggestions = defaultdict(float)

        for song_id, in_playlist in enumerate(playlist_vector):
            if in_playlist == 1:  # If song is in this playlist
                similar_songs = self.most_similar_songs_to(song_id)  # Get most similar songs to it
                # Add up the similarity score on each similar song
                for song, similarity in similar_songs:
                    suggestions[song] += similarity

        suggestions = sorted(suggestions.items(), key=lambda pair: pair[-1], reverse=True)

        return [
            (suggestion, weight)
            for suggestion, weight in suggestions
            if suggestion not in self.named_playlists[self.user_id]  # Don't include existing songs
        ][:max_suggestions]

    def item_based_recommendation(
        self, playlist: Union[dict, List[str]], limit: int = 10, user_id: int = -1
    ) -> List[Tuple[str, float]]:
        """Return a recommendation of tracks based off the given playlist(can be
        mpd-formatted or list of strings)"""
        t0 = time.time()
        self.user_id = user_id
        playlist_vector = self.make_playlist_vector(playlist)
        suggestions = self.item_based_suggestions(
            playlist_vector=playlist_vector, max_suggestions=limit
        )
        t1 = time.time()
        print(f"Time taken: {round(t1-t0, 2)} s")
        return suggestions


In [2]:
# Init recommender with path to the mpd json/s and if use pickled vector data
recommender = Collaborative_Recommender("mpd/data_samples", rewrite_data=False)
recommender.make_user_based_vectors()
recommender.extract_unique_songs()
recommender.extract_playlist_names()
print("Sample tracks::")
recommender.sample_tracks(69, 5)
recommender.make_item_based_vectors()

Loading the found jsons: ['mpd.slice.26000-26999.json']
Loaded 1 jsons
Rewriting data: Generating vectors and dumping them
Found 34827 unique songs
Pickling...
Time taken: 31.8 s
playlist_vectors length: 1000
Found 34827 unique songs
Sample tracks::
5 Seconds of Summer - Amnesia
Christina Perri - A Thousand Years
Miley Cyrus - Drive
Ed Sheeran - Thinking Out Loud
Justin Bieber - Down To Earth


## Recommend songs to pelle

Pelle's playlist happens to be in the database already:

In [3]:
pelle_id = 69
print("Pelles own playlist:")
recommender.named_playlists[pelle_id]

Pelles own playlist:


['5 Seconds of Summer - Amnesia',
 'Christina Perri - A Thousand Years',
 'Miley Cyrus - Drive',
 'Ed Sheeran - Thinking Out Loud',
 'Justin Bieber - Down To Earth',
 'Adele - Hello',
 'Uncle Jed - Latch',
 'Bon Iver - Skinny Love',
 'Corey Gray - If I Lose Myself',
 'Justin Bieber - Life Is Worth Living',
 'X Ambassadors - Unsteady',
 'Madilyn Bailey - Pompeii',
 'Nicki Minaj - Bed Of Lies',
 'Birdy - All You Never Say',
 'Madilyn Bailey - Maps',
 'Daya - Back to Me',
 'Daya - Hide Away',
 'Daya - Back to Me']

### User based recommendation
Give the recommender a playlist in the same form as the spotify data

In [4]:
pelles_playlist = recommender.playlists[pelle_id]
suggestions, sug_name = recommender.user_based_recommendation(pelles_playlist, 12, pelle_id)
print("Suggested playlist name:", sug_name)
print("Suggestions:")
display(suggestions)

Suggested playlist name: car you ❤️❤️ Syd
Suggestions:


[('Ed Sheeran - Photograph', 0.8473668279740618),
 ('James Bay - Let It Go', 0.6311617785519165),
 ('Sam Smith - Lay Me Down', 0.6194777047540253),
 ('John Legend - All of Me', 0.6049336982536837),
 ('Passenger - Let Her Go', 0.5787191194277588),
 ("Jason Mraz - I Won't Give Up", 0.5596744710984812),
 ("James Arthur - Say You Won't Let Go", 0.5584253648693458),
 ("Sam Smith - I'm Not The Only One", 0.5409875690857151),
 ('Sam Smith - Stay With Me', 0.5404209506202018),
 ('Lukas Graham - 7 Years', 0.5383299958530888),
 ("Meghan Trainor - Like I'm Gonna Lose You", 0.5024478955016453),
 ('Justin Bieber - Love Yourself', 0.4968716903397613)]

Or you can also supply the recommender with just a list of songs:

In [5]:
pelles_custom_playlist = recommender.named_playlists[pelle_id]

print("The same playlist but supplied as a list of track names")
display(recommender.user_based_recommendation(pelles_custom_playlist, 12, pelle_id))

The same playlist but supplied as a list of track names


([('Ed Sheeran - Photograph', 0.8473668279740618),
  ('James Bay - Let It Go', 0.6311617785519165),
  ('Sam Smith - Lay Me Down', 0.6194777047540253),
  ('John Legend - All of Me', 0.6049336982536837),
  ('Passenger - Let Her Go', 0.5787191194277588),
  ("Jason Mraz - I Won't Give Up", 0.5596744710984812),
  ("James Arthur - Say You Won't Let Go", 0.5584253648693458),
  ("Sam Smith - I'm Not The Only One", 0.5409875690857151),
  ('Sam Smith - Stay With Me', 0.5404209506202018),
  ('Lukas Graham - 7 Years', 0.5383299958530888),
  ("Meghan Trainor - Like I'm Gonna Lose You", 0.5024478955016453),
  ('Justin Bieber - Love Yourself', 0.4968716903397613)],
 'car you ❤️❤️ Syd')

### Item based recommender


In [6]:
pelle_id = 69
pelles_playlist = recommender.playlists[pelle_id]
suggestions = recommender.item_based_recommendation(pelles_playlist, 12, pelle_id)
print("Suggestions:")
display(suggestions)