# EMSN AtmosBird Cloud Classifier - Volledig Automatisch

**One-Click Training:** Upload ZIP → Train → Download Model

Deze notebook gebruikt semi-supervised learning:
1. Pre-trained model voor initiële classificatie
2. Brightness-based dag/nacht detectie
3. Automatische labeling + handmatige correctie optie

---
## Instructies
1. **Runtime → Change runtime type → GPU (A100) + High RAM**
2. **Runtime → Run all** (Ctrl+F9)
3. Upload je ZIP als gevraagd
4. Wacht tot training klaar is (~10-15 min)
5. Download model uit Google Drive

In [None]:
# @title 1. Installatie & Setup (duurt ~1 minuut)
print("Installeren van dependencies...")
!pip install -q torch torchvision timm onnx onnxruntime pillow matplotlib scikit-learn tqdm

import os
import json
import random
import shutil
import zipfile
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Tuple

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import timm

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.cluster import KMeans

# GPU Check
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\n{'='*50}")
print(f"Device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
print(f"{'='*50}\n")
print("Setup compleet!")

In [None]:
# @title 2. Google Drive Mount & Werkdirectory
from google.colab import drive

print("Google Drive mounten...")
drive.mount('/content/drive')

# Werkdirectory in Google Drive (blijft bewaard)
WORK_DIR = Path('/content/drive/MyDrive/EMSN/cloud_classifier')
WORK_DIR.mkdir(parents=True, exist_ok=True)

# Lokale temp directory (sneller)
LOCAL_DIR = Path('/content/atmosbird_data')
LOCAL_DIR.mkdir(parents=True, exist_ok=True)

print(f"\nWerkdirectory: {WORK_DIR}")
print(f"Lokale cache: {LOCAL_DIR}")
print("\nDrive gemount!")

In [None]:
# @title 3. Upload AtmosBird Beelden (ZIP)
from google.colab import files

# Check of er al beelden zijn
existing_images = list((WORK_DIR / 'images').glob('*.jpg')) if (WORK_DIR / 'images').exists() else []

if len(existing_images) > 100:
    print(f"Er zijn al {len(existing_images)} beelden in Google Drive.")
    USE_EXISTING = True
else:
    USE_EXISTING = False
    print("="*50)
    print("UPLOAD JE ATMOSBIRD BEELDEN")
    print("="*50)
    print("\nMaak op de Pi een ZIP met:")
    print("  cd /mnt/usb/atmosbird/ruwe_foto/2025/12")
    print("  zip -r atmosbird.zip 20 21 22 23 24 25 26 27 28 29 30")
    print("\nOf selecteer specifieke beelden:")
    print("  find . -name '*.jpg' | shuf -n 500 | zip atmosbird.zip -@")
    print("\n" + "="*50)
    print("Upload nu je ZIP bestand...")
    print("="*50 + "\n")
    
    uploaded = files.upload()
    
    for filename in uploaded.keys():
        if filename.endswith('.zip'):
            print(f"\nUitpakken: {filename}...")
            
            # Uitpakken naar lokale directory
            images_dir = LOCAL_DIR / 'images'
            images_dir.mkdir(exist_ok=True)
            
            with zipfile.ZipFile(filename, 'r') as zip_ref:
                # Extract alleen JPG files
                jpg_files = [f for f in zip_ref.namelist() if f.lower().endswith('.jpg')]
                print(f"Gevonden: {len(jpg_files)} JPG bestanden")
                
                for jpg in tqdm(jpg_files, desc="Uitpakken"):
                    # Extract naar flat directory
                    data = zip_ref.read(jpg)
                    dest = images_dir / Path(jpg).name
                    with open(dest, 'wb') as f:
                        f.write(data)
            
            # Kopieer ook naar Google Drive voor later
            gdrive_images = WORK_DIR / 'images'
            gdrive_images.mkdir(exist_ok=True)
            print(f"\nKopieren naar Google Drive...")
            for img in tqdm(list(images_dir.glob('*.jpg')), desc="Kopieren"):
                shutil.copy(img, gdrive_images / img.name)
            
            print(f"\nKlaar! {len(list(images_dir.glob('*.jpg')))} beelden geladen.")
            break

# Set images directory
if USE_EXISTING:
    IMAGES_DIR = WORK_DIR / 'images'
    # Kopieer naar lokaal voor snellere toegang
    LOCAL_IMAGES = LOCAL_DIR / 'images'
    LOCAL_IMAGES.mkdir(exist_ok=True)
    print(f"Kopieren van {len(existing_images)} beelden naar lokale cache...")
    for img in tqdm(existing_images[:500], desc="Kopieren"):  # Max 500 voor snelheid
        shutil.copy(img, LOCAL_IMAGES / img.name)
    IMAGES_DIR = LOCAL_IMAGES
else:
    IMAGES_DIR = LOCAL_DIR / 'images'

print(f"\nBeelden directory: {IMAGES_DIR}")
print(f"Aantal beelden: {len(list(IMAGES_DIR.glob('*.jpg')))}")

In [None]:
# @title 4. Automatische Labeling
print("Automatisch labelen van beelden...\n")

def analyze_image(img_path: Path) -> Dict:
    """Analyseer beeld voor automatische labeling."""
    img = Image.open(img_path).convert('RGB')
    img_array = np.array(img)
    
    # Bereken statistieken
    gray = np.mean(img_array, axis=2)
    brightness = np.mean(gray)
    contrast = np.std(gray)
    
    # Kleur analyse (blauw vs grijs)
    r, g, b = img_array[:,:,0], img_array[:,:,1], img_array[:,:,2]
    blue_ratio = np.mean(b) / (np.mean(r) + np.mean(g) + np.mean(b) + 1e-6)
    
    # Textuur (variatie = wolken, uniform = helder of bewolkt)
    # Gebruik Laplacian variance als textuur maat
    from scipy import ndimage
    laplacian = ndimage.laplace(gray)
    texture = np.var(laplacian)
    
    return {
        'brightness': brightness,
        'contrast': contrast,
        'blue_ratio': blue_ratio,
        'texture': texture
    }

def auto_label(features: Dict) -> str:
    """Bepaal label op basis van features."""
    brightness = features['brightness']
    contrast = features['contrast']
    blue_ratio = features['blue_ratio']
    texture = features['texture']
    
    # Nacht detectie
    if brightness < 30:
        # Nacht: hoge textuur = sterren zichtbaar = helder
        if texture > 50:
            return 'helder'  # nacht_helder -> helder
        else:
            return 'bewolkt'  # nacht_bewolkt -> bewolkt
    
    # Dag detectie
    # Hoge blue ratio + hoge textuur = deels bewolkt of helder
    # Lage contrast + grijzig = bewolkt
    
    if contrast < 30 and blue_ratio < 0.35:
        return 'bewolkt'
    elif blue_ratio > 0.38 and contrast > 50:
        return 'helder'
    else:
        return 'gedeeltelijk'

# Analyseer alle beelden
all_images = list(IMAGES_DIR.glob('*.jpg'))
print(f"Analyseren van {len(all_images)} beelden...\n")

labels = {}
features_list = []

for img_path in tqdm(all_images, desc="Analyseren"):
    try:
        features = analyze_image(img_path)
        features['path'] = str(img_path)
        features['name'] = img_path.name
        features_list.append(features)
        
        # Auto label
        label = auto_label(features)
        labels[img_path.name] = label
    except Exception as e:
        print(f"Fout bij {img_path.name}: {e}")

# Statistieken
print(f"\n{'='*50}")
print("AUTOMATISCHE LABELING RESULTAAT")
print(f"{'='*50}")
label_counts = {}
for label in labels.values():
    label_counts[label] = label_counts.get(label, 0) + 1

for label, count in sorted(label_counts.items()):
    pct = count / len(labels) * 100
    print(f"  {label}: {count} ({pct:.1f}%)")

print(f"\nTotaal: {len(labels)} beelden gelabeld")

In [None]:
# @title 5. Bekijk Voorbeelden per Klasse
def show_examples(labels: Dict, images_dir: Path, n_examples: int = 6):
    """Toon voorbeelden per klasse."""
    classes = ['helder', 'gedeeltelijk', 'bewolkt']
    
    fig, axes = plt.subplots(3, n_examples, figsize=(20, 12))
    
    for row, cls in enumerate(classes):
        # Vind beelden van deze klasse
        cls_images = [k for k, v in labels.items() if v == cls]
        samples = random.sample(cls_images, min(n_examples, len(cls_images)))
        
        for col, img_name in enumerate(samples):
            img = Image.open(images_dir / img_name)
            img.thumbnail((400, 300))
            axes[row, col].imshow(img)
            axes[row, col].axis('off')
            if col == 0:
                axes[row, col].set_ylabel(cls.upper(), fontsize=14, fontweight='bold')
    
    plt.suptitle('Automatisch Gelabelde Voorbeelden', fontsize=16, fontweight='bold')
    plt.tight_layout()
    plt.savefig(WORK_DIR / 'label_examples.png', dpi=150)
    plt.show()

show_examples(labels, IMAGES_DIR)
print("\nControleer of de labels correct lijken.")
print("Als niet, kun je de thresholds in cel 4 aanpassen en opnieuw uitvoeren.")

In [None]:
# @title 6. Dataset & Model Setup
print("Dataset en model voorbereiden...\n")

# Configuratie
IMAGE_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 15
LEARNING_RATE = 0.001
NUM_CLASSES = 3

CLASS_MAPPING = {'helder': 0, 'gedeeltelijk': 1, 'bewolkt': 2}
CLASS_NAMES = ['helder', 'gedeeltelijk', 'bewolkt']

# Transforms
train_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE + 32, IMAGE_SIZE + 32)),
    transforms.RandomCrop(IMAGE_SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

class CloudDataset(Dataset):
    def __init__(self, image_dir, labels, transform=None):
        self.image_dir = Path(image_dir)
        self.transform = transform
        self.samples = [(self.image_dir / k, CLASS_MAPPING[v]) 
                        for k, v in labels.items() if v in CLASS_MAPPING]
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

# Train/Val split
items = list(labels.items())
train_items, val_items = train_test_split(items, test_size=0.2, random_state=42)
train_labels = dict(train_items)
val_labels = dict(val_items)

train_dataset = CloudDataset(IMAGES_DIR, train_labels, train_transform)
val_dataset = CloudDataset(IMAGES_DIR, val_labels, val_transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"Training set: {len(train_dataset)} beelden")
print(f"Validation set: {len(val_dataset)} beelden")

# Model
class CloudClassifier(nn.Module):
    def __init__(self, num_classes=3):
        super().__init__()
        self.backbone = timm.create_model('efficientnet_b0', pretrained=True, num_classes=0)
        self.classifier = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(self.backbone.num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        return self.classifier(self.backbone(x))

model = CloudClassifier().to(device)
print(f"\nModel geladen: EfficientNet-B0")
print(f"Parameters: {sum(p.numel() for p in model.parameters()):,}")

In [None]:
# @title 7. Training (duurt ~10-15 minuten)
print("="*50)
print("START TRAINING")
print("="*50 + "\n")

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)

history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
best_val_acc = 0.0
best_model_state = None

for epoch in range(EPOCHS):
    # Training
    model.train()
    train_loss, train_correct, train_total = 0.0, 0, 0
    
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    for images, targets in pbar:
        images, targets = images.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * images.size(0)
        train_correct += (outputs.argmax(1) == targets).sum().item()
        train_total += targets.size(0)
        
        pbar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{train_correct/train_total:.2%}'})
    
    train_loss /= train_total
    train_acc = train_correct / train_total
    
    # Validation
    model.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    
    with torch.no_grad():
        for images, targets in val_loader:
            images, targets = images.to(device), targets.to(device)
            outputs = model(images)
            loss = criterion(outputs, targets)
            
            val_loss += loss.item() * images.size(0)
            val_correct += (outputs.argmax(1) == targets).sum().item()
            val_total += targets.size(0)
    
    val_loss /= val_total
    val_acc = val_correct / val_total
    scheduler.step()
    
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_state = model.state_dict().copy()
        print(f"  ★ Nieuw beste model! Val acc: {val_acc:.2%}")
    
    print(f"Epoch {epoch+1}: Loss={train_loss:.4f}, Acc={train_acc:.2%}, Val_Loss={val_loss:.4f}, Val_Acc={val_acc:.2%}")

print(f"\n{'='*50}")
print(f"TRAINING COMPLEET!")
print(f"Beste validatie accuracy: {best_val_acc:.2%}")
print(f"{'='*50}")

In [None]:
# @title 8. Resultaten & Evaluatie
# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(history['train_loss'], label='Train')
ax1.plot(history['val_loss'], label='Validation')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training Loss')
ax1.legend()
ax1.grid(True)

ax2.plot(history['train_acc'], label='Train')
ax2.plot(history['val_acc'], label='Validation')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.set_title('Training Accuracy')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.savefig(WORK_DIR / 'training_history.png', dpi=150)
plt.show()

# Confusion matrix
model.load_state_dict(best_model_state)
model.eval()

all_preds, all_targets = [], []
with torch.no_grad():
    for images, targets in val_loader:
        outputs = model(images.to(device))
        all_preds.extend(outputs.argmax(1).cpu().numpy())
        all_targets.extend(targets.numpy())

print("\nClassification Report:")
print(classification_report(all_targets, all_preds, target_names=CLASS_NAMES))

cm = confusion_matrix(all_targets, all_preds)
fig, ax = plt.subplots(figsize=(8, 6))
im = ax.imshow(cm, cmap='Blues')
ax.set_xticks(range(3))
ax.set_yticks(range(3))
ax.set_xticklabels(CLASS_NAMES)
ax.set_yticklabels(CLASS_NAMES)
ax.set_xlabel('Voorspeld')
ax.set_ylabel('Werkelijk')
ax.set_title('Confusion Matrix')
for i in range(3):
    for j in range(3):
        ax.text(j, i, str(cm[i, j]), ha='center', va='center',
                color='white' if cm[i, j] > cm.max()/2 else 'black')
plt.colorbar(im)
plt.tight_layout()
plt.savefig(WORK_DIR / 'confusion_matrix.png', dpi=150)
plt.show()

In [None]:
# @title 9. Export naar ONNX
print("Exporteren naar ONNX formaat...\n")

model.load_state_dict(best_model_state)
model.eval()
model_cpu = model.cpu()

# Paden
ONNX_PATH = WORK_DIR / 'cloud_classifier.onnx'
PT_PATH = WORK_DIR / 'cloud_classifier.pt'

# PyTorch checkpoint
torch.save({
    'model_state_dict': best_model_state,
    'class_names': CLASS_NAMES,
    'image_size': IMAGE_SIZE,
    'best_val_acc': best_val_acc,
    'training_date': datetime.now().isoformat(),
    'labels': labels
}, PT_PATH)
print(f"PyTorch checkpoint: {PT_PATH}")

# ONNX export
dummy_input = torch.randn(1, 3, IMAGE_SIZE, IMAGE_SIZE)
torch.onnx.export(
    model_cpu,
    dummy_input,
    ONNX_PATH,
    export_params=True,
    opset_version=11,
    do_constant_folding=True,
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}}
)

