In [11]:
import os
import torch
from pathlib import Path
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from PIL import Image, ImageDraw
import torch.nn as nn
import torch.optim as optim
import logging
import matplotlib.pyplot as plt

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

# Constants
VALID_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif', '.tiff', '.webp'}
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
DATA_DIR = 'data'
MODEL_PATH = 'saved_model.pth'

In [12]:
# Custom Exceptions
class DatasetConfigError(Exception):
    pass

class ImageProcessingError(Exception):
    pass

# Validate image file
def validate_image_file(file_path):
    file_path = Path(file_path)
    return (
        file_path.is_file() and 
        file_path.suffix.lower() in VALID_IMAGE_EXTENSIONS and 
        not file_path.name.startswith('.')
    )

# Get image files from directory
def get_image_files(directory):
    directory_path = Path(directory)
    if not directory_path.is_dir():
        raise DatasetConfigError(f"Directory not found: {directory}")
    
    image_files = [str(path) for path in directory_path.rglob('*') if validate_image_file(path)]
    
    if not image_files:
        logger.warning(f"No valid images found in {directory}")
    
    print(f"Found {len(image_files)} valid image files in {directory}.")
    return image_files

# Create data transformations
def create_data_transformations(input_size=(224, 224)):
    train_transform = transforms.Compose([
        transforms.Resize(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    
    val_transform = transforms.Compose([
        transforms.Resize(input_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    
    print("Data transformations created.")
    return train_transform, val_transform

# Load datasets
def load_image_datasets(data_dir, train_transform, val_transform):
    train_dir = os.path.join(data_dir, 'train')
    val_dir = os.path.join(data_dir, 'val')

    for directory, name in [(train_dir, 'training'), (val_dir, 'validation')]:
        if not os.path.exists(directory):
            raise DatasetConfigError(f"{name.capitalize()} directory not found: {directory}")

    class FilteredImageFolder(datasets.ImageFolder):
        def __init__(self, root, transform=None):
            valid_classes = [d for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))]
            super().__init__(root, transform=transform)
            self.samples = [s for s in self.samples if self.classes[s[1]] in valid_classes]

    train_dataset = FilteredImageFolder(root=train_dir, transform=train_transform)
    val_dataset = FilteredImageFolder(root=val_dir, transform=val_transform)

    print(f"Loaded training dataset with {len(train_dataset)} images across {len(train_dataset.classes)} classes.")
    print(f"Loaded validation dataset with {len(val_dataset)} images across {len(val_dataset.classes)} classes.")
    return train_dataset, val_dataset

In [13]:
# Create data loaders
def create_data_loaders(train_dataset, val_dataset, batch_size=32):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    print(f"Data loaders created with batch size {batch_size}.")
    return train_loader, val_loader, train_dataset.classes

# Train model
def train_model(model, train_loader, criterion, optimizer, num_epochs, device):
    model.train()
    print("Starting training...")
    
    train_losses = []
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        epoch_loss = running_loss / len(train_loader)
        train_losses.append(epoch_loss)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

    return train_losses

# Save model
def save_model(model, model_path):
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to {model_path}.")

In [14]:
# Predict and display images with bounding boxes
def predict_and_display_images(model, image_paths, val_transform, class_names, device):
    predictions = []
    font_size = 24  # Kích thước font chữ
    font = ImageFont.truetype("arial.ttf", font_size)  # Đường dẫn đến font chữ nếu cần

    for image_path in image_paths:
        try:
            img = Image.open(image_path).convert('RGB')
            img_tensor = val_transform(img).unsqueeze(0).to(device)
            
            with torch.no_grad():
                model.eval()
                outputs = model(img_tensor)
                probabilities = torch.nn.functional.softmax(outputs, dim=1)
                top_probs, top_classes = torch.topk(probabilities, k=1)

            class_name = class_names[top_classes[0][0].item()]
            probability = top_probs[0][0].item() * 100
            
            # Draw bounding box and label
            draw = ImageDraw.Draw(img)
            width, height = img.size
            box_coords = [10, 10, width - 10, height - 10]  # Ví dụ về tọa độ bounding box
            draw.rectangle(box_coords, outline="red", width=3)  

            # Tạo nền đỏ cho văn bản
            text = f"{class_name}: {probability:.2f}%"
            text_width, text_height = draw.textsize(text, font=font)
            draw.rectangle([15, 15, 15 + text_width, 15 + text_height], fill="red")  # Nền đỏ
            draw.text((15, 15), text, fill="white", font=font)  # Chữ trắng

            predictions.append(img)
            print(f"Predicted result for {image_path}: {class_name} - {probability:.2f}%")
        except Exception as e:
            logger.error(f"Error predicting image {image_path}: {str(e)}")
            continue

    # Display images
    display_image_grid(predictions)

def display_image_grid(images, grid_size=(3, 3)):
    """Display a grid of images."""
    num_images = len(images)
    cols, rows = grid_size
    if num_images > cols * rows:
        print("Warning: Number of images exceeds grid size. Truncating...")

    plt.figure(figsize=(15, 10))
    for index, img in enumerate(images[:cols * rows]):
        plt.subplot(rows, cols, index + 1)
        plt.imshow(img)
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [15]:
# Load and train model
def load_and_train_model():
    train_transform, val_transform = create_data_transformations()
    train_dataset, val_dataset = load_image_datasets(DATA_DIR, train_transform, val_transform)
    train_loader, val_loader, class_names = create_data_loaders(train_dataset, val_dataset)

    # Check if the model has been trained before
    if os.path.exists(MODEL_PATH):
        print(f"Loading model from {MODEL_PATH}...")
        model = models.resnet18(pretrained=False)
        model.fc = nn.Linear(model.fc.in_features, len(class_names))
        model.load_state_dict(torch.load(MODEL_PATH))
        model = model.to(DEVICE)
        print("Model loaded successfully.")
    else:
        print("Training new model...")
        model = models.resnet18(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, len(class_names))
        model = model.to(DEVICE)

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        num_epochs = 5

        train_losses = train_model(model, train_loader, criterion, optimizer, num_epochs, DEVICE)
        save_model(model, MODEL_PATH)

    return model, class_names

In [None]:
# Bước 1: Tạo biến chuyển đổi dữ liệu
train_transform, val_transform = create_data_transformations()

# Bước 2: Tải datasets
train_dataset, val_dataset = load_image_datasets(DATA_DIR, train_transform, val_transform)

# Bước 3: Tạo Data Loaders
train_loader, val_loader, class_names = create_data_loaders(train_dataset, val_dataset)

# Bước 4: Tải và huấn luyện mô hình
model, class_names = load_and_train_model()

# Bước 5: Dự đoán và hiển thị hình ảnh
test_images_dir = os.path.join(DATA_DIR, 'test')
test_images = get_image_files(test_images_dir)
predict_and_display_images(model, test_images, val_transform, class_names, DEVICE)

Data transformations created.
Loaded training dataset with 32246 images across 9 classes.
Loaded validation dataset with 32246 images across 9 classes.
Data loaders created with batch size 32.
Data transformations created.
Loaded training dataset with 32246 images across 9 classes.
Loaded validation dataset with 32246 images across 9 classes.
Data loaders created with batch size 32.
Training new model...
Starting training...
