In [1]:
import random
import pandas as pd
import numpy as np
import csv
import copy
import re

print("Start of data adjust")
data = pd.read_csv('./Dataset/data.csv')
row_swaps = ['acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'explicit',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'popularity',
 'speechiness',
 'tempo',
 'valence',
 'year',
 'artists',
 'name',
 'release_date',
 'id']

# Here's how you reduce sample the data set.
# sample_data will contain every 100th row. This is just as an example
# Should use more than a 100th of the data
sample_data = data.iloc[::, :]
sample_data.head()
sample_data = sample_data[row_swaps]

# Splitting the data into features and song profiles
song_profiles = sample_data[['id', 'name', 'artists', 'release_date', 'year']].copy()
onlyartists = song_profiles.copy().filter('artists').values
features = (sample_data.copy()
    .drop('name', axis=1)
    .drop('id', axis=1)
    .drop('release_date', axis=1)
    .values.tolist())

def RemoveMultiArtistSongs(features):
    expr = re.compile(r"',")
    output = []
    for row in features:
        artist_str = row[-1]
        if expr.match(artist_str) == None:
            output.append(row)
    return output

def GetOnly90sSongs(features):
    outfeatures = []
    for row in features:
        if row[-2] >= 1990.0 and row[-2] < 2000.0:
            outfeatures.append(row)
    return outfeatures

def GetArtistSubset(features, num):
    artists = set()
    out = []
    while len(artists) < num:
        row = random.sample(features, 1)
        artists.add(row[0][-1])
    
    for row in features:
        if (row[-1] in artists):
            out.append(row)
    return out, artists

def CalcNormalizations(features):
    maxes = [0 for _ in range(len(features[0]))]
    mins = [30000 for _ in range (len(features[0]))]
    for row in features:
        for i in range(len(row) - 1):
            if (abs(row[i]) > maxes[i]):
                maxes[i] = abs(row[i])
            if (row[i] < mins[i]):
                mins[i] = row[i]
    
    ofeatures = copy.deepcopy(features)
    length = len(ofeatures[0])
    
    for i in range(len(maxes)):
        maxes[i] -= mins[i]

    for row in ofeatures:
        for i in range(len(row) - 1):
            if (maxes[i] > 0):
                row[i] = (row[i] - mins[i])/maxes[0]

    return ofeatures

def GetNumClasses(ofeatures):
    classes = set()
    for row in ofeatures:
        classes.add(row[-1])
    return len(classes)

