In [369]:
import pandas as pd
import numpy as np
import ast
import textwrap
import time

import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import seaborn as sns

import tensorflow_hub as hub
from sklearn.preprocessing import MinMaxScaler, MultiLabelBinarizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
from scipy.spatial.distance import cdist
from sklearn.metrics.pairwise import cosine_similarity
from joblib import Parallel, delayed

In [13]:
feat_df = pd.read_csv('data/feat_df.csv')
feat_df.columns
# song_feats.head()

Index(['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness',
       'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
       'uri', 'duration_ms', 'time_signature', 'name', 'album_name',
       'album_uri', 'artists'],
      dtype='object')

In [None]:
# Vibes
#  'danceability', 'energy', 'liveness', 'valence'

# Music technicals
#  -> Measure - continous
#      'loudness', 'tempo'
#  -> Measure - categorical
#      'key', 'mode', 'time_signature'
#  -> Instrumentality
#      'speechiness', 'acousticness', 'instrumentalness'


# EDA

In [None]:
fig = make_subplots(rows = 2, cols = 2, subplot_titles = ('danceability', 'energy', 'liveness', 'valence'), vertical_spacing = 0.1)

fig.add_trace(go.Histogram(x = song_feats.danceability, name = 'daceability'), row = 1, col = 1)
fig.add_trace(go.Histogram(x = song_feats.energy, name = 'energy'), row = 1, col = 2)
fig.add_trace(go.Histogram(x = song_feats.liveness, name = 'liveness'), row = 2, col = 1)
fig.add_trace(go.Histogram(x = song_feats.valence, name = 'valence'), row = 2, col = 2)

fig.update_layout(height = 850, width = 1000)

fig.show()

In [None]:
fig = make_subplots(rows = 1, cols = 2, subplot_titles = ('loudness', 'tempo'), vertical_spacing = 0.1)

fig.add_trace(go.Histogram(x = song_feats.loudness, name = 'loudness'), row = 1, col = 1)
fig.add_trace(go.Histogram(x = song_feats.tempo, name = 'tempo'), row = 1, col = 2)

fig.update_layout(height = 500, width = 1200)

fig.show()

In [None]:
fig = make_subplots(rows = 1, cols = 2, subplot_titles = ('key', 'mode'), vertical_spacing = 0.1)

fig.add_trace(go.Histogram(x = song_feats.key, name = 'key'), row = 1, col = 1)
fig.add_trace(go.Histogram(x = song_feats['mode'], name = 'mode'), row = 1, col = 2)

fig.update_layout(height = 500, width = 1200)

fig.show()

In [None]:
fig = make_subplots(rows = 3, cols = 1, subplot_titles = ('speechiness', 'acousticness', 'instrumentalness'), vertical_spacing = 0.1)

fig.add_trace(go.Histogram(x = song_feats.speechiness, name = 'key'), row = 1, col = 1)
fig.add_trace(go.Histogram(x = song_feats.acousticness, name = 'mode'), row = 2, col = 1)
fig.add_trace(go.Histogram(x = song_feats.instrumentalness, name = 'key'), row = 3, col = 1)

fig.update_layout(height = 850, width = 600)

fig.show()

# Feature Engineering

In [None]:
# Vibes
#  'danceability', 'energy', 'liveness', 'valence'

# Music technicals
#  -> Measure - continous
#      'loudness', 'tempo'
#  -> Measure - categorical
#      'key', 'mode', 'time_signature'
#  -> Instrumentality
#      'speechiness', 'acousticness', 'instrumentalness'

## Genre Feature
* Extract the artist_uris present on track and find matching genres from the artist_info data
    * Don't really need to artist name because you just need unique ID to do the search and match
* Possible feature: Primary artist
    * The "primary artist" or main artist in tracks with multiple artists
    * This is to possibly put heavier weights on the primary artist's genres as the featured artist should not affect the general vibe of the song as much

In [15]:
artist_df = pd.read_csv('data/artist_info.csv')
album_tracks = pd.read_csv('data/album_tracks.csv')
album_df = pd.read_csv('data/album_info.csv')

In [17]:
# artist_df['genres'].apply(ast.literal_eval)
artist_df['genres'] = artist_df['genres'].apply(ast.literal_eval)

