<a href="https://colab.research.google.com/github/TheCodingCvrlo/TheCodingCvrlo/blob/main/pca.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Carlo Antonio Patti, 3134395
### Bocconi University, May 2023
A notebook written in fulfillment of the requirements for the Degree of Bachelor of Science in Economics, Management and Computer Science


## System Setup

In [None]:
from google.colab import drive
drive.mount("drive", force_remount = True)

Mounted at drive


In [None]:
!pip install qrpca --quiet #gpu implementation of pca

In [None]:
#@title Imports
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import OrdinalEncoder
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
import torch
import pickle as pk
from copy import deepcopy as cp

#viz
import matplotlib.pyplot as plt
import seaborn as sns


#extra
from qrpca.decomposition import qrpca
from qrpca.decomposition import svdpca

In [None]:
#@title Device selection
# default pytorch device selection snippet (credits https://pytorch.org/tutorials/beginner/basics/quickstart_tutorial.html)
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

print(f"Using {device} device")

Using cuda device


In [None]:
PATH_FILTERED = "drive//MyDrive//thesis//data//filtered"

PATH_DF_30 = PATH_FILTERED +"//df_30.csv"
PATH_DF_100 = PATH_FILTERED +"//df_100.csv"

In [None]:
# df_30 = pd.read_csv(PATH_DF_30)
df_100 = pd.read_csv(PATH_DF_100)

# Data Split

In [None]:
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

def split_data(PATH, test_size=0.15):

  df = pd.read_csv(PATH, index_col=0)

  oh_enc = OneHotEncoder(sparse_output=False)
  labels_oh = oh_enc.fit_transform(np.array(df['artist']).reshape(-1,1))

  labels_df = pd.DataFrame(labels_oh, index=df.index, columns=['lab_'+i for i in oh_enc.categories_[0]])

  df_full = pd.merge(df, labels_df, left_index=True, right_index=True)

  feats_pattern = r'^\d+$'
  feats_cols = df.filter(regex=feats_pattern).columns
  labs_cols = ['lab_'+i for i in oh_enc.categories_[0]]

  targets = df_full.label

  train_idx, valid_idx = train_test_split(
      np.arange(len(targets)),
      test_size=test_size,
      shuffle=True,
      stratify=targets
      )

  df_train = df_full.iloc[train_idx]
  df_test = df_full.iloc[valid_idx]

  train_feats = df_train[feats_cols].values
  train_labs = df_train[labs_cols].values

  test_feats = df_test[feats_cols].values
  test_labs = df_test[labs_cols].values

  del df, df_full

  return train_feats, train_labs, test_feats, test_labs, df_train, df_test

In [None]:
trf30, trl30, tsf30, tsl30, df_train_30, df_test_30 = split_data(PATH_DF_30)

EmptyDataError: ignored

In [None]:
trf100, trl100, tsf100, tsl100, df_train_100, df_test_100 = split_data(PATH_DF_100)

## Dimensionality Reduction

PCA is computed on the train set and then transferred onto the test set to avoid information leakage and therefore performance overestimation

In [None]:
# train_feats_30 = torch.from_numpy(trf30)
train_feats_100 = torch.from_numpy(trf100)

# test_feats_30 = torch.from_numpy(tsf30)
test_feats_100 = torch.from_numpy(tsf100)

In [None]:
%%time
pca30 = qrpca(n_component_ratio = 4799, device=device)


train_feats_30_pca = pca30.fit_transform(train_feats_30)
test_feats_30_pca = pca30.transform(test_feats_30)

In [None]:
%%time
pca100 = PCA(n_components=256)


train_feats_100_pca = pca100.fit_transform(train_feats_100)
test_feats_100_pca = pca100.transform(test_feats_100)

In [None]:
#@title Plot
import seaborn as sns
import plotly.express as px
from plotly import graph_objects as go
from plotly.subplots import make_subplots


fig = px.line(x=np.arange(4799), y=np.cumsum(pca30.explained_variance_ratio.cpu()))


# fig.update_annotations(
#     font_size=20,
#     font={
#         "family": "libertine"
#         }
# )

fig.update_layout(
    plot_bgcolor='white',
    showlegend=False,
    title_font_family="libertine",
    font = dict(
        size = 16
    ),
    width = 1000,
    height = 1000,
    yaxis_range=[0.2,1.1]
)

fig.update_xaxes(
    mirror=True,
    showline=False,
    linecolor='black',
    gridcolor='lightgrey',
    showticklabels=True
)
fig.update_yaxes(
    nticks=7,
    mirror=True,
    showline=False,
    linecolor='black',
    gridcolor='lightgrey',
    showticklabels=True
)

fig.add_hline(y=0.95,
              line={
                  'color':'red'
              })
fig.add_vline(x=256)

fig.add_annotation(x=500,
                   y=0.3,
                   text='x = 256',
                   showarrow=False)

fig.add_annotation(x=-100,
                   y=0,
                   text='',
                   showarrow=False)

fig.update_yaxes(title_text="explained variance")
fig.update_xaxes(title_text="n features")

