# Preprocessing

In [None]:
import os
import pywt
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans

Menampilkan salah satu data yang digunakan.

In [None]:
data = pd.read_csv('Raw_Data_EEG/P02_SHAFATYRA.csv')
data

In [None]:
if 'AUX' in data.columns:
    data = data.drop(columns=['AUX'])
    
# Convert 'Timestamp' column to datetime and set as index
if 'Timestamp' in data.columns:  
    data['Timestamp'] = pd.to_datetime(data['Timestamp'], errors='coerce')

In [None]:
# Fungsi untuk memplot sinyal mentah dengan subplot per channel
def plot_data(df, channels, title):
    plt.figure(figsize=(12, len(channels) * 3))  
    for i, channel in enumerate(channels):
        plt.subplot(len(channels), 1, i + 1) 
        plt.plot(df.index, df[channel], label=channel)
        plt.title(f'{channel} - {title}')
        plt.xlabel('Timestamp')
        plt.ylabel('Amplitude')
    plt.tight_layout()  # Mengatur layout agar tidak overlap
    plt.show()

channels = ['TP9', 'AF7','AF8', 'TP10']
plot_data(data, channels, 'Sinyal EEG')

In [None]:
directories = ["1. Split", "2. Cleaning", "3. Augmentasi", "4. Average", 
               "5. Transform","6. Label", "7.1. FFT_Plot", "7.2. DWT_Plot"]
for directory in directories:
    os.makedirs(directory, exist_ok=True)

raw_dir = "Raw_Data_EEG"
split_dir = "1. Split"
cleaning_dir = "2. Cleaning"
augmentasi_dir = "3. Augmentasi"
average_dir = "4. Average"
Transform_dir = "5. Transform"
label_dir = "6. Label"
FFT_Plot_dir = "7.1. FFT_Plot"
DWT_Plot_dir = "7.2. DWT_Plot"

## Split Data

Memisahkan data berdasarkan aktivitas.

In [None]:
def split_data(file_path, file_number):
    df = pd.read_csv(file_path)
    if 'AUX' in df.columns:
        df = df.drop(columns=['AUX'])
    if 'Timestamp' in df.columns:    
        df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
        start_times = [
            df['Timestamp'].iloc[0],
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=5),
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=35),
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=43),
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=50)]
        end_times = [
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=5),
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=35),
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=43),
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=50),
            df['Timestamp'].iloc[0] + pd.Timedelta(minutes=51)]
        csv_files = [
            df[(df['Timestamp'] >= start_times[i]) & (df['Timestamp'] <= end_times[i])]
            for i in range(5)]
        for i, csv_file in enumerate(csv_files):
            filename = f'{split_dir}/{file_number}_K{i}.csv'
            csv_file.to_csv(filename, index=False)

Referensi implementasi filter bandpass Butterworth:
- https://medium.com/time-series-ml/dsp-frequency-bandpass-filter-in-python-62c0e7189852
- https://docs.scipy.org/doc/scipy-1.9.0/reference/generated/scipy.signal.butter.html
- https://www.machinelearningplus.com/machine-learning/how-to-detect-outliers-with-z-score/

## Butter Band-pass

In [None]:
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

## Modified Z-Score

In [None]:
def modified_z_score(data, k=0.33725):
    median = np.median(data)
    mad = np.median(np.abs(data - median))
    if mad == 0:
        return np.zeros_like(data)
    z_scores = k * (data - median) / mad
    return z_scores

def remove_artifacts(df, threshold=3.5):
    for column in df.columns:
        data = df[column].values
        z_scores = modified_z_score(data)
        data[np.abs(z_scores) > threshold] = np.nan
        df[column] = pd.Series(data).interpolate().values
    return df

## Padding

In [None]:
def padding_data(df, desired_length=1000):
    current_length = len(df)
    if current_length < desired_length:
        padding_length = desired_length - current_length
        padding_values = np.zeros((padding_length, len(df.columns)))
        padding_df = pd.DataFrame(padding_values, columns=df.columns)
        df = pd.concat([df, padding_df], ignore_index=True)
    return df

