<a href="https://colab.research.google.com/github/Ramzeus-svg/adhd1/blob/main/ADHD_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
!pip install -U tensorflow-addons



In [17]:
from collections import Iterable
import numpy as np
import os, scipy.io

from typing import List

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_probability as tfp


# channels 3, 4, 5, 6, 13, and 14
def load_mat(in_path: str, standardize: bool = False, channel_indices=None, window_size: int = 3) -> np.ndarray:
    if channel_indices is None:
        channel_indices = [3, 4, 5, 6, 13, 14]

    loaded_mat = scipy.io.loadmat(in_path)

    path = in_path.rsplit('/', 1)[1]  # /user/home/.../v10p.mat -> v10p.mat
    path = path.split('.')[0]  # v10p.mat -> v10p

    loaded_mat = loaded_mat[path]

    loaded_mat = list(loaded_mat)
    # loaded_mat = np.array(loaded_mat)[:-(len(loaded_mat)%window_size)]
    # loaded_mat = np.array(loaded_mat)
    loaded_mat = loaded_mat[:6]
    # loaded_mat = loaded_mat[:12]
    # loaded_mat = loaded_mat[:9285]

    # print(f"load_mat(): {path}.shape = {loaded_mat.shape}, carve = {(len(loaded_mat)%window_size)}")

    # Grab the relevant indices
    return_list = []
    triplets = []
    # return_list = np.array_split(loaded_mat, window_size)

    for index, row in enumerate(loaded_mat[:-window_size]):

        curr = np.array([row[indice] for indice in channel_indices])
        triplets.append(curr)
        if len(triplets) == 3:
            # print(f"batch == {batch}")
            triplets=np.array(triplets)
            # print(f"triplets == {triplets}")
            print(f"triplets.shape == {triplets.shape}")

            return_list.append(triplets)
            # print(f"return_list == {return_list}")
            # print(f"return_list.shape == {return_list.shape}")
            triplets = []

            # exit()

    if standardize:
        return_list = standardize_data(return_list)

    return_list = np.array(return_list)
    return return_list


def standardize_data(input_iterable: Iterable) -> np.ndarray:
    return_list = input_iterable - np.mean(input_iterable)
    return_list /= np.std(return_list)
    return np.array(return_list)


# code from my github
class ActivationLayerFactory():
    def __init__(self, activation=tf.keras.activations.relu):
        self.__activation_layer_counter = 0
        self.activation = activation

    def produce(self, activation=None):
        if activation is None: activation = tf.keras.activations.relu
        temp = tf.keras.layers.Activation(activation=activation, name='activation_' + str(self.__activation_layer_counter))
        self.__activation_layer_counter += 1
        return temp


class DropoutLayerFactory():
    def __init__(self, dropout_rate=0.5):
        self.__dropout_layer_counter = 0
        self.rate = dropout_rate

    def produce(self, rate=None):
        if rate == None: rate = self.rate
        temp = tf.keras.layers.Dropout(rate=rate, name='dropout' + str(self.__dropout_layer_counter))
        self.__dropout_layer_counter += 1
        return temp


class BatchNormalizationLayerFactory():
    def __init__(self):
        self.__batchnormalization_layer_count = 0

    def produce(self):
        temp = tf.keras.layers.BatchNormalization(name='BN' + str(self.__batchnormalization_layer_count))
        self.__batchnormalization_layer_count += 1
        return temp

In [18]:
import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_probability as tfp
import numpy as np




