In [2]:
import os
import sklearn
import numpy as np
from numpy import unwrap, diff, abs, angle
import pandas as pd
import seaborn as sns
import tensorflow as tf
from sklearn.svm import SVC
import matplotlib.pyplot as plt
from scipy.signal import hilbert
from sklearn.utils import shuffle
import scipy
from scipy.signal import butter, filtfilt, hilbert
from scipy.interpolate import interp1d
from scipy.interpolate import CubicSpline
from tensorflow.keras.models import Sequential
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import classification_report
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split
from keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from sklearn.metrics import accuracy_score, classification_report
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tensorflow.keras.layers import Dense,  BatchNormalization, Dropout
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
import mne
from mne.preprocessing import ICA
import pywt
from scipy.stats import skew, kurtosis
from scipy.signal import spectrogram

Preproccesing Functions

In [3]:
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = filtfilt(b, a, data)
    return y

def denoise_data(df, col_names, n_clusters):
    df_denoised = df.copy()
    for col_name, k in zip(col_names, n_clusters):
        df_denoised[col_name] = pd.to_numeric(df_denoised[col_name], errors='coerce') # Convert column to numeric format
        X = df_denoised.select_dtypes(include=['float64', 'int64']) # Select only numeric columns
        clf = KNeighborsRegressor(n_neighbors=k, weights='uniform') # Fit KNeighborsRegressor
        clf.fit(X.index.values[:, np.newaxis], X[col_name])
        y_pred = clf.predict(X.index.values[:, np.newaxis]) # Predict values 
        df_denoised[col_name] = y_pred
    return df_denoised

def z_score(df, col_names):
    df_standard = df.copy()
    for col in col_names:
        df_standard[col] = (df[col] - df[col].mean()) / df[col].std()
    return df_standard

def custom_detrend(df, col_names):
    df_detrended = df.copy()
    for col in col_names:
        y = df_detrended[col]
        x = np.arange(len(y))
        p = np.polyfit(x, y, 1)
        trend = np.polyval(p, x)
        detrended = y - trend
        df_detrended[col] = detrended
    return df_detrended

def preprocess(df, col_names, n_clusters):
    df_new = df.copy()
    df_new = denoise_data(df, col_names, n_clusters)
    # df_new = z_score(df_new, col_names)
    # df_new = custom_detrend(df_new, col_names)
    return df_new

def df_to_raw(df, sfreq=250):
    info = mne.create_info(ch_names=list(df.columns), sfreq=sfreq, ch_types=['eeg'] * df.shape[1])
    raw = mne.io.RawArray(df.T.values * 1e-6, info)  # Converting values to Volts from microvolts for MNE
    return raw

def reject_artifacts(df, channel):
    threshold_factor = 3
    median = df[channel].median()
    mad = np.median(np.abs(df[channel] - median))
    spikes = np.abs(df[channel] - median) > threshold_factor * mad
    x = np.arange(len(df[channel]))
    cs = CubicSpline(x[~spikes], df[channel][~spikes]) # Interpolate using Cubic Spline
    interpolated_values = cs(x)
    interpolated_values[spikes] *= 0.1  # Make interpolated values 0.1 times smaller
    # Check each interpolated value's difference from median and compare to the threshold
    spike_values = np.abs(interpolated_values - median) > threshold_factor * mad
    interpolated_values[spike_values] *= 0.01 
    df[channel] = interpolated_values
    return df


Define the dataset

In [4]:
import os
import numpy as np
import pandas as pd
import mne
from mne.preprocessing import ICA
from mne.viz import plot_topomap
from scipy.signal import welch
import numpy as np
import matplotlib.pyplot as plt

In [5]:
montage = mne.channels.make_standard_montage('standard_1020')