## Augmentasi Data

In [None]:
def augment_eeg_data(eeg_data):
    augmented_data = eeg_data + np.random.normal(0, 0.1, eeg_data.shape)
    return augmented_data

def write_augmented_eeg_data(file_path, augmented_eeg_data):
    df = pd.DataFrame(augmented_eeg_data, columns=['TP9', 'AF7', 'AF8', 'TP10'])
    df.to_csv(file_path, index=False)

## Average

In [None]:
def average_eeg_data(data):
    required_columns = ['TP9', 'AF7', 'AF8', 'TP10']
    if not all(column in data.columns for column in required_columns):
        raise ValueError(f"CSV file {file_name} must contain the following columns: {required_columns}")
    data['Average'] = data[required_columns].mean(axis=1)
    average_data = data[['Average']]
    return average_data

## Main cleaning

In [None]:
lowcut = 8
highcut = 40
fs = 256

files = [f for f in os.listdir(raw_dir) if f.endswith('.csv')]
for i, file_name in enumerate(files):
    file_path = os.path.join(raw_dir, file_name)
    split_data(file_path, i+1)
print("Split selesai")

files = [f for f in os.listdir(split_dir) if f.endswith('.csv')]
for file_name in files:
    file_path = os.path.join(split_dir, file_name)
    df = pd.read_csv(file_path)
    df.set_index('Timestamp', inplace=True)
    for column in df.columns:
        df[column] = butter_bandpass_filter(df[column].values, lowcut, highcut, fs) #Butter Band Pass
    df = remove_artifacts(df) # remove artifact
    df = padding_data(df) # padding data
    filename, _ = os.path.splitext(os.path.basename(file_path))
    output_file_path = os.path.join(cleaning_dir, f"{filename}.csv")
    df.to_csv(output_file_path, index=False)
print('Cleaning selesai')

files = [f for f in os.listdir(cleaning_dir) if f.endswith('.csv')]
for file_name in files:
    file_path = os.path.join(cleaning_dir, file_name)
    df = pd.read_csv(file_path)
    for i in range(10):
        augmented_eeg_data = augment_eeg_data(df)
        output_file_path = os.path.join(augmentasi_dir, f"{i}_{file_name}")
        write_augmented_eeg_data(output_file_path, augmented_eeg_data)
print('augmentasi selesai')

files = [f for f in os.listdir(augmentasi_dir) if f.endswith('.csv')]
for file_name in files:
    file_path = os.path.join(augmentasi_dir, file_name)
    df = pd.read_csv(file_path)
    average_data = average_eeg_data(df)
    output_file_path = os.path.join(average_dir, file_name)
    average_data.to_csv(output_file_path, index=False)
print('average selesai')

## Transformasi Signal

### Fast Fourier Transform (FFT)

