In [None]:
import os
import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import seaborn as sns
from PIL import Image

# Ensure GPU is used if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# Paths to dataset
train_dir = r'C:/Users/asus/Downloads/Testing Simulasi Klasifikasi Sampah/Dataset sampah sendiri/garbage_classification_sendiri/train'
valid_dir = r'C:/Users/asus/Downloads/Testing Simulasi Klasifikasi Sampah/Dataset sampah sendiri/garbage_classification_sendiri/validation'
test_dir = r'C:/Users/asus/Downloads/Testing Simulasi Klasifikasi Sampah/Dataset sampah sendiri/garbage_classification_sendiri/test'

# Data transformations and augmentations
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomRotation(30),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

valid_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Custom Dataset class to handle corrupted images and transparent images
class VerifyImageFolder(datasets.ImageFolder):
    def __getitem__(self, index):
        try:
            path, target = self.samples[index]
            sample = self.loader(path)
            
            # Handle images with transparency (P or RGBA mode)
            if sample.mode in ('P', 'RGBA'):
                sample = sample.convert('RGBA').convert('RGB')
            
            if self.transform is not None:
                sample = self.transform(sample)
            if self.target_transform is not None:
                target = self.target_transform(target)
            
            return sample, target
        
        except (OSError, IOError) as e:
            print(f"Error loading image at index {index}: {e}")
            return None  # Skip corrupted images

# Custom collate function to filter out None items
def custom_collate_fn(batch):
    # Filter out None elements
    batch = list(filter(lambda x: x is not None, batch))
    
    # If all items are None, return an empty batch
    if len(batch) == 0:
        return None
    
    return torch.utils.data.dataloader.default_collate(batch)

# Load datasets with custom class
train_dataset = VerifyImageFolder(train_dir, transform=train_transforms)
valid_dataset = VerifyImageFolder(valid_dir, transform=valid_transforms)
test_dataset = VerifyImageFolder(test_dir, transform=test_transforms)

# DataLoaders for batch processing with custom collate_fn
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)

# Dapatkan mapping dari class index ke nama class
class_names = train_dataset.classes

# Cetak mapping class index ke nama class
print("Class index to name mapping:")
for idx, class_name in enumerate(class_names):
    print(f"{idx}: {class_name}")

# Load pretrained EfficientNetB7 model from torchvision
model = models.efficientnet_b7(pretrained=True)

# Freeze base model layers
for param in model.parameters():
    param.requires_grad = False

# Unfreeze the last few layers of the base model
for param in list(model.parameters())[-10:]:
    param.requires_grad = True

# Tentukan jumlah kelas
num_classes = 9  # Misalkan kita memiliki 9 kelas

# Add custom classifier layer
num_features = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Linear(num_features, 128),
    nn.ReLU(),
    nn.Dropout(0.2),  # Reduce dropout rate
    nn.Linear(128, num_classes)
)

# Move model to device (GPU or CPU)
model = model.to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

def train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=100):
    best_acc = 0.0
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0
        for batch in train_loader:
            # Skip empty batches
            if batch is None:
                continue
                
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)
        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_acc.item())

        model.eval()
        val_loss = 0.0
        val_corrects = 0
        with torch.no_grad():
            for batch in valid_loader:
                # Skip empty batches
                if batch is None:
                    continue
                
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels.data)

        val_loss = val_loss / len(valid_loader.dataset)
        val_acc = val_corrects.double() / len(valid_loader.dataset)
        val_losses.append(val_loss)
        val_accuracies.append(val_acc.item())

        print(f'Epoch {epoch}/{num_epochs - 1}, Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), 'best_model_sendiri_output_test.pth')

        scheduler.step()

    print(f'Best Val Acc: {best_acc:.4f}')
    
    # Plotting training and validation loss and accuracy
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Acc')
    plt.plot(val_accuracies, label='Val Acc')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.show()
    
    return model

# Train the model
model = train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=100)

