In [None]:
# import libraries
import os
import datetime

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from tensorflow import keras
from tensorflow.keras.callbacks import TensorBoard
from keras.callbacks import EarlyStopping
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.regularizers import l2

# Helper Functions

In [None]:
def get_dataframe(language):
  # Set the directory path
  directory_path = 'C:\Research\labeled_features\{}'.format(language)

  # Create an empty list to store the dataframes
  dataframes = []

  # Iterate through all the files in the directory
  for file in os.listdir(directory_path):
    # Check if the file is a CSV file
    if file.endswith('.csv'):
      # Read the CSV file into a Pandas dataframe
      df = pd.read_csv(os.path.join(directory_path, file))

      # Drop rows where the 'label' column is not what was expected
      df = df[df['label'].isin([0, 1, 2, 3])]

      # Append the dataframe to the list
      dataframes.append(df)

  # Concatenate all the dataframes into a single dataframe
  df_all = pd.concat(dataframes)

  # Model was predicting negative values, so I had to remove the negative values
  df_all = df_all[df_all['label'].isin([0, 1, 2, 3])]

  # Add column with the index of each row, reset the indices
  df_all = df_all.reset_index(drop=True)

  # Create a list where at each index is the sample_id of that row
  sample_ids = df_all['sample_id'].tolist()

  # Drop the 'sample_id' column
  return pd.DataFrame(df_all.drop(columns=['sample_id'])), sample_ids

In [None]:
def get_dataframes_knn(language):
  # Set the directory path
  directory_path = 'C:\Research\labeled_features\{}'.format(language)

  # Create an empty list to store the train dataframes
  train_set = []

  # Create an empty list to store the test dataframes
  test = []

  # Number of files in directory
  num_files = len(os.listdir(directory_path))

  # Index of the last file that is lower than 80% of the total number of files rounded down
  first_train_file = int(np.floor(num_files * 0.8))

  # Iterate through all the files in the directory
  for i, file in enumerate(os.listdir(directory_path)):
    # Check if the file is a CSV file
    if file.endswith('.csv'):
      # Read the CSV file into a Pandas dataframe
      df = pd.read_csv(os.path.join(directory_path, file))

      # Drop rows where the 'label' column is not what was expected
      df = df[df['label'].isin([0, 1, 2, 3])]

      # Append the dataframe to the corresponding list
      if i < first_train_file:
        train_set.append(df)
      else:
        test.append(df)

  # Concatenate the test dataframes into a single dataframe
  df_test = pd.concat(test).drop(columns=['sample_id'])

  # Concatenate the train dataframes into a single dataframe
  df_train = pd.concat(train_set).drop(columns=['sample_id'])

  # Split the train dataframe into X and y
  X = df_train.iloc[:, :-1]
  y = df_train['label']

  # Split the test dataframe into X and y
  X_test = df_test.iloc[:, :-1]
  y_test = df_test['label']

  return X, y, X_test, y_test

In [None]:
def get_dataframes_divided(language):
  # Set the directory path
  directory_path = 'C:\Research\labeled_features\{}'.format(language)

  # Create an empty list to store the train dataframes
  train_set = []

  # Create an empty list to store the test dataframes
  test = []

  # Number of files in directory
  num_files = len(os.listdir(directory_path))

  # Index of the last file that is lower than 80% of the total number of files rounded down
  first_train_file = int(np.floor(num_files * 0.8))

  # Iterate through all the files in the directory
  for i, file in enumerate(os.listdir(directory_path)):
    # Check if the file is a CSV file
    if file.endswith('.csv'):
      # Read the CSV file into a Pandas dataframe
      df = pd.read_csv(os.path.join(directory_path, file))

      # Drop rows where the 'label' column is not what was expected
      df = df[df['label'].isin([0, 1, 2, 3])]

      # Append the dataframe to the corresponding list
      if i <= first_train_file:
        train_set.append(df)
      else:
        test.append(df)

  # Concatenate the test dataframes into a single dataframe
  df_test = pd.concat(test).drop(columns=['sample_id'])

  # Concatenate the train dataframes into a single dataframe
  df_train = pd.concat(train_set).drop(columns=['sample_id'])

  return df_train, df_test


In [None]:
def get_predictions(model, X_test):
    # Make predictions on the testing data
    unrounded_predictions = model.predict(X_test)

    # Round the predictions to the nearest integer
    predictions = np.round(unrounded_predictions)

    # Make all negative predictions 0
    predictions[predictions < 0] = 0

    return unrounded_predictions, predictions