In [None]:
# folder_name = 'i'
# selected_columns = ['Fz', 'FC1', 'FC2', 'C3', 'Cz', 'C4', 'CPz', 'Pz']
# duration = 40 
# raw=[]
# event=[]
# PP=[]
# BP=[]
# cleaned_data_list = []
# if os.path.exists(folder_name) and os.path.isdir(folder_name):
#     for file_name in os.listdir(folder_name):
#         if file_name.endswith('.csv'):
#             file_path = os.path.join(folder_name, file_name)
#             s_temp = pd.read_csv(file_path, header=None)
#             inst = s_temp.iloc[:, 17]
#             df_temp = s_temp.iloc[:, :8]
#             # print(df_temp.shape)
#             # df_temp.plot(figsize=(10, 8))
#             # plt.show()
#             raw.append(df_temp)
#             event.append(inst)
            
#             # 1. Band Pass
#             raw_bp = np.copy(df_temp)
#             for column in range(8):
#                 raw_bp[:, column] = butter_bandpass_filter(raw_bp[:, column], lowcut=.4, highcut=40, fs=250) 
#             plt.plot(raw_bp)
#             plt.show()
#             # print(raw_bp.shape)
#             # raw_bp = mne.filter.filter_data(raw_bp.T, sfreq=250, l_freq=.4, h_freq=40, method='iir').T

#             # # 2. Artifact rejection
#             # BP_artifact_RJ = np.copy(raw_bp)
#             # for channel in range (8):
#             #     BP_artifact_RJ= reject_artifacts(pd.DataFrame(BP_artifact_RJ), channel)
#             #     print('BP_artifact_RJ' ,type(BP_artifact_RJ))
#             # # plt.plot(BP_artifact_RJ)
#             # # plt.show()
            
#             # # 3. Smoothing
#             # BP_artifact_RJ_SM=BP_artifact_RJ.copy()
#             # window_size = 10 
#             # for channel in range (8):
#             #     # channel_data = BP_artifact_RJ_SM[channel, :]
#             #     BP_artifact_RJ_SM= BP_artifact_RJ_SM.rolling(window=window_size, center=True).mean().fillna(method='bfill').fillna(method='ffill')
#             # # plt.plot(BP_artifact_RJ_SM)
#             # # plt.show()
#             # BP.append(BP_artifact_RJ_SM)
            
#             # 4. Denoising and other preprocessing
#             raw_bp_df = pd.DataFrame(raw_bp, columns=selected_columns)
#             # BP_artifact_RJ_SM.columns = selected_columns
#             # eeg_df_denoised = preprocess(pd.DataFrame(BP_artifact_RJ_SM), col_names=selected_columns, n_clusters=[50]*len(selected_columns))
#             eeg_df_denoised = preprocess(raw_bp_df, col_names=selected_columns, n_clusters=[10]*len(selected_columns))
#             eeg_df_denoised.plot(subplots=True, figsize=(15, 10), title='Denoised EEG Data')
#             plt.show()
#             PP.append(eeg_df_denoised)

#             # 2. Create MNE Raw object
#             info = mne.create_info(ch_names=selected_columns, ch_types=['eeg']*8, sfreq=250)
#             raw_mne = mne.io.RawArray(raw_bp.T, info)
#             raw_mne.set_montage(montage)

#             # 3. Apply ICA
#             ica = ICA(n_components=6, method='infomax', fit_params=dict(extended=True), random_state=None, max_iter=800)
#             ica.fit(raw_mne, picks='eeg')
           
#             # After getting ICA sources:
#             sources = ica.get_sources(raw_mne)
#             source_data = sources.get_data()
#             # Define the sampling frequency and parameters for the Welch method
            
#             fs = 250  # Your data's sampling frequency
#             nperseg = fs  # 1-second window
#             noverlap = nperseg // 2  # 50% overlap

#             # Create a figure to encapsulate all plots for this trial/block
#             n_components = source_data.shape[0]
#             fig, axes = plt.subplots(n_components, 3, figsize=(8, n_components*1.5))
#             fig.suptitle(f'Trial_{file_name}', fontsize=10)

