## ETL

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display


In [2]:
# Load the labeled data
good_examples = pd.read_csv('./InDaS Labeled/good_period_2024_01_03.csv')
good_examples

bad_examples = pd.read_csv('./InDaS Labeled/bad_period_2024_01_06.csv')
bad_examples

all_good_segments = []
all_bad_segments = []

In [3]:
# Apply the indicator function to create a new column
good_examples['indicator'] = good_examples.apply(lambda x: 1 if x['rel_time'] == 0 else 0, axis=1)

start_indices_good = good_examples.index[good_examples['indicator'] == 1].tolist()

# Add the end of the DataFrame as the last index
start_indices_good.append(len(good_examples))


# Loop through each segment and plot, limiting the number of rotations
for i in range(len(start_indices_good)-1):
    good_segment = good_examples.iloc[start_indices_good[i]:start_indices_good[i + 1]]
    all_good_segments.append(good_segment)

print("Total Good Cutting Data:", len(all_good_segments))
print("Shape of Segment 1:", all_good_segments[0].shape)
print("Shape of Torque Data:", all_good_segments[0]['Torque'].shape)


Total Good Cutting Data: 4552
Shape of Segment 1: (1668, 11)
Shape of Torque Data: (1668,)


In [4]:
# Apply the indicator function to create a new column
bad_examples['indicator'] = bad_examples.apply(lambda x: 1 if x['rel_time'] == 0 else 0, axis=1)

start_indices_bad = bad_examples.index[bad_examples['indicator'] == 1].tolist()

# Add the end of the DataFrame as the last index
start_indices_bad.append(len(bad_examples))

# Loop through each segment and plot, limiting the number of rotations
for i in range(len(start_indices_bad)-1):
    segment = bad_examples.iloc[start_indices_bad[i]:start_indices_bad[i + 1]]
    all_bad_segments.append(segment)

print("Total Bad Cutting Data:", len(all_bad_segments))
print("Shape of Segment 1:", all_bad_segments[0].shape)
print("Shape of Torque Data:", all_bad_segments[0]['Torque'].shape)


Total Bad Cutting Data: 1937
Shape of Segment 1: (1668, 11)
Shape of Torque Data: (1668,)


## Convert Time-Series into Spectogram Features

### STFT Spectogram Features

In [5]:
from scipy.signal import spectrogram
import random

good_stft_features = []
bad_stft_features = []

def compute_stft_spectrogram(data, fs=500.0, nperseg=512, noverlap=128, nfft=512, cmap='magma'):
    f, t, Sxx = spectrogram(data, fs=fs, nperseg=nperseg, noverlap=noverlap, nfft=nfft)
    return Sxx

    fig = plt.figure()
    plt.pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud', cmap=cmap)
    plt.ylabel('Frequency [Hz]')
    plt.xlabel('Time [sec]')
    plt.colorbar(label='Intensity [dB]')
    plt.title('STFT Spectrogram for Good Cutting')
    plt.show()
    fig.savefig(f'/home/admin-anedunga/Desktop/InDas_Template/figures and graphs/good_cutting_stft{random.randint(1, 9)}.png', dpi=fig.dpi)
    return Sxx

# Plot the spectrogram of the first segment
# Sample number
MAX_SAMPLES_GOOD = 4540
# Sample number
MAX_SAMPLES_BAD = 1936

#MAX_SAMPLES_GOOD = 3
#MAX_SAMPLES_BAD = 3

for i in range(MAX_SAMPLES_GOOD):
    good_cutting_stft = np.asarray(all_good_segments[i]['Torque'])
    good_stft = compute_stft_spectrogram(good_cutting_stft)
    good_stft_features.append(good_stft)

print(good_stft.shape)
   
for i in range(MAX_SAMPLES_BAD):
    bad_cutting_stft = np.asarray(all_bad_segments[i]['Torque'])
    bad_stft = compute_stft_spectrogram(bad_cutting_stft)
    bad_stft_features.append(bad_stft)




(257, 4)


## Undersample Classes to Balance Them

### STFT Features

