In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder
from sklearn.ensemble import BaggingClassifier, BaggingRegressor, AdaBoostClassifier, AdaBoostRegressor
from sklearn.metrics import mean_absolute_error, confusion_matrix, f1_score, roc_auc_score, roc_curve, auc
from sklearn.utils import class_weight

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Conv1D, Conv2D, Dense, Flatten, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError, SparseCategoricalCrossentropy
from tensorflow.keras.metrics import SparseCategoricalAccuracy
from tensorflow.keras.callbacks import EarlyStopping
from keras.wrappers.scikit_learn import KerasClassifier, KerasRegressor

from keras_tuner import HyperModel
from keras_tuner.tuners import RandomSearch

In [None]:
def read_data(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()

    data = []
    for line in lines:
        row = line.strip().split()
        data.append(row)

    max_cols = max(len(row) for row in data)
    cols = [f"col{i}" for i in range(1, max_cols+1)]
    df = pd.DataFrame(data, columns=cols)
    return df

# Import the data
df = read_data('C:\\Users\\aaron\\Desktop\\CY_Manifolds\\Hodge.K3\\Hodge.K3')

# Name the import columns
df = df.rename(columns={'col1': 'w1'})
df = df.rename(columns={'col2': 'w2'})
df = df.rename(columns={'col3': 'w3'})
df = df.rename(columns={'col4': 'w4'})
df = df.rename(columns={'col5': 'w5'})
df = df.rename(columns={'col6': 'd'})
df = df.rename(columns={'col7': 'TS'})
df = df.rename(columns={'col8': 'h11'}) # Hodge number
df = df.rename(columns={'col9': 'h12'}) # Hodge number
df = df.rename(columns={'col10': 'hashP'})
df = df.rename(columns={'col11': 'hashV'})
df = df.rename(columns={'col12': 'hashPbar'})
df = df.rename(columns={'col13': 'hashVbar'})
df = df.rename(columns={'col14': 'PI'}) # K3 Projection
df = df.rename(columns={'col15': 'facet0'}) # K3 Facets
df = df.rename(columns={'col16': 'facet1'}) # K3 Facets
df = df.rename(columns={'col17': 'facet2'}) # K3 Facet

# Remove the first two characters from the 'Column1' column
df['d'] = df['d'].str.slice(0, -2)
df['h11'] = df['h11'].str.slice(2)
df['hashP'] = df['hashP'].str.slice(2)
df['hashPbar'] = df['hashPbar'].str.slice(2)
df['PI'] = df['PI'].str.slice(2)
df['facet0'] = df['facet0'].str.slice(2)

# Conversion from string to integer
df['w1'] = df['w1'].astype(int) # weight 1
df['w2'] = df['w2'].astype(int) # weight 2
df['w3'] = df['w3'].astype(int) # weight 3
df['w4'] = df['w4'].astype(int) # weight 4
df['w5'] = df['w5'].astype(int) # weight 5
df['d'] = df['d'].astype(int) # degree - sum of the weights
df['h11'] = df['h11'].astype(int) # hodge number h11
df['h12'] = df['h12'].astype(int) # hodge number h12
df['hashP'] = df['hashP'].astype(int) # points of MNP
df['hashV'] = df['hashV'].astype(int) # vertices of MNP
df['hashPbar'] = df['hashPbar'].astype(int) # points of dual MNP
df['hashVbar'] = df['hashVbar'].astype(int) # vertices of dual MNP
df['PI'] = pd.to_numeric(df['PI'], errors='coerce') # projections
df['facet0'] = df['facet0'].astype(int) # first k3 facet
df['facet1'] = pd.to_numeric(df['facet1'], errors='coerce') # second k3 facet
df['facet2'] = pd.to_numeric(df['facet2'], errors='coerce') # third k3 facet
# Numbers of K3 projections (#Pi) and K3 faecets

# For removing unknown projections
#predict = df[df['PI'].isna()]
#df = df[df['PI'].notna()]
#df = df[df['facet1'].notna()]
#df = df[df['facet2'].notna()]

# Filter rows based on the condition
#df = df[df['h12'] <= 200]

# convert facet1/2 to boolean
#df['facet1'] = df['facet1'].apply(lambda x: 0 if pd.isna(x) else 1)
#df['facet2'] = df['facet2'].apply(lambda x: 0 if pd.isna(x) else 1)

# Create feature and target labels
X = df.iloc[:, :5]
y = np.array(df['PI'].values)

# Choose a scaler: MinMaxScaler or StandardScaler
#scaler = MinMaxScaler()  
scaler = StandardScaler() # Best scaler from initial run

# Fit the scaler to your DataFrame (only the columns you want to scale)
scaler.fit(X)

# Transform the columns using the fitted scaler
scaled_X = pd.DataFrame(scaler.transform(X), columns=X.columns)

# Split the data into training and testing sets
#X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42) # no scaling
X_train, X_test, y_train, y_test = train_test_split(scaled_X, y, test_size=0.25, random_state=42) # x scaled only

# Separate feature columns in the 'predict' DataFrame
X_predict = predict.iloc[:, :5]

# Transform the columns in the 'predict' DataFrame using the fitted scaler
scaled_X_predict = pd.DataFrame(scaler.transform(X_predict), columns=X_predict.columns)

df

In [None]:
#df['h11'].unique()
df['h11'].nunique()

In [None]:
stats = df.describe(include='all')
#print(stats)

In [None]:
# Assuming your DataFrame is named 'df' and the column is called 'column_name'
value_counts = df['PI'].value_counts(normalize=False)

# Plot the frequencies as a bar chart
value_counts.plot(kind='bar')
plt.xlabel('Unique Values')
plt.ylabel('Relative Frequency')
plt.title('Frequency of Unique Values in Column')
plt.show()

# Deep Learning w/ Tensorflow

### Base Model - Regression

In [None]:
# Create a deep neural network model using Keras
model = Sequential()
model.add(Dense(96, activation='relu', input_shape=(5,) ))
model.add(Dropout(0.20))
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.20))
#model.add(Dense(64, activation='relu'))
#model.add(Dropout(0.30))
#model.add(Dense(128, activation='relu'))
#model.add(Dropout(0.40))
model.add(Dense(1))

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.0071962472189975694), loss='mse', metrics=['mae'])