print(f"ONNX model: {ONNX_PATH}")
print(f"Model grootte: {ONNX_PATH.stat().st_size / 1024 / 1024:.1f} MB")

# Valideer ONNX
import onnx
import onnxruntime as ort

onnx_model = onnx.load(ONNX_PATH)
onnx.checker.check_model(onnx_model)
print("\nONNX model validatie: OK!")

# Test inference
session = ort.InferenceSession(str(ONNX_PATH))
test_input = np.random.randn(1, 3, 224, 224).astype(np.float32)
output = session.run(None, {'input': test_input})[0]
print(f"ONNX inference test: OK! Output shape: {output.shape}")

In [None]:
# @title 10. Sla Labels op
labels_path = WORK_DIR / 'labels.json'
with open(labels_path, 'w') as f:
    json.dump(labels, f, indent=2)
print(f"Labels opgeslagen: {labels_path}")
print(f"Totaal: {len(labels)} gelabelde beelden")

In [None]:
# @title 11. Klaar! Download Instructies
print("="*60)
print("         TRAINING SUCCESVOL AFGEROND!")
print("="*60)
print(f"\nBeste validatie accuracy: {best_val_acc:.1%}")
print(f"\nBestanden staan in Google Drive:")
print(f"  {WORK_DIR}/")
print(f"    ├── cloud_classifier.onnx  ({ONNX_PATH.stat().st_size/1024/1024:.1f} MB)")
print(f"    ├── cloud_classifier.pt")
print(f"    ├── labels.json")
print(f"    ├── training_history.png")
print(f"    └── confusion_matrix.png")
print("\n" + "="*60)
print("DEPLOYMENT OP PI BERGING")
print("="*60)
print("""
1. Download cloud_classifier.onnx van Google Drive

2. Kopieer naar Pi:
   scp cloud_classifier.onnx ronny@192.168.1.87:/home/ronny/emsn2/scripts/atmosbird/

3. Test op Pi:
   cd /home/ronny/emsn2/scripts/atmosbird
   python cloud_classifier_inference.py cloud_classifier.onnx \
       /mnt/usb/atmosbird/ruwe_foto/2025/12/30/sky_20251230_120000.jpg

4. Het model is nu klaar voor gebruik in atmosbird_capture.py!
""")
print("="*60)