In [None]:
import os
import pandas as pd
from pandas import json_normalize
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt

In [None]:
# Function to read data from text (CSV) files
def read_data(file_name):
    df = pd.read_csv(file_name, names=['data_id', 'accel0X', 'accel0Y', 'accel0Z', 'accel1X', 'accel1Y', 'accel1Z', 'tension', 'timestamp'], dtype=str)
    df['time'] = pd.to_datetime(df['timestamp'], unit='ms')
    df[['accel0X', 'accel0Y', 'accel0Z', 'accel1X', 'accel1Y', 'accel1Z', 'tension']] = df[['accel0X', 'accel0Y', 'accel0Z', 'accel1X', 'accel1Y', 'accel1Z', 'tension']].apply(pd.to_numeric, errors='coerce')
    df['tension'] = 0.650 * (df['tension'] - 2166)
    return df
    
def establish_printing_start(file_name):
    df = pd.read_json(file_name, lines=True)
    df = json_normalize(df.to_dict('records'))
    return df[df.status == 'P'].head(1)['timestamp'].values[0]

# Add Label Encoder
label_encoder = LabelEncoder()

def process_and_label_data(base_dir):
    categories = ['arm_failure', 'bowden', 'plastic', 'proper', 'retraction_05', 'unstick']
    all_data = []

    for category in categories:
        print(f"Processing category: {category}")
        txt_file = os.path.join(base_dir, category, 't.txt')
        json_file = os.path.join(base_dir, category, 'j.json')

        df = read_data(txt_file)
        start_time = establish_printing_start(json_file)
        df = df[df.time > start_time]
        
        # Add category label
        df['label'] = category

        all_data.append(df)

    combined_df = pd.concat(all_data, ignore_index=True)
    return combined_df

def zscore_normalize_data(df, columns):
    for column in columns:
        df[column] = df[column].astype(float)
        mean = df[column].mean()
        std = df[column].std()
        df[column] = (df[column] - mean) / std
    return df

# Function to plot raw data
def plot_raw_data(df, features, categories):
    for category in categories:
        category_data = df[df['label'] == category]
        category_data_interpolated = category_data.copy().interpolate(method='linear')
        for feature in features:
            plt.figure(figsize=(15, 5))

            # Plot raw and interpolated data together
            plt.plot(category_data['time'], category_data[feature], label='Raw Data', alpha=0.6)
            plt.plot(category_data_interpolated['time'], category_data_interpolated[feature], label='Interpolated Data', alpha=0.6)
            plt.title(f'Raw vs Interpolated Data for Category: {category}, Feature: {feature}')
            plt.xlabel('Time')
            plt.ylabel('Value')
            plt.legend()
            plt.show()

# Function to plot Normalized data
def plot_Normalized_data(df, features, categories):
    for category in categories:
        category_data = df[df['label'] == category]
        for feature in features:
            plt.figure(figsize=(15, 5))
            plt.plot(category_data['time'], category_data[feature])
            plt.title(f'Normalized Data for Category: {category}, Feature: {feature}')
            plt.xlabel('Time')
            plt.ylabel('Value')
            plt.show()

In [None]:
# Call the function to process and label the data
# Be careful. it takes too much time!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
base_directory = r'C:\MyFiles\AI\UNI\ML-DP-AI\Project\dataset\WithBase'
all_data = process_and_label_data(base_directory)
Raw_data = all_data.copy()

categories = ['arm_failure', 'bowden', 'plastic', 'proper', 'retraction_05', 'unstick']

In [None]:
# Handle missing values using interpolation
all_data_int = all_data.interpolate(method='linear')

In [None]:
# Normalize the features
features = ['accel0X', 'accel0Y', 'accel0Z', 'accel1X', 'accel1Y', 'accel1Z', 'tension']
all_data = zscore_normalize_data(all_data_int, features)

In [None]:
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#This cell will take too much time and CPU
# This section is just for plotting data with and whitout interpolation and it does not needed to be done for model


plot_raw_data(Raw_data, features, categories)

In [None]:
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Warning!!!!!!!!!!!!!!!!!!!!!!!!
#This cell will take too much time and CPU
# This section is just for plotting normalized data (zscore) and it does not needed to be done for model


plot_Normalized_data(all_data_zscore, features, categories)

In [None]:
# Reshape data into segments
X = all_data[features].values
y = all_data['label'].values

time_steps = 100
samples = len(X) // time_steps
X = X[:samples*time_steps].reshape(samples, time_steps, len(features))

# Encode categorical labels
y = label_encoder.fit_transform(y)
y = y[:samples*time_steps].reshape(samples, time_steps, -1)
y = np.apply_along_axis(lambda x: np.bincount(x.astype(int)).argmax(), axis=1, arr=y)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Define the 1D-CNN model
def create_1d_cnn_model(input_shape, num_classes):
    model = Sequential()
    model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=input_shape))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Conv1D(filters=128, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Conv1D(filters=256, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Flatten())
    model.add(Dense(100, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Create the model
input_shape = (X_train.shape[1], X_train.shape[2])
num_classes = len(np.unique(y))
model = create_1d_cnn_model(input_shape, num_classes)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Train the model
history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_split=0.2, callbacks=[early_stopping])

# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {test_accuracy}')
print(f'Test Loss: {test_loss}')

# Save the model
model.save('1d_cnn_model.h5')

In [None]:
# Plot training & validation accuracy values
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from tensorflow.keras.models import load_model

# Load the trained model
model = load_model('1d_cnn_model.h5')

# Make predictions on the test data
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)

# Calculate and print the accuracy
accuracy = accuracy_score(y_test, y_pred_classes)

print(f'Test Accuracy: {accuracy:.4f}')


# Generate and print the classification report
report = classification_report(y_test, y_pred_classes, target_names=label_encoder.classes_)
print('Classification Report:')
print(report)

# Generate and plot the confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred_classes)
plt.figure(figsize=(7, 5))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# Test with individual samples from the test set
print(len(X_test))

sample_indices = np.random.choice(len(X_test), 5, replace=False)
X_samples = X_test[sample_indices]
y_true_samples = y_test[sample_indices]
y_pred_samples = model.predict(X_samples)
y_pred_classes_samples = np.argmax(y_pred_samples, axis=1)

for i, idx in enumerate(sample_indices):
    print(f'Sample {idx}:')
    print(f'  True Label: {label_encoder.inverse_transform([y_true_samples[i]])[0]}')
    print(f'  Predicted Label: {label_encoder.inverse_transform([y_pred_classes_samples[i]])[0]}')