# Create an EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# Train the model with the EarlyStopping callback
model.fit(X_train, y_train, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping])

loss, mae = model.evaluate(X_test, y_test, verbose=1)
print("Mean Absolute Error:", mae)

### Base Model - Regression w/ Scaled Outputs

In [None]:
# Create a deep neural network model using Keras
model = Sequential()
model.add(Dense(192, activation='relu', input_shape=(5,) ))
model.add(Dropout(0.50))
model.add(Dense(224, activation='relu'))
model.add(Dropout(0.10))
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.30))
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.40))
model.add(Dense(1))

# Compile the model
model.compile(optimizer='adam', loss='mse', metrics=['mae'])

# Create an EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# Scale the target labels
y_scaler = StandardScaler()
y_train_scaled = y_scaler.fit_transform(y_train.reshape(-1, 1)).flatten()
y_test_scaled = y_scaler.transform(y_test.reshape(-1, 1)).flatten()

# Train the model with the EarlyStopping callback and scaled target labels
model.fit(X_train, y_train_scaled, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping])

# Evaluate the model performance
loss, mae = model.evaluate(X_test, y_test_scaled, verbose=1)
print("Scaled Mean Absolute Error:", mae)

# Convert the Scaled Mean Absolute Error back to the original scale
mae_original_scale = y_scaler.inverse_transform(np.array([[mae]]))[0][0]
print("Mean Absolute Error (Original Scale):", mae_original_scale)


### Base Model - Classification w/o Class Weights