In [6]:
SAMPLES_FOR_TRAINING = 1800
balanced_good_stft_features = np.array(good_stft_features[:SAMPLES_FOR_TRAINING])
balanced_bad_stft_features = np.array(bad_stft_features[:SAMPLES_FOR_TRAINING])

validation_good_stft_features = np.array(good_stft_features[SAMPLES_FOR_TRAINING:MAX_SAMPLES_GOOD])
validation_bad_stft_features = np.array(bad_stft_features[SAMPLES_FOR_TRAINING:MAX_SAMPLES_BAD])

validation_good_stft_labels = np.ones(validation_good_stft_features.shape[0])
validation_bad_stft_labels = np.zeros(validation_bad_stft_features.shape[0])

validation_stft_features = np.concatenate((validation_good_stft_features, validation_bad_stft_features), axis=0)
validation_stft_labels = np.concatenate((validation_good_stft_labels, validation_bad_stft_labels), axis=0)
print("STFT Features Val Shape:", validation_stft_features.shape)
print("Balanced_Good", balanced_good_stft_features.shape)
print("Balanced_Bad", balanced_bad_stft_features.shape)

STFT Features Val Shape: (2876, 257, 4)
Balanced_Good (1800, 257, 4)
Balanced_Bad (1800, 257, 4)


## Dataloader

### STFT

In [7]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split

# Create labels for the features
good_labels = [1] * len(balanced_good_stft_features)  # Label 1 for good torque
bad_labels = [0] * len(balanced_bad_stft_features)    # Label 0 for bad torque

# Combine features and labels
features = np.array(balanced_good_stft_features + balanced_bad_stft_features)
labels = np.array(good_labels + bad_labels)

# Custom Dataset class
class MelSpectrogramDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        feature = self.features[idx]
        label = self.labels[idx]
        return torch.tensor(feature, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

# Create dataset and dataloaders
dataset = MelSpectrogramDataset(features, labels)
train_size = int(0.4 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

val_dataset = MelSpectrogramDataset(validation_stft_features, validation_stft_labels)


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


## Training ResNet 18

In [8]:

import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet18

# Modify ResNet-18
class ResNet18MelSpectrogram(nn.Module):
    def __init__(self, num_classes=2):
        super(ResNet18MelSpectrogram, self).__init__()
        self.resnet = resnet18(pretrained=False)
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.resnet.fc = nn.Sequential(
            nn.Dropout(0.5),  # Dropout layer before the fully connected layer
            nn.Linear(self.resnet.fc.in_features, 2)  # 2 classes: good and bad
        )

    def forward(self, x):
        x = self.resnet(x)
        return x

# Instantiate model, loss function, and optimizer
model = ResNet18MelSpectrogram(num_classes=2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.00001, weight_decay=1e-4)

# Training loop
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.unsqueeze(1).to(device), labels.to(device)  # Add channel dimension

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / train_size
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}')


# Evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.unsqueeze(1).to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}%')



Epoch 1/10, Loss: 0.7094
Epoch 2/10, Loss: 0.2365
Epoch 3/10, Loss: 0.1165
Epoch 4/10, Loss: 0.0743
Epoch 5/10, Loss: 0.0527
Epoch 6/10, Loss: 0.0392
Epoch 7/10, Loss: 0.0328
Epoch 8/10, Loss: 0.0277
Epoch 9/10, Loss: 0.0241
Epoch 10/10, Loss: 0.0212
Test Accuracy: 100.00%


### Validation

In [9]:
# Validation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in val_loader:
        inputs, labels = inputs.unsqueeze(1).to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print("Correct Preditions/Total", correct, "/", total)
print(f'Validation Accuracy: {accuracy:.2f}%')

  return torch.tensor(feature, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


Correct Preditions/Total 2740 / 2876
Validation Accuracy: 95.27%


### Check Validation Scores & Save Model Weights

In [10]:
# if accuracy > 90:
#     torch.save(model.state_dict(),f'model_ResNet18_{accuracy:.2f}.pth')
#     print("Model Saved")