In [1]:
import json
import os
import re
import numpy as np
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from datetime import datetime
from functools import reduce
from __future__ import print_function
from dotenv import load_dotenv

### GAMEPLAN  
Write functions to get metrics at artist and song level to aggregate familiarity indicies by song and artist  
Combine these values through some ratio to get a total song familiarity index  
Write code to parse through streaming data by user, aggregating song and artist objects inside of function

In [184]:
load_dotenv()
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
CLIENT_ID = os.getenv("CLIENT_ID")

# Class Definition

In [270]:
class Artist:
    def __init__(self, name, ms_played):
        self.name = name
        self.last_played = None
        self.artist_song_play_count = 0
        self.ms_played = ms_played
        self.total_listening = self.ms_played / 60000
        self.familiarity_index = self.total_listening / 3.5 + self.artist_song_play_count
        
    def update_stats(self, ms_played, date):
        self.ms_played += ms_played
        self.artist_song_play_count += 1
        self.mins_played = self.ms_played / 60000
        self.last_played = date
        self.familiarity_index = self.total_listening / 3.5 + self.artist_song_play_count
        
    def __str__(self):
        return f"{self.name} :| Familiarity Index|: {self.familiarity_index:.2f}"

    def __repr__(self):
        return str(self)


In [271]:
class Song:
    def __init__(self, name, artist, album, ms_played, date, mrd=None):
        self.name = name
        self.artist = artist
        self.album = album
        self.ms_played = ms_played
        self.mins_played = self.ms_played / 60000
        self.play_count = 1
        self.most_recent_play = date
        self.mrd = mrd
        
        self.popularity_index = 0
        
        self.familiarity_index = self.mins_played / 3.5 + self.play_count
        
    def update_stats(self, ms_played, date):
        self.ms_played += ms_played
        self.play_count += 1
        self.mins_played = self.ms_played / 60000
        self.most_recent_play = date
        self.get_familiarity_index()
        
    def get_familiarity_index(self):
        # find better val that 3.5
        if self.mrd is None:
            days_to_last_listen_mult = 1
        else:
            days_to_last_listen = self.mrd - self.most_recent_play
            days_to_last_listen_mult = self.scale_value(days_to_last_listen)
        self.familiarity_index = (self.mins_played / 3.5 + self.play_count) * days_to_last_listen_mult
    
    def scale_value(self, value):
        decimal_days = value.total_seconds() / 86400
        if decimal_days < 1:
            decimal_days = 1
        elif decimal_days > 30:
            decimal_days = 30
        
        scaled_value = 1.5 - (decimal_days - 1) * (1.5 - 0.75) / (30 - 1)
        return scaled_value
    
    # TODO
    def get_song_popularity(self):
        # get total streams, current spotify streaming data
        self.popularity_index = 0

    def __str__(self):
        return f"{self.name} by, {self.artist} :| Familiarity Index|: {self.familiarity_index:.2f}"

    def __repr__(self):
        return str(self)

