# Main analysis file for Spotify Comparison

## Setup

In [1]:
from dotenv import load_dotenv
load_dotenv()

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd
from pprint import pprint
from dataclasses import dataclass, field
from typing import *
from pprint import pprint
import jsons

import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
# set up and parsing of data
spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())

df = pd.read_csv('../resources/bubbleflexe-rv.csv', usecols=[1, 2], names=['bsides', 'tt'])
df = df.dropna()
df = df.drop(labels=0, axis=0).reset_index(drop=True)
for c in df.columns:
    df[c] = df[c].apply(lambda x: str(x).split(';'))
# unify the responded songs
df['unified'] = df['bsides'] + df['tt']

df.to_csv('export/responses_frame.csv')

In [3]:
# Utilities
def dictionaryFilter(d: dict, func) -> dict:
    '''Filter a dictionary d by items that result in true for func(k, v)'''
    return {k: v for k, v in d.items() if func(k, v)}

In [4]:
unique_bsides = set()
for i in df['bsides']:
    unique_bsides.update(i)

unique_tt = set()
for i in df['tt']:
    unique_tt.update(i)

unique_songs = unique_bsides.union(unique_tt)

## Data Introspection

In [5]:
# N=5,700 after cleaning non-responses (nans)
# note that there is no response where they gave favorite b-sides and no tt's, vice versa -- all nans in dataframe can be cleaned naively
# Verify no nans
count = 0
for x in df['unified']:
    if 'nan' in x:
        print(x)
        count+=1
print(count)

0


In [6]:
# TODO report stats on responses (e.g. dist of responses of b-sides, tt's, etc)

## Questions
1. For a given song, what is the frequency list of all other songs to it? -> Done
2. How to use the size of each individual response? -> **Not yet added**
3. What about normalizing for song popularity -> Using percents, Done

In [37]:
class Song:
    # todo make init based on name and artist make a spotify req to populate the object's name and artist
    def __init__(self, name: str, artist: str = 'Red Velvet', id: str = None, spotify_af: dict = None, tags: list = None):
        self.name = name
        self.artist = artist
        self.id = id or self.__get_track_id()
        self.spotify_af = spotify_af or self.__get_audio_analysis()
        self.tags = tags
    
    def __get_track_id(self):
        r = spotify.search(q=f'{self.name} artist:{self.artist}', type='track')
        return r['tracks']['items'][0]['id']
    
    def __get_audio_analysis(self):
        ft = spotify.audio_features(self.id)[0]
        delkeys = ['type', 'id', 'uri', 'track_href', 'analysis_url']
        [ft.pop(x) for x in delkeys]
        return ft

    def __repr__(self):
        # TODO see if this is safe
        return f'{self.name} - {self.artist} - {str(self.tags)}'

    def __str__(self):
        return f'{self.name} - {self.artist} - [{str(self.tags)}]'

    def __key(self):
        return (self.name) # TODO THIS IS ONLY DUE TO WORKING WITH RV CONTENT.
        # TODO find out a better way to make SongCollection like a dictionary but with regex matching for song name; make easier to extract from responses

    def __hash__(self):
        # note that we are heavily breaking convention here
        return hash(self.__key())

    def __eq__(self, o):
        if isinstance(o, Song):
            # if self.name == o.name and self.artist == o.artist:
            #     return True
            return self.__key() == o.__key()
        elif isinstance(o, str):
            # this is more useful for when there are multiple artists. we are only working with RV songs, so it's looser
            # if o == str(self)[:str(self).rfind('-')-1]:
            #     return True
            if o.lower() == self.name.lower():
                return True
        return False