In [None]:
def compute_power_spectrum(xn, fs=256):
    N = len(xn)
    Xk = (1 / (fs * N)) * np.abs(np.fft.fft(xn)) ** 2
    Xk = Xk[:(N // 2) + 1]
    Xk[1:-1] = 2 * Xk[1:-1]
    return Xk

def fft(data, fs=256):
    freqs = None
    N = None
    channel_name = 'Average'
    xn = data[channel_name].values
    if N is None:
        N = len(xn)
        k = np.arange(N // 2 + 1)
        freqs = k * fs / N
    else:
        if len(xn) > N:
            xn = xn[:N]
        elif len(xn) < N:
            xn = np.pad(xn, (0, N - len(xn)), 'constant')
    power_spectrum_data = compute_power_spectrum(xn, fs)
    file_results = pd.DataFrame([power_spectrum_data], columns=freqs)
    file_results.insert(0, 'File Name', file_name)
    return file_results

### Discrete Wavelet Transform (DWT)

https://pywavelets.readthedocs.io/en/latest/ref/dwt-discrete-wavelet-transform.html

In [None]:
def dwt(data, wavelet='db4'):
    data = data.values
    coeffs = pywt.wavedec(data, wavelet)
    approximation = coeffs[0]
    details = coeffs[1:]
    approximation_df = pd.DataFrame(approximation, columns=['Approximation']).T
    detail_dfs = [pd.DataFrame(detail, columns=[f'Detail_Level_{i}']).T for i, detail in enumerate(details, start=1)]
    details_combined_df = pd.concat(detail_dfs, ignore_index=True)
    final_df = pd.concat([approximation_df, details_combined_df], axis=0).reset_index(drop=True)
    file_results = final_df.mean()
    file_results = pd.DataFrame(file_results).T
    return file_results

In [None]:
# FFT
combined_results = pd.DataFrame()
files = [f for f in os.listdir(average_dir) if f.endswith('.csv')]
for file_name in files:
    file_path = os.path.join(average_dir, file_name)
    df = pd.read_csv(file_path)
    file_results = fft(df)
    combined_results = pd.concat([combined_results, file_results], ignore_index=True)
output_file_path = os.path.join(Transform_dir, "FFT_Transform.csv")
combined_results.to_csv(output_file_path, index=False)

# DWT
combined_results = pd.DataFrame()
files = [f for f in os.listdir(average_dir) if f.endswith('.csv')]
for file_name in files:
    file_path = os.path.join(average_dir, file_name)
    df = pd.read_csv(file_path)
    file_results = dwt(df['Average'])
    combined_results = pd.concat([combined_results, file_results], ignore_index=True)
    if 'File Name' not in combined_results.columns:
        combined_results.insert(0, 'File Name', file_name)
    else:
        combined_results['File Name'].iloc[-len(file_results):] = file_name
output_file_path = os.path.join(Transform_dir, "DWT_Transform.csv")
combined_results.to_csv(output_file_path, index=False)

## Labeling

In [None]:
def calculate_theta_beta_ratio(data, theta_range=(4, 8), beta_range=(13, 30)):
        freqs = df_fft.columns[1:].astype(float)
        psd_values = data[1:].values.astype(float)
        theta_power = np.mean(psd_values[(freqs >= theta_range[0]) & (freqs <= theta_range[1])])
        beta_power = np.mean(psd_values[(freqs >= beta_range[0]) & (freqs <= beta_range[1])])
        theta_beta_ratio = theta_power / beta_power
        return theta_beta_ratio

### FFT

In [None]:
df = pd.read_csv('5. Transform/FFT_Transform.csv')
df_fft = df.iloc[:,1:]
df_fft['Theta/Beta Ratio'] = df_fft.apply(calculate_theta_beta_ratio, axis=1)
kmeans = KMeans(n_clusters=4, random_state=42).fit(df_fft['Theta/Beta Ratio'].values.reshape(-1, 1))
centroids = kmeans.cluster_centers_.flatten()
labels = kmeans.labels_
new_labels = np.zeros_like(labels)
new_labels[labels == 0] = 0
new_labels[labels == 1] = 2
new_labels[labels == 2] = 3
new_labels[labels == 3] = 1
df_fft['Stress Level'] = new_labels
df_new = df_fft.drop(columns='Theta/Beta Ratio')
df_new['File Name'] = df['File Name']
df_new.to_csv('6. Label/FFT_Labeled.csv', index=False)
print('Data berhasil disimpan.')

In [None]:
df_fft['Stress Level'].value_counts()

### DWT

In [None]:
df_dwt = pd.read_csv('5. Transform/DWT_Transform.csv')
df_fft = pd.read_csv('6. Label/FFT_Labeled.csv')
df = df_dwt.copy()
df['Stress Level'] = df_fft['Stress Level']
df.to_csv('6. Label/DWT_Labeled.csv', index=False)
print('Data berhasil disimpan.')

In [None]:
df['Stress Level'].value_counts()

## Ploting

### FFT Plot

In [None]:
def get_color_for_frequency(frequency):
    if 0 <= frequency < 4:
        return color_map['delta']
    elif 4 <= frequency < 8:
        return color_map['theta']
    elif 8 <= frequency < 12:
        return color_map['alpha']
    elif 12 <= frequency < 35:
        return color_map['beta']
    elif frequency >= 35:
        return color_map['gamma']
    else:
        return 'lightblue'

file_path = '6. Label/FFT_Labeled.csv'
df = pd.read_csv(file_path)
os.makedirs(FFT_Plot_dir, exist_ok=True)
y_min = df.iloc[:, :-2].quantile(0.01).min()
y_max = df.iloc[:, :-2].quantile(0.99).max()
x_min = float(df.columns[0])
x_max = float(df.columns[-3])
color_map = {
    'delta': 'blue',
    'theta': 'green',
    'alpha': 'red',
    'beta': 'orange',
    'gamma': 'purple'
}
for index, row in df.iterrows():
    x_values = df.columns[:-2].astype(float)
    y_values = row.values[:-2]
    colors = [get_color_for_frequency(freq) for freq in x_values]
    plt.plot(x_values, y_values, color='black', linewidth=1.5)
    for i in range(len(x_values) - 1):
        plt.plot(x_values[i:i+2], y_values[i:i+2], color=colors[i], linewidth=1.5)        
    plt.title(f"Plot for {row['File Name']} (Stress Level: {row['Stress Level']})")
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Amplitude')
    plt.ylim(y_min, y_max)
    plt.xlim(x_min, x_max)
    plt.grid(True, linestyle='--', alpha=0.7)
    stress_level_folder = os.path.join(FFT_Plot_dir, str(row['Stress Level']))
    os.makedirs(stress_level_folder, exist_ok=True)
    file_name = row['File Name'].replace('.csv', '')
    plot_filename = os.path.join(stress_level_folder, f"{file_name}_plot.jpg")
    plt.savefig(plot_filename, dpi=300)
    plt.close()
print("Selesai")

### DWT Plot

In [None]:
file_path = '6. Label/DWT_Labeled.csv'
df = pd.read_csv(file_path)
os.makedirs(DWT_Plot_dir, exist_ok=True)
global_y_min = df.iloc[:, 1:-1].values.min()
global_y_max = df.iloc[:, 1:-1].values.max()
print("Global Y-axis limits:", global_y_min, global_y_max)
for index, row in df.iterrows():
    x_values = df.columns[1:-1].astype(float)
    y_values = row.values[1:-1]
    plt.plot(x_values, y_values, color='blue', linewidth=1.5)
    plt.xlabel("DWT Coefficients")
    plt.ylabel("Value")
    plt.title(f"DWT Data Plot for {row['File Name']} (Stress Level: {row['Stress Level']})")
    plt.ylim(global_y_min, global_y_max)
    stress_level_folder = os.path.join(DWT_Plot_dir, str(row['Stress Level']))
    os.makedirs(stress_level_folder, exist_ok=True)
    file_name = row['File Name'].replace('.csv', '')
    plot_filename = os.path.join(stress_level_folder, f"{file_name}_plot.jpg")
    plt.savefig(plot_filename, dpi=300)
    plt.close()
print("Selesai")


## Image to Array

In [None]:
import os
import random
import numpy as np
from PIL import Image
import pandas as pd

def images_to_array(input_folder, target_size=(10, 4), num_samples=2):
    data = []
    labels = []

    for label in os.listdir(input_folder):
        label_folder = os.path.join(input_folder, label)

        if not os.path.isdir(label_folder):
            continue

        image_files = os.listdir(label_folder)
        selected_images = random.sample(image_files, min(num_samples, len(image_files)))

        for image_file in selected_images:
            image_path = os.path.join(label_folder, image_file)
            image = Image.open(image_path).convert('L')
            image = image.resize(target_size)
            image_array = np.array(image, dtype=np.uint8)

            data.append(image_array)
            labels.append(label)

    return data, labels

if __name__ == "__main__":
    input_folder = "7.2. DWT_Plot"

    data, labels = images_to_array(input_folder)

    for idx, (matrix, label) in enumerate(zip(data, labels)):
        print(f"Indeks: {idx}, Label: {label}")
        print(pd.DataFrame(matrix).to_string(index=False, header=[f"Feature_{i+1}" for i in range(matrix.shape[1])]))
        print()