In [272]:
class User:
    def __init__(self, name, total_top_songs=1000, recency_cutoff=3):
        self.name = name
        self.songs = {}
        self.artists = {}
        self.top_songs = [] # list of song titles with highest familiarity_index
        self.top_artists = []
        self.get_songs_artists(name, recency_cutoff=recency_cutoff)
        self.get_top_songs(n=total_top_songs)
        
    def get_songs_artists(self, user_name, recency_cutoff=3):
        prefix = "Streaming_History_Audio_"
        folder_path = f"data/{user_name}/"
        total_audio_files = self.count_files_with_prefix(folder_path, prefix)
        latest_day = None  # Initialize latest_day to track the most recent date
        for filename in os.listdir(folder_path):
            match = re.search(r"_(\d+)\.json", filename)
            if match:
                file_number = int(match.group(1))
            else:
                print("No file number found.")
            if filename.startswith(prefix) and file_number < total_audio_files:

                filepath = os.path.join(folder_path, filename)
                with open(filepath, "r", encoding="utf-8") as f:
                    data = json.load(f)
                    for song in data:
                        if song["master_metadata_track_name"] is None:
                            continue
                        
                        # See if this listen falls inside the recent listening cutoff period
                        song_date = datetime.strptime(song["ts"], "%Y-%m-%dT%H:%M:%SZ")
                        cutoff_date = datetime.today().replace(year=datetime.today().year - recency_cutoff)
                        if song_date <= cutoff_date:
                            continue
                        # for getting the most recent listen date
                        if latest_day is None or song_date > latest_day:
                            latest_day = song_date 
                        
                        song_obj = Song(song["master_metadata_track_name"], song["master_metadata_album_artist_name"], song["master_metadata_album_album_name"], song["ms_played"], song_date)
                                
                        if song_obj.name in self.songs:
                            self.songs[song_obj.name].update_stats(song_obj.ms_played, song_date)
                        else:
                            self.songs[song_obj.name] = song_obj
                        if song_obj.artist in self.artists:
                            self.artists[song_obj.artist].update_stats(song["ms_played"], song_date)
                        else:
                            self.artists[song_obj.artist] = Artist(song["master_metadata_album_artist_name"], song["ms_played"])
            else:
                print(f"File {filename} does not match the expected format")
        
        # For assigning the most recent listen
        for song in self.songs.values():
            song.mrd = latest_day
            song.get_song_popularity()
            song.get_familiarity_index()
        return
    
    def get_top_songs(self, n):
        self.top_songs = sorted(self.songs.values(), key=lambda x: x.familiarity_index, reverse=True)[:n]
        return self.top_songs
    
    def get_top_artists(self, familiarity_cutoff=75):
        all_artists = sorted(self.artists.values(), key=lambda x: x.familiarity_index, reverse=True)
        self.top_artists = [artist for artist in all_artists if artist.familiarity_index > familiarity_cutoff]
        return self.top_artists
    
    def count_files_with_prefix(self, folder_path, prefix):
        return sum(1 for file in os.listdir(folder_path) if file.startswith(prefix))
        

In [273]:
def get_artist_intersection(users, familiarity_cutoff=95):
    top_user_artists = [user.get_top_artists(familiarity_cutoff) for user in users]
    artists_sets = []
    for user_artists in top_user_artists:
        user_artists_set = set()
        for artist in user_artists:
            str_artist = str(artist)
            artist_info = str_artist.split(" :| ")[0]
            user_artists_set.add(artist_info)
        artists_sets.append(user_artists_set)
    common_artists = list(reduce(set.intersection, artists_sets))
    return common_artists

In [337]:
# given list of users, get their top songs and return list of intersected songs above familiarity cutoff
def get_song_intersection(users, song_familiarity_cutoff, artist_familiarity_cutoff, difficult=False):
    all_user_songs = [user.songs.values() for user in users]
    auto_add_artist_intersection = get_artist_intersection(users, 135)
    top_artist_intersection = get_artist_intersection(users, artist_familiarity_cutoff)
    song_sets = []
    song_to_f_i = {}
    popular_songs = set()
    shared_artist_songs = set()
    shared_artist_auto_add = set()
    for user_songs in all_user_songs:
        valid_songs = set()
        for song in user_songs:
            str_song = str(song)
            song_info = str_song.split(" :| ")[0]
            familiarity_index = float(str_song.split("|: ")[1])
            if song_info not in song_to_f_i.keys():
                song_to_f_i[song_info] = [familiarity_index]
            else:
                song_to_f_i[song_info].append(familiarity_index)
            if familiarity_index > song_familiarity_cutoff:
                valid_songs.add(song_info)
                if song.artist in auto_add_artist_intersection:
                    shared_artist_auto_add.add(song_info)
                elif song.artist in top_artist_intersection:
                    shared_artist_songs.add(song_info)
                if song.popularity_index >= 100: # TODO
                    popular_songs.add(song_info)
        song_sets.append(valid_songs)
    common_songs = reduce(set.intersection, song_sets)
    if difficult:
        for song in shared_artist_auto_add:
            if song in common_songs:
                continue
            if len(song_to_f_i[song]) >= 2 and min(song_to_f_i[song]) > song_familiarity_cutoff / 4:
                common_songs.add(song)
        for song in shared_artist_songs:
            if song in common_songs:
                continue
            if len(song_to_f_i[song]) == len(users) and min(song_to_f_i[song]) > song_familiarity_cutoff / 2:
                common_songs.add(song)
        # common_songs = common_songs.union(shared_artist_songs) # adds all shared artist songs
        
    common_songs = list(common_songs)
    # combine the list of popular songs that appeared in any list and shared artist songs to the common_songs list
    return common_songs

