# Parkinson's Disease detection with spectral centroid and spectral rolloff audio features using Decision Tree coded without libraries.

# 1. Import the dependencies.

In [None]:
import numpy as np
import random
import librosa
from librosa import feature
import os
import pickle
import joblib
from typing import Tuple

# 2. Load the audios with Librosa, and preprocess them using Peak Amplitude Normalization.
- load_audio will return all the audios with their sampling rate.
- to_positive will turn all values to postive numbers (audio is 1D).
- get_max will return the maximum value in each audio array.
- peak_amplitude_normalize will preprocess audios with this formula 10 ** (peak / 20) / maximum_value.

In [None]:
def load_audio(file_path) -> Tuple:  # Loads the audio
    audio, sampling_rate = librosa.load(file_path, sr=44100)
    return audio, sampling_rate

def to_positive(n_array):  # Turn all values to positive values
    for i in range(len(n_array)):
        if n_array[i] < 0:
            n_array[i] = -1 * n_array[i]
    return n_array

def get_max(n_array):  # Get the maximum value
    max_value = n_array[0]
    for i in range(1, len(n_array), 1):
        if n_array[i] > max_value:
            max_value = n_array[i]
    return max_value

def peak_amplitude_normalize(audio_data, peak=-3.0):  # Calculate a scaling factor based on the specific peak value
    n_array = to_positive(audio_data)               # (-3 dB) and multiply the entire audio signal by the scaling factor
    maximum_value = get_max(n_array)
    scaling = 10 ** (peak / 20) / maximum_value
    normalized_audio = audio_data * scaling
    return normalized_audio

# 3. Extract spectral centroid and spectral rolloff audio features.
- spectral_centroid_rolloff function extract both spectral centroid and spectral rolloff features using Librosa library for each audio.

In [None]:
def spectral_centroid_rolloff() -> Tuple:  # Gets each audio from the dataset, get its status, load the audio, clean
    directory = '/content/drive/MyDrive/HYP TRAIN DATA/'  # the audio compute the features (Spectral Centroid & Rolloff)
    features = []
    status = []
    # Getting each audio path from the dataset
    for file_name in os.listdir(directory):
        if file_name.endswith('.wav'):
            if "P" in file_name:
                status.append(1)
            if "C" in file_name:
                status.append(0)
            file_path = os.path.join(directory, file_name)
            audio, sampling_rate = load_audio(file_path)
            preprocessed_audio = peak_amplitude_normalize(audio)
            spectral_centroid = librosa.feature.spectral_centroid(y=preprocessed_audio, sr=sampling_rate).mean()
            spectral_roll_off = librosa.feature.spectral_rolloff(y=preprocessed_audio, sr=sampling_rate).mean()
            feats = np.array([spectral_centroid, spectral_roll_off])
            features.append(feats)

    features = np.array(features)
    status = np.array(status)
    return features, status

# 4. Class Node with data members feature, threshold, and node value and Decision Tree class with data members min_samples_split, max_depth, and n_features.
- Every node has a feature to divide with according to the threshold, has a node value, and finally a node has both left and right children (nodes)
- Decision Tree builds a tree recursivly starting at the root using the class Node.

In [None]:
# A node has feature to divide with, with which threshold, a node has left and right children, and a node has a value
# if it is the leaf node
class Node:
    def __init__(self, feature=None, threshold=None, node_value=None):  # Constructor
        self._feature = feature
        self._threshold = threshold
        self._node_value = node_value
        self._left = None
        self._right = None

    def set_feature(self, feature):  # To set feature to divide with
        self._feature = feature

    def set_threshold(self, threshold):  # To set threshold to divide with
        self._threshold = threshold

    def set_node_value(self, node_value):  # To set node value if leaf node
        self._node_value = node_value

    def set_left_node(self, left):  # To set left node if not leaf node
        self._left = left

    def set_right_node(self, right):  # To set right node if not leaf node
        self._right = right

    def get_feature(self):  # Returns feature to divide with
        return self._feature

    def get_threshold(self):  # Returns threshold to divide with
        return self._threshold

    def get_node_value(self):  # Returns node value if leaf node
        return self._node_value

    def get_left_node(self):  # Returns left node if not leaf node
        return self._left

    def get_right_node(self):  # Returns right node if not leaf node
        return self._right