#             for i in range(n_components):
#                 # Topomap
#                 mne.viz.plot_topomap(ica.get_components()[:, i], ica.info, axes=axes[i, 0], cmap='jet', show=False, sphere=0.08)    
#                 # Time Course
#                 axes[i, 1].plot(raw_mne.times, source_data[i, :])
#                 axes[i, 1].set_title(f'Component {i} Time Course')
#                 axes[i, 1].set_xlabel('Time (s)')
#                 axes[i, 1].set_ylabel('Amplitude')
#                 # PSD
#                 frequencies, psd = welch(source_data[i, :], fs=fs, nperseg=nperseg)
#                 mask = (frequencies >= 0) & (frequencies <= 40)
#                 psd_log = 10 * np.log10(psd[mask])
#                 axes[i, 2].plot(frequencies[mask], psd_log)
#                 axes[i, 2].set_title(f'Component {i} PSD')
#                 axes[i, 2].set_xlabel('Frequency (Hz)')
#                 axes[i, 2].set_ylabel('PSD (dB/Hz)')
#             # Adjust layout for better visualization
#             plt.tight_layout()
#             plt.subplots_adjust(top=0.95)
#             plt.show()
            
#             # Prompt user for components to exclude
#             exclude_components_input = input("Enter components to remove as comma-separated values (e.g., 0,2,5). If none, just press Enter: ")

#             if exclude_components_input.strip():  # Check if the input is not empty
#                 exclude_components = [int(comp.strip()) for comp in exclude_components_input.split(",")]
                
#                 # Mark components for exclusion
#                 ica.exclude = exclude_components
                
#                 # Apply ICA cleaning
#                 raw_mne_clean = raw_mne.copy()
#                 ica.apply(raw_mne_clean)
#             else:
#                 print("No components excluded. Proceeding with the original data.")
#                 raw_mne_clean = raw_mne.copy()


#             # Extract the data and times from the cleaned raw object
#             clean_data = raw_mne_clean.get_data().T
#             times = raw_mne_clean.times

#             # Plotting the cleaned data for this trial using matplotlib
#             fig, axs = plt.subplots(len(selected_columns), 1, figsize=(15, 10), sharex=True)
#             for i, channel in enumerate(selected_columns):
#                 axs[i].plot(times, clean_data[:, i], label=channel)
#                 axs[i].set_title(channel)
#                 axs[i].legend(loc="upper right")
#             plt.xlabel('Time (s)')
#             plt.tight_layout()
#             plt.suptitle(f'Cleaned EEG Data for Trial {file_name}', y=1.02)
#             plt.show()


#             # Append the cleaned data to a list (assuming you have a list to append to)
#             cleaned_data_list.append(raw_mne_clean)  # where cleaned_data_list is a list you've initialized before your loop


In [None]:
            # # 4. Denoising and other preprocessing
            # raw_bp_df = pd.DataFrame(raw_bp, columns=selected_columns)
            # # BP_artifact_RJ_SM.columns = selected_columns
            # # eeg_df_denoised = preprocess(pd.DataFrame(BP_artifact_RJ_SM), col_names=selected_columns, n_clusters=[50]*len(selected_columns))
            # eeg_df_denoised = preprocess(raw_bp_df, col_names=selected_columns, n_clusters=[10]*len(selected_columns))
            # eeg_df_denoised.plot(subplots=True, figsize=(15, 10), title='Denoised EEG Data')
            # plt.show()
            # PP.append(eeg_df_denoised)