# Load best model
model.load_state_dict(torch.load('best_model_sendiri_output_test.pth'))

model.eval()
test_loss = 0.0
test_corrects = 0
y_true = []
y_pred = []
y_prob = []
with torch.no_grad():
    for batch in test_loader:
        # Skip empty batches
        if batch is None:
            continue
            
        inputs, labels = batch
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        test_corrects += torch.sum(preds == labels.data)

        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())
        y_prob.extend(torch.softmax(outputs, dim=1).cpu().numpy())

test_loss = test_loss / len(test_loader.dataset)
test_acc = test_corrects.double() / len(test_loader.dataset)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')

# Plot confusion matrix dengan label nama class
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

# Evaluate model performance using various metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')  
roc_auc = roc_auc_score(y_true, y_prob, multi_class='ovr')

print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'ROC AUC Score: {roc_auc:.4f}')

In [None]:
import cv2
import torch
import torchvision.transforms as transforms
from PIL import Image
import tkinter as tk
from threading import Thread
import time

# Check if CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load the trained model
model_path = r'C:\Users\asus\Downloads\Testing Simulasi Klasifikasi Sampah\model_complete_garbage_classification_9_sendiri_output.pth'
model = torch.load(model_path)
model = model.to(device)
model.eval()

# Define labels
labels = {
    0: 'anorganik-brown-glass',
    1: 'anorganik-cardboard',
    2: 'anorganik-green-glass',
    3: 'anorganik-metal',
    4: 'anorganik-paper',
    5: 'anorganik-plastic',
    6: 'anorganik-trash',
    7: 'anorganik-white-glass',
    8: 'organik-biological',
    9: 'no-garbage'  # Add a label for no garbage
}

