In [None]:
import numpy as np
import pandas as pd
import collections
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.cluster import OPTICS, KMeans, DBSCAN
from sklearn.metrics.cluster import adjusted_rand_score
from sklearn.preprocessing import MinMaxScaler, StandardScaler

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Create heatmap for genre overlaps

In [None]:
genres = list(set(df["genre"]))
genreDict = {}
for x in range(len(genres)):
    genreDict[genres[x]] = x
genresHeat = np.zeros((len(genres), len(genres)))

In [None]:
last, lastG = None, None
genresPercent = collections.defaultdict(int)

for i, row in sDF.iterrows():
    if row["track_name"] == last:
        genresPercent[(genreDict[lastG], genreDict[row["genre"]])] += 1
        genresPercent[(genreDict[row["genre"]], genreDict[lastG])] += 1
    else:
        last = row["track_name"]
        lastG = row["genre"]
for k, v in genresPercent.items():
    genresHeat[k[0]][k[1]] = v / len(df[df["genre"] == genres[k[0]]])

In [None]:
mask = np.zeros_like(genresHeat)
mask[np.triu_indices_from(mask)] = True

with sns.axes_style("white"):
    f, ax = plt.subplots(figsize=(7, 5))
    sns.heatmap(genresHeat, xticklabels=genres, yticklabels=genres, mask=mask)

In [None]:
groups = []
seen = []
for x in range(len(genresHeat)):
    for y in range(len(genresHeat[0])):
        if genresHeat[x][y] >= 0.15 and (genres[x], genres[y]) not in seen:
            groups.append([genres[x], genres[y]])
            seen.append((genres[x], genres[y]))
            seen.append((genres[y], genres[x]))
print(groups)

In [None]:
#download and fix data by groups
df = pd.read_csv("/content/drive/MyDrive/5112_final/SpotifyFeatures.csv")
df.columns
for i, row in df.iterrows():
    if row["genre"] == 'Children’s Music':
        df.at[i, "genre"] = "Children's Music"

In [None]:
groups = [["Children's music", "Alternative", "Rock", "Folk", "Indie"], ["R&B", "Dance", "Soul", "Pop", "Rap", "Hip-Hop"], ["A capella", "Jazz"]]

In [None]:
c = len(groups)
print(groups)
labelDict = {}
for i, val in enumerate(groups):
    for x in val:
        labelDict[x] = i
print(labelDict)
for i, r in df.iterrows():
    if r["genre"] not in labelDict:
        labelDict[r["genre"]] = c
        c += 1

In [None]:
#Create subDF to work with
df2 = pd.get_dummies(df, columns=['key', 'mode'])
df2 = df2.drop(['duration_ms', 'time_signature'], axis=1)
genreDF = pd.concat([df2[df2["genre"] == "Hip-Hop"], df2[df2["genre"] == "Children's Music"][:10000], df2[df2["genre"] == "Opera"]]).reset_index()
labels = df2.apply(lambda row: labelDict[row["genre"]], axis = 1)[:10000]
gLabels = genreDF.apply(lambda row: labelDict[row["genre"]], axis = 1)

In [None]:
scaler = MinMaxScaler()
data = genreDF[cols].to_numpy()
data = scaler.fit_transform(data)

In [None]:
opticsRandScores = []
minSamples = [50, 100, 200, 400, 800, 1000, 2000]
cols = genreDF.columns[5:15]
heatmap = np.zeros((10, 10))

seen = {}
"""
#Find optimal clusterings
for x in range(5, 15):
    for y in range(5, 15):
        if x == y:
            continue
        if (x, y) in seen or (y, x) in seen:
            heatmap[x - 5][y - 5] = seen[(x, y)]
            continue
        #scaler = StandardScaler()
        d3 = data[:, [x - 5, y - 5]]
        #kmeans = DBSCAN(eps=0.1, min_samples=600).fit_predict(d3)
        #kmeans = OPTICS(eps=0.1, min_samples=600).fit_predict(d3)
        kmeans = KMeans(n_clusters = 3, init='k-means++', n_init=26).fit_predict(data)
        s = adjusted_rand_score(gLabels, kmeans)
        heatmap[x - 5][y - 5] = s
        seen[(x, y)] = s
        seen[(y, x)] = s
        print(s)
#Work with OPTICS/DBSCAN
for x in minSamples:
    d3 = data[:, [0, 2]]
    kmeans = DBSCAN(min_samples=x).fit_predict(d3)
    kmeans = OPTICS(min_samples=x).fit_predict(d3)
    s = adjusted_rand_score(gLabels, kmeans)
    opticsRandScores.append(s)
    print(s)
"""