In [None]:
folder_name = 'i'
selected_columns = ['Fz', 'FC1', 'FC2', 'C3', 'Cz', 'C4', 'CPz', 'Pz']
# Define a list of colors, can be extended or modified
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k', 'orange', 'purple', 'pink', 'brown', 'gray']
duration = 40 
raw=[]
event=[]
PP=[]
BP=[]
cleaned_data_list = []
if os.path.exists(folder_name) and os.path.isdir(folder_name):
    for file_name in os.listdir(folder_name):
        if file_name.endswith('.csv'):
            file_path = os.path.join(folder_name, file_name)
            s_temp = pd.read_csv(file_path, header=None)
            inst = s_temp.iloc[:, 17]
            df_temp = s_temp.iloc[:, :8]
            # print(df_temp.shape)
            # df_temp.plot(figsize=(10, 8))
            # plt.show()
            raw.append(df_temp)
            event.append(inst)
            
            # 1. Band Pass
            raw_bp = np.copy(df_temp)
            for column in range(8):
                raw_bp[:, column] = butter_bandpass_filter(raw_bp[:, column], lowcut=.4, highcut=40, fs=250) 
            # plt.plot(raw_bp)
            # plt.show()
            
                        
            fig, axs = plt.subplots(len(selected_columns), 1, figsize=(15, 10), sharex=True)
            for i, channel in enumerate(selected_columns):
                axs[i].plot(times, raw_bp[:, i], label=channel, color=colors[i])
                axs[i].set_title(channel)
                axs[i].legend(loc="upper right")
            plt.xlabel('Time (s)')
            plt.tight_layout()
            plt.suptitle(f'raw_bp for Trial {file_name}', y=1.02)
            plt.show()

            


            # 2. Create MNE Raw object
            info = mne.create_info(ch_names=selected_columns, ch_types=['eeg']*8, sfreq=250)
            raw_mne = mne.io.RawArray(raw_bp.T, info)
            raw_mne.set_montage(montage)

            # 3. Apply ICA
            ica = ICA(n_components=6, method='infomax', fit_params=dict(extended=True), random_state=None, max_iter=800)
            ica.fit(raw_mne, picks='eeg')
           
            # After getting ICA sources:
            sources = ica.get_sources(raw_mne)
            source_data = sources.get_data()
            # Define the sampling frequency and parameters for the Welch method
            
            fs = 250  # Your data's sampling frequency
            nperseg = fs  # 1-second window
            noverlap = nperseg // 2  # 50% overlap

            # Create a figure to encapsulate all plots for this trial/block
            n_components = source_data.shape[0]
            fig, axes = plt.subplots(n_components, 3, figsize=(8, n_components*1.5))
            fig.suptitle(f'Trial_{file_name}', fontsize=10)

            for i in range(n_components):
                # Topomap
                mne.viz.plot_topomap(ica.get_components()[:, i], ica.info, axes=axes[i, 0], cmap='jet', show=False, sphere=0.08)    
                # Time Course
                axes[i, 1].plot(raw_mne.times, source_data[i, :])
                axes[i, 1].set_title(f'Component {i} Time Course')
                axes[i, 1].set_xlabel('Time (s)')
                axes[i, 1].set_ylabel('Amplitude')
                # PSD
                frequencies, psd = welch(source_data[i, :], fs=fs, nperseg=nperseg)
                mask = (frequencies >= 0) & (frequencies <= 40)
                psd_log = 10 * np.log10(psd[mask])
                axes[i, 2].plot(frequencies[mask], psd_log)
                axes[i, 2].set_title(f'Component {i} PSD')
                axes[i, 2].set_xlabel('Frequency (Hz)')
                axes[i, 2].set_ylabel('PSD (dB/Hz)')
            plt.tight_layout()
            plt.subplots_adjust(top=0.95)
            plt.show()
            
            # Prompt user for components to exclude
            exclude_components_input = input("Enter components to remove as comma-separated values (e.g., 0,2,5). If none, just press Enter: ")

            if exclude_components_input.strip():  # Check if the input is not empty
                exclude_components = [int(comp.strip()) for comp in exclude_components_input.split(",")]
                
                # Mark components for exclusion
                ica.exclude = exclude_components
                
                # Apply ICA cleaning
                raw_mne_clean = raw_mne.copy()
                ica.apply(raw_mne_clean)
            else:
                print("No components excluded. Proceeding with the original data.")
                raw_mne_clean = raw_mne.copy()


            # Extract the data and times from the cleaned raw object
            clean_data = raw_mne_clean.get_data().T
            times = raw_mne_clean.times

            # Plotting the cleaned data for this trial using matplotlib
            fig, axs = plt.subplots(len(selected_columns), 1, figsize=(15, 10), sharex=True)
            for i, channel in enumerate(selected_columns):
                axs[i].plot(times, clean_data[:, i], label=channel, color=colors[i])
                axs[i].set_title(channel)
                axs[i].legend(loc="upper right")
            plt.xlabel('Time (s)')
            plt.tight_layout()
            plt.suptitle(f'Cleaned EEG Data for Trial {file_name}', y=1.02)
            plt.show()
            
            
            # Denoise the cleaned data
            clean_data_df = pd.DataFrame(clean_data, columns=selected_columns)
            n_clusters = [10]*len(selected_columns)  # adjust the values based on how much smoothing you want
            denoised_data_df = denoise_data(clean_data_df, col_names=selected_columns, n_clusters=n_clusters)



            fig, axs = plt.subplots(len(selected_columns), 1, figsize=(15, 10), sharex=True)
            for i, channel in enumerate(selected_columns):
                axs[i].plot(times, denoised_data_df[channel], label=channel, color=colors[i])
                axs[i].set_title(channel)
                axs[i].legend(loc="upper right")
            plt.xlabel('Time (s)')
            plt.tight_layout()
            plt.suptitle(f'denoised EEG Data for Trial {file_name}', y=1.02)
            plt.show()



            # # Append the cleaned data to a list (assuming you have a list to append to)
            # cleaned_data_list.append(raw_mne_clean)  # where cleaned_data_list is a list you've initialized before your loop


