<a href="https://colab.research.google.com/github/Iman6243/AI/blob/main/ModelingANN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Packages

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, KFold
from sklearn.tree import DecisionTreeRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense, LSTM
from keras.optimizers import Adam
from scipy.signal import butter, filtfilt, find_peaks
from scipy.stats import pearsonr
import os

## class SignalProcessor

In [None]:
class SignalProcessor:
    def __init__(self, fs, lowcut, highcut, order=4):
        self.fs = fs
        self.lowcut = lowcut
        self.highcut = highcut
        self.order = order

    def bandpass_filter(self, dataframe):
        nyquist = 0.5 * self.fs
        low = self.lowcut / nyquist
        high = self.highcut / nyquist
        b, a = butter(self.order, [low, high], btype='band')
        filtered_dataframe = pd.DataFrame(index=dataframe.index)
        for column in dataframe.columns:
            filtered_dataframe[column] = filtfilt(b, a, dataframe[column])
        return filtered_dataframe

## class DataVisualizer

In [None]:
class DataVisualizer:
    @staticmethod
    def display_signal(dataframe, column, start_index=None, end_index=None, label=None):
        if start_index is None:
            start_index = dataframe.index[0]
        if end_index is None:
            end_index = dataframe.index[-1]
        if label is None:
            label = 'Signal'
        subset_df = dataframe.iloc[start_index:end_index]
        plt.figure(figsize=(12, 6))
        plt.plot(subset_df.index, subset_df[column], label=label)
        plt.xlabel('Time')
        plt.ylabel('Value')
        plt.title('Signal in the Specified Range')
        plt.legend()
        plt.grid(True)
        plt.show()
        @staticmethod
    def show_max_peak_signal(dataframe, column, start_index=None, end_index=None):
        selected_rows = dataframe.iloc[start_index:end_index]
        signal = selected_rows[column]
        peaks, properties = find_peaks(signal, height=0)
        if len(peaks) > 0:
            max_peak_idx = peaks[np.argmax(properties['peak_heights'])]
            max_peak_value = signal.iloc[max_peak_idx]
            min_peak_idx = peaks[np.argmin(properties['peak_heights'])]
            min_peak_value = signal.iloc[min_peak_idx]
            plt.figure(figsize=(12, 6))
            plt.plot(selected_rows.index, signal, label='Signal')
            plt.plot(selected_rows.index[peaks], signal.iloc[peaks], "x", label='Peaks')
            plt.plot(selected_rows.index[max_peak_idx], max_peak_value, "ro", label='Largest Peak')
            plt.plot(selected_rows.index[min_peak_idx], min_peak_value, "go", label='Smallest Peak')
            plt.xlabel('Time')
            plt.ylabel('Value')
            plt.title('Peaks in the Specified Range')
            plt.legend()
            plt.grid(True)
            plt.show()
            return max_peak_idx, max_peak_value, min_peak_idx, min_peak_value
        else:
            print("No peaks found in the specified range")
            return None, None, None, None

## class ModelTrainer

In [None]:
class ModelTrainer:
    def __init__(self):
        self.scaler_X = MinMaxScaler()
        self.scaler_y = MinMaxScaler()

    def train_decision_tree(self, X_train, y_train, X_test, y_test):
        model = DecisionTreeRegressor()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        print(f'Decision Tree - Mean Squared Error: {mse}, R^2 Score: {r2}')
        return model

    def train_ann(self, X_train, y_train, X_test, y_test):
        model = Sequential()
        model.add(Dense(64, input_dim=X_train.shape[1], activation='relu'))
        model.add(Dense(32, activation='relu'))
        model.add(Dense(1, activation='linear'))
        model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
        model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=1)
        y_pred = model.predict(X_test)
        mse = np.mean((y_test - y_pred) ** 2)
        print(f'ANN - Mean Squared Error: {mse}')
        return model

    def train_lstm(self, X_train, y_train, X_test, y_test, time_step=10):
        X_train_lstm, y_train_lstm = self.create_dataset(X_train, y_train, time_step)
        X_test_lstm, y_test_lstm = self.create_dataset(X_test, y_test, time_step)
        X_train_lstm = X_train_lstm.reshape(X_train_lstm.shape[0], X_train_lstm.shape[1], X_train_lstm.shape[2])
        X_test_lstm = X_test_lstm.reshape(X_test_lstm.shape[0], X_test_lstm.shape[1], X_test_lstm.shape[2])
        model = Sequential()
        model.add(LSTM(50, return_sequences=True, input_shape=(X_train_lstm.shape[1], X_train_lstm.shape[2])))
        model.add(LSTM(50, return_sequences=False))
        model.add(Dense(1))
        model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
        model.fit(X_train_lstm, y_train_lstm, epochs=100, batch_size=32, verbose=0)
        y_pred = model.predict(X_test_lstm)
        mse = np.mean((y_test_lstm - y_pred) ** 2)
        print(f'LSTM - Mean Squared Error: {mse}')
        return model

    @staticmethod
    def create_dataset(X, y, time_step=1):
        Xs, ys = [], []
        for i in range(len(X) - time_step):
            Xs.append(X[i:(i + time_step)])
            ys.append(y[i + time_step])
        return np.array(Xs), np.array(ys)

## class DataHandler

In [None]:
class DataHandler:
    @staticmethod
    def load_csv_files(directory):
        filenames = [file for file in os.listdir(directory) if file.endswith('.csv')]
        df_list = [pd.read_csv(file) for file in filenames]
        for i, df in enumerate(df_list):
            print(f"Shape of dataframe {i+1}- {filenames[i]} : {df.shape}")
        return df_list, filenames

    @staticmethod
    def copy_dataset(dataframe, start_index=None, end_index=None):
        if start_index is None:
            start_index = dataframe.index[0]
        if end_index is None:
            end_index = dataframe.index[-1]
        selected_rows = dataframe.iloc[start_index:end_index]
        copydf = selected_rows.copy()
        copydf.reset_index(drop=True, inplace=True)
        return copydf

## Main

In [None]:
# Example usage
if __name__ == "__main__":
    # Load data
    data_handler = DataHandler()
    df_list, filenames = data_handler.load_csv_files('.')
    Motion = df_list[0]
    SigAcc = df_list[1]
    SigGyr = df_list[2]

    # Visualize data
    visualizer = DataVisualizer()
    visualizer.display_signal(Motion, 'COPaZ1', 0, 2000, filenames[0])

    # Process signals
    signal_processor = SignalProcessor(fs=100, lowcut=0.1, highcut=10)
    filtered_motion = signal_processor.bandpass_filter(Motion)

    # Train models
    model_trainer = ModelTrainer()
    X_train, X_test, y_train, y_test = train_test_split(filtered_motion.iloc[:, :-1], filtered_motion.iloc[:, -1], test_size=0.2, random_state=42)
    model_dt = model_trainer.train_decision_tree(X_train, y_train, X_test, y_test)
    model_ann = model_trainer.train_ann(X_train, y_train, X_test, y_test)
    model_lstm = model_trainer.train_lstm(X_train, y_train, X_test, y_test)