In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
from torchvision import datasets, transforms
import timm

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn.preprocessing import LabelEncoder

import os
import time
import copy
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Dispositivo utilizzato: {device}')

torch.manual_seed(42)
np.random.seed(42)

plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

In [None]:
data_dir = Path(r"c:\Users\frogora\OneDrive - BOARD\Desktop\Profession AI projects\Profession-AI-projects\7. Computer Vision\progetto-finale-flowes")
train_dir = data_dir / "train"
val_dir = data_dir / "valid" 
test_dir = data_dir / "test"

for path in [train_dir, val_dir, test_dir]:
    if not path.exists():
        raise FileNotFoundError(f"Cartella non trovata: {path}")
    print(f"Cartella trovata: {path}")

BATCH_SIZE = 32
LEARNING_RATE = 0.001
EPOCHS = 25
IMG_SIZE = 224
NUM_CLASSES = 2

In [None]:
def count_images_in_folder(folder_path):
    """Conta il numero di immagini per ogni classe in una cartella"""
    counts = {}
    for class_folder in folder_path.iterdir():
        if class_folder.is_dir():
            jpg_count = len(list(class_folder.glob('*.jpg')))
            counts[class_folder.name] = jpg_count
    return counts

train_counts = count_images_in_folder(train_dir)
val_counts = count_images_in_folder(val_dir)
test_counts = count_images_in_folder(test_dir)

data_summary = pd.DataFrame({
    'Train': train_counts,
    'Validation': val_counts,
    'Test': test_counts
})

print("Distribuzione del Dataset:")
print(data_summary)
print(f"\nTotale immagini: {data_summary.sum().sum()}")

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

data_summary.plot(kind='bar', ax=ax1, color=['skyblue', 'lightgreen', 'salmon'])
ax1.set_title('Distribuzione Immagini per Set', fontsize=14, fontweight='bold')
ax1.set_ylabel('Numero di Immagini')
ax1.set_xlabel('Classi')
ax1.legend(title='Dataset Split')
ax1.tick_params(axis='x', rotation=0)

total_per_class = data_summary.sum(axis=1)
colors = ['#FF9999', '#66B2FF']
wedges, texts, autotexts = ax2.pie(total_per_class.values, labels=total_per_class.index, 
                                   autopct='%1.1f%%', colors=colors, startangle=90)
ax2.set_title('Distribuzione Totale per Classe', fontsize=14, fontweight='bold')

plt.tight_layout()
plt.show()

total_daisy = total_per_class['daisy']
total_dandelion = total_per_class['dandelion']
ratio = total_dandelion / total_daisy
print(f"\nBilanciamento del dataset:")
print(f"Rapporto Dandelion/Daisy: {ratio:.2f}")
if ratio > 1.3 or ratio < 0.7:
    print("Dataset leggermente sbilanciato - considereremo weighted loss")
else:
    print("Dataset relativamente bilanciato")