In [None]:
fs=250
B_N=int(len(PP)) #Number of blocks
PP_NP=np.array(PP) #shape: (B_N, 10000, 8=Channel Numbers)
event=np.array(event).reshape(B_N*(df_temp.shape[0]), 1) # df_temp.shape[0]=10000
denoised=PP_NP.reshape(B_N*(df_temp.shape[0]), 8) # seprate each blocks' signal 
pp_sig_event=np.concatenate((denoised, event), axis=1) 
labels=[] 
face = [] #lable=0
scene=[]#lable=1
# Aassuming correctness for the human behavior
for i in range(len(pp_sig_event)): #len(pp_sig_event) = the whole sample points, (df_temp.shape[0]*B_N)
    if 'M' in pp_sig_event[i, 8] or 'F' in pp_sig_event[i, 8]:
        face.append(pp_sig_event[i])
        labels.append(0)
    else:
        scene.append(pp_sig_event[i]) 
        labels.append(1)        
face = np.array(face)
scene = np.array(scene)
labels=np.array(labels) 
                 
print('event', event.shape,  'denoised',  denoised.shape, 'pp_sig_event', pp_sig_event.shape, 'face', face.shape, 'scene', scene.shape, 'labels', labels.shape)  
#denoised is all the denoised data with shape: (df_temp.shape[0]*B_N, 8)     
# event is all the events with shape: (df_temp.shape[0]*B_N, 1)                            

In [None]:
label=labels.reshape(int(labels.shape[0]/fs), fs)
Y=np.squeeze(label[:,0])

frequency_bands = {
    'delta': (0.5, 4),
    'theta': (4, 8),
    'alpha': (8, 14),
    'beta': (14, 30),
    'gamma': (30, 40),
     }