In [None]:
# Create a deep neural network model using Keras
model = Sequential()
model.add(Dense(192, activation='relu', input_shape=(5,) ))
model.add(Dropout(0.20))
model.add(Dense(192, activation='relu'))
model.add(Dropout(0.10))
model.add(Dense(224, activation='relu'))
model.add(Dropout(0.30))
model.add(Dense(160, activation='relu'))
model.add(Dropout(0.20))
model.add(Dense(352, activation='softmax'))

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Create an EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# Train the model with the EarlyStopping callback
model.fit(X_train, y_train, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping])

# Evaluate the model performance
loss, accuracy = model.evaluate(X_test, y_test, verbose=1)
print("Accuracy:", accuracy)

In [None]:
# Obtain predicted probabilities for each class
y_pred_probs = model.predict(X_test)

# Convert predicted probabilities to class labels
y_pred = np.argmax(y_pred_probs, axis=1)

# Calculate the F1-score
f1 = f1_score(y_test, y_pred, average='weighted')
print("F1-score:", f1)

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:\n", conf_matrix)

# Calculate the ROC AUC score for each class
roc_auc_scores = roc_auc_score(y_test, y_pred_probs, multi_class='ovr', average='weighted')
print("ROC AUC scores (weighted):", roc_auc_scores)

# Plot the ROC curve for each class
n_classes = 4
fpr = dict()
tpr = dict()
roc_auc = dict()

# Ensure y_test is an array of integers
y_test_int = y_test.astype(int)

# One-hot encode the true labels
y_test_one_hot = np.eye(n_classes)[y_test_int]

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test_one_hot[:, i], y_pred_probs[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot ROC curve for each class
plt.figure()
for i in range(n_classes):
    plt.plot(fpr[i], tpr[i], label=f"Class {i} (area = {roc_auc[i]:.2f})")

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

### Base Model - Classification w/ Class Weights

In [None]:
# Create a deep neural network model using Keras
model = Sequential()
model.add(Dense(192, activation='relu', input_shape=(5,)))
model.add(Dropout(0.20))
model.add(Dense(192, activation='relu'))
model.add(Dropout(0.10))
model.add(Dense(224, activation='relu'))
model.add(Dropout(0.30))
model.add(Dense(160, activation='relu'))
model.add(Dropout(0.20))
model.add(Dense(316, activation='softmax'))

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Create an EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# Compute class weights
class_weights = class_weight.compute_class_weight('balanced', classes = np.unique(y_train), y=y_train)
class_weights = dict(enumerate(class_weights))

# Train the model with the EarlyStopping callback and class weights
model.fit(X_train, y_train, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping], class_weight=class_weights)

# Evaluate the model performance
loss, accuracy = model.evaluate(X_test, y_test, verbose=1)
print("Accuracy:", accuracy)

In [None]:
# Obtain predicted probabilities for each class
y_pred_probs = model.predict(X_test)

# Convert predicted probabilities to class labels
y_pred = np.argmax(y_pred_probs, axis=1)

# Calculate the F1-score
f1 = f1_score(y_test, y_pred, average='weighted')
print("F1-score:", f1)

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:\n", conf_matrix)

# Calculate the ROC AUC score for each class
roc_auc_scores = roc_auc_score(y_test, y_pred_probs, multi_class='ovr', average='weighted')
print("ROC AUC scores (weighted):", roc_auc_scores)

# Plot the ROC curve for each class
n_classes = 4
fpr = dict()
tpr = dict()
roc_auc = dict()

# Ensure y_test is an array of integers
y_test_int = y_test.astype(int)