In [None]:
def show_sample_images(data_dir, num_samples=8):
    """Mostra immagini campione dal dataset"""
    fig, axes = plt.subplots(2, num_samples//2, figsize=(16, 8))
    fig.suptitle('Campioni del Dataset GreenTech Solutions', fontsize=16, fontweight='bold')
    
    classes = ['daisy', 'dandelion']
    
    for i, class_name in enumerate(classes):
        class_dir = data_dir / class_name
        image_files = [f for f in class_dir.glob('*.jpg') if not f.name.startswith('._')][:num_samples//2]
        
        for j, img_path in enumerate(image_files):
            try:
                img = plt.imread(img_path)
                axes[i, j].imshow(img)
                axes[i, j].set_title(f'{class_name.capitalize()}', fontweight='bold')
                axes[i, j].axis('off')
            except Exception as e:
                print(f"Errore nella lettura di {img_path}: {e}")
                axes[i, j].text(0.5, 0.5, 'Errore\ncaricamento\nimmagine', 
                               ha='center', va='center', transform=axes[i, j].transAxes)
                axes[i, j].set_title(f'{class_name.capitalize()} - Errore', fontweight='bold')
                axes[i, j].axis('off')
    
    plt.tight_layout()
    plt.show()

show_sample_images(train_dir)

In [None]:
train_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_test_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print("Trasformazioni configurate:")
print("  - Training: Resize, Flip, Rotation, ColorJitter, Affine, Normalize")
print("  - Validation/Test: Resize, Normalize")

In [None]:
def clean_hidden_files(directory):
    """Rimuove i file nascosti (._*) dal dataset"""
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.startswith('._'):
                file_path = os.path.join(root, file)
                try:
                    os.remove(file_path)
                    print(f"Rimosso: {file_path}")
                except Exception as e:
                    print(f"Errore rimozione {file_path}: {e}")

print("Pulizia file nascosti in corso...")
clean_hidden_files(train_dir)
clean_hidden_files(val_dir)
clean_hidden_files(test_dir)
print("Pulizia completata!")

train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset = datasets.ImageFolder(val_dir, transform=val_test_transforms)
test_dataset = datasets.ImageFolder(test_dir, transform=val_test_transforms)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)

class_names = train_dataset.classes
class_to_idx = train_dataset.class_to_idx

print(f"Dataset caricati:")
print(f"- Training: {len(train_dataset)} immagini")
print(f"- Validation: {len(val_dataset)} immagini")
print(f"- Test: {len(test_dataset)} immagini")
print(f"- Classi: {class_names}")
print(f"- Mapping classi: {class_to_idx}")

class_counts = [len([f for f in (train_dir / class_name).glob('*.jpg')]) for class_name in class_names]
total_samples = sum(class_counts)
class_weights = [total_samples / (len(class_names) * count) for count in class_counts]

print(f"\nBilanciamento classi:")
for i, (name, count, weight) in enumerate(zip(class_names, class_counts, class_weights)):
    print(f"- {name}: {count} immagini (peso: {weight:.3f})")

In [None]:
class FlowerClassifier(nn.Module):
    """Modello per classificazione fiori basato su transfer learning"""
    
    def __init__(self, model_name='efficientnet_b3', num_classes=2, pretrained=True):
        super(FlowerClassifier, self).__init__()
        
        self.backbone = timm.create_model(model_name, pretrained=pretrained)
        
        if hasattr(self.backbone, 'classifier'):
            in_features = self.backbone.classifier.in_features
            self.backbone.classifier = nn.Identity()
        elif hasattr(self.backbone, 'head'):
            in_features = self.backbone.head.in_features
            self.backbone.head = nn.Identity()
        elif hasattr(self.backbone, 'fc'):
            in_features = self.backbone.fc.in_features
            self.backbone.fc = nn.Identity()
        else:
            in_features = self.backbone.num_features
        
        self.classifier = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(in_features, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.4),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )
        
    def forward(self, x):
        features = self.backbone(x)
        output = self.classifier(features)
        return output

model = FlowerClassifier(model_name='efficientnet_b3', num_classes=NUM_CLASSES)
model = model.to(device)

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Modello creato: EfficientNet-B3 con classificatore personalizzato")
print(f"- Parametri totali: {total_params:,}")
print(f"- Parametri addestrabili: {trainable_params:,}")
print(f"- Dispositivo: {device}")

In [None]:
class_weights_tensor = torch.FloatTensor(class_weights).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)

optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.7, patience=3, min_lr=1e-7
)

class EarlyStopping:
    def __init__(self, patience=7, min_delta=0.001):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float('inf')
        
    def __call__(self, val_loss):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
        return self.counter >= self.patience

early_stopping = EarlyStopping(patience=7)

print("Configurazione training:")
print(f"  - Loss: CrossEntropyLoss con pesi {class_weights}")
print(f"  - Optimizer: AdamW (lr={LEARNING_RATE}, weight_decay=0.01)")
print(f"  - Scheduler: ReduceLROnPlateau")
print(f"  - Early Stopping: patience=7")

In [None]:
def train_one_epoch(model, train_loader, criterion, optimizer, device):
    """Addestra il modello per una epoch"""
    model.train()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0
    
    for inputs, labels in train_loader:
        inputs = inputs.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)
        
        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        _, preds = torch.max(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        total_samples += inputs.size(0)
    
    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples
    
    return epoch_loss, epoch_acc.item()

def validate_model(model, val_loader, criterion, device):
    """Valuta il modello sul validation set"""
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            _, preds = torch.max(outputs, 1)
            
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total_samples += inputs.size(0)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples
    epoch_f1 = f1_score(all_labels, all_preds, average='macro')
    
    return epoch_loss, epoch_acc.item(), epoch_f1

print("Funzioni di training e validation definite")

In [None]:
train_losses = []
train_accs = []
val_losses = []
val_accs = []
val_f1s = []
best_f1 = 0.0
best_model_wts = copy.deepcopy(model.state_dict())

print("Inizio training del modello GreenTech Solutions...\n")
start_time = time.time()

for epoch in range(EPOCHS):
    epoch_start = time.time()
    
    train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, device)
    s
    val_loss, val_acc, val_f1 = validate_model(model, val_loader, criterion, device)
    
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    val_f1s.append(val_f1)
    
    scheduler.step(val_loss)
    
    if val_f1 > best_f1:
        best_f1 = val_f1
        best_model_wts = copy.deepcopy(model.state_dict())
        print(f"Nuovo miglior modello salvato! F1-Score: {val_f1:.4f}")
    
    epoch_time = time.time() - epoch_start
    
    print(f"Epoch {epoch+1}/{EPOCHS} ({epoch_time:.1f}s):")
    print(f"  Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}")
    print(f"  Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}")
    print(f"  LR: {optimizer.param_groups[0]['lr']:.2e}")
    print("-" * 60)
    
    if early_stopping(val_loss):
        print(f"Early stopping attivato alla epoch {epoch+1}")
        break