def apply_bandpass_filter(signal, lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    filtered_signal = filtfilt(b, a, signal)
    return filtered_signal
denoised_reshaped = denoised.reshape(int(denoised.shape[0]/250), 250, 8)

# def extract_statistical_features_from_amplitude(coefficients):
#     """Extract statistical features from the amplitude of wavelet coefficients."""
#     amplitude = np.abs(coefficients.flatten())  # Get the amplitude and flatten
#     # Extract features from amplitude
#     mean_amp = np.mean(amplitude)
#     variance_amp = np.var(amplitude)
#     skewness_amp = skew(amplitude)
#     kurtosis_amp = kurtosis(amplitude)
#     return [mean_amp, variance_amp, skewness_amp, kurtosis_amp]

# features_combined = []
# for segment in denoised_reshaped:
#     segment_features = []
#     for channel_data in segment.T:  # Going through each channel
#         # For each frequency band, apply the bandpass filter and then extract wavelet features
#         for band, (low, high) in frequency_bands.items():
#             filtered_signal = apply_bandpass_filter(channel_data, low, high, fs)
#             coefficients, frequencies = pywt.cwt(filtered_signal, scales=np.arange(1, 50), wavelet='cmor')
#             # Extract statistical features from the amplitude
#             amp_features = extract_statistical_features_from_amplitude(coefficients)
#             segment_features.extend(amp_features)            
#     features_combined.append(segment_features)
# features_combined_np = np.array(features_combined)
# print(features_combined_np.shape)
# wavelet_np=features_combined_np.reshape(features_combined_np.shape[0], 8, int(features_combined_np.shape[1]/8))

In [None]:
# Wavelet combined with frequency bands
features_combined = []

for segment in denoised_reshaped:
    segment_features = []
    for channel_data in segment.T:  # Going through each channel
        
        # For each frequency band, apply the bandpass filter and then extract wavelet features
        for band, (low, high) in frequency_bands.items():
            filtered_signal = apply_bandpass_filter(channel_data, low, high, fs)
            coeffs = pywt.wavedec(filtered_signal, 'db4', level=4)
            coeff_array = np.concatenate(coeffs)
            amplitude = np.abs(coeff_array)
            for coeff in coeffs:
                segment_features.extend([np.mean(amplitude), np.var(amplitude), skew(amplitude), kurtosis(amplitude)])
                
    features_combined.append(segment_features)    
features_combined_np = np.array(features_combined)



In [None]:
# wavelet_np=features_combined_np.reshape(features_combined_np.shape[0], 8, int(features_combined_np.shape[1]/8))

# combined_features = np.concatenate([wavelet_np], axis=2)
print(features_combined_np.shape)  # Should print (1600, 8, 11)
# af=features_combined_np.reshape(int(features_combined_np.shape[0]), int(8*features_combined_np.shape[2]))
af=features_combined_np
af, Y = shuffle(af, Y)
print(af.shape, Y.shape)
# Balance the dataset
oversampler = RandomOverSampler(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = oversampler.fit_resample(af, Y)
X_resampled= X_resampled.astype(np.float32)
y_resampled = y_resampled.astype(np.int32)

# Split X and y into training and testing sets
# X_touched, X_untouch, y_touch, y_untouch = train_test_split(X_resampled, y_resampled, test_size=0.1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_resampled,y_resampled, test_size=0.1, random_state=42)

# Convert y_train and y_test to categorical format for Keras
y_train = tf.keras.utils.to_categorical(y_train, num_classes=2)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=2)
# y_untouch=tf.keras.utils.to_categorical(y_untouch, num_classes=2)


# Convert the data to a numerical type (float)
X_train = X_train.astype(np.float64)
print(X_train.shape)

# Convert one-hot-encoded labels to integer-encoded labels
y_train = np.argmax(y_train, axis=-1)
y_test = np.argmax(y_test, axis=-1)
# y_untouch = np.argmax(y_untouch, axis=-1)
print(y_train.shape, y_test.shape)


print('X_train:', X_train.shape, 'y_train:', y_train.shape, 'X_test:', X_test.shape, 'y_test:',
      y_test.shape, 'X_untouch:', 'y_untouch:' )

In [None]:
mlp_data=denoised_reshaped.reshape(denoised_reshaped.shape[0], denoised_reshaped.shape[1]*denoised_reshaped.shape[2])
print(mlp_data.shape)

af_mlp=mlp_data
Y_mlp=np.squeeze(label[:,0])
af_mlp, Y_mlp= shuffle(af_mlp, Y_mlp)
print(af_mlp.shape, Y_mlp.shape)
# Balance the dataset
oversampler = RandomOverSampler(sampling_strategy='auto', random_state=42)
X_resampled_mlp, y_resampled_mlp = oversampler.fit_resample(af_mlp, Y_mlp)
X_resampled_mlp= X_resampled_mlp.astype(np.float32)
y_resampled_mlp = y_resampled_mlp.astype(np.int32)


X_train_mlp, X_test_mlp, y_train_mlp, y_test_mlp = train_test_split(X_resampled_mlp,y_resampled_mlp, test_size=0.1, random_state=42)


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout

# Given data (You'd already have this loaded)
# X_train, y_train, X_test, y_test

# Ensure your target (y_train and y_test) is properly shaped.
# For binary classification, it should be of shape (n_samples, 1)
# For multi-class single-label classification, it should be one-hot encoded.

# Define the MLP model
model = Sequential()

# Input layer (with relu activation and input shape matching your feature count)
model.add(Dense(256, activation='relu', input_shape=(2000,)))

# Hidden layers
model.add(Dropout(0.5))
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(64, activation='relu'))