class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=10, n_features=None):  # Constructor
        self._min_samples_split = min_samples_split
        self._max_depth = max_depth
        self._n_features = n_features
        self._root = None

    def fit(self, x, y):  # To fit the data to the tree
        if self._n_features is None:  # if number of features not set then set it
            self._n_features = x.shape[1]
        else:
            self._n_features = np.min(x.shape[1], self._n_features)
        self._root = self._construct_tree(x, y)

    def unique_labels(self, y):  # Finds and returns the unique labels given y labels
        y = list(y)
        label = y[0]
        list_labels = []
        list_labels.append(label)

        for i in range(1, len(y), 1):
            label = y[i]
            if label in list_labels:
                pass
            else:
                list_labels.append(label)
        list_labels = np.array(list_labels)
        list_labels.sort()
        return list_labels

    def random_feature_indexes(self, a, size):  # Finds randomly the indexes of features, given the number of indexes we
        if size > a:                            # want to find
            raise ValueError(str(size) + " features can't be chosen out of " + str(a))
        random_feat_indexes = []
        random_feat = random.randrange(0, a, 1)
        random_feat_indexes.append(random_feat)

        for _ in range(1, size, 1):
            random_feat = random.randrange(0, a, 1)
            while random_feat in random_feat_indexes:
                random_feat = random.randrange(0, a, 1)
            random_feat_indexes.append(random_feat)

        random_feat_indexes = np.array(random_feat_indexes)
        return random_feat_indexes

    def _construct_tree(self, x, y, depth=0):  # Constructs the tree recursively
        number_of_samples = x.shape[0]
        number_of_features = x.shape[1]
        number_of_labels = len(self.unique_labels(y))

        # Stop constructing the tree if maximum depth is exceeded, then set the node value
        if depth >= self._max_depth:
            node_value = self._common_label(y)
            node = Node()
            node.set_node_value(node_value)
            return node

        if number_of_labels == 1:  # If only one label remains, set the node value
            node_value = self._common_label(y)
            node = Node()
            node.set_node_value(node_value)
            return node

        if number_of_samples < self._min_samples_split:  # Stop if number of samples is less than the minimum number of
            node_value = self._common_label(y)           # samples to split, then set the node value
            node = Node()
            node.set_node_value(node_value)
            return node

        feat_indexes = self.random_feature_indexes(number_of_features, self._n_features)  # Find random feature indexes
        # Get the most suitable feature and threshold to divide with
        best_feature, best_thresh = self._most_suitable_feature(x, y, feat_indexes)
        # Construct the children
        left_indexes, right_indexes = self._split(x[:, best_feature], best_thresh)
        left_subtree = self._construct_tree(x[left_indexes, :], y[left_indexes], depth + 1)  # Left child
        right_subtree = self._construct_tree(x[right_indexes, :], y[right_indexes], depth + 1)  # Right child
        node = Node()  # Instantiate Node object
        node.set_feature(best_feature)  # Set feature
        node.set_threshold(best_thresh)  # Set threshold
        node.set_left_node(left_subtree)  # Set left node
        node.set_right_node(right_subtree)  # Set right node
        return node

    def _most_suitable_feature(self, x, y, feat_indexes):  # Finds the best feature and threshold to divide with
        best_gain = -np.inf
        split_feature_index, split_threshold = None, None
        for feat_index in feat_indexes:
            x_column = x[:, feat_index]
            thresholds = self.unique_labels(x_column)
            for threshold in thresholds:
                # calculate the information gain
                gain = self._information_gain(y, x_column, threshold)
                if gain > best_gain:
                    best_gain = gain
                    split_feature_index = feat_index
                    split_threshold = threshold
        return split_feature_index, split_threshold

    def _information_gain(self, y, x_column, threshold):
        # Entropy of the parent
        parent_entropy = self._entropy(y)
        # Creation of children
        left_indexes, right_indexes = self._split(x_column, threshold)
        if len(left_indexes) == 0:
            return 0
        if len(right_indexes) == 0:
            return 0

        # Child node entropy
        entropy_left = self._entropy(y[left_indexes])
        entropy_right = self._entropy(y[right_indexes])
        # Compute the weighted entropy of the child nodes
        total_samples = len(y)
        samples_left = len(left_indexes)
        samples_right = len(right_indexes)
        child_node_entropy = (samples_left / total_samples) * entropy_left + (samples_right / total_samples) * entropy_right
        # calculate the IG
        information_gain = parent_entropy - child_node_entropy
        return information_gain

    def find_indexes(self, array, threshold):  # Find indexes of left and right
        array = list(array)
        left = []
        right = []
        for arr_elem in array:
            if arr_elem <= threshold:
                left.append(array.index(arr_elem))
            if arr_elem > threshold:
                right.append(array.index(arr_elem))

        left = np.array(left)
        right = np.array(right)
        return left, right

    def _split(self, x_column, split_thresh):  # Returns left and right indexes using method find_indexes
        left_indexes, right_indexes = self.find_indexes(x_column, split_thresh)
        return left_indexes, right_indexes

    def _entropy(self, y):  # Computing the entropy
        occurrences = []
        uniq_labels = self.unique_labels(y)
        for i in range(len(uniq_labels)):
            value = uniq_labels[i]
            num_occurrences = 0
            for val in y:
                if value == val:
                    num_occurrences += 1
            occurrences.append(num_occurrences)

        occurrences = np.array(occurrences)
        occur = []
        for i in range(len(occurrences)):
            nx = occurrences[i]
            length = len(y)
            px = nx / length
            occur.append(px)

        the_occurrences = []
        for occ in occur:
            if occ > 0:
                the_occurrences.append(occ * np.log(occ))
        the_occurrences = np.array(the_occurrences)
        return -np.sum(the_occurrences)

    def _majority_label(self, y):  # Given the y labels returns the one appearing more times
        labels = self.unique_labels(y)
        occurrences = []
        majority_lab = y[0]
        for i in range(len(labels)):
            label = labels[i]
            num_occur = 0
            for lab in y:
                if lab == label:
                    num_occur += 1
            occurrences.append(num_occur)

        for i in range(len(occurrences)):
            for n in range(len(occurrences)):
                if i == n:
                    pass
                else:
                    if occurrences[i] > occurrences[n]:
                        majority_lab = labels[i]
        return majority_lab

    def _common_label(self, y):  # Returns the most common label using the _majority_label method
        label = self._majority_label(y)
        return label

    def predict(self, samples):  # Given samples, for each sample traverse the tree starting at the root
        preds = []
        for sample in samples:
            preds.append(self._search_tree(sample, self._root))
        preds = np.array(preds)
        return preds

    def _search_tree(self, sample, node):  # Recursively traverse the tree given a sample starting at the root
        node_value = node.get_node_value()
        if node_value is not None:
            return node_value
        feature = node.get_feature()  # Feature to divide with
        threshold = node.get_threshold()  # With which threshold to divide with
        if sample[feature] <= threshold:
            return self._search_tree(sample, node.get_left_node())
        else:
            return self._search_tree(sample, node.get_right_node())

    def single_predict(self, sample):  # Prediction if we have one sample
        prediction = self._search_tree(sample, self._root)
        return prediction