In [None]:
plt.plot(minSamples, [0.21503370910119335, 0.03757034165012847, 0.007834085584663865, 0, 0, 0, 0])
plt.xlabel("Min Samples")
plt.ylabel("Rand Score")
plt.title("OPTICS")
plt.plot()

In [None]:
mask = np.zeros_like(heatmap)
mask[np.triu_indices_from(mask)] = True

with sns.axes_style("white"):
    f, ax = plt.subplots(figsize=(7, 5))
    sns.heatmap(heatmap, xticklabels=genreDF.columns[5:15], yticklabels=genreDF.columns[5:15], mask=mask, )

In [None]:
x = "popularity"
y = "valence"
print(labels)
scatterColors = ['black', 'green', 'red', 'purple',
                     'orange', 'yellow', 'aqua', 'forestgreen', 'slategrey', 'magenta', 'pink']
helper = {"Hip-Hop": 0, "Children's music": 1, "Opera": 2}

f = genreDF[genreDF["genre"] == "Hip-Hop"]
p = genreDF[genreDF["genre"] == "Children's Music"]
j = genreDF[genreDF["genre"] == "Opera"]
plt.scatter(f[x].to_numpy(), f[y].to_numpy(), c = "red", label="Hip-Hop", alpha=0.3)
plt.scatter(p[x].to_numpy(), p[y].to_numpy(), c = "blue", label="Children's music", alpha=0.3)
plt.scatter(j[x].to_numpy(), j[y].to_numpy(), c = "green", label="Opera", alpha=0.3)

plt.legend()
plt.xlabel(x)
plt.ylabel(y)
plt.title("Genre Distributions")
plt.plot()

In [None]:
def plotRes(data, clusterRes, clusterNum):
    nPoints = len(data)
    scatterColors = ['black', 'green', 'brown', 'red', 'purple',
                     'orange', 'yellow', 'aqua', 'forestgreen', 'slategrey', 'magenta', 'pink']
    for i in range(min(clusterRes), clusterNum):
        if (i==0):
            #Plot all noise point as blue
            color='blue'
        else:
            color = scatterColors[i % len(scatterColors)]
        x1 = [];  y1 = []
        for j in range(nPoints):
            if clusterRes[j] == i:
                x1.append(data[j, 0])
                y1.append(data[j, 1])
        plt.scatter(x1, y1, c=color, alpha=0.3, marker='.')
    plt.title("Kmeans")
    plt.xlabel("popularity")
    plt.ylabel("valence")

In [None]:
kmeans = KMeans(n_clusters = 3, init='k-means++', n_init=26).fit_predict(data[:, [0, -1]])
#kmeans = OPTICS(eps=0.3, min_samples=500).fit_predict(data[:, [0, 2]])
s = adjusted_rand_score(gLabels, kmeans)
print(s)

In [None]:
plotRes(data[:, [0, -1]], kmeans, max(kmeans) + 1)

In [None]:
#Our own implementation of dbscan
class DBScan():
    def __init__(self, data, eps, minpts):
        self.epsilon = eps
        self.minpts = minpts
        self.data = data
        
    def pointsWithin(self, index, point):
        cluster = []
        for i, val in enumerate(self.data):
            if i == index:
                continue
            if np.linalg.norm(point - val) <= self.epsilon:
                cluster.append(i)
        
        return cluster
        
    def dbscan(self):
        pointLabels = ["noise"] * len(self.data)
        pointClusters = {}
        visited = collections.defaultdict(bool)
        for i, point in enumerate(self.data):
            if visited[i]:
                continue
            visited[i] = True
            cluster = self.pointsWithin(i, point)
            edge = False
            if len(cluster) >= self.minpts:
                for ind in cluster:
                    visited[ind] = True
                pointLabels[i] = "core"
            else:
                for ind in cluster:
                    if pointLabels[ind] == "core":
                        pointLabels[i] = "border"
                        edge = True
                        break
                if not edge:
                    pointClusters[i] = cluster + [i]

        return pointClusters