# One-hot encode the true labels
y_test_one_hot = np.eye(n_classes)[y_test_int]

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test_one_hot[:, i], y_pred_probs[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot ROC curve for each class
plt.figure()
for i in range(n_classes):
    plt.plot(fpr[i], tpr[i], label=f"Class {i} (area = {roc_auc[i]:.2f})")

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

### Base Model - Classification w/ Class Weights & One Hot Encoding

In [None]:
import random
import itertools
import numpy as np
from sklearn.utils import class_weight
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping

def augment_data(X, y, num_permutations=10):
    X_augmented = []
    y_augmented = []

    for i in range(X.shape[0]):
        all_permutations = list(itertools.permutations(X[i]))
        selected_permutations = random.sample(all_permutations, num_permutations)

        X_augmented.extend(selected_permutations)
        y_augmented.extend([y[i]] * num_permutations)

    return np.array(X_augmented), np.array(y_augmented)

X_train_augmented, y_train_augmented = augment_data(X_train.to_numpy(), y_train, num_permutations=10)
unique_classes = np.unique(y_train)
mapping_dict = {label: idx for idx, label in enumerate(unique_classes)}

y_train_mapped = np.array([mapping_dict[label] for label in y_train_augmented])
y_test_mapped = np.array([mapping_dict[label] for label in y_test])

class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train_augmented), y=y_train_augmented)
class_weights_dict = dict(enumerate(class_weights))

y_train_encoded = to_categorical(y_train_mapped, num_classes=len(unique_classes))
y_test_encoded = to_categorical(y_test_mapped, num_classes=len(unique_classes))

model = Sequential()
model.add(Dense(192, activation='relu', input_shape=(5,)))
model.add(Dropout(0.20))
model.add(Dense(192, activation='relu'))
model.add(Dropout(0.10))
model.add(Dense(224, activation='relu'))
model.add(Dropout(0.30))
model.add(Dense(160, activation='relu'))
model.add(Dropout(0.20))
model.add(Dense(len(unique_classes), activation='softmax'))

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

model.fit(X_train_augmented, y_train_encoded, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping], class_weight=class_weights_dict)

loss, accuracy = model.evaluate(X_test, y_test_encoded, verbose=1)
print("Accuracy:", accuracy)


# Keras Tuner

### Regression

In [None]:
class RegressionHyperModel(HyperModel):
    def __init__(self, input_shape):
        self.input_shape = input_shape

    def build(self, hp):
        model = Sequential()
        model.add(Dense(units=hp.Int('units_1', min_value=32, max_value=256, step=32), 
                        activation='relu', input_shape=self.input_shape))
        model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.5, step=0.1)))

        for i in range(hp.Int('num_layers', min_value=1, max_value=3)):
            model.add(Dense(units=hp.Int(f'units_{i+2}', min_value=32, max_value=256, step=32), activation='relu'))
            model.add(Dropout(hp.Float(f'dropout_{i+2}', min_value=0.1, max_value=0.5, step=0.1)))

        model.add(Dense(1))

        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG')
        optimizer = Adam(learning_rate=learning_rate)

        model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])
        return model

input_shape = (5,)
hypermodel = RegressionHyperModel(input_shape)

tuner = RandomSearch(
    hypermodel,
    objective='val_loss',
    max_trials=50,
    executions_per_trial=1,
    directory='random_search',
    project_name='regression_final_h12'
)

tuner.search_space_summary()

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# Train the model with the EarlyStopping callback
tuner.search(X_train, y_train, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping])

# Get the best model
best_model = tuner.get_best_models(num_models=1)[0]

# Evaluate the best model on the test set
loss, mae = best_model.evaluate(X_test, y_test, verbose=1)
print("Mean Absolute Error:", mae)

In [None]:
# Get the best set of hyperparameters
best_hyperparameters = tuner.get_best_hyperparameters()[0]
best_hyperparameters.values

### Classification w/o Class Weights

In [None]:
from tensorflow.keras.utils import to_categorical