print("Removing songs with multiple artists...")
noMultipleArtists = RemoveMultiArtistSongs(features)
# print("Getting all songs from the 90's...")
# _90ssongs = GetOnly90sSongs(noMultipleArtists)
print("Retrieving artist subset...")
# ArtistSubsetSongs, Artists = GetArtistSubset(_90ssongs, 15)
ArtistSubsetSongs, Artists = GetArtistSubset(noMultipleArtists, 15)
Artists = list(Artists)
# Artists = onlyartists
print("Normalizing the data...")
normalized_data_with_classes = CalcNormalizations(ArtistSubsetSongs)
# num_classes = GetNumClasses(features)
print("Sorting into training and testing sets...")
train_features = random.sample(normalized_data_with_classes, len(normalized_data_with_classes)*4//5)
test_features = []
for row in normalized_data_with_classes:
    if row not in train_features:
        test_features.append(row)

print("Form data for tensorflow")
tf_train_features = []
tf_train_labels = []
tf_test_features = []
tf_test_labels = []

for row in train_features:
    tf_train_features.append(row[:-1])
    tf_train_labels.append(Artists.index(row[-1]))

for row in test_features:
    tf_test_features.append(row[:-1])
    tf_test_labels.append(Artists.index(row[-1]))
tf_dataset = (tf_train_features, tf_train_labels, tf_test_features, tf_test_labels)
print("Complete!")

Start of data adjust
Removing songs with multiple artists...
Retrieving artist subset...
Normalizing the data...
Sorting into training and testing sets...
Form data for tensorflow
Complete!


In [2]:
%%html
<style>
div.output_area pre {
    white-space: pre;
}
</style>

In [3]:
import math
import random
import copy

debug = False
info_dump = False

class Neuron:
    Weights = []
    Output = 1.0
    Delta = 0.5
    
    def __init__(self, layerWidth):
        self.Weights = [random.random() for _ in range(layerWidth + 1)]
    
    def Activate(self, inputs):
        activation_val = self.Weights[-1]
        if (debug):
            print("\tsanity check:\n\t\t", inputs, len(inputs), "\n\t\t", self.Weights, len(self.Weights))
        for i in range(len(inputs)):
            activation_val += inputs[i] * self.Weights[i]
        self.Output = math.tanh(activation_val)
        if (debug):
            print("\toutput sanity check:", self.Output)
        return self.Output
    
    def UpdateWeights(self, regressive_outputs, learn_rate):
        if (debug):
            print("UpdateWeights sanity check\n", regressive_outputs)
            print("\tUpdateWeights Lengths:", len(self.Weights), len(regressive_outputs))
        for i in range(len(regressive_outputs)):
            self.Weights[i] += learn_rate * self.Delta * regressive_outputs[i]
        self.Weights[-1] += learn_rate * self.Delta
    
    def UpdateDelta(self, error):
        self.Delta = error * (1 - np.tanh(self.Output)**2)

class DenseNeuralNetwork:
    def __init__(self, features, hidden_layers, output_length, learning_rate):
        self.RecurrentInputs = 0
        self.OutputLength = 0
        self.NeuronLayers = []
        self.LearningRate = 0.0
        print("Building neural network with", features, "features, and", hidden_layers, "hidden layers.")
        self.LearningRate = learning_rate
        self.OutputLength = output_length
        
        y_intercept = features
        slope = (output_length - y_intercept)/hidden_layers
        num_weights = y_intercept
        
        for i in range(hidden_layers + 1):
            layer_len = math.ceil((i*slope + y_intercept)) if (i < hidden_layers) else output_length
            print("\tBuilding layer", i, "with", layer_len, "neurons.")
            new_layer = [Neuron(num_weights) for _ in range(layer_len)]
            self.NeuronLayers.append(new_layer)
            num_weights = layer_len
        if (debug):
            print("init sanity check:")
            for i in range(len(self.NeuronLayers)):
                print("\tLayer ", i,
                      "Width:", len(self.NeuronLayers[i]),
                      "Weights:", len(self.NeuronLayers[i][0].Weights))
        print("Completed!")
    
    def ForwardPropagation(self, current_input):
        # setup previous layers "output" as the total input
        layer_output = copy.deepcopy(current_input)
        i = 0
        for layer in self.NeuronLayers:
            if (debug):
                print("FP through layer: ", i)
                i += 1
            next_layer_input = []
            for neuron in layer:
                next_layer_input.append(neuron.Activate(layer_output))
            layer_output = next_layer_input
        return layer_output
    
    def BackwardPropagation(self, expected_value):
        prev_layer = []
        for layer in reversed(self.NeuronLayers):
            layer_error = []
            if (debug):
                print("\t\tLengths: ", len(expected_value), ",", len(layer))
            if (layer is self.NeuronLayers[-1]):
                for i in range(len(layer)):
                    layer_error.append(expected_value[i] - layer[i].Output)
            else:
                for i in range(len(layer)):
                    error = 0.0
                    for neuron in prev_layer:
                        error += neuron.Weights[i] * neuron.Delta
                    layer_error.append(error)
            for i in range(len(layer)):
                layer[i].UpdateDelta(layer_error[i])
            prev_layer = layer

    def UpdateAllWeights(self, current_input):
        current_input = copy.deepcopy(current_input)
        layer_input = current_input
        if (debug):
            print("updateallweights sanity:", layer_input)
            i = 0
        for layer in self.NeuronLayers:
            if (debug):
                print("\tUpdate layer: ", i)
                i += 1
            next_layer_input = []
            for neuron in layer:
                neuron.UpdateWeights(layer_input, self.LearningRate)
                next_layer_input.append(neuron.Output)
            layer_input = next_layer_input

    def Train(self, training_data, training_labels, epochs):
        print("DNN starting training with", epochs, "epochs on",
            len(training_data),"pieces of data.")
        prev_inputs = []
        expected_similarity = 0.85
        blank = [-1 for _ in range(self.OutputLength)]

        for epoch in range(epochs):
            print("DNN Epoch:", epoch + 1)
            total_epoch_error = 0.0

            for i in range(len(training_data)):
                row = training_data[i]
                new_error = 0
                expected_value = copy.deepcopy(blank)
                expected_value[training_labels[i]] = 1
                
                if (debug):
                    print("Class", value[-1])
                output = self.ForwardPropagation(row)
                
                for i in range(len(expected_value)):
                    new_error = (expected_value[i] - output[i]) ** 2
                if (info_dump):
                    print("\tOutput: ", output, "\tExpected:", expected_value, "\tError: ", new_error)
                total_epoch_error += new_error
                self.BackwardPropagation(expected_value)
                self.UpdateAllWeights(expected_value)
                
            print("DNN Total Epoch Error:", total_epoch_error, "\n---------------------------------")

    def Classify(self, data_row):
        output = self.ForwardPropagation(data_row)
        max_val = -2
        outclass = 0
        for i in range(len(output)):
            if (output[i] > max_val):
                max_val = output[i]
                outclass = i
        return outclass

In [8]:
import os
import logging
import numpy as np
import pandas as pd
import copy
from sklearn.metrics import confusion_matrix
from sklearn import tree
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
import tensorflow as tf
from tensorflow import keras
from threading import Thread

tf.get_logger().setLevel('ERROR') # hopefully speeds up tensorflow by removing separate thread prints.

class Algorithm:
    TrainingData = []
    TrainingClasses = []
    Classes = []
    Model = None
    Accuracy = 0.0

    def __init__(self, training_data, training_labels, classes):
        pass

    def Train(self):
        pass

    def Test(self, testing_data, testing_classes) -> float:
        return 0.0
    
    def Classify(self, datum) -> int:
        return -1

class DecisionTreeWrapper(Algorithm):
    TrainingData = []
    TrainingClasses = []
    Classes = []
    Model = None
    Accuracy = 0.0
    Loss = 0.0

    def __init__(self, training_data, training_labels, classes):
        self.TrainingData = training_data
        self.TrainingClasses = training_labels
        self.Classes = classes
        self.Model = DecisionTreeClassifier()

    def Train(self):
        self.Model.fit(self.TrainingData, self.TrainingClasses)
    
    def Test(self, testing_data, testing_classes):
        Predictions = self.Model.predict(testing_data)
        self.Accuracy = accuracy_score(Predictions, testing_classes)
        return self.Accuracy
    
    def Classify(self, datum) -> int:
        return self.Model.predict([datum])[0]

class TensorNetworkWrapper(Algorithm):
    TrainingData = []
    TrainingClasses = []
    Classes = []
    Model = None
    Accuracy = 0.0
    Loss = 0.0

    def __init__(self, training_data, training_labels, classes):
        self.TrainingData = training_data
        self.TrainingClasses = training_labels
        self.Classes = classes
        self.Model = keras.Sequential([
            keras.layers.Flatten(input_shape=(15, 1)),
            keras.layers.Dense(units=15, activation=tf.nn.tanh),
            keras.layers.Dense(units=15, activation=tf.nn.tanh),
            keras.layers.Dense(units=15, activation=tf.nn.tanh),
            keras.layers.Dense(units=15, activation=tf.nn.tanh),
            keras.layers.Dense(units=15, activation=tf.nn.softmax)
        ])
        self.Model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

    def Train(self):
        self.Model.fit(self.TrainingData, self.TrainingClasses, epochs=200)
    
    def Test(self, testing_data, testing_classes):
        self.Loss, self.Accuracy = self.Model.evaluate(testing_data, testing_classes)
    
    def Classify(self, datum) -> int:
        class_predictions = self.Model.predict_classes([datum])
        max = -2
        index = -1
        for i in range(len(class_predictions)):
            if class_predictions[i] > max:
                max = class_predictions[i]
                index = i
        return index

class DenseNetworkWrapper(Algorithm):
    TrainingData = []
    TrainingClasses = []
    Classes = []
    Model = None
    Accuracy = 0.0
    Loss = 0.0

    def __init__(self, training_data, training_labels, classes):
        self.TrainingData = training_data
        self.TrainingClasses = training_labels
        self.Classes = classes
        # features, hidden_layers, output_length, learning_rate
        self.Model = DenseNeuralNetwork(len(training_data[0]) - 1, 5, len(classes), 0.25)

    def Train(self):
        self.Model.Train(self.TrainingData, self.TrainingClasses, 200)
    
    def Test(self, testing_data, testing_classes):
        total_pos = 0
        for j in range(len(tf_test_features)):
            row = testing_data[j]
            expected = testing_data[j]

            actual = self.Model.Classify(row[:-1])
            total_pos += 1 if (actual == expected) else 0

        self.Accuracy = total_pos / len(test_features)
        return self.Accuracy
    
    def Classify(self, datum) -> int:
        return self.Model.Classify(datum)

class EnsembleClassifier:
    Algorithms = []
    Weights = []
    Classes = []
    AlgoTrainData = []
    AlgoTrainClasses = []
    AlgoTestData = []
    AlgoTestClasses = []
    Accuracy = 0.0
    Loss = 0.0

    def __init__(self, training_dataset, classes):
        print("Building Ensemble...")
        self.Algorithms = []
        self.Weights = []
        self.AlgoTrainData = []
        self.AlgoTrainClasses = []
        self.AlgoTestData = []
        self.AlgoTestClasses = []
        self.Classes = classes
        self.Accuracy = 0.0
        self.Loss = 0.0
        
        train_set = random.sample(training_dataset, len(training_dataset) * 2 // 3)
        test_set = []

        for row in training_dataset:
            if row not in train_set:
                test_set.append(row)

        for row in train_set:
            self.AlgoTrainData.append(row[:-1])
            self.AlgoTrainClasses.append(self.Classes.index(row[-1]))

        for row in test_set:
            self.AlgoTestData.append(row[:-1])
            self.AlgoTestClasses.append(self.Classes.index(row[-1]))

        self.Algorithms = [
            DecisionTreeWrapper(self.AlgoTrainData, self.AlgoTrainClasses, self.Classes),
            DenseNetworkWrapper(self.AlgoTrainData, self.AlgoTrainClasses, self.Classes),
            TensorNetworkWrapper(self.AlgoTrainData, self.AlgoTrainClasses, self.Classes)
        ]

    def TrainAll(self):
        print("Training all algorithms in Ensemble...")
        threads = []
        for algo in self.Algorithms:
            # algo.Train()
            newthread = Thread(
                target=algo.Train,
                args=()
            )
            newthread.start()
            threads.append(newthread)
        
        for thread in threads:
            thread.join()

    def WeighAll(self):
        print("Setting algorithm weights...")
        threads = []
        for algo in self.Algorithms:
            # algo.Test(self.AlgoTestData, self.AlgoTestClasses)
            # self.Weights.append(algo.Accuracy)
            newthread = Thread(
                target=algo.Test,
                args=(
                    self.AlgoTestData,
                    self.AlgoTestClasses
                )
            )
            newthread.start()
            threads.append(newthread)
        
        for thread in threads:
            thread.join()

        for algo in self.Algorithms:
            self.Weights.append(algo.Accuracy)
        print("Voting Weights:", self.Weights)

    def Classify(self, datum):
        votes = [0 for _ in range(len(self.Classes))]

        for i in range(len(self.Algorithms)):
            algo = self.Algorithms[i]
            votes[algo.Classify(datum)] += self.Weights[i]
        max = 0
        index = -1
        for i in range(len(votes)):
            if (votes[i] > max):
                max = votes[i]
                index = i
        return index

    def Evaluate(self, TestingData, TestingClasses):
        total_pos = 0
        print("Start Testing on", len(TestingData), "test values") 
        i = 0
        for j in range(len(TestingData)):
            print(".", end="")
            i += 1
            if i == 100:
                print("")
                i = 0
            row = TestingData[j]
            expected = TestingClasses[j]

            actual = self.Classify(row)
            total_pos += 1 if (actual == expected) else 0

        print("\nAccuracy: ", 100 * total_pos / len(test_features))

In [9]:
ensemble = EnsembleClassifier(train_features, Artists)

Building Ensemble...
Building neural network with 14 features, and 5 hidden layers.
	Building layer 0 with 14 neurons.
	Building layer 1 with 15 neurons.
	Building layer 2 with 15 neurons.
	Building layer 3 with 15 neurons.
	Building layer 4 with 15 neurons.
	Building layer 5 with 15 neurons.
Completed!


In [6]:
ensemble.TrainAll()

DNN Total Epoch Error: 3.99953345298327 
---------------------------------
DNN Epoch: 142
Epoch 98/200
---------------------------------
DNN Epoch: 143
Epoch 99/200
---------------------------------
DNN Epoch: 144
Epoch 100/200
 3/17 [====>.........................] - ETA: 3s - loss: 2.1059 - accuracy: 0.1736DNN Total Epoch Error: 3.9996167218960212 
---------------------------------
DNN Epoch: 145
DNN Total Epoch Error: 3.9996401103151875 
---------------------------------
DNN Epoch: 146
Epoch 101/200
---------------------------------
DNN Epoch: 147
Epoch 102/200
---------------------------------
DNN Epoch: 148
Epoch 103/200
 2/17 [==>...........................] - ETA: 3s - loss: 1.9439 - accuracy: 0.1562 3.9996987638098793 
---------------------------------
DNN Epoch: 149
---------------------------------
DNN Epoch: 150
Epoch 104/200
---------------------------------
DNN Epoch: 151
Epoch 105/200
---------------------------------
DNN Epoch: 152
DNN Total Epoch Error: 3.99975100435317

In [11]:
ensemble.WeighAll()
ensemble.Evaluate(tf_test_features, tf_test_labels)

Start Testing on 413 test values
.

ValueError: in user code:

    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\training.py:1569 predict_function  *
        return step_function(self, iterator)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\training.py:1559 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:1285 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:2833 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:3608 _call_for_each_replica
        return fn(*args, **kwargs)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\training.py:1552 run_step  **
        outputs = model.predict_step(data)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\training.py:1525 predict_step
        return self(x, training=False)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\base_layer.py:1030 __call__
        outputs = call_fn(inputs, *args, **kwargs)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\sequential.py:380 call
        return super(Sequential, self).call(inputs, training=training, mask=mask)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\functional.py:420 call
        return self._run_internal_graph(
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\functional.py:556 _run_internal_graph
        outputs = node.layer(*args, **kwargs)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\base_layer.py:1013 __call__
        input_spec.assert_input_compatibility(self.input_spec, inputs, self.name)
    C:\Users\taiya\AppData\Local\Programs\Python\Python39\lib\site-packages\tensorflow\python\keras\engine\input_spec.py:251 assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer dense is incompatible with the layer: expected axis -1 of input shape to have value 15 but received input with shape (None, 1)