In [None]:
def plot_conf_matrix(conf_matrix, language_model, language_test):
    # Convert the confusion matrix to a Pandas dataframe
    cm_df = pd.DataFrame(conf_matrix)#, index=['true 0', 'true 1', 'true 2', 'true 3'], columns=['pred 0', 'pred 1', 'pred 2', 'pred 3'])

    # Create a heatmap of the confusion matrix
    fig = px.imshow(cm_df, title='Confusion Matrix', text_auto=True)
    fig.update_layout(title='Confusion Matrix Predicted with {} model, on {} data'.format(language_model, language_test), xaxis_title='Predicted', yaxis_title='Expected')
    fig.show()

In [None]:
def plot_correlations(df_all, language_model, language_test):
    # Calculate the correlations between the columns and the label
    correlations = df_all.corr()['label'].iloc[:-1]

    # Create a scatter plot of the correlations
    fig = px.scatter(x=correlations.index, y=correlations, title='Correlations')
    fig.update_layout(title='Correlations between features and label, predicted with {} model, on {} data'.format(language_model, language_test), xaxis_title='Feature', yaxis_title='Correlation')
    fig.show()

    # Create a dataframe from the correlations
    correlations_df = pd.DataFrame(correlations)

    # correlations_df.style.background_gradient(cmap ='viridis')\
    # .set_properties(**{'font-size': '20px'})

In [None]:
def compute_correlation(unrounded_predictions, y_test):
    expected_predicted_df = pd.DataFrame()
    expected_predicted_df['expected'] = y_test
    expected_predicted_df['predicted'] = unrounded_predictions
    corr = expected_predicted_df.corr()['predicted'].iloc[0]
    return expected_predicted_df, corr

In [None]:
def plot_expected_vs_predicted(expected_predicted_df, language_model, language_test):
  fig = go.Figure()

  labels = expected_predicted_df['expected'].unique()

  for label in labels:
    fig.add_trace(go.Violin(
    x=expected_predicted_df['expected'][expected_predicted_df['expected'] == label],
    y=expected_predicted_df['predicted'][expected_predicted_df['expected'] == label],
    name=label,
    box_visible=True,
    meanline_visible=False,
    ))

  fig.update_layout(title='Expected vs Predicted, predicted with {} model, on {} data'.format(language_model, language_test), xaxis_title='Expected', yaxis_title='Predicted')
  fig.show()

In [None]:

def print_class_accuray(conf_matrix):
    # Calculate for each of the expected classes, what percentage of the predictions are correct
    correct_percentage_0 = conf_matrix[0,0] / np.sum(conf_matrix[0,:]) # 0
    correct_percentage_1 = conf_matrix[1,1] / np.sum(conf_matrix[1,:]) # 1
    correct_percentage_2 = conf_matrix[2,2] / np.sum(conf_matrix[2,:]) # 2
    correct_percentage_3 = conf_matrix[3,3] / np.sum(conf_matrix[3,:]) # 3

    # Print the percentages
    print("Percentage of correct predictions for class 0: {:.2f}%".format(correct_percentage_0 * 100))
    print("Percentage of correct predictions for class 1: {:.2f}%".format(correct_percentage_1 * 100))
    print("Percentage of correct predictions for class 2: {:.2f}%".format(correct_percentage_2 * 100))
    print("Percentage of correct predictions for class 3: {:.2f}%".format(correct_percentage_3 * 100))

    # Calculate the acuracy of predicting 0 or 1
    correct_percentage_0_1 = (conf_matrix[0,0] + conf_matrix[1,1]) / np.sum(conf_matrix[0:2,:]) # 0 or 1

    # Calculate the acuracy of predicting 2 or 3
    correct_percentage_2_3 = (conf_matrix[2,2] + conf_matrix[3,3]) / np.sum(conf_matrix[2:4,:]) # 2 or 3

    # Print the percentages
    print("Percentage of correct predictions for class 0 or 1: {:.2f}%".format(correct_percentage_0_1 * 100))
    print("Percentage of correct predictions for class 2 or 3: {:.2f}%".format(correct_percentage_2_3 * 100))



In [None]:
def duplicate_rows(df, label, times):
    # Get the rows where the label is 'label
    df_label = df[df['label'] == label]

    # Duplicate the rows
    df_label = pd.concat([df_label] * times)

    # Add the duplicated rows to the original dataframe
    df = pd.concat([df, df_label])

    return df

In [None]:
def standarize_data(df):
    scaler = StandardScaler()
    df_standardized = scaler.fit_transform(df.iloc[:, :-1])
    df_standardized = pd.DataFrame(df_standardized, columns=df.columns[:-1].tolist())
    # Check for duplicate column names
    if df.columns[-1] in df_standardized.columns:
        df_standardized.columns = [str(col) + '_1' if col == df.columns[-1] else col for col in df_standardized.columns]
    df_standardized[df.columns[-1]] = df[df.columns[-1]]
    
    return df_standardized

In [None]:
# Get the data
train_df, test_df = get_dataframes_divided("EN")

In [None]:
# Print duplicate columns of a dataframe
def print_duplicate_columns(df):
    duplicate_columns = df.columns[df.columns.duplicated()]
    print(duplicate_columns)    