In [315]:
def write_to_playlist(song_list, playlist_name):
    sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=CLIENT_ID,
                                                client_secret=CLIENT_SECRET,
                                                redirect_uri='http://localhost:8888/callback',
                                                scope="playlist-modify-public playlist-modify-private"))

    user_id = sp.current_user()['id']

    playlist_name = 'Guess The Song'
    playlist_description = 'A list of songs we all have in common so we can guess the song together.'
    playlist_public = True  # Set to False if you want the playlist to be private

    playlist = sp.user_playlist_create(user_id, playlist_name, public=playlist_public, description=playlist_description)

    playlist_id = playlist['id']

    # Function to get the Spotify URI for a song based on title and artist
    def get_track_uri(title, artist):
        query = f"track:{title} artist:{artist}"
        result = sp.search(query, limit=1, type="track")
        
        # If the search result contains tracks, return the URI of the first result
        if result['tracks']['items']:
            return result['tracks']['items'][0]['uri']
        else:
            return None  # Return None if no track is found

    # List to store track URIs
    track_uris = []

    # Get the URIs for the songs in the list
    for song in song_list:
        title, artist = song.split(" by, ")
        track_uri = get_track_uri(title, artist)
        if track_uri:
            track_uris.append(track_uri)
        else:
            print(f"Could not find track: {title} by, {artist}")

    # Function to add tracks to the playlist in batches of 100
    def add_tracks_in_batches(playlist_id, track_uris):
        # Split the track URIs into chunks of 100 or fewer
        for i in range(0, len(track_uris), 100):
            batch = track_uris[i:i+100]
            sp.playlist_add_items(playlist_id, batch)
            print(f"Successfully added {len(batch)} songs to the playlist.")

    # Add the tracks in batches
    if track_uris:
        add_tracks_in_batches(playlist_id, track_uris)
    else:
        print("No songs were added to the playlist.")

# Experimentation

In [310]:
danny = User("Danny", recency_cutoff=1)
print(len(danny.songs))
andy = User("Andy", recency_cutoff=1)
print(len(andy.songs))
ant = User("Ant", recency_cutoff=1)
print(len(ant.songs))

No file number found.
File Streaming_History_Video_2016-2025.json does not match the expected format
No file number found.
File ReadMeFirst_ExtendedStreamingHistory.pdf does not match the expected format
12118
No file number found.
File ReadMeFirst_ExtendedStreamingHistory.pdf does not match the expected format
No file number found.
File Streaming_History_Video_2017-2024.json does not match the expected format
5646
No file number found.
File Streaming_History_Video_2019-2024.json does not match the expected format
No file number found.
File ReadMeFirst_ExtendedStreamingHistory.pdf does not match the expected format
7198


In [296]:
users = [danny, andy, ant]

In [None]:
print(*users[1].top_songs,sep='\n')

In [None]:
print(*users[2].get_top_artists(150),sep='\n')

In [338]:
song_intersection = get_song_intersection([danny, andy, ant], 10, 95, True)

In [339]:
print(len(song_intersection))
print(*song_intersection,sep='\n')