# 5. Standard scaler and splitting of the data.
- standard_scaler uses means and std deviations to standardize the features.
- split splits the data into training and testing sets.

In [None]:
def standard_scaler(features):
    # Computation of mean and standard deviation
    means = features.mean(axis=0)
    std_deviations = features.std(axis=0)
    # Standardizing the features
    standardized_features = (features - means) / std_deviations
    standardized_features = np.array(standardized_features)
    return standardized_features

def _random_indexes(array, size, random_state):  # For selecting the indexes for test features
    if size > array:
        raise ValueError(str(size) + " features can't be chosen out of " + str(array))
    random_indexes = []
    random.seed(random_state)
    random_index = random.randrange(0, array, 1)
    random_indexes.append(random_index)

    for _ in range(1, size, 1):
        random_index = random.randrange(0, array, 1)
        while random_index in random_indexes:
            random_index = random.randrange(0, array, 1)
        random_indexes.append(random_index)
    random_indexes = np.array(random_indexes)
    return random_indexes

def split(features, targets, test_size, random_state=50):
    number_of_samples = len(targets)
    t_size = test_size * number_of_samples
    t_size = int(t_size) + 1
    random_indexes = _random_indexes(number_of_samples, t_size, random_state)
    x_training, x_testing, y_training, y_testing = [], [], [], []
    features = list(features)
    targets = list(targets)

    for i in range(len(random_indexes)):
        x_testing.append(features[random_indexes[i]])
        y_testing.append(targets[random_indexes[i]])

    for i in range(len(features)):
        if i in random_indexes:
            pass
        else:
            x_training.append(features[i])
            y_training.append(targets[i])

    x_training, x_testing, y_training, y_testing = np.array(x_training), np.array(x_testing), np.array(y_training), \
        np.array(y_testing)
    return x_training, x_testing, y_training, y_testing