# Get data

## EN

In [None]:
# Get the data
train_df, test_df = get_dataframes_divided("EN")

In [None]:
# Get unique values of the labels
unique_labels = np.unique(train_df['label'])
print(unique_labels)
unique_labels = np.unique(test_df['label'])
print(unique_labels)


In [None]:
# Standarize the data
train_df = standarize_data(train_df.reset_index(drop=True))
test_df = standarize_data(test_df.reset_index(drop=True))

X_train_EN = train_df.iloc[:, :-1]
y_train_EN = train_df.iloc[:, -1]
X_test_EN = test_df.iloc[:, :-1]
y_test_EN = test_df.iloc[:, -1]

In [None]:
print(X_train_EN.shape)
print(y_train_EN.shape)
print(X_test_EN.shape)
print(y_test_EN.shape)

## ES

In [None]:
# Get the data
train_df, test_df = get_dataframes_divided("ES")

# Standarize the data
train_df = standarize_data(train_df.reset_index(drop=True))
test_df = standarize_data(test_df.reset_index(drop=True))

X_train_ES = train_df.iloc[:, :-1]
y_train_ES = train_df.iloc[:, -1]
X_test_ES = test_df.iloc[:, :-1]
y_test_ES = test_df.iloc[:, -1]

In [None]:
print(X_train_ES.shape)
print(y_train_ES.shape)
print(X_test_ES.shape)
print(y_test_ES.shape)

In [None]:
correlations = train_df.corr()
correlations

# Feedforward Neural Network

## EN

To read the logs use: 
tensorboard --logdir "C:/Users/carlo/Research/reduction-detection/logs/fit/" and look at http://localhost:6006


In [None]:
language_model = 'EN' # 'EN' or 'ES'
language_test = 'EN' # 'EN' or 'ES'

model_ID = language_model + "_" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

# Create a TensorBoard object with a log directory
logdir = "C:/Users/carlo/Research/reduction-detection/logs/fit/{model_ID}".format(model_ID=model_ID)
tensorboard_callback = TensorBoard(log_dir=logdir)

# Create the EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=30)


In [None]:
print(model_ID)

In [None]:
# Create feed forward model that works with a classification problem
model_ffnn_EN = Sequential()

# Add the input layer
model_ffnn_EN.add(Dense(80, input_dim=80, activation='relu', kernel_regularizer=l2(0.01)))

# Add the hidden layer
model_ffnn_EN.add(Dense(80, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_EN.add(Dropout(0.2))

# Add the hidden layer
model_ffnn_EN.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_EN.add(Dropout(0.2))

# # Add the hidden layer
# model_ffnn_EN.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

# model_ffnn_EN.add(Dropout(0.2))

# # Add the hidden layer
# model_ffnn_EN.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

# model_ffnn_EN.add(Dropout(0.2))

# # Add the hidden layer
# model_ffnn_EN.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

# model_ffnn_EN.add(Dropout(0.2))

# # Add the hidden layer
# model_ffnn_EN.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

# model_ffnn_EN.add(Dropout(0.2))

# Add the output layer
model_ffnn_EN.add(Dense(1, activation='relu'))

# Compile the model
model_ffnn_EN.compile(optimizer='adam', loss='mean_squared_error', metrics=['mse'])

# Train the model
batch_size = 32
epochs = 1000
model_ffnn_EN.fit(X_train_EN, y_train_EN, batch_size=batch_size, epochs=epochs, validation_data=(X_test_EN, y_test_EN), callbacks=[early_stopping, tensorboard_callback])

In [None]:
# Get unique values of the labels
unique_labels = np.unique(test_df['label'])

unique_labels

In [None]:
# Get the predictions
unrounded_predictions, predictions = get_predictions(model_ffnn_EN, X_test_EN)

# Round predictions greater than 3 to 3
predictions = np.where(predictions > 3, 3, predictions)

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test_EN, predictions)

print_class_accuray(conf_matrix)

# Get the accuracy
accuracy = accuracy_score(y_test_EN, predictions)

# Print the accuracy
print("Accuracy: {:.2f}%".format(accuracy*100))

# Evaluate the model on the test data
test_loss, test_acc = model_ffnn_EN.evaluate(X_test_EN, y_test_EN, verbose=0)

plot_conf_matrix(conf_matrix, language_model, language_test)

expected_predicted_df, corr = compute_correlation(unrounded_predictions, y_test_EN)
print("Correlation between expected and predicted: {}".format(corr))

plot_expected_vs_predicted(expected_predicted_df, language_model, language_test)

In [None]:
model_ffnn_EN.save('models/EN/ffnn_acc_{}_val_loss_{}_{}_{}'.format(accuracy, test_loss, datetime.datetime.now().strftime("%Y%m%d-%H%M%S"), model_ID))

### Load model

In [None]:
model_ffnn_EN = keras.models.load_model('models/EN/ffnn_20230131-113400')

In [None]:
# Get the predictions
unrounded_predictions, predictions = get_predictions(model_ffnn_EN, X_test_EN)

# Round predictions greater than 3 to 3
predictions = np.where(predictions > 3, 3, predictions)

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test_EN, predictions)