class ClassificationHyperModel(HyperModel):
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape
        self.num_classes = num_classes

    def build(self, hp):
        model = Sequential()
        model.add(Dense(units=hp.Int('units_1', min_value=32, max_value=256, step=32),
                        activation='relu', input_shape=self.input_shape))
        model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.5, step=0.1)))

        for i in range(hp.Int('num_layers', min_value=1, max_value=3)):
            model.add(Dense(units=hp.Int(f'units_{i+2}', min_value=32, max_value=256, step=32), activation='relu'))
            model.add(Dropout(hp.Float(f'dropout_{i+2}', min_value=0.1, max_value=0.5, step=0.1)))

        model.add(Dense(self.num_classes, activation='softmax'))

        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG')
        optimizer = Adam(learning_rate=learning_rate)

        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        return model

input_shape = (5,)
num_classes = 2

# One-hot encode the labels
y_train_encoded = to_categorical(y_train, num_classes=num_classes)
y_test_encoded = to_categorical(y_test, num_classes=num_classes)

hypermodel = ClassificationHyperModel(input_shape, num_classes)

tuner = RandomSearch(
    hypermodel,
    objective='val_loss',
    max_trials=50,
    executions_per_trial=1,
    directory='random_search',
    project_name='classification_final_binary_facet1'
)

tuner.search_space_summary()

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# Train the model with the EarlyStopping callback
tuner.search(X_train, y_train_encoded, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping])

# Get the best model
best_model = tuner.get_best_models(num_models=1)[0]

# Evaluate the best model on the test set
loss, accuracy = best_model.evaluate(X_test, y_test_encoded, verbose=1)
print("Accuracy:", accuracy)

In [None]:
# Get the best set of hyperparameters
best_hyperparameters = tuner.get_best_hyperparameters()[0]
best_hyperparameters.values

### Classification w/ Class Weights

In [None]:
from tensorflow.keras.utils import to_categorical

class ClassificationHyperModel(HyperModel):
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape
        self.num_classes = num_classes

    def build(self, hp):
        model = Sequential()
        model.add(Dense(units=hp.Int('units_1', min_value=32, max_value=256, step=32),
                        activation='relu', input_shape=self.input_shape))
        model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.5, step=0.1)))

        for i in range(hp.Int('num_layers', min_value=1, max_value=3)):
            model.add(Dense(units=hp.Int(f'units_{i+2}', min_value=32, max_value=256, step=32), activation='relu'))
            model.add(Dropout(hp.Float(f'dropout_{i+2}', min_value=0.1, max_value=0.5, step=0.1)))

        model.add(Dense(self.num_classes, activation='softmax'))

        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG')
        optimizer = Adam(learning_rate=learning_rate)

        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        return model

input_shape = (5,)
num_classes = 2

# One-hot encode the labels
y_train_encoded = to_categorical(y_train, num_classes=num_classes)
y_test_encoded = to_categorical(y_test, num_classes=num_classes)

hypermodel = ClassificationHyperModel(input_shape, num_classes)

tuner = RandomSearch(
    hypermodel,
    objective='val_loss',
    max_trials=50,
    executions_per_trial=1,
    directory='random_search',
    project_name='classification_final_binary_facet2'
)

tuner.search_space_summary()

# Compute the class weights
class_weights = class_weight.compute_class_weight('balanced', classes = np.unique(y_train), y=y_train)
class_weights = dict(enumerate(class_weights))

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# Train the model with the EarlyStopping callback and class weights
tuner.search(X_train, y_train_encoded, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping], class_weight=class_weights)

# Get the best model
best_model = tuner.get_best_models(num_models=1)[0]

# Evaluate the best model on the test set
loss, accuracy = best_model.evaluate(X_test, y_test_encoded, verbose=1)
print("Accuracy:", accuracy)

In [None]:
# Get the best set of hyperparameters
best_hyperparameters = tuner.get_best_hyperparameters()[0]
best_hyperparameters.values

### Classification - Class Weights/One Hot Encoding/Label ReMapping

In [None]:
from sklearn.utils import class_weight
from tensorflow.keras.utils import to_categorical