# Define the transformation
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Traffic Light and Trash Bin Animation Class
class TrafficLight(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Trash Bin Simulation")
        self.geometry("400x450")  # Adjusted the width to fit the trash bins
        
        # Create Canvas for traffic light
        self.canvas = tk.Canvas(self, width=200, height=400, bg='black')
        self.canvas.pack(side="left")

        # Create Circles (Lamps)
        self.red_light = self.canvas.create_oval(50, 50, 150, 150, fill="gray")
        self.yellow_light = self.canvas.create_oval(50, 150, 150, 250, fill="gray")
        self.green_light = self.canvas.create_oval(50, 250, 150, 350, fill="gray")
        
        # Create Canvas for trash bins
        self.trash_canvas = tk.Canvas(self, width=200, height=400, bg='white')
        self.trash_canvas.pack(side="right")

        # Draw trash bins (closed state)
        self.red_trash_bin = self.trash_canvas.create_rectangle(50, 50, 150, 150, fill="red")
        self.yellow_trash_bin = self.trash_canvas.create_rectangle(50, 150, 150, 250, fill="yellow")
        self.green_trash_bin = self.trash_canvas.create_rectangle(50, 250, 150, 350, fill="green")

        # Create Label for displaying the type of waste
        self.label = tk.Label(self, text="", font=("Helvetica", 12))
        self.label.pack(pady=10)

        # Timer variables
        self.prediction_timer = None
        self.last_prediction = None

    def red_on(self):
        self.all_off()
        self.canvas.itemconfig(self.red_light, fill="red")
        self.label.config(text="Sampah anorganik non-recycle")
        self.open_trash_bin('red')

    def yellow_on(self):
        self.all_off()
        self.canvas.itemconfig(self.yellow_light, fill="yellow")
        self.label.config(text="Sampah anorganik recycle")
        self.open_trash_bin('yellow')

    def green_on(self):
        self.all_off()
        self.canvas.itemconfig(self.green_light, fill="green")
        self.label.config(text="Sampah organik")
        self.open_trash_bin('green')

    def all_off(self):
        self.canvas.itemconfig(self.red_light, fill="gray")
        self.canvas.itemconfig(self.yellow_light, fill="gray")
        self.canvas.itemconfig(self.green_light, fill="gray")
        self.label.config(text="")
        self.close_all_bins()

    def open_trash_bin(self, color):
        if color == 'red':
            self.trash_canvas.coords(self.red_trash_bin, 50, 50, 150, 120)  # Opening the red bin
        elif color == 'yellow':
            self.trash_canvas.coords(self.yellow_trash_bin, 50, 150, 150, 220)  # Opening the yellow bin
        elif color == 'green':
            self.trash_canvas.coords(self.green_trash_bin, 50, 250, 150, 320)  # Opening the green bin

    def close_all_bins(self):
        # Reset bins to closed state
        self.trash_canvas.coords(self.red_trash_bin, 50, 50, 150, 150)
        self.trash_canvas.coords(self.yellow_trash_bin, 50, 150, 150, 250)
        self.trash_canvas.coords(self.green_trash_bin, 50, 250, 150, 350)

# Class to handle "No Garbage" state
class NoGarbage:
    def __init__(self, traffic_light):
        self.traffic_light = traffic_light

    def execute(self):
        self.traffic_light.all_off()  # Turn off all lights and close bins

# Function to update the traffic light based on AI output
def update_traffic_light(label, traffic_light):
    if label == 'anorganik-trash':
        traffic_light.red_on()
    elif label == 'organik-biological':
        traffic_light.green_on()
    elif label == 'no-garbage':
        no_garbage = NoGarbage(traffic_light)
        no_garbage.execute()
    else:
        traffic_light.yellow_on()

# Function to manage timer and ensure prediction consistency
def check_prediction_consistency(label, traffic_light):
    if label == 'no-garbage':
        # Set the timer if 'no-garbage' is detected
        if traffic_light.last_prediction != label:
            traffic_light.last_prediction = label
            traffic_light.prediction_timer = time.time()
        elif time.time() - traffic_light.prediction_timer >= 2:  # 2 second timer
            no_garbage = NoGarbage(traffic_light)
            no_garbage.execute()
    elif traffic_light.last_prediction is None:
        traffic_light.last_prediction = label
        traffic_light.prediction_timer = time.time()
    elif label == traffic_light.last_prediction:
        if time.time() - traffic_light.prediction_timer >= 2:  # 2 second timer
            update_traffic_light(label, traffic_light)
    else:
        traffic_light.last_prediction = label
        traffic_light.prediction_timer = time.time()

# Function to run the waste classification and update GUI
def run_classification():
    # Open camera
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)

    if not cap.isOpened():
        print("Error: Could not open camera.")
        exit()

    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        
        if not ret:
            print("Error: Failed to capture image.")
            break
        
        # Convert frame to PIL Image
        img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        
        # Preprocess the frame for the model
        img = transform(img).unsqueeze(0).to(device)
        
        # Predict using the model
        with torch.no_grad():
            outputs = model(img)
            probabilities = torch.nn.functional.softmax(outputs, dim=1)
            confidence, predicted = torch.max(probabilities, 1)
            confidence = confidence.item()
        
        # Use a threshold for confidence
        confidence_threshold = 0.6  # Adjust as needed
        if confidence < confidence_threshold:
            label = 'no-garbage'  # No garbage detected
        else:
            label = labels[predicted.item()]
        
        # Check prediction consistency with a 2-second delay
        check_prediction_consistency(label, traffic_light)

        # Display the resulting frame with the label and confidence
        label_with_confidence = f"{label}: {confidence * 100:.2f}%"
        cv2.putText(frame, label_with_confidence, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
        cv2.imshow('Waste Classifier', frame)
        
        # Break the loop on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release the camera and close windows
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    # Create Traffic Light and Trash Bin GUI
    traffic_light = TrafficLight()
    
    # Run the classification in a separate thread
    Thread(target=run_classification).start()
    
    # Start the GUI loop
    traffic_light.mainloop()
