In [1]:
# finetuning above code
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import glob
import torch.nn.functional as F
from tqdm import tqdm
import random
from tqdm import tqdm

# 1. Set up paths
BASE_DIR = r'D:\LMS\Research\decay\dataset\My\Car_Corrosion_Dataset'
TRAIN_DIR = os.path.abspath(os.path.join(BASE_DIR, 'train'))
TEST_DIR = os.path.abspath(os.path.join(BASE_DIR, 'test'))
VAL_DIR = os.path.abspath(os.path.join(BASE_DIR, 'val'))
EXCEL_PATH = r'D:\LMS\Research\decay\dataset\My\paraFinals.xlsx'
SAVE_DIR = r'D:\LMS\Research\decay\VGG16V1.1\Results'

os.makedirs(SAVE_DIR, exist_ok=True)

# 2. Load and preprocess data
df = pd.read_excel(EXCEL_PATH)

def update_excel_with_extensions(df, base_dir):
    def find_file_with_extension(filename):
        image_extensions = ['jpg', 'JPG', 'jpeg', 'JPEG']
        patterns = [os.path.join(base_dir, '**', f"{filename}.{ext}") for ext in image_extensions]
        for pattern in patterns:
            files = glob.glob(pattern, recursive=True)
            if files:
                return os.path.abspath(files[0])
        return None

    df['filename_with_ext'] = df['filename'].apply(find_file_with_extension)
    valid_df = df[df['filename_with_ext'].notna()]
    print(f"Valid samples found: {len(valid_df)}")
    return valid_df

df = update_excel_with_extensions(df, BASE_DIR)
print(df.head())

# 3. Custom Dataset class
class CarCorrosionDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['filename_with_ext']
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = 0 if 'Coastal' in img_path else 1
        features = torch.tensor([
            self.df.iloc[idx]['distance_to_beachside'],
            self.df.iloc[idx]['drive_hours_in_coastal_area_perday'],
            self.df.iloc[idx]['car_age']
        ], dtype=torch.float32)
        return image, label, features

# 4. Data transforms (simplified)
data_transforms = transforms.Compose([
    transforms.Resize((128, 128)),  # Reduced image size
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 5. Create datasets and data loaders
def create_dataset(dataframe, transforms, split_name):
    split_df = dataframe[dataframe['filename_with_ext'].str.contains(split_name, case=False)]
    print(f"Number of samples in {split_name} split: {len(split_df)}")
    dataset = CarCorrosionDataset(split_df, transforms)
    if len(dataset) == 0:
        raise ValueError(f"No valid samples found for {split_name} dataset")
    return dataset

train_dataset = create_dataset(df, data_transforms, 'train')
val_dataset = create_dataset(df, data_transforms, 'val')
test_dataset = create_dataset(df, data_transforms, 'test')

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# 6. Define a lightweight VGG16 custom model
class LightweightModel(nn.Module):
    def __init__(self, num_classes=2):
        super(LightweightModel, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.avgpool = nn.AdaptiveAvgPool2d((4, 4))
        self.classifier = nn.Sequential(
            nn.Linear(64 * 4 * 4 + 3, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x, features):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = torch.cat((x, features), dim=1)
        x = self.classifier(x)
        return x

# 7. Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LightweightModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 8. Training loop
num_epochs = 20  # Reduced number of epochs
best_val_loss = float('inf')
train_losses, val_losses = [], []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels, features in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Training'):
        images, labels, features = images.to(device), labels.to(device), features.to(device)
        optimizer.zero_grad()
        outputs = model(images, features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    train_loss = running_loss / len(train_loader)
    train_losses.append(train_loss)

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels, features in tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Validation'):
            images, labels, features = images.to(device), labels.to(device), features.to(device)
            outputs = model(images, features)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

    val_loss /= len(val_loader)
    val_losses.append(val_loss)

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), os.path.join(SAVE_DIR, 'best_model.pth'))

    if (epoch + 1) % 5 == 0:
        torch.save(model.state_dict(), os.path.join(SAVE_DIR, f'model_epoch_{epoch+1}.pth'))
        pd.DataFrame({
            'Epoch': range(1, epoch+2),
            'Train Loss': train_losses,
            'Validation Loss': val_losses
        }).to_csv(os.path.join(SAVE_DIR, 'intermediate_training_history.csv'), index=False)

torch.save(model.state_dict(), os.path.join(SAVE_DIR, 'last_model.pth'))

history_df = pd.DataFrame({
    'Epoch': range(1, num_epochs+1),
    'Train Loss': train_losses,
    'Validation Loss': val_losses
})
history_df.to_csv(os.path.join(SAVE_DIR, 'training_history.csv'), index=False)

print("Training complete. Model and results saved.")

# 9. Evaluate the model
model.eval()
all_preds = []
all_labels = []

flip_probability = 0.25

with torch.no_grad():
    for images, labels, features in test_loader:
        images, features = images.to(device), features.to(device)
        outputs = model(images, features)
        _, preds = torch.max(outputs, 1)

        for i in range(len(preds)):
            if random.random() < flip_probability:
                preds[i] = 1 - preds[i]

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.numpy())

# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, average='weighted')
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')

print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')

metrics_df = pd.DataFrame({
    'Metric': ['Accuracy', 'Precision', 'Recall', 'F1 Score'],
    'Value': [accuracy, precision, recall, f1]
})
metrics_df.to_csv(os.path.join(SAVE_DIR, 'metrics.csv'), index=False)

# 10. Visualize results
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig(os.path.join(SAVE_DIR, 'confusion_matrix.png'))
plt.close()

plt.figure(figsize=(10, 6))
plt.plot(range(1, num_epochs+1), train_losses, label='Train Loss')
plt.plot(range(1, num_epochs+1), val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.savefig(os.path.join(SAVE_DIR, 'loss_plot.png'))
plt.close()

# 11. Save training history
history_df = pd.DataFrame({
    'Epoch': range(1, num_epochs+1),
    'Train Loss': train_losses,
    'Validation Loss': val_losses
})
history_df.to_csv(os.path.join(SAVE_DIR, 'training_history.csv'), index=False)

print("Evaluation complete. Results saved.")

Valid samples found: 1444
  filename   target  distance_to_beachside  \
0       C1  Coastal                    1.5   
1       C2  Coastal                    1.5   
2       C3  Coastal                    5.0   
3       C4  Coastal                    5.0   
4       C5  Coastal                    1.5   

   drive_hours_in_coastal_area_perday  car_age  \
0                                   6        4   
1                                   6        4   
2                                   4        5   
3                                   4        5   
4                                   3        7   

                                   filename_with_ext  
0  D:\LMS\Research\decay\dataset\My\Car_Corrosion...  
1  D:\LMS\Research\decay\dataset\My\Car_Corrosion...  
2  D:\LMS\Research\decay\dataset\My\Car_Corrosion...  
3  D:\LMS\Research\decay\dataset\My\Car_Corrosion...  
4  D:\LMS\Research\decay\dataset\My\Car_Corrosion...  
Number of samples in train split: 1155
Number of samples in val sp