# 6. Metrics
- _confusion_matrix computes true positives, fales positives, true negatives, and false negatives.
- accuracy_score, precision_score, recall_score, f1_score andd confusion_matrix compute accuracy, precison, recall, and f1 scores and confusion matrix.

In [None]:
def _confusion_matrix(y_testing, y_prediction):
    # Computing confusion matrix
    length_of_labels = len(y_testing)
    true_positive, false_positive, true_negative, false_negative = 0, 0, 0, 0

    for i in range(length_of_labels):
        if y_testing[i] == 1:
            if y_testing[i] == y_prediction[i]:
                true_positive += 1
            else:
                false_positive += 1

        if y_testing[i] == 0:
            if y_testing[i] == y_prediction[i]:
                true_negative += 1
            else:
                false_negative += 1
    return true_positive, false_positive, true_negative, false_negative

def accuracy_score(y_testing, y_preds):
    tp, fp, tn, fn = _confusion_matrix(y_testing, y_preds)
    accuracy = (tp + tn) / (tp + fp + tn + fn)
    return accuracy

def precision_score(y_testing, y_preds):
    tp, fp, tn, fn = _confusion_matrix(y_testing, y_preds)
    precision = tp / (tp + fp)
    return precision

def recall_score(y_testing, y_preds):
    tp, fp, tn, fn = _confusion_matrix(y_testing, y_preds)
    tp_fn = tp + fn
    if tp_fn == 0:
        return 0.0
    else:
        recall = tp / tp_fn
        return recall

def f1_score(y_testing, y_preds):
    precision = precision_score(y_testing, y_preds)
    recall = recall_score(y_testing, y_preds)
    precision_recall = precision + recall
    if precision_recall == 0:
        return 0.0
    else:
        f1 = (2 * precision * recall) / precision_recall
        return f1

def confusion_matrix(y_testing, y_preds):
    tp, fp, tn, fn = _confusion_matrix(y_testing, y_preds)
    con_mat = []
    positives = [tp, fp]
    negatives = [fn, tn]
    con_mat.append(positives)
    con_mat.append(negatives)
    return con_mat

# 7. Main function.

In [None]:
if __name__ == "__main__":
    feats, y_labels = spectral_centroid_rolloff()
    features = standard_scaler(feats)
    X_train, X_valid, y_train, y_valid = split(features, y_labels, test_size=0.20)
    tree = DecisionTree(min_samples_split=1, max_depth=1)
    tree.fit(X_train, y_train)
    predictions = tree.predict(X_valid)

    # Metrics on the validation data
    accuracy = accuracy_score(y_valid, predictions)
    precision = precision_score(y_valid, predictions)
    recall = recall_score(y_valid, predictions)
    f1 = f1_score(y_valid, predictions)
    confusion_mat = confusion_matrix(y_valid, predictions)

    print('Accuracy score on the validation data: ', accuracy)
    print('Precision score on the validation data: ', precision)
    print('Recall score on the validation data: ', recall)
    print('F1 score on the validation data: ', f1)
    print('The confusion matrix on the validation: ', confusion_mat)
    test = X_valid[5]

    # # Saving the model using pickel
    # tree_model = "tree_model.pkl"
    # # with open(tree_model, 'wb') as file:
    # #     pickle.dump(tree, file)

    # with open(tree_model, 'rb') as file:
    #     model = pickle.load(file)
    # pred = model.single_predict(test)
    # print(pred)
    # # Saving the model using Joblib
    # file_name = 'decision_tree_model.sav'
    # # joblib.dump(tree, file_name)
    # loaded_tree = joblib.load(file_name)
    # y_pred = loaded_tree.single_predict(test)
    # print(y_pred)

Accuracy score on the validation data:  0.75
Precision score on the validation data:  0.5
Recall score on the validation data:  1.0
F1 score on the validation data:  0.6666666666666666
The confusion matrix on the validation:  [[3, 3], [0, 6]]