class ClassificationHyperModel(HyperModel):
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape
        self.num_classes = num_classes

    def build(self, hp):
        model = Sequential()
        model.add(Dense(units=hp.Int('units_1', min_value=32, max_value=256, step=32),
                        activation='relu', input_shape=self.input_shape))
        model.add(Dropout(hp.Float('dropout_1', min_value=0.1, max_value=0.5, step=0.1)))

        for i in range(hp.Int('num_layers', min_value=1, max_value=3)):
            model.add(Dense(units=hp.Int(f'units_{i+2}', min_value=32, max_value=256, step=32), activation='relu'))
            model.add(Dropout(hp.Float(f'dropout_{i+2}', min_value=0.1, max_value=0.5, step=0.1)))

        model.add(Dense(self.num_classes, activation='softmax'))

        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2, sampling='LOG')
        optimizer = Adam(learning_rate=learning_rate)

        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'], weighted_metrics=['accuracy'])
        #model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        return model

input_shape = (5,)
num_classes = 210

# Create a mapping dictionary for class labels
combined_y = np.concatenate((y_train, y_test))
unique_classes = np.unique(combined_y)
mapping_dict = {label: idx for idx, label in enumerate(unique_classes)}

# Map the original class labels to the new labels
y_train_mapped = np.array([mapping_dict[label] for label in y_train])
y_test_mapped = np.array([mapping_dict[label] for label in y_test])


# Compute class weights for the new labels
#class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
#class_weights_dict = {mapping_dict[label]: weight for label, weight in zip(np.unique(y_train), class_weights)}
# Compute sample weights for the training set
sample_weights = class_weight.compute_sample_weight('balanced', y_train_mapped)


hypermodel = ClassificationHyperModel(input_shape, len(unique_classes))

tuner = RandomSearch(
    hypermodel,
    objective='val_loss',
    max_trials=50,
    executions_per_trial=1,
    directory='random_search',
    project_name='classification_final_h12'
)

tuner.search_space_summary()

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

# One-hot encode the mapped labels
y_train_encoded = to_categorical(y_train_mapped, num_classes=len(unique_classes))
y_test_encoded = to_categorical(y_test_mapped, num_classes=len(unique_classes))

# Train the model with the EarlyStopping callback and class weights
#tuner.search(X_train, y_train_encoded, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping], class_weight=class_weights_dict)
tuner.search(X_train, y_train_encoded, epochs=100, batch_size=64, validation_split=0.25, verbose=2, callbacks=[early_stopping], sample_weight=sample_weights)

# Get the best model
best_model = tuner.get_best_models(num_models=1)[0]

# Evaluate the best model on the test set
loss, accuracy = best_model.evaluate(X_test, y_test_encoded, verbose=1)
print("Accuracy:", accuracy)

In [None]:
# Get the best set of hyperparameters
best_hyperparameters = tuner.get_best_hyperparameters()[0]
best_hyperparameters.values

# Ensemble Methods

### Bagging - Regression

In [None]:
def create_regression_model(callbacks=None):
    model = Sequential()
    model.add(Dense(64, activation='relu', input_shape=(5,)))
    model.add(Dropout(0.10))
    model.add(Dense(224, activation='relu'))
    model.add(Dropout(0.50))
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.10))
#    model.add(Dense(256, activation='relu'))
#    model.add(Dropout(0.40))
    model.add(Dense(1))

    model.compile(optimizer=Adam(learning_rate=0.0001966340202965072), loss='mse', metrics=['mae'])
    
    return model

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

keras_regressor = KerasRegressor(build_fn=lambda: create_regression_model(callbacks=[early_stopping]), epochs=100, batch_size=64, verbose=1)

n_estimators = 5  # Number of base models in the ensemble
bagging_regressor = BaggingRegressor(estimator=keras_regressor, n_estimators=n_estimators, max_samples=0.8, random_state=42)

bagging_regressor.fit(X_train, y_train)

y_pred = bagging_regressor.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
print("Mean Absolute Error:", mae)

### Bagging - Classification

In [None]:
from keras.callbacks import EarlyStopping