In [None]:
def export_df(feats, labs, origin, PATH):
  os.makedirs(PATH, exist_ok=True)
  cols_transfer = ['title', 'artist', 'label']
  df_pca = pd.DataFrame(feats[:,:256], index=origin.index)
  df_pca[cols_transfer] = origin[cols_transfer]


  df_labs = pd.DataFrame(labs)

  df_pca.to_csv(PATH+"//features.csv")
  df_labs.to_csv(PATH+"//labels.csv")

In [None]:
PATH_TRAIN_30 = "drive//MyDrive//thesis//data//reduced//30//train"
PATH_TEST_30 = "drive//MyDrive//thesis//data//reduced//30//test"
PATH_TRAIN_100 = "drive//MyDrive//thesis//data//reduced//100//train"
PATH_TEST_100 = "drive//MyDrive//thesis//data//reduced//100//test"

In [None]:
# export_df(train_feats_30_pca, trl30, df_train_30, PATH=PATH_TRAIN_30)
export_df(train_feats_100_pca, trl100, df_train_100, PATH=PATH_TRAIN_100)
# export_df(test_feats_30_pca, tsl30, df_test_30, PATH=PATH_TEST_30)
export_df(test_feats_100_pca, tsl100, df_test_100, PATH=PATH_TEST_100)

Carlo Antonio Patti, Bocconi University, July 2023

In [None]:
df_30 = cp(df_train_30)

In [None]:
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline


scaler = StandardScaler()
projector = TSNE()

pipe_tsne = Pipeline([
    ('scaler', scaler),
    ('proj', projector)
])

In [None]:
feats_pattern = r'^\d+$'
feats_30 = df_30.filter(regex=feats_pattern).values

In [None]:
artists = df_30.artist.unique()

In [None]:
df_tsne = pd.DataFrame(pipe_tsne.fit_transform(feats_30), columns = ['x', 'y'])
df_tsne['artist'] = df_30.artist
# df_tsne = pd.merge(df_tsne, df_genres, how='inner', left_on='artist', right_on='artist')

KeyboardInterrupt: ignored

In [None]:
df_tsne['artist'] = df_30.artist.values

In [None]:
df_tsne

Unnamed: 0,x,y,artist
0,-15.408294,-10.506518,10cc
1,-21.470659,55.716858,10cc
2,-2.213358,10.129037,10cc
3,11.366096,-4.005829,10cc
4,4.565465,29.501245,10cc
...,...,...,...
11246,-11.822532,-68.168556,blink-182
11247,-12.424356,-66.162491,blink-182
11248,-2.297735,-28.925196,blink-182
11249,24.954039,-27.120293,blink-182


In [None]:

# importing the module
import json

PATH_QUERIES = "drive//MyDrive//thesis//data//queries"
PATH_JSON = PATH_QUERIES+f'//artist-genre.json'


# Opening JSON file
with open(PATH_JSON) as json_file:
  data = json.load(json_file)


data = {k: data[k].split(",") for k in data.keys()}
data = {k: [g.strip() for g in data[k]] for k in data.keys()}

In [None]:
PATH_GENRES = 'drive//MyDrive//thesis//data//flags//genres.txt'

with open(PATH_GENRES, 'r') as file:
  genres = file.read().splitlines()

print(genres)
genres=np.array(genres)

['rock', 'pop', 'rock-and-roll', 'metal', 'hip hop', 'disco', 'soul', 'k-pop', 'rap', 'r&b', 'country', 'edm', 'house', 'dance', 'funk', 'jazz', 'blues']


In [None]:
###################### PSEUDO CODE #################################
# loop over genres
#   loop over artist_genres
#     if genre is in artist_genres (fuzzy match):
#       genres[genre_idx] += 1
####################################################################
import re

def std_genre(artist_data, genres=genres):
  mask = np.zeros(len(genres), dtype=int)
  for i in range(len(genres)):
    g = genres[i]
    pattern = r'\b' + re.escape(g) + r'\b'
    for ag in artist_data:
      if re.search(pattern, ag):
        mask[i] +=1
  out = genres[np.argmax(mask)]
  return out

In [None]:
data_mono = {k: std_genre(data[k]) for k in list(data.keys())}
artist_col = list(data_mono.keys())
artist_vals = [data_mono[k] for k in data_mono.keys()]
df_genres = pd.DataFrame()
df_genres['artist'] = artist_col
df_genres['genre'] = artist_vals
df_genres.head()

Unnamed: 0,artist,genre
0,Chicago,rock
1,ABC,rock
2,James Taylor,rock
3,Frankie Valli & The Four Seasons,rock
4,Alan Jackson,country


In [None]:
df_merged = pd.merge(df_tsne, df_genres, how='inner', left_on='artist', right_on='artist')

In [None]:
import plotly.express as px
df_merged_small = df_merged[(df_merged.genre=='rap') | (df_merged.genre=='hip hop') | (df_merged.genre=='jazz') | (df_merged.genre=='rock')].groupby('artist').mean()
df_merged_small = pd.merge(df_merged_small, df_genres, how='inner', left_on='artist', right_on='artist')
fig = px.scatter(df_merged_small, x='x', y='y', color='genre', hover_data=['artist']) #hover_data = []
fig.update_traces(textposition='top left')
fig.for_each_trace(lambda t: t.update(textfont_color=t.marker.color))
fig.show()


The default value of numeric_only in DataFrameGroupBy.mean is deprecated. In a future version, numeric_only will default to False. Either specify numeric_only or select only columns which should be valid for the function.

