In [1]:
import numpy as np
import matplotlib.pyplot as plt
import multiprocessing as mp
from joblib import Parallel, delayed

In [2]:
# Vectorized
class KNNClassifier:
    def __init__(self, k, distance_metric='euclidean'):
        self.k = k
        self.distance_metric = distance_metric

    def fit(self, X, y):
        self.X = X
        self.y = y

    def predict(self, X):
        distances = self._distance(X, self.X)
        indices = np.argsort(distances, axis=1)[:, :self.k]
        labels = self.y[indices]
        predictions = np.zeros(X.shape[0])
        for i in range(X.shape[0]):
            predictions[i] = np.argmax(np.bincount(labels[i]))
        return predictions
    
    def _distance(self, x1, x2):
        if self.distance_metric == 'euclidean':
            return np.sqrt(np.sum((x1[:, np.newaxis] - x2) ** 2, axis=2))
        elif self.distance_metric == 'manhattan':
            return np.sum(np.abs(x1[:, np.newaxis] - x2), axis=2)
        elif self.distance_metric == 'cosine':
            x1_norm = np.linalg.norm(x1, axis=1)
            x2_norm = np.linalg.norm(x2, axis=1)
            return 1 - np.dot(x1, x2.T) / np.outer(x1_norm, x2_norm)
        else:
            raise ValueError('Unknown distance metric')
    

In [97]:
# Vectorized + Parallelized
class KNNClassifier_P:
    def __init__(self, k, distance_metric='euclidean'):
        self.k = k
        self.distance_metric = distance_metric
        
    def fit(self, X, y):
        self.X = X
        self.y = y
        
    def predict(self, x):
        y_pred = np.zeros(x.shape[0])
        # print("ha,x.shape[0]")
        # Parallel Processing
        y_pred = Parallel(n_jobs=8)(delayed(self._predict_one)(x[i]) for i in range(x.shape[0]))
        
        
        return np.array(y_pred)
    
    def _distance(self, x1, x2):
        # Distance Metric calculations are vectorized
        # print('in distance')
        if self.distance_metric == 'euclidean':
            return np.sqrt(np.sum((x1[np.newaxis:] - x2) ** 2, axis=1))
        elif self.distance_metric == 'manhattan':
            return np.sum(np.abs(x1[np.newaxis:] - x2), axis=1)
        elif self.distance_metric == 'cosine':
            x1 = x1[np.newaxis, :]
            dot_product = np.dot(x1, x2.T)
            # print(dot_product.shape)
            norm_x1 = np.linalg.norm(x1, axis=1)
            norm_x2 = np.linalg.norm(x2, axis=1)
            cosine_sim = dot_product / (norm_x1 * norm_x2.T)
            return 1 - cosine_sim
        else:
            raise ValueError('Unknown distance metric')
            
    
    def _predict_one(self, x):
        # print('in predict one')
        distances = self._distance(x, self.X)
        distances = distances.reshape(-1)
        indices = np.argsort(distances)[:self.k]
        labels = self.y[indices]
        # print(labels.shape)
        return np.argmax(np.bincount(labels))

In [100]:
import time

# test the classifier
np.random.seed(0)
X = np.random.rand(10000, 2)
print(X.shape)
y = (X[:, 0] + X[:, 1] > 1).astype(int)
print(y.shape)
knn = KNNClassifier_P(3, distance_metric='euclidean')
knn.fit(X, y)
start_time = time.time()
y_pred = knn.predict(X)
end_time = time.time()
print("Accuracy:", np.mean(y == y_pred))

print("Time:", end_time - start_time)

# h = .01
# x_min, x_max = X[:, 0].min() - 0.1, X[:, 0].max() + 0.1
# y_min, y_max = X[:, 1].min() - 0.1, X[:, 1].max() + 0.1
# xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
#                      np.arange(y_min, y_max, h))
# Z = knn.predict(np.c_[xx.ravel(), yy.ravel()])
# Z = Z.reshape(xx.shape)
# plt.contourf(xx, yy, Z, alpha=0.8)
# plt.scatter(X[:, 0], X[:, 1], c=y)
# plt.show()


(10000, 2)
(10000,)
Accuracy: 0.9974
Time: 1.0891847610473633


In [None]:
import pandas as pd

filepath = '../../data/external/spotify.csv'

df = pd.read_csv(filepath)

X = df[['acousticness', 'danceability','valence']].values
# y = df['target'].values
# rename the first col to id
df.rename(columns={df.columns[0]: 'id'}, inplace=True)

y = df['track_genre'].values

# print(X)
print(y)

from sklearn.preprocessing import LabelEncoder

# Assuming df is your DataFrame

y = df['track_genre'].values

# Create a LabelEncoder object
le = LabelEncoder()

# Fit and transform the data
y_encoded = le.fit_transform(y)