# Output layer
# For binary classification, use 1 neuron with sigmoid activation
# For multi-class classification, use softmax activation and change units to number of classes
model.add(Dense(1, activation='sigmoid'))

# Compile the model
# For binary classification, use binary_crossentropy
# For multi-class classification, use categorical_crossentropy
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(X_train_mlp, y_train_mlp, epochs=100, batch_size=32, validation_data=(X_test_mlp, y_test_mlp))


In [None]:
# Continue from the previously mentioned code
# Evaluate the model on the training set
train_loss, train_accuracy = model.evaluate(X_train_mlp, y_train_mlp, verbose=0)
print(f"Training Accuracy: {train_accuracy * 100:.2f}%")

# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(X_test_mlp, y_test_mlp, verbose=0)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


In [None]:
import optuna
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam

def objective(trial):
    model = Sequential()
    # Input layer
    n_units_i = trial.suggest_int('n_units_i', 128, 512)
    model.add(Dense(n_units_i, activation='relu', input_shape=(2000,)))
    # Hidden layers
    dropout_rate_1 = trial.suggest_float('dropout_rate_1', 0.1, 0.6)
    model.add(Dropout(dropout_rate_1))
    n_units_h1 = trial.suggest_int('n_units_h1', 32, 256)
    model.add(Dense(n_units_h1, activation='relu'))
    dropout_rate_2 = trial.suggest_float('dropout_rate_2', 0.1, 0.6)
    model.add(Dropout(dropout_rate_2))
    n_units_h2 = trial.suggest_int('n_units_h2', 16, 128)
    model.add(Dense(n_units_h2, activation='relu'))
    # Output layer
    model.add(Dense(1, activation='sigmoid'))
    # Compile the model
    lr = trial.suggest_float('lr', 1e-3, 1e-1, log=True)
    optimizer = Adam(learning_rate=lr)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    # Train the model
    model.fit(X_train_mlp, y_train_mlp, epochs=100, batch_size=64, validation_data=(X_test_mlp, y_test_mlp), verbose=0)
    # Evaluate the model
    score = model.evaluate(X_test_mlp, y_test_mlp, verbose=0)
    return score[1]  # Return accuracy
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)
print('Number of finished trials: ', len(study.trials))
print('Best trial:')
trial = study.best_trial
print('  Value: ', trial.value)
print('  Params: ')
for key, value in trial.params.items():
    print(f'    {key}: {value}')


In [None]:
import optuna 
# sklearn
from keras.models import Sequential
from keras.layers import Dense, Dropout, LeakyReLU
from keras.optimizers import Adam

def objective(trial):
    model = Sequential()

    # Input layer
    n_units_i = trial.suggest_int('n_units_i', 128, 512)

    activation_choice_i = trial.suggest_categorical('activation_i', ['relu', 'leaky_relu', 'elu', 'swish'])

    if activation_choice_i == 'leaky_relu':
        model.add(Dense(n_units_i, input_shape=(2000,)))
        model.add(LeakyReLU())
    else:
        model.add(Dense(n_units_i, activation=activation_choice_i, input_shape=(2000,)))

    # Hidden layer 1
    dropout_rate_1 = trial.suggest_float('dropout_rate_1', 0.1, 0.6)
    model.add(Dropout(dropout_rate_1))

    n_units_h1 = trial.suggest_int('n_units_h1', 32, 256)
    activation_choice_h1 = trial.suggest_categorical('activation_h1', ['relu', 'leaky_relu', 'elu', 'swish'])

    if activation_choice_h1 == 'leaky_relu':
        model.add(Dense(n_units_h1))
        model.add(LeakyReLU())
    else:
        model.add(Dense(n_units_h1, activation=activation_choice_h1))

    # Hidden layer 2
    dropout_rate_2 = trial.suggest_float('dropout_rate_2', 0.1, 0.6)
    model.add(Dropout(dropout_rate_2))

    n_units_h2 = trial.suggest_int('n_units_h2', 16, 128)
    activation_choice_h2 = trial.suggest_categorical('activation_h2', ['relu', 'leaky_relu', 'elu', 'swish'])

    if activation_choice_h2 == 'leaky_relu':
        model.add(Dense(n_units_h2))
        model.add(LeakyReLU())
    else:
        model.add(Dense(n_units_h2, activation=activation_choice_h2))

    # Output layer
    model.add(Dense(1, activation='sigmoid'))

    # Compile the model
    lr = trial.suggest_float('lr', 1e-3, 1e-1, log=True)
    optimizer = Adam(learning_rate=lr)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

    # Train the model
    model.fit(X_train_mlp, y_train_mlp, epochs=100, batch_size=64, validation_data=(X_test_mlp, y_test_mlp), verbose=0)

    # Evaluate the model
    score = model.evaluate(X_test_mlp, y_test_mlp, verbose=0)
    return score[1]  # Return accuracy

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)