In [19]:
# Get the artist_uris from

def extract_artist_uris(artist_raw):
    try:
        artist_liteval = ast.literal_eval(artist_raw)
        artist_list = [artist['uri'] for artist in artist_liteval]
        return(artist_list)
    except Exception as e:
        print(f'An error occurred: {e}')

start_time = time.time()

feat_df['artists_clean'] = feat_df['artists'].apply(extract_artist_uris)

print("--- %s seconds ---" % (time.time() - start_time))

--- 9.65678858757019 seconds ---


In [409]:
def extract_artist_name(artist_raw):
    artist_liteval = ast.literal_eval(artist_raw.item())
    artist_list = [artist['name'] for artist in artist_liteval]
    return(artist_list)

In [20]:
# getting the genres from artist into features

def get_genres(uri_list):
    try:
        all_genres = []
        for artist in uri_list:
            genre_series = artist_df['genres'].loc[artist_df['artist_uri'] == artist]
            for genre in genre_series:
                all_genres.extend(genre)
        return(all_genres)
    except Exception as e:
        print(f'An error occurred: {e}')

start_time = time.time()

feat_df['genres'] = feat_df['artists_clean'].apply(lambda x: get_genres(x))

print("--- %s seconds ---" % (time.time() - start_time))

--- 95.94767260551453 seconds ---


## Feature Scaling and Vectorization

In [22]:
# Creating copy of dataframe
feats_1 = feat_df.copy()

In [23]:
min_max_scaler = MinMaxScaler()

# Scaling the song attribute measures
scaled_vectors = min_max_scaler.fit_transform(feat_df[['loudness', 'tempo']])
feats_1[['loudness_scaled', 'tempo_scaled']] = scaled_vectors

# Scaling instrumentality features
instr_vectors = min_max_scaler.fit_transform(feat_df[['speechiness', 'acousticness', 'instrumentalness']])
feats_1[['speechiness_scaled', 'acousticness_scaled', 'instrumentalness_scaled']] = instr_vectors

In [24]:
# Vectorize the categorical data
mlb = MultiLabelBinarizer()

# Song attributes
feats_1 = pd.get_dummies(feats_1, columns = ['key', 'mode', 'time_signature'])
feats_1.drop(columns = ['name', 'album_name', 'album_uri', 'artists', 'artists_clean'], inplace = True)

In [25]:
# Vibes vector
feats_1['vibe_vector'] = feats_1[['danceability', 'energy', 'valence', 'liveness']].values.tolist()

In [26]:
# Creating genre matrix
mlb = MultiLabelBinarizer()
genre_vectors = mlb.fit_transform(feats_1['genres'])
genre_df = pd.DataFrame(genre_vectors, columns = mlb.classes_)
genre_feat = pd.concat([feats_1.uri, genre_df], axis = 1)
genre_feat = genre_feat.set_index('uri')