print_class_accuray(conf_matrix)

# Get the accuracy
accuracy = accuracy_score(y_test_EN, predictions)

# Print the accuracy
print("Accuracy: {:.2f}%".format(accuracy*100))

plot_conf_matrix(conf_matrix, language_model, language_test)

expected_predicted_df, corr = compute_correlation(unrounded_predictions, y_test_EN)
print("Correlation between expected and predicted: {}".format(corr))

plot_expected_vs_predicted(expected_predicted_df, language_model, language_test)

## ES

In [None]:
language_model = 'ES' # 'EN' or 'ES'
language_test = 'ES' # 'EN' or 'ES'

model_ID = language_model + "_" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

# Create a TensorBoard object with a log directory
logdir = "C:/Users/carlo/Research/reduction-detection/logs/fit/{model_ID}".format(model_ID=model_ID)
tensorboard_callback = TensorBoard(log_dir=logdir)

# Create the EarlyStopping callback
early_stopping = EarlyStopping(monitor='loss', patience=20)


In [None]:
print(model_ID)

In [None]:
model_ffnn_ES = Sequential()

# Add the input layer
model_ffnn_ES.add(Dense(61, input_dim=80, activation='relu', kernel_regularizer=l2(0.01)))

# Add the hidden layer
model_ffnn_ES.add(Dense(61, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_ES.add(Dropout(0.2))

# Add the hidden layer
model_ffnn_ES.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_ES.add(Dropout(0.2))

# Add the hidden layer
model_ffnn_ES.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_ES.add(Dropout(0.2))

# Add the hidden layer
model_ffnn_ES.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_ES.add(Dropout(0.2))

# Add the hidden layer
model_ffnn_ES.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_ES.add(Dropout(0.2))

# Add the hidden layer
model_ffnn_ES.add(Dense(122, activation='relu', kernel_regularizer=l2(0.01)))

model_ffnn_ES.add(Dropout(0.2))

# Add the output layer
model_ffnn_ES.add(Dense(1, activation='relu'))

# Compile the model
model_ffnn_ES.compile(optimizer='adam', loss='mean_squared_error', metrics=['mse'])

# Train the model
batch_size = 32
epochs = 1000
model_ffnn_ES.fit(X_train_ES, y_train_ES, batch_size=batch_size, epochs=epochs, validation_data=(X_test_ES, y_test_ES), callbacks=[early_stopping, tensorboard_callback])

In [None]:
# Get the predictions
unrounded_predictions, predictions = get_predictions(model_ffnn_ES, X_test_ES)

# Round predictions greater than 3 to 3
predictions = np.where(predictions > 3, 3, predictions)

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test_ES, predictions)

print_class_accuray(conf_matrix)

# Get the accuracy
accuracy = accuracy_score(y_test_ES, predictions)

# Evaluate the model on the test data
test_loss, test_acc = model_ffnn_ES.evaluate(X_test_ES, y_test_ES, verbose=0)

# Print the accuracy
print("Accuracy: {:.2f}%".format(accuracy*100))

plot_conf_matrix(conf_matrix, language_model, language_test)

expected_predicted_df, corr = compute_correlation(unrounded_predictions, y_test_ES)
print("Correlation between expected and predicted: {}".format(corr))

plot_expected_vs_predicted(expected_predicted_df, language_model, language_test)

In [None]:
model_ffnn_ES.save('models/ES/ffnn_acc_{}_val_loss_{}_{}_{}'.format(accuracy, test_loss, datetime.datetime.now().strftime("%Y%m%d-%H%M%S"), model_ID))

### Load model

In [None]:
model_ffnn_ES = keras.models.load_model('models/ES/ffnn_20230131-113400')

In [None]:
# Get the predictions
unrounded_predictions, predictions = get_predictions(model_ffnn_ES, X_test_ES)

# Round predictions greater than 3 to 3
predictions = np.where(predictions > 3, 3, predictions)

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test_ES, predictions)

print_class_accuray(conf_matrix)

# Get the accuracy
accuracy = accuracy_score(y_test_EN, predictions)

# Print the accuracy
print("Accuracy: {:.2f}%".format(accuracy*100))

plot_conf_matrix(conf_matrix, language_model, language_test)

expected_predicted_df, corr = compute_correlation(unrounded_predictions, y_test_EN)
print("Correlation between expected and predicted: {}".format(corr))

plot_expected_vs_predicted(expected_predicted_df, language_model, language_test)