@dataclass
class SongCollection:
    songs: List[Song]
    responses: pd.DataFrame
    count_adj_mat: pd.DataFrame
    count_list: pd.Series = field(init=False)

    def __post_init__(self):
        self.count_list = self.__get_count_list()
            
    def get(self, match) -> Song:
        '''Return matching Song object for query'''
        # TODO add this to the run attr of the class for methods that have a parameter songmatch. Advanced!
        if isinstance(match, Song):
            if match not in self.songs:
                raise Exception('Song object provided is not in the SongCollection\'s songs')
            return match
        return self.__get_handler(self.__get_by_name, match)

    def __get_handler(self, method, match):
        '''Wrapper method for handling output of searching by method with query match'''
        r = method(match)
        if len(r) > 1:
            raise Exception('too many songs matched')
        elif len(r) == 0:
            raise Exception('no matching Song found')
        return r[0]

    def __get_by_name(self, name):
        name = name.lower()
        return [s for s in self.songs if s.name.lower() == name]
        # r = list(filter(lambda s: s.name.lower() == name, self.songs))

    def __get_by_eq(self, obj):
        return [s for s in self.songs if s == obj]


    def __get_count_list(self) -> pd.Series:
        '''Return sorted dict representing # of times a Song was mentioned in responses for all Songs in the collection'''
        # this is equivalent to the diagonal in the adj matrix
        return pd.Series(np.diag(self.count_adj_mat), index=songs).sort_values()
        
    def get_song_count_list(self, songmatch) -> pd.Series:
        '''Return sorted dict of the count of other songs in the responses for a given Song (det. by songmatch)'''
        # this is requivallent to a row or column
        song = self.get(songmatch)
        return self.count_adj_mat[song].sort_values()

    def get_song_inbound_percent_list(self, songmatch, dig=2) -> pd.Series:
        '''
        Return the inbound percents for a song. 
        i.e. for each Song s in a given Song t's count list, divide s's value in t's count list by the count of s in the responses
        This gives us a way to see the percent of s's listeners that listen to t.
        '''
        song = self.get(songmatch)
        return (self.count_adj_mat[song]/self.count_list).round(decimals=dig).sort_values() # series division

    def get_song_outbound_percent_list(self, songmatch, dig=2) -> pd.Series:
        '''
        Return the outbound percents for a song
        i.e. for each Song s in a given Song t's count list, divide s's value in t's count list by the count of t in the responses
        This gives us a way to see the percent of t's listeners that listen to s.
        '''
        song = self.get(songmatch)
        return (self.count_adj_mat[song]/self.count_list[song]).round(decimals=dig).sort_values()
        
    
    



In [47]:
songs = [Song(name) for name in unique_songs]
with open('export/songs', 'w') as f:
    f.write(jsons.dumps(songs))

In [14]:
with open('export/songs', 'r') as f:
    songs = jsons.loads(f.read(), List[Song])

In [110]:
sdf = pd.DataFrame(0, index=songs, columns=songs)
def handleResponse(resp: List):
    '''Recursively handle a response and add it to the song dataframe'''
    if not len(resp):
        return
    last_song: str = resp.pop()
    sdf.at[last_song, last_song] += 1
    for song in resp:
        sdf.at[last_song, song] += 1
        sdf.at[song, last_song] += 1
    handleResponse(resp)

for r in df['unified']:
    handleResponse(r.copy())

sdf.to_csv('export/song_adj_mat.csv')

In [15]:
sdf = pd.read_csv('export/song_adj_mat.csv', index_col=0, header=0).set_axis(songs, axis=0).set_axis(songs, axis=1)

In [30]:
sdf['Milkshake']

Milkshake - Red Velvet - [None]                   1606
Cool World - Red Velvet - [None]                   472
'Cause It's You - Red Velvet - [None]              442
Lady's Room - Red Velvet - [None]                  373
Eyes Locked Hands Locked - Red Velvet - [None]     911
                                                  ... 
Look - Red Velvet - [None]                         979
Little Little - Red Velvet - [None]                724
Somethin Kinda Crazy - Red Velvet - [None]         532
Moonlight Melody - Red Velvet - [None]             674
RBB - Red Velvet - [None]                         1056
Name: Milkshake - Red Velvet - [None], Length: 92, dtype: int64

In [31]:
col = SongCollection(songs, df, sdf)

In [36]:
col.get_song_outbound_percent_list('Psycho')

First Time - Red Velvet - [None]           0.11
Rose Scent Breeze - Red Velvet - [None]    0.12
Stupid Cupid - Red Velvet - [None]         0.12
My Dear - Red Velvet - [None]              0.12
Lady's Room - Red Velvet - [None]          0.13
                                           ... 
In & Out - Red Velvet - [None]             0.65
Kingdom Come - Red Velvet - [None]         0.65
Peek-A-Boo - Red Velvet - [None]           0.66
Bad Boy - Red Velvet - [None]              0.79
Psycho - Red Velvet - [None]               1.00
Name: Psycho - Red Velvet - [None], Length: 92, dtype: float64

In [33]:
# de/serialization doesn't work on col

# with open('export/songcol.json', 'w') as f:
#     f.write(jsons.dumps(col))

# with open('export/songcol.json', 'r') as f:
#     col = jsons.loads(f.read(), SongCollection)

## Graphing, Network Analysis

### Questions on dataset
* Which songs have high mutual connectivity?

In [None]:
class CollectionStatistics:
    def __init__(self, col: SongCollection):
        self.col = col
    
    def get_mutual_pairs(self, cutoff=0.5, attr=None, attr_filter=None):
        '''Return all edges that satisfy the '''