# V2: Cosine Similarity (sklearn implementation)
* sklearn vs scipy vs creating your own with a function
    * sklearn's is a vectorized function
    * scipy seems to run it through a couple for loops (https://stackoverflow.com/questions/61490351/scipy-cosine-similarity-vs-sklearn-cosine-similarity)
    * creating your own function then .apply() or something may be the only way if you need to do it in cloud computing

## Sklearn

### Genre: Dimensionality Reduction

In [33]:
start_time = time.time()

pca = PCA(n_components=100)  # Reduce to 100 dimensions, for example
genre_feat_PCA = pd.DataFrame(pca.fit_transform(genre_feat), index = genre_feat.index)
    
print("--- %s seconds ---" % (time.time() - start_time))
# 10.69199252128601 seconds for a (178119 rows × 1214 columns) matrix/dataframe

--- 10.000387191772461 seconds ---


In [None]:
# 100,000 -> N/A, needs 75 GB
# 75,000 -> N/A, needs 41.9 GB
# 50,000 -> 45.036359548568726 seconds -> 0.000900727190971 seconds per
# 30,000 -> 16.268847942352295 seconds -> 0.000542294931412 seconds per
# 25,000 -> 4.4220030307769775 seconds -> 0.000176880121231 seconds per

In [45]:
# To be used on PCA reduced data
# Using the genre feature to reduce the overall amount of songs used in cosine similarity
#  When trying to find similarity, makes sense to reccomend like-genre songs like Rock to Rock or K-pop to K-pop

def batch_cosine_genre(df, uri):
    uri_slice = df.loc[df.index == uri]
    df_other = df.loc[~(df.index == uri)]
    
    top_res = []
    
    batch_size = 14999
    num_batches = len(df_other) // batch_size + (1 if len(df_other) % batch_size != 0 else 0)
    
    for i in range(num_batches):
        start_time = time.time()
        start = i * batch_size
        end = start + batch_size
        batch = df_other[start:end]
        batch = pd.concat([batch, uri_slice])
        
        batch_cs = cosine_similarity(batch)
        batch_cs_df = pd.DataFrame(batch_cs, index = batch.index)
        index_pos = batch_cs_df.index.get_loc(uri_slice.index[0])
        batch_sim_scores = batch_cs_df.iloc[:, index_pos]
        batch_top_1000 = batch_sim_scores.nlargest(1000).index
        top_res.append(batch_top_1000)
        print(f"Batch {i} time: --- %s seconds ---" % (time.time() - start_time))
        # time.sleep(0.1)
    
    # Length of top_res is not actually (num_batches * 1000) because there are duplicated values of the uri_slice in each batch
    #  so (num_batches * 1000) - (num_batches)
    top_res_list = [item for sublist in top_res for item in sublist]
    # top_res_select = df_other[df_other.index.isin(top_res_list)]
    return(top_res_list)
          

In [435]:
# Goes back to the feature dataframe and performs cosine
def get_rec(df, tracks_df, uri, uri_list, top_n):
    df_select = df[df.uri.isin(uri_list)]
    df_select = df_select.set_index('uri')
    df_select = df_select.drop(columns = ['genres', 'vibe_vector']) # ad hoc

    if len(uri_list) < 15000:
        cs_res = cosine_similarity(df_select)
        cs_res = pd.DataFrame(cs_res, index = df_select.index)
        recs_raw =  cs_res[cs_res.index.get_loc(uri)].nlargest(top_n).index.to_list()
        
        target_slice = tracks_df[tracks_df.uri == uri]
        recs_df = tracks_df[tracks_df.uri.isin(recs_raw)]
        
        print(textwrap.dedent(f"""----- Original Song -----
        Song: {target_slice.name.item()}
        Album: {target_slice.album_name.item()} 
        Artist(s): {extract_artist_name(target_slice.artists)}
        ---------------------------------------------\n"""))
        
        for i in range(top_n):
            print(textwrap.dedent(f"""----- Recommendation {i + 1} -----
            Song: {recs_df.iloc[i]['name']}
            Album: {recs_df.iloc[i]['album_name']}
            Artist(s): {extract_artist_name(recs_df.iloc[i].artists)}
            ---------------------------------------------\n"""))
    return(recs_df)


In [437]:
t2 = get_rec(feats_1, album_tracks, test_uri, test_rec_set, 10)
t2

        
# slice = album_tracks[album_tracks.uri == test_uri]
# test = extract_artist_name(slice.artists)
# test

----- Original Song -----
        Song: Dos Celulares
        Album: Pisteando Con La Regida (Vol. 3) 
        Artist(s): ['Fuerza Regida']
        ---------------------------------------------



AttributeError: 'str' object has no attribute 'item'

In [221]:
# test_recs = get_rec(feats_1, test_uri, test_rec_set, 10)
# test_recs

t1 = feats_1[feats_1.uri.isin(test_rec_set)]
t1 = t1.set_index('uri')
t1 = t1.drop(columns = ['genres', 'vibe_vector'])
t1_cs = cosine_similarity(t1)
t1_cs_df = pd.DataFrame(t1_cs, index = t1.index)
t_recs = t1_cs_df[t1_cs_df.index.get_loc(test_uri)].nlargest(50).index.to_list()
t_recs += test_uri
fin_recs = album_tracks[album_tracks.uri.isin(t_recs)]

In [73]:
func_test_set = genre_feat_PCA[:100000]

test_uri = 'spotify:track:75ZkrYWgQtQjRdpsubetpG'

start_time = time.time()

test_rec_set = batch_cosine_genre(func_test_set, test_uri)

test_recs = get_rec(feats_1, test_uri, test_rec_set, 10)

print("--- %s seconds ---" % (time.time() - start_time))

Batch 0 time: --- 1.1830744743347168 seconds ---
Batch 1 time: --- 1.3191990852355957 seconds ---
Batch 2 time: --- 1.337620735168457 seconds ---
Batch 3 time: --- 1.3658716678619385 seconds ---
Batch 4 time: --- 1.302027702331543 seconds ---
Batch 5 time: --- 1.3494820594787598 seconds ---
Batch 6 time: --- 0.633575439453125 seconds ---


ValueError: could not convert string to float: 'spotify:track:54FL6e96e4hOh5JT47kC4A'

# (Deprecated) V1: AWS, Spark, and DIMSUM

In [None]:
# AWS Credentials and Settings
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_ACCESS_KEY'

os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
encoded_secret_key = secret_key.replace("/", "%2F").replace("+", "%2B")

aws_region = 'us-east-1'

s3 = boto3.client(
    service_name='s3',
    region_name=aws_region,
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key
)

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession, Row, Window
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, explode, col, collect_list, regexp_replace, split, expr, length, concat_ws, count, size, first, broadcast, monotonically_increasing_id
from pyspark.sql.types import ArrayType, StringType, IntegerType, FloatType, DoubleType
from pyspark import sql
import pyspark.pandas as ps

# Set up Spark
spark = SparkSession.builder \
    .appName("PicklesPlus") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint","s3." + aws_region + ".amazonaws.com") \
    .config("spark.executor.memory", "15g") \
    .config("spark.executor.cores", "2") \
    .config("spark.default.parallelism", "4") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

In [None]:
# Importing ML libraries
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.linalg import DenseVector, SparseVector
from pyspark.ml.stat import Correlation, ChiSquareTest, Summarizer

from pyspark.ml.feature import StringIndexer, Tokenizer, HashingTF, IDF, VectorAssembler, StandardScaler, OneHotEncoder, Normalizer, StopWordsRemover, CountVectorizer

from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

from pyspark.ml.clustering import KMeans

from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
main = spark.read.parquet('s3a://kagglespotify6k/raw/main_dataset.parquet/')

all_tracks = spark.read.parquet('s3a://kagglespotify6k/trusted/all_tracks/')


# main contains some duplicate columns
columns_to_drop = ['key', 'loudness', 'mode', 'tempo', 'time_signature']
main_merge = main.drop(*columns_to_drop)

# Join the 2 dataframes
merged_df = all_tracks.join(main_merge, all_tracks['track_uri'] == main_merge['track_uri'])
merged_df = merged_df.drop(main_merge['track_uri'])
merged_df = merged_df.withColumn('playlist_uris', expr("substring(split(playlist_uris, ':')[2], 1, length(split(playlist_uris, ':')[2]) - 2)"))
merged_df = merged_df.withColumn('playlist_uris_list', split('playlist_uris', ',\s'))

merged_df.write.mode('overwrite').parquet('s3a://kagglespotify6k/trusted/merged_main_tracks_all/')

# 28.79 seconds

In [None]:
feat_df2 = spark.read.parquet('s3a://kagglespotify6k/trusted/feature_df_all270k/')

# Aggregating the categorical features to avoid having to one hot encode with a for loop
feat_df2 = feat_df2.withColumn('categorical', F.array(col('key'),col('mode'), col('time_signature')))
feat_test = feat_df2.limit(900)
## First, vectorize the textual categorical features (genre and playlists) into genre_vec

# artists_genres & playlist_uris_list already tokenized -> [genre_a, genre_b, genre_c], skip straight to hashing
hashingTF_genres = HashingTF(numFeatures = 50, inputCol = 'artists_genres', outputCol = 'genre_hash')
idf_genres = IDF(inputCol = 'genre_hash', outputCol = 'genre_idf')

hashingTF_playlists = HashingTF(numFeatures = 50, inputCol = 'playlist_uris_list', outputCol = 'playlist_hash')
idf_playlists = IDF(inputCol = 'playlist_hash', outputCol = 'playlist_idf')

## Vectorizing the music technique categorical features (key, mode, time_signature)
# Referenced: https://stackoverflow.com/questions/35804755/apply-onehotencoder-for-several-categorical-columns-in-sparkmlib
#   StringIndexer only takes in 1 column at a time
hashingTF_categ = HashingTF(numFeatures = 50, inputCol = 'categorical', outputCol = 'categ_hash')
idf_categ = IDF(inputCol = 'categ_hash', outputCol = 'categ_idf')

## Vectorizing the confidence measure features (they are on a scale of 0 - 1.0)
conf_VecAssembler = VectorAssembler(inputCols = ['acousticness', 'instrumentalness', 'speechiness', 'valence', 'danceability', 'energy', 'liveness'], outputCol = 'conf_features')
# conf_scaler = StandardScaler(inputCol = 'conf_features', outputCol = 'scaled_conf_features')

## Final feature vector
final_VecAssembler = VectorAssembler(inputCols=['genre_idf', 'playlist_idf', 'categ_idf', 'conf_features'], outputCol='features')

## Pipeline
pipeline = Pipeline(stages = [hashingTF_genres, idf_genres, hashingTF_playlists, idf_playlists , hashingTF_categ, idf_categ, conf_VecAssembler, final_VecAssembler])

model = pipeline.fit(feat_test)
model_df = model.transform(feat_test)

# model_df.write().overwrite().save('s3a://kagglespotify6k/models/mile_6_model/mile_6_model_cos_sim/')

##########################################

# Run 1(30k): 26.94 seconds
# Run 2(100k): 23.79 seconds

In [None]:
# Referenced: https://stackoverflow.com/questions/34121258/cosine-similarity-via-dimsum-in-spark
# https://spark.apache.org/docs/1.2.2/api/java/org/apache/spark/mllib/linalg/distributed/RowMatrix.html
# https://stackoverflow.com/questions/57530010/spark-scala-cosine-similarity-matrix

# model_df = spark.read.parquet('s3a://kagglespotify6k/models/mile_6_model/mile_6_model_cos_sim/')
fin_feat = model_df.select('name','track_uri','artists_names', 'artists_genres', 'playlist_uris_list','features')
fin_feat.cache()

# Unique numeric index because number easier to manipulate than string identifier
simfeat_test = fin_feat.withColumn("index", monotonically_increasing_id())

# Convert the vectors
def row_to_vec(row):
    row_vec = Vectors.sparse(row['features'].size, row['features'].indices, row['features'].values)
    return IndexedRow(row['index'], row_vec)

# RDD for spark
indexed_rows = simfeat_test.rdd.map(row_to_vec)

# Create an IndexedRowMatrix
matrix = IndexedRowMatrix(indexed_rows)

# Compute similarity with DIMSUM
matrix_cos = matrix.toRowMatrix().columnSimilarities()

# Convert to DataFrame
sim_df1 = matrix_cos.entries.map(lambda e: (e.i, e.j, e.value)).toDF(['index1', 'index2', 'similarity'])

sim_df2 = sim_df1.join(simfeat_test.select('index', 'track_uri', 'name', 'artists_names'), sim_df1.index2 == simfeat_test.index, 'left')\
    .withColumnRenamed('track_uri', 'rec_track_uri').withColumnRenamed('name', 'rec_song').withColumnRenamed('artists_names', 'rec_artists').drop('index')\
    .join(simfeat_test.select('index', 'track_uri', 'name', 'artists_names'),sim_df1.index1 == simfeat_test.index, 'left')\
    .withColumnRenamed('track_uri', 'orig_track_uri').withColumnRenamed('name', 'orig_song').withColumnRenamed('aritsts_names', 'orig_artists').drop('index')

In [None]:
def song_rec_m2(track_uri, num_rec,sim_df):
    fin_rec = sim_df.filter((sim_df['orig_track_uri'] == track_uri) & (sim_df['rec_track_uri'] != track_uri)).orderBy(col("similarity").desc()).limit(num_rec)
    songs = fin_rec.collect()
    print(f'Original Song: {songs[0]["orig_song"]} by {songs[0]["artists_names"]}')
    for num, row in enumerate(songs, start = 1):
        print(f"{num}) {row['rec_song']} by {row['artists_names']}, Score: {row['similarity']}")


song_rec_m2('008wXvCVu8W8vCbq5VQDlC', 10, sim_df2)