# Finds reconstruction loss... should help us find out if something is similar to FOCUSING or NOT FOCUSING...
class AutoEncoder_2D(tf.keras.Model):

    def __init__(self, encoder, decoder, loss_function=tf.keras.losses.MSE, *args, **kwargs):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

        self.loss_function = loss_function

    def compile(self,
                optimizer=None,
                loss=None,
                metrics=None,
                loss_weights=None,
                weighted_metrics=None,
                run_eagerly=None,
                steps_per_execution=None,
                **kwargs):
        super().compile(optimizer=optimizer)
        self.in_optimizer = optimizer

    @tf.function(experimental_compile=True)
    def call(self, data, training=True, mask=None):
        encoded = self.encoder(data)
        decoded = self.decoder(encoded)
        return decoded

    @tf.function(experimental_compile=True)
    def encode(self, input_data):
        return self.encoder(input_data)

    @tf.function(experimental_compile=True)
    def decode(self, input_data):
        return self.decoder(input_data)

    @tf.function(experimental_compile=True)
    def get_loss(self, data):
        encoded = self.encoder(data)
        decoded = self.decoder(encoded)
        return tf.reduce_mean(self.loss_function(data, decoded))

    @tf.function(experimental_compile=True)
    def train_step(self, input_data):
        with tf.GradientTape() as tape:
            encoded_data = self.encoder(input_data)
            decoded_data = self.decoder(encoded_data)
            reconstruction_loss = self.loss_function(input_data, decoded_data)

        gradients = tape.gradient(reconstruction_loss, self.trainable_variables)
        self.in_optimizer.apply_gradients(zip(gradients, self.trainable_variables))


latent_shape = 1
activations = tfa.activations.lisht
output_activation = activations

activation_layer_factory = ActivationLayerFactory(activation=activations)
dropout_layer_factory = DropoutLayerFactory(dropout_rate=0.5)
bn_layer_factory = BatchNormalizationLayerFactory()

encoder_output_activation = tf.keras.activations.tanh
decoder_output_activation = tfa.activations.lisht
regularizer = tf.keras.regularizers.l1_l2(0.01)


def make_encoder(in_shape):
    encoder_input = tf.keras.Input(shape=in_shape)

    x = tf.keras.layers.Convolution1D(filters=128, kernel_size=2, strides=2, kernel_regularizer=regularizer)(encoder_input)
    x = activation_layer_factory.produce()(x)
    x = bn_layer_factory.produce()(x)

    x = tf.keras.layers.Convolution1D(filters=64, kernel_size=2, strides=2, kernel_regularizer=regularizer)(encoder_input)
    x = activation_layer_factory.produce()(x)
    x = bn_layer_factory.produce()(x)

    x = tf.keras.layers.Convolution1D(filters=32, kernel_size=1, strides=1, kernel_regularizer=regularizer)(x)
    x = activation_layer_factory.produce()(x)
    x = bn_layer_factory.produce()(x)

    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(latent_shape*3)(x)
    x = activation_layer_factory.produce()(x)
    x = bn_layer_factory.produce()(x)
    x = tf.keras.layers.Dense(latent_shape*2)(x)
    x = tf.keras.layers.Dense(latent_shape)(x)
    encoder_output = encoder_output_activation(x)

    encoder: tf.keras.models.Model = tf.keras.models.Model(encoder_input, encoder_output, name='encoder')
    return encoder


def make_decoder(in_shape):
    decoder_input = tf.keras.Input(shape=latent_shape)
    x = tf.keras.layers.Reshape((1, latent_shape))(decoder_input)

    x = tf.keras.layers.Conv1D(filters=32, kernel_size=1, strides=1, kernel_regularizer=regularizer)(x)
    x = activation_layer_factory.produce()(x)
    x = bn_layer_factory.produce()(x)

    x = tf.keras.layers.Conv1D(filters=64, kernel_size=1, strides=1, kernel_regularizer=regularizer)(x)
    x = activation_layer_factory.produce()(x)
    x = bn_layer_factory.produce()(x)

    x = tf.keras.layers.Conv1D(filters=128, kernel_size=1, strides=1, kernel_regularizer=regularizer)(x)
    x = tf.keras.layers.Conv1DTranspose(filters=in_shape[1], kernel_size=in_shape[0], strides=in_shape[0], kernel_regularizer=regularizer)(x)
    decoder_output = decoder_output_activation(x)

    decoder: tf.keras.Model = tf.keras.Model(inputs=decoder_input, outputs=decoder_output, name="decoder")
    return decoder


In [19]:
import numpy as np
import multiprocessing as mp
import os