print(y_encoded)

# print the classes and its corresponding encoding
encoded_classes_dict = dict(zip(le.classes_, le.transform(le.classes_)))
print(encoded_classes_dict)
# save this into a csv
pd.DataFrame.from_dict(encoded_classes_dict, orient='index').to_csv('encoded_classes.csv', header=False)




['acoustic' 'acoustic' 'acoustic' ... 'world-music' 'world-music'
 'world-music']
[  0   0   0 ... 113 113 113]
{'acoustic': 0, 'afrobeat': 1, 'alt-rock': 2, 'alternative': 3, 'ambient': 4, 'anime': 5, 'black-metal': 6, 'bluegrass': 7, 'blues': 8, 'brazil': 9, 'breakbeat': 10, 'british': 11, 'cantopop': 12, 'chicago-house': 13, 'children': 14, 'chill': 15, 'classical': 16, 'club': 17, 'comedy': 18, 'country': 19, 'dance': 20, 'dancehall': 21, 'death-metal': 22, 'deep-house': 23, 'detroit-techno': 24, 'disco': 25, 'disney': 26, 'drum-and-bass': 27, 'dub': 28, 'dubstep': 29, 'edm': 30, 'electro': 31, 'electronic': 32, 'emo': 33, 'folk': 34, 'forro': 35, 'french': 36, 'funk': 37, 'garage': 38, 'german': 39, 'gospel': 40, 'goth': 41, 'grindcore': 42, 'groove': 43, 'grunge': 44, 'guitar': 45, 'happy': 46, 'hard-rock': 47, 'hardcore': 48, 'hardstyle': 49, 'heavy-metal': 50, 'hip-hop': 51, 'honky-tonk': 52, 'house': 53, 'idm': 54, 'indian': 55, 'indie': 56, 'indie-pop': 57, 'industrial': 58, 

In [None]:
# test the classifier
# Load the spotify dataset
import pandas as pd

pathfile = './spotify_encoded.csv'
df = pd.read_csv(pathfile)
X = df[['acousticness', 'danceability', 'valence']].values
y = df['track_genre'].values

knn = KNNClassifier_P(3, distance_metric='euclidean')
knn.fit(X, y)
start_time = time.time()
y_pred = knn.predict(X)
end_time = time.time()
print("Accuracy:", np.mean(y == y_pred))
print("Time:", end_time - start_time)


KeyboardInterrupt: 

In [None]:
# test the classifier
# Load the spotify dataset
import pandas as pd

pathfile = './spotify_encoded.csv'
df = pd.read_csv(pathfile)
X = df[['acousticness', 'danceability', 'valence','energy']].values
y = df['track_genre'].values

# split the data
np.random.seed(0)
indices = np.random.permutation(X.shape[0])
X = X[indices]
y = y[indices]

# split 80 10 10 train test validation
n = X.shape[0]
X_train = X[:int(0.8*n)]
y_train = y[:int(0.8*n)]
X_test = X[int(0.8*n):int(0.9*n)]
y_test = y[int(0.8*n):int(0.9*n)]
X_val = X[int(0.9*n):]
y_val = y[int(0.9*n):]

knn = KNNClassifier_P(3, distance_metric='euclidean')
knn.fit(X_train, y_train)
start_time = time.time()
y_pred = knn.predict(X_train)
end_time = time.time()
print("Accuracy:", np.mean(y_val == y_pred))
print("Time:", end_time - start_time)


KeyboardInterrupt: 

In [None]:
pathfile_train = '../../data/external/spotify-2/train.csv'
pathfile_test = '../../data/external/spotify-2/test.csv'
pathfile_val = '../../data/external/spotify-2/validate.csv'

df_train = pd.read_csv(pathfile_train)
df_test = pd.read_csv(pathfile_test)
df_val = pd.read_csv(pathfile_val)

# Fit and transform the data
df_train['track_genre'] = le.transform(df_train['track_genre'])
df_test['track_genre'] = le.transform(df_test['track_genre'])
df_val['track_genre'] = le.transform(df_val['track_genre'])

X_train = df_train[['acousticness', 'danceability', 'valence','energy','liveness']].values
y_train = df_train['track_genre'].values

X_test = df_test[['acousticness', 'danceability', 'valence','energy','liveness']].values
y_test = df_test['track_genre'].values

X_val = df_val[['acousticness', 'danceability', 'valence','energy','liveness']].values
y_val = df_val['track_genre'].values

knn = KNNClassifier_P(8)
knn.fit(X_train, y_train)
start_time = time.time()
y_pred = knn.predict(X_test)
end_time = time.time()
print("Accuracy:", np.mean(y_test == y_pred))
print("Time:", end_time - start_time)


Accuracy: 0.1163157894736842
Time: 15.230531215667725
