# Anomaly detection 

In [35]:
from IPython.display import clear_output
%pip install pandas numpy torch opencv-python torchvision tqdm scikit_learn requests BeautifulSoup4 selenium
clear_output()

In [69]:
import pandas as pd 
import numpy as np 


import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision
import cv2

from tqdm import tqdm
from sklearn.model_selection import train_test_split

In [68]:
cax = pd.read_csv('caxton_dataset/caxton_dataset_final.csv')
cax.head(3)

Unnamed: 0,img_path,timestamp,flow_rate,feed_rate,z_offset,target_hotend,hotend,bed,nozzle_tip_x,nozzle_tip_y,img_num,print_id,flow_rate_class,feed_rate_class,z_offset_class,hotend_class,img_mean,img_std
0,caxton_dataset/print0/image-6.jpg,2020-10-08T13:12:50-34,100,100,0.0,205.0,204.13,65.74,531,554,5,0,1,1,1,1,18.68723,13.809311
1,caxton_dataset/print0/image-7.jpg,2020-10-08T13:12:50-80,100,100,0.0,205.0,204.13,65.74,531,554,6,0,1,1,1,1,27.321104,22.875292
2,caxton_dataset/print0/image-8.jpg,2020-10-08T13:12:51-27,100,100,0.0,205.0,204.24,65.84,531,554,7,0,1,1,1,1,23.138174,17.933411


In [63]:
class ImageDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __getitem__(self, index):
        img_path = self.data.iloc[index]['img_path']
        img = cv2.imread(img_path)
        # img = torch.from_numpy(img).float().permute(2, 0, 1) / 255.0  # Normalize and convert to tensor

        # Get labels
        ground_class = self.data.iloc[index]['ground_class']

        return img, ground_class

    def __len__(self):
        return len(self.data)

In [133]:
batch_size = 64

# CNN hyperparameters
input_channels = 3  # RGB images
num_classes = 1 # Binary classification (normal vs anomaly)
learning_rate = 0.001
num_epochs = 50
dropout_rate = 0.5

# Define image dimensions
img_height = 224
img_width = 224

# Create a transform to resize images
transform = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.ToTensor(),
])

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [73]:
import os
import shutil
import re

# Define the source directory and the new directory structure
src_dir = 'binary_data'
data_dir = 'data'
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')

# Create the new directory structure
for dir in [train_dir, test_dir]:
    os.makedirs(os.path.join(dir, 'no_defected'), exist_ok=True)
    os.makedirs(os.path.join(dir, 'defected'), exist_ok=True)

# Function to move files
def move_files(files, src, dst):
    for f in files:
        shutil.copy(os.path.join(src, f), os.path.join(dst, f))

# Function to get defect type from filename
def get_defect_type(filename):
    match = re.match(r'([a-z_]+)_\d+', filename)
    return match.group(1) if match else None

# Split and move files for each category
for category in ['no_defected', 'defected']:
    src_category_dir = os.path.join(src_dir, category)
    files = os.listdir(src_category_dir)
    
    if category == 'defected':
        # Group files by defect type
        defect_types = {}
        for file in files:
            defect_type = get_defect_type(file)
            if defect_type:
                if defect_type not in defect_types:
                    defect_types[defect_type] = []
                defect_types[defect_type].append(file)
        
        # Split each defect type separately
        train_files, test_files = [], []
        for defect_type, type_files in defect_types.items():
            type_train, type_test = train_test_split(type_files, test_size=0.2, random_state=42)
            train_files.extend(type_train)
            test_files.extend(type_test)
    else:
        train_files, test_files = train_test_split(files, test_size=0.2, random_state=42)
    
    move_files(train_files, src_category_dir, os.path.join(train_dir, category))
    move_files(test_files, src_category_dir, os.path.join(test_dir, category))


In [134]:
# Set random seed for reproducibility
torch.manual_seed(42)

train_dataset = torchvision.datasets.ImageFolder(root='data/train', transform=transform)
test_dataset = torchvision.datasets.ImageFolder(root='data/test', transform=transform)

In [135]:
train_df, test_df = train_test_split(cax, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.25, random_state=42) 

# Create datasets
train_dataset = ImageDataset(train_df)
val_dataset = ImageDataset(val_df)
test_dataset = ImageDataset(test_df)

val_loader = DataLoader(val_dataset, batch_size=batch_size)

NameError: name 'cax' is not defined

In [136]:
# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [158]:
class model(nn.Module):
    def __init__(self, input_channels, num_classes) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, 16, 3)
        self.conv2 = nn.Conv2d(16, 32, 3)
        # self.pool = nn.MaxPool2d(2, 2)

        self.feature_size = 32 * (224 // 4) * (224 // 4)
        
        self.fc1 = nn.Linear(self.feature_size, 1024)
        self.fc2 = nn.Linear(1024, num_classes)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = torch.flatten(x)  # Flatten the feature maps
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x.squeeze(-1)

In [159]:
for inputs, labels in train_loader:
    print("Input shape:", inputs.shape)
    print("Label shape:", labels.shape)
    break

Input shape: torch.Size([64, 3, 224, 224])
Label shape: torch.Size([64])


In [160]:
model = model(input_channels, num_classes).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
loss_fn = nn.BCEWithLogitsLoss()

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0
    
    for batch, (inputs, labels) in tqdm(enumerate(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}"):
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Forward Pass
        outputs = model(inputs)
        loss = loss_fn(outputs, labels.float())
        
        # Backward Pass and Optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        predicted = (outputs > 0).float()
        train_correct += (predicted == labels).sum().item()
        train_total += labels.size(0)
    
    train_loss /= len(train_loader)
    train_accuracy = 100 * train_correct / train_total
    
    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for inputs, labels in tqdm(val_loader, desc="Validation"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = loss_fn(outputs, labels.float())
            val_loss += loss.item()
            predicted = (outputs > 0).float()
            val_correct += (predicted == labels).sum().item()
            val_total += labels.size(0)
    
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total
    
    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")
    print()

Epoch 1/50: 0it [00:00, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x99123200 and 100352x1024)

In [None]:
with torch.no_grad():
    output = model()