def create_model(callbacks=None):
    model = Sequential()
    model.add(Dense(256, activation='relu', input_shape=(5,)))
    model.add(Dropout(0.50))
    model.add(Dense(192, activation='relu'))
    model.add(Dropout(0.10))
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.20))
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.30))
    model.add(Dense(4, activation='softmax'))

    model.compile(optimizer=Adam(learning_rate=0.0007371287947368476), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
    return model

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='min', restore_best_weights=True)

keras_model = KerasClassifier(build_fn=lambda: create_model(callbacks=[early_stopping]), epochs=100, batch_size=64, verbose=1)

n_estimators = 5  # Number of base models in the ensemble
bagging_model = BaggingClassifier(estimator=keras_model, n_estimators=n_estimators, max_samples=0.8, random_state=42)

# Encode the labels if they are not already integers
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

# Fit the bagging model
bagging_model.fit(X_train, y_train)

accuracy = bagging_model.score(X_test, y_test)
print("Accuracy:", accuracy)

In [None]:
# Obtain predicted probabilities for each class
y_pred_probs = bagging_model.predict_proba(X_test)

# Convert predicted probabilities to class labels
y_pred = np.argmax(y_pred_probs, axis=1)

# Calculate the F1-score
f1 = f1_score(y_test, y_pred, average='weighted')
print("F1-score:", f1)

import seaborn as sns

# Calculate the confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:\n", conf_matrix)

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.set(font_scale=1.4)  # Increase font size
ax = sns.heatmap(conf_matrix, annot=True, fmt='d', cmap=plt.cm.Blues, cbar=False, annot_kws={"size": 14})
plt.xlabel('Predicted Label', fontsize=16)
plt.ylabel('True Label', fontsize=16)
plt.title("Confusion Matrix", fontsize=18)

# Customize tick labels
ax.set_xticklabels(ax.get_xticklabels(), fontsize=14)
ax.set_yticklabels(ax.get_yticklabels(), fontsize=14)

plt.show()

# Calculate the ROC AUC score for each class
#roc_auc_scores = roc_auc_score(y_test, y_pred_probs, multi_class='ovr', average='weighted')
#print("ROC AUC scores (weighted):", roc_auc_scores)

# Plot the ROC curve for each class
n_classes = 4
fpr = dict()
tpr = dict()
roc_auc = dict()

# Ensure y_test is an array of integers
y_test_int = y_test.astype(int)

# One-hot encode the true labels
y_test_one_hot = np.eye(n_classes)[y_test_int]

# Calculate the ROC AUC score for each class
roc_auc_scores = roc_auc_score(y_test_one_hot, y_pred_probs, average='weighted')
print("ROC AUC scores (weighted):", roc_auc_scores)

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test_one_hot[:, i], y_pred_probs[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot ROC curve for each class
plt.figure()
for i in range(n_classes):
    plt.plot(fpr[i], tpr[i], label=f"Class {i} (area = {roc_auc[i]:.2f})")

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

In [None]:
model.save_weights("k3_fibrations_classification_proj_1.h5")

In [None]:
# Make predictions using the bagging model
predictions = bagging_model.predict(scaled_X_predict)

# Obtain class probabilities using the bagging model
probabilities = bagging_model.predict_proba(scaled_X_predict)

# Calculate the confidence for each prediction
confidence = np.max(probabilities, axis=1)

# Convert predictions to integer type
predictions = predictions.astype(int)

# Inverse transform the predicted labels
predicted_labels = label_encoder.inverse_transform(predictions)

# Combine the original X_predict features with the predicted labels and confidence
predicted_df = X_predict.copy()
predicted_df['Predicted_PI'] = predicted_labels
predicted_df['Confidence'] = confidence

print(predicted_df)


In [None]:
class_counts = predicted_df['Predicted_PI'].value_counts()
print(class_counts)

In [None]:
predicted_df.to_csv('predictions.csv', index=False)