model.load_state_dict(best_model_wts)

total_time = time.time() - start_time
print(f"Training completato in {total_time//60:.0f}m {total_time%60:.0f}s")
print(f"Miglior F1-Score: {best_f1:.4f}")

torch.save({
    'epoch': epoch + 1,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'best_f1': best_f1,
    'class_names': class_names,
    'class_to_idx': class_to_idx
}, 'greentech_flower_classifier.pth')

print("Modello salvato come 'greentech_flower_classifier.pth'")

In [None]:
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('GreenTech Solutions - Metriche di Training', fontsize=16, fontweight='bold')

epochs_range = range(1, len(train_losses) + 1)

ax1.plot(epochs_range, train_losses, 'b-', label='Training Loss', linewidth=2)
ax1.plot(epochs_range, val_losses, 'r-', label='Validation Loss', linewidth=2)
ax1.set_title('Loss', fontweight='bold')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True, alpha=0.3)

ax2.plot(epochs_range, train_accs, 'b-', label='Training Accuracy', linewidth=2)
ax2.plot(epochs_range, val_accs, 'r-', label='Validation Accuracy', linewidth=2)
ax2.set_title('Accuracy', fontweight='bold')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.legend()
ax2.grid(True, alpha=0.3)

ax3.plot(epochs_range, val_f1s, 'g-', label='Validation F1-Score', linewidth=2)
ax3.set_title('F1-Score (Macro)', fontweight='bold')
ax3.set_xlabel('Epoch')
ax3.set_ylabel('F1-Score')
ax3.legend()
ax3.grid(True, alpha=0.3)