print('Number of finished trials: ', len(study.trials))
print('Best trial:')
trial = study.best_trial
print('  Value: ', trial.value)
print('  Params: ')
for key, value in trial.params.items():
    print(f'    {key}: {value}')


In [None]:
# 1. Extract the Best Parameters
best_params = study.best_params

# 2. Build the Model using the Best Parameters
model = Sequential()

# Input Layer
n_units_i = best_params['n_units_i']

if best_params['activation_i'] == 'leaky_relu':
    model.add(Dense(n_units_i, input_shape=(2000,)))
    model.add(LeakyReLU())
else:
    model.add(Dense(n_units_i, activation=best_params['activation_i'], input_shape=(2000,)))

# Hidden Layer 1
model.add(Dropout(best_params['dropout_rate_1']))
n_units_h1 = best_params['n_units_h1']

if best_params['activation_h1'] == 'leaky_relu':
    model.add(Dense(n_units_h1))
    model.add(LeakyReLU())
else:
    model.add(Dense(n_units_h1, activation=best_params['activation_h1']))

# Hidden Layer 2
model.add(Dropout(best_params['dropout_rate_2']))
n_units_h2 = best_params['n_units_h2']

if best_params['activation_h2'] == 'leaky_relu':
    model.add(Dense(n_units_h2))
    model.add(LeakyReLU())
else:
    model.add(Dense(n_units_h2, activation=best_params['activation_h2']))

# Output Layer
model.add(Dense(1, activation='sigmoid'))

# Compile the Model
optimizer = Adam(learning_rate=best_params['lr'])
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

# 3. Train the Model on the Training Data
model.fit(X_train_mlp, y_train_mlp, epochs=100, batch_size=64, verbose=1)

# Evaluate the model on the training data
train_loss, train_acc = model.evaluate(X_train_mlp, y_train_mlp, verbose=1)
print("\nAccuracy on Training Data: {:.4f}".format(train_acc))


# 4. Test the Model on the Test Data
loss, accuracy = model.evaluate(X_test_mlp, y_test_mlp, verbose=0)

print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")


In [None]:
# Get the best hyperparameters from the Optuna study
best_params = study.best_params

# Build the model using the best hyperparameters
model = Sequential()

model.add(Dense(best_params['n_units_i'], activation='relu', input_shape=(2000,)))
model.add(Dropout(best_params['dropout_rate_1']))
model.add(Dense(best_params['n_units_h1'], activation='relu'))
model.add(Dropout(best_params['dropout_rate_2']))
model.add(Dense(best_params['n_units_h2'], activation='relu'))
model.add(Dense(1, activation='sigmoid'))

optimizer = Adam(learning_rate=best_params['lr'])
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

# Train the model using the training data
model.fit(X_train_mlp, y_train_mlp, epochs=100, batch_size=64, verbose=1)

# Evaluate the model on the training data
train_loss, train_acc = model.evaluate(X_train_mlp, y_train_mlp, verbose=1)
print("\nAccuracy on Training Data: {:.4f}".format(train_acc))

# Evaluate the model on the test data
test_loss, test_acc = model.evaluate(X_test_mlp, y_test_mlp, verbose=1)
print("\nAccuracy on Test Data: {:.4f}".format(test_acc))