# os.system("python -m pip install -U sklearn tensorflow-addons tensorflow-probability")
import string, scipy.io



os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import tensorflow_addons as tfa
import torch



convert_model = False
if convert_model:
    encoder_converter = tf.lite.TFLiteConverter.from_saved_model("/content/drive/MyDrive/saved_models/encoder_2D")
    decoder_converter = tf.lite.TFLiteConverter.from_saved_model("/content/drive/MyDrive/saved_models/decoder_2D")
    encoder_model = encoder_converter.convert()
    decoder_model = decoder_converter.convert()
    with open('/content/drive/MyDrive/saved_models/encoder.tflite', 'wb') as f:
        f.write(encoder_model)
    with open('/content/drive/MyDrive/saved_models/decoder.tflite', 'wb') as f:
        f.write(decoder_model)
    print('\nFinished saving models!')
    exit()

adhd_path = "/content/drive/MyDrive/data/adhd/"
control_part1_path = "/content/drive/MyDrive/data/Control_part1/"

# Get the file paths for adhd_part1_paths and control_part1_paths
adhd_paths = [adhd_path+ path for path in os.listdir(adhd_path)]
control_part1_paths = [control_part1_path + path for path in os.listdir(control_part1_path)]

# Opens the files into numpy arrays
standardize = False
channel_indices = [3, 4, 5, 6, 13, 14]
window_size = 3  # gives us a 2D array
batch_size = 20

# Quick and dirty copy-pasted functions
def load_adhd(index):
    return load_mat(adhd_paths[index], standardize=standardize, channel_indices=channel_indices, window_size=window_size)


def load_control_part1(index):
    return load_mat(control_part1_paths[index], standardize=standardize, channel_indices=channel_indices, window_size=window_size)




# Multiprocessing for loading and processing the data faster
adhd_data = np.squeeze(np.array(mp.Pool(mp.cpu_count()).map(load_adhd, range(len(adhd_paths))), dtype=np.float32))
control_part1_data = np.squeeze(np.array(mp.Pool(mp.cpu_count()).map(load_control_part1, range(len(control_part1_paths))), dtype=np.float32))

flat_data = adhd_data.flatten()
mean, std_dev = np.median(flat_data), np.std(flat_data)
for index, row in enumerate(adhd_data):
    adhd_data[index] = (row - mean) / std_dev

flat_data = control_part1_data.flatten()
mean, std_dev = np.median(flat_data), np.std(flat_data)
for index, row in enumerate(control_part1_data):
    control_part1_data[index] = (row - mean) / std_dev

# Print out the input_data for debugging
print('\n\n\n')
for batch, line in enumerate(adhd_data):
    print(f"adhd_data #{batch}: {line}, shape: {line.shape}")
    break
for batch, line in enumerate(control_part1_data):
    print(f"control_part1_data #{batch}: {line}, shape: {line.shape}")
    break

print('\n\n')
print(f"adhd_data.shape: {adhd_data.shape}")
print(f"control_part1_data.shape: {control_part1_data.shape}")

#########################
## Defining the model
#########################
in_shape = (window_size, len(channel_indices))

encoder = make_encoder(in_shape=in_shape)
decoder = make_decoder(in_shape=in_shape)

encoder.summary()
decoder.summary()

lr_schedule = tfa.optimizers.ExponentialCyclicalLearningRate(1e-4, 1e-3, step_size=2000, gamma=0.96)
in_optimizer = tfa.optimizers.LAMB(learning_rate=lr_schedule)

AE = AutoEncoder_2D(encoder=encoder, decoder=decoder)
AE.compile(in_optimizer)

adhd_data = tf.convert_to_tensor(adhd_data, dtype=tf.float32)


@tf.function(experimental_compile=True)
def train_loop(data, batch_size=3):
    for batch in range(0, len(data), batch_size):
        start = batch * batch_size
        each = data[start:start + batch_size]
        AE.train_step(each)