163
Nice to meet you (feat. Central Cee) by, PinkPantheress
Get Em High by, Kanye West
For James by, Pale Jay
2 Hold U by, Clairo
One Dance by, Drake
Ay No Puedo by, The Marías
Ice Melt by, Crumb
CYANIDE by, Daniel Caesar
Bags by, Clairo
Nights by, Frank Ocean
drive ME crazy! by, Lil Yachty
Almost Fantasy by, Fog Lake
Thank You by, Clairo
Master of None by, Beach House
Thinkin Bout You by, Frank Ocean
Witches by, Alice Phoebe Lou
Rusty (feat. Domo Genesis & Earl Sweatshirt) by, Tyler, The Creator
If You Don't Want My Love by, Jalen Ngonda
CHERIMOYA - BONUS by, Jordan Ward
Better in the Dark by, Jordana
Sierra Leone by, Frank Ocean
Love Songs by, Clairo
Droppin' Seeds (feat. Lil' Wayne) by, Tyler, The Creator
Second Nature by, Clairo
Sticky (feat. GloRilla, Sexyy Red & Lil Wayne) by, Tyler, The Creator
Home Team by, Indigo De Souza
Flaming Hot Cheetos by, Clairo
Devil In A New Dress by, Kanye West
Cariño by, The Marías
John Redcorn by, SiR
Blur by, The Marías
Charmed by, Σtella
No One N

In [299]:
artist_intersection = get_artist_intersection([danny, andy, ant], 135)

In [300]:
print(len(artist_intersection))
print(*artist_intersection,sep='\n')

7
Kanye West
PinkPantheress
Tyler, The Creator
Clairo
Frank Ocean
The Marías
Drake


In [None]:
write_to_playlist(song_intersection, "Guess The Song")

In [235]:
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
CLIENT_ID = os.getenv("CLIENT_ID")
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=CLIENT_ID,
                                               client_secret=CLIENT_SECRET,
                                               redirect_uri='http://localhost:8888/callback',
                                               scope="playlist-modify-public playlist-modify-private"))

user_id = sp.current_user()['id']

playlist_name = 'Guess The Song'
playlist_description = 'A list of songs we all have in common so we can guess the song together.'
playlist_public = True  # Set to False if you want the playlist to be private

playlist = sp.user_playlist_create(user_id, playlist_name, public=playlist_public, description=playlist_description)

playlist_id = playlist['id']

# Function to get the Spotify URI for a song based on title and artist
def get_track_uri(title, artist):
    query = f"track:{title} artist:{artist}"
    result = sp.search(query, limit=1, type="track")
    
    # If the search result contains tracks, return the URI of the first result
    if result['tracks']['items']:
        return result['tracks']['items'][0]['uri']
    else:
        return None  # Return None if no track is found

# List to store track URIs
track_uris = []

# Get the URIs for the songs in the list
for song in song_intersection:
    title, artist = song.split(" by ")
    track_uri = get_track_uri(title, artist)
    if track_uri:
        track_uris.append(track_uri)
    else:
        print(f"Could not find track: {title} by {artist}")

# Function to add tracks to the playlist in batches of 100
def add_tracks_in_batches(playlist_id, track_uris):
    # Split the track URIs into chunks of 100 or fewer
    for i in range(0, len(track_uris), 100):
        batch = track_uris[i:i+100]
        sp.playlist_add_items(playlist_id, batch)
        print(f"Successfully added {len(batch)} songs to the playlist.")

# Add the tracks in batches
if track_uris:
    add_tracks_in_batches(playlist_id, track_uris)
else:
    print("No songs were added to the playlist.")

Could not find track: Do You Think I'm Pretty by Kingfisher
Successfully added 100 songs to the playlist.
Successfully added 100 songs to the playlist.
Successfully added 83 songs to the playlist.


## Future Planning

### Improvements to Song Algorithm

1. Add songs that appear in one person's list to the combined list if the song has a certain popularity index value
3. Add a feature that is based on album?  
4. Play the game with friends and see how it is received and take suggestions

### Integration into Personal Website

1. Create page and navigation to page to play the game  
2. Integrate Spotify API so I can play the songs from the site on shuffle from a play button  
3. Make an option to pick a playlist from a spotify login or to upload n files and then run the algorithm to create the playlist
4. Integrate buzzer api into the site so you can buzz in from your own phone
5. Pause the song when the buzzer is clicked by a user and then on the site allow an input a text option for the title of the song
6. Check to see if the inputted song is correct and keep user scores  
7. If correct add point to user who input score correctly, if wrong, give buzzed user a 5 second no buzzing allowed and continue playing the song

#### Nice to Have  
1. Some level of leeway in whether or not you inputted the song title correctly  
2. Database to persist scores  
3. Ask for more once this is working