lr_history = [LEARNING_RATE * (0.7 ** max(0, (i - 3) // 4)) for i in range(len(train_losses))]
ax4.plot(epochs_range, lr_history, 'purple', linewidth=2)
ax4.set_title('Learning Rate', fontweight='bold')
ax4.set_xlabel('Epoch')
ax4.set_ylabel('Learning Rate')
ax4.set_yscale('log')
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("Statistiche finali del training:")
print(f"- Miglior Validation F1-Score: {max(val_f1s):.4f}")
print(f"- Miglior Validation Accuracy: {max(val_accs):.4f}")
print(f"- Loss finale: {val_losses[-1]:.4f}")
print(f"- Epochs completate: {len(train_losses)}")

In [None]:
def evaluate_on_test(model, test_loader, device, class_names):
    """Valuta il modello sul test set e genera report dettagliato"""
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            probs = torch.softmax(outputs, dim=1)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())
    
    accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
    f1_macro = f1_score(all_labels, all_preds, average='macro')
    f1_weighted = f1_score(all_labels, all_preds, average='weighted')
    
    return all_labels, all_preds, all_probs, accuracy, f1_macro, f1_weighted

print("Valutazione finale sul test set...")
test_labels, test_preds, test_probs, test_acc, test_f1_macro, test_f1_weighted = evaluate_on_test(
    model, test_loader, device, class_names
)

print(f"Risultati finali sul Test Set:")
print(f"- Accuracy: {test_acc:.4f} ({test_acc*100:.2f}%)")
print(f"- F1-Score Macro: {test_f1_macro:.4f}")
print(f"- F1-Score Weighted: {test_f1_weighted:.4f}")

print(f"Classification Report:")
print(classification_report(test_labels, test_preds, target_names=class_names, digits=4))

In [None]:
cm = confusion_matrix(test_labels, test_preds)
cm_normalized = confusion_matrix(test_labels, test_preds, normalize='true')

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, 
            yticklabels=class_names, ax=ax1, cbar_kws={'label': 'Count'})
ax1.set_title('Confusion Matrix - Conteggi Assoluti', fontweight='bold')
ax1.set_xlabel('Predicted Label')
ax1.set_ylabel('True Label')

sns.heatmap(cm_normalized, annot=True, fmt='.3f', cmap='Blues', xticklabels=class_names, 
            yticklabels=class_names, ax=ax2, cbar_kws={'label': 'Rate'})
ax2.set_title('Confusion Matrix - Normalizzata', fontweight='bold')
ax2.set_xlabel('Predicted Label')
ax2.set_ylabel('True Label')

plt.tight_layout()
plt.show()

print("Analisi per classe:")
for i, class_name in enumerate(class_names):
    tp = cm[i, i]
    fp = cm[:, i].sum() - tp
    fn = cm[i, :].sum() - tp
    tn = cm.sum() - tp - fp - fn
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    
    print(f"\n{class_name.upper()}:")
    print(f"  - Precision: {precision:.4f}")
    print(f"  - Recall (Sensitivity): {recall:.4f}")
    print(f"  - Specificity: {specificity:.4f}")
    print(f"  - Campioni corretti: {tp}/{tp+fn}")

In [None]:
def visualize_predictions(model, test_loader, device, class_names, num_images=12):
    """Visualizza alcune predizioni del modello"""
    model.eval()
    
    dataiter = iter(test_loader)
    images, labels = next(dataiter)
    images = images.to(device)
    labels = labels.to(device)
    
    with torch.no_grad():
        outputs = model(images)
        probs = torch.softmax(outputs, dim=1)
        _, preds = torch.max(outputs, 1)
    
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    
    fig, axes = plt.subplots(3, 4, figsize=(16, 12))
    fig.suptitle('Predizioni del Modello GreenTech Solutions', fontsize=16, fontweight='bold')
    
    for i in range(min(num_images, len(images))):
        img = images[i].cpu().numpy().transpose((1, 2, 0))
        img = std * img + mean
        img = np.clip(img, 0, 1)
        
        true_label = class_names[labels[i]]
        pred_label = class_names[preds[i]]
        confidence = probs[i][preds[i]].item()
        
        color = 'green' if labels[i] == preds[i] else 'red'
        
        ax = axes[i//4, i%4]
        ax.imshow(img)
        ax.set_title(f'True: {true_label}\nPred: {pred_label}\nConf: {confidence:.3f}', 
                    color=color, fontweight='bold')
        ax.axis('off')
        
        for spine in ax.spines.values():
            spine.set_edgecolor(color)
            spine.set_linewidth(3)
    
    plt.tight_layout()
    plt.show()

visualize_predictions(model, test_loader, device, class_names)

In [None]:
def analyze_errors(model, test_loader, device, class_names):
    """Analizza gli errori del modello per identificare pattern"""
    model.eval()
    errors = []
    
    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(test_loader):
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)
            _, preds = torch.max(outputs, 1)
            
            incorrect = preds != labels
            if incorrect.any():
                for i in range(len(labels)):
                    if incorrect[i]:
                        confidence = probs[i][preds[i]].item()
                        true_conf = probs[i][labels[i]].item()
                        errors.append({
                            'true_class': class_names[labels[i]],
                            'pred_class': class_names[preds[i]],
                            'confidence': confidence,
                            'true_confidence': true_conf,
                            'batch_idx': batch_idx,
                            'image_idx': i
                        })
    
    if errors:
        errors_df = pd.DataFrame(errors)
        
        print(f"Analisi errori ({len(errors)} errori totali):")
        print(f"\nDistribuzione errori per classe:")
        error_by_true = errors_df.groupby('true_class').size()
        for class_name in class_names:
            count = error_by_true.get(class_name, 0)
            total = sum(1 for label in test_labels if class_names[label] == class_name)
            error_rate = count / total if total > 0 else 0
            print(f"  - {class_name}: {count}/{total} errori ({error_rate:.2%})")
        
        print(f"\nConfidenza media negli errori:")
        avg_conf = errors_df['confidence'].mean()
        avg_true_conf = errors_df['true_confidence'].mean()
        print(f"  - Confidenza predizione errata: {avg_conf:.3f}")
        print(f"  - Confidenza classe vera: {avg_true_conf:.3f}")
        
        high_conf_errors = errors_df[errors_df['confidence'] > 0.7]
        print(f"Errori con alta confidenza (>70%): {len(high_conf_errors)}")
        if len(high_conf_errors) > 0:
            print("Questi potrebbero essere casi genuinamente difficili o errori di labeling.")
    
    return errors

errors = analyze_errors(model, test_loader, device, class_names)