print('Training...')
epochs = 10000  # 00
for epoch in range(epochs):
    train_loop(adhd_data, batch_size=batch_size)
    if epoch % 1000 == 0:
        adhd_data = adhd_data[0:batch_size]
        adhd_loss_value = AE.get_loss(adhd_data)
        control_data = control_part1_data[0:batch_size]
        control_loss_value = AE.get_loss(control_data)
        print(
            "\n",
            f"Epoch #{epoch}:",
            f"\n target: \n {adhd_data[0]}",
            f"\n prediction on adhd: \n {AE.call(adhd_data[0:1])}",
            f"\n prediction on control: \n {AE.call(control_data[0:1])}",
            f" \t reconstruction loss on ADHD == {adhd_loss_value:0.2f}",
            f" \t reconstruction loss on control == {control_loss_value:0.2f}",
            sep='', end='', flush=True,
        )

        if adhd_loss_value < 0.4 and control_loss_value > adhd_loss_value * 2.5:
            print('\n\n')
            break

print('\nFinished training!')

# Saves models
print('\nSaving models...')
AE.encoder.save("/content/drive/MyDrive/saved_models/encoder_2D")
AE.decoder.save("/content/drive/MyDrive/saved_models/decoder_2D")
encoder_converter = tf.lite.TFLiteConverter.from_saved_model("/content/drive/MyDrive/saved_models/encoder_2D")
decoder_converter = tf.lite.TFLiteConverter.from_saved_model("/content/drive/MyDrive/saved_models/decoder_2D")
encoder_model = encoder_converter.convert()
decoder_model = decoder_converter.convert()
with open('/content/drive/MyDrive/saved_models/encoder.tflite', 'wb') as f:
    f.write(encoder_model)
with open('/content/drive/MyDrive/saved_models/decoder.tflite', 'wb') as f:
    f.write(decoder_model)

# Save losses per triplet line
adhd_f = open("/content/drive/MyDrive/data/adhd_reconstructed.txt", "w")
control_f = open("/content/drive/MyDrive/data/control_reconstructed.txt", "w")
test_f = open("/content/drive/MyDrive/data/original_lines.txt", "w")
contest_f = open("/content/drive/MyDrive/data/original_lines_control.txt", "w")
ratio_f = open("/content/drive/MyDrive/data/ratios.txt", "w")

adhd_write_lines = []
control_write_lines = []

for index, row in enumerate(adhd_data):

    control_data = control_part1_data[index:index + 3]
    control_loss_value = np.array(AE.get_loss(control_data))
    control_write_lines.append(str(control_loss_value))
    control_write_lines.append("\n")

    adhd_data = adhd_data[index:index + 3]
    adhd_loss_value = np.array(AE.get_loss(adhd_data))
    adhd_write_lines.append(str(adhd_loss_value))
    adhd_write_lines.append("\n")

    ratio_f.write(f"{control_loss_value / adhd_loss_value:1.2f}\n")

    test_data = list(np.squeeze(np.array(adhd_data)))
    curr_line = []
    for elem in test_data:
        for more in elem:
            if type(more) != np.float32 and len(more) > 1:
                test_f.write(str(more) + "\n")

    cont_data = list(np.squeeze(np.array(control_data)))
    curr_line = []
    for elem in cont_data:
        for more in elem:
            if type(more) != np.float32 and len(more) > 1:
                contest_f.write(str(more) + "\n")

adhd_f.writelines(adhd_write_lines)
control_f.writelines(control_write_lines)


triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)
triplets.shape == (3, 6)




In [44]:
import numpy as np # to work with arrays
import matplotlib.pyplot as plt # to plot dataset
from sklearn.model_selection import train_test_split

X = adhd_path[0:-1]
y = control_part1_path[0 : -1] 

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 1/2,random_state=1)
print(X_test)

ValueError: ignored

In [45]:
plt.scatter(X_train, y_train, color = 'red') #plotting the training values (experiance) , with training values salary as red plots 
plt.plot(X_train, Model.predict(X_train), color = 'blue') # plot the preidctaing line to compare the predictaing values with real 
plt.title('ADHD (Training set)') 
plt.show()

NameError: ignored