# Wildfire Detection AI Development Notebook

This notebook provides an interactive environment for developing and testing the wildfire detection AI model.

## Table of Contents
1. [Setup and Imports](#setup)
2. [Dataset Exploration](#dataset)
3. [Data Preprocessing](#preprocessing)
4. [Model Development](#model)
5. [Training](#training)
6. [Evaluation](#evaluation)
7. [TensorFlow Lite Conversion](#conversion)
8. [Performance Analysis](#performance)

## 1. Setup and Imports <a name="setup"></a>

In [None]:
# Install required packages if running in Colab
# !pip install tensorflow matplotlib seaborn scikit-learn pillow opencv-python

import os
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.optimizers import Adam
import cv2
import glob
from PIL import Image
import time
from pathlib import Path

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Configure matplotlib
plt.style.use('default')
sns.set_palette("husl")

print("TensorFlow version:", tf.__version__)
print("GPU Available:", tf.config.list_physical_devices('GPU'))

## 2. Dataset Exploration <a name="dataset"></a>

In [None]:
# Dataset configuration
DATASET_PATH = "dataset"
FIRE_DATASET_PATH = os.path.join(DATASET_PATH, "fire")
NO_FIRE_DATASET_PATH = os.path.join(DATASET_PATH, "no_fire")

# Create directories if they don't exist
os.makedirs(FIRE_DATASET_PATH, exist_ok=True)
os.makedirs(NO_FIRE_DATASET_PATH, exist_ok=True)

def analyze_dataset():
    """Analyze dataset characteristics"""
    
    # Get image paths
    fire_images = glob.glob(os.path.join(FIRE_DATASET_PATH, "*.jpg")) + \
                 glob.glob(os.path.join(FIRE_DATASET_PATH, "*.png"))
    
    no_fire_images = glob.glob(os.path.join(NO_FIRE_DATASET_PATH, "*.jpg")) + \
                    glob.glob(os.path.join(NO_FIRE_DATASET_PATH, "*.png"))
    
    print(f"Fire images: {len(fire_images)}")
    print(f"No-fire images: {len(no_fire_images)}")
    print(f"Total images: {len(fire_images) + len(no_fire_images)}")
    
    # Analyze image sizes
    fire_sizes = []
    no_fire_sizes = []
    
    for img_path in fire_images[:50]:  # Sample first 50
        try:
            img = Image.open(img_path)
            fire_sizes.append(img.size)
        except:
            continue
    
    for img_path in no_fire_images[:50]:  # Sample first 50
        try:
            img = Image.open(img_path)
            no_fire_sizes.append(img.size)
        except:
            continue
    
    # Plot size distributions
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    if fire_sizes:
        fire_widths = [s[0] for s in fire_sizes]
        fire_heights = [s[1] for s in fire_sizes]
        plt.scatter(fire_widths, fire_heights, alpha=0.6, label='Fire')
    if no_fire_sizes:
        no_fire_widths = [s[0] for s in no_fire_sizes]
        no_fire_heights = [s[1] for s in no_fire_sizes]
        plt.scatter(no_fire_widths, no_fire_heights, alpha=0.6, label='No Fire')
    plt.xlabel('Width')
    plt.ylabel('Height')
    plt.title('Image Size Distribution')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    labels = ['Fire', 'No Fire']
    sizes = [len(fire_images), len(no_fire_images)]
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
    plt.title('Class Distribution')
    
    plt.tight_layout()
    plt.show()
    
    return fire_images, no_fire_images

# Analyze dataset
fire_images, no_fire_images = analyze_dataset()

In [None]:
# Display sample images
def show_sample_images(image_paths, title, num_samples=5):
    """Display sample images from a class"""
    
    plt.figure(figsize=(15, 3))
    plt.suptitle(title, fontsize=16)
    
    for i in range(min(num_samples, len(image_paths))):
        plt.subplot(1, num_samples, i+1)
        img = Image.open(image_paths[i])
        plt.imshow(img)
        plt.axis('off')
        plt.title(f'Sample {i+1}\n{img.size}')
    
    plt.tight_layout()
    plt.show()

# Show sample images from each class
if fire_images:
    show_sample_images(fire_images, "Fire Images")

if no_fire_images:
    show_sample_images(no_fire_images, "No-Fire Images")

## 3. Data Preprocessing <a name="preprocessing"></a>

In [None]:
# Configuration
IMG_HEIGHT = 96
IMG_WIDTH = 96
BATCH_SIZE = 32
VALIDATION_SPLIT = 0.2

def create_data_generators():
    """Create training and validation data generators"""
    
    # Training data generator with augmentation
    train_datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=20,
        width_shift_range=0.2,
        height_shift_range=0.2,
        horizontal_flip=True,
        zoom_range=0.2,
        brightness_range=[0.8, 1.2],
        validation_split=VALIDATION_SPLIT
    )
    
    # Validation data generator (no augmentation)
    val_datagen = ImageDataGenerator(
        rescale=1./255,
        validation_split=VALIDATION_SPLIT
    )
    
    # Training generator
    train_generator = train_datagen.flow_from_directory(
        DATASET_PATH,
        target_size=(IMG_HEIGHT, IMG_WIDTH),
        batch_size=BATCH_SIZE,
        class_mode='categorical',
        subset='training',
        shuffle=True,
        seed=42
    )
    
    # Validation generator
    validation_generator = val_datagen.flow_from_directory(
        DATASET_PATH,
        target_size=(IMG_HEIGHT, IMG_WIDTH),
        batch_size=BATCH_SIZE,
        class_mode='categorical',
        subset='validation',
        shuffle=False,
        seed=42
    )
    
    print(f"Class indices: {train_generator.class_indices}")
    
    return train_generator, validation_generator

# Create data generators
train_generator, validation_generator = create_data_generators()

In [None]:
# Visualize data augmentation
def visualize_augmentation(generator, num_images=5):
    """Visualize data augmentation effects"""
    
    # Get a batch of images
    images, labels = next(generator)
    
    plt.figure(figsize=(15, 6))
    plt.suptitle('Data Augmentation Examples', fontsize=16)
    
    for i in range(min(num_images, len(images))):
        plt.subplot(2, num_images//2 + 1, i+1)
        plt.imshow(images[i])
        class_name = 'Fire' if np.argmax(labels[i]) == 0 else 'No Fire'
        plt.title(f'{class_name}')
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()

# Visualize augmented images
visualize_augmentation(train_generator)

## 4. Model Development <a name="model"></a>

In [None]:
def create_cnn_model():
    """Create a custom CNN model optimized for TinyML"""
    
    model = keras.Sequential([
        layers.Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
        
        # First convolutional block
        layers.Conv2D(16, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Second convolutional block
        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Third convolutional block
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Dense layers
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        
        # Output layer
        layers.Dense(2, activation='softmax')  # Fire, No-Fire
    ])
    
    return model

def create_mobilenet_model():
    """Create MobileNetV2-based model"""
    
    # Load MobileNetV2 without top layers
    base_model = MobileNetV2(
        weights='imagenet',
        include_top=False,
        input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)
    )
    
    # Freeze base model layers
    base_model.trainable = False
    
    model = keras.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(2, activation='softmax')
    ])
    
    return model

# Create model
MODEL_TYPE = 'cnn'  # or 'mobilenet'

if MODEL_TYPE == 'cnn':
    model = create_cnn_model()
else:
    model = create_mobilenet_model()

# Display model summary
model.summary()

In [None]:
# Compile model
LEARNING_RATE = 0.001

optimizer = Adam(learning_rate=LEARNING_RATE)

model.compile(
    optimizer=optimizer,
    loss='categorical_crossentropy',
    metrics=['accuracy', keras.metrics.Precision(), keras.metrics.Recall()]
)

print("Model compiled successfully!")

## 5. Training <a name="training"></a>

In [None]:
# Training callbacks
def get_callbacks():
    """Get training callbacks"""
    
    callbacks = []
    
    # Early stopping
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True,
        verbose=1
    )
    callbacks.append(early_stopping)
    
    # Model checkpoint
    model_checkpoint = ModelCheckpoint(
        'models/best_model.h5',
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    )
    callbacks.append(model_checkpoint)
    
    # Learning rate reduction
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    )
    callbacks.append(reduce_lr)
    
    return callbacks

# Get callbacks
callbacks = get_callbacks()

In [None]:
# Train the model
EPOCHS = 50

print("Starting model training...")

history = model.fit(
    train_generator,
    epochs=EPOCHS,
    validation_data=validation_generator,
    callbacks=callbacks,
    verbose=1
)

print("Training completed!")

## 6. Evaluation <a name="evaluation"></a>

In [None]:
# Plot training history
def plot_training_history(history):
    """Plot training history"""
    
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
    
    # Accuracy
    ax1.plot(history.history['accuracy'], label='Training')
    ax1.plot(history.history['val_accuracy'], label='Validation')
    ax1.set_title('Model Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Loss
    ax2.plot(history.history['loss'], label='Training')
    ax2.plot(history.history['val_loss'], label='Validation')
    ax2.set_title('Model Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # Precision
    if 'precision' in history.history:
        ax3.plot(history.history['precision'], label='Training')
        ax3.plot(history.history['val_precision'], label='Validation')
        ax3.set_title('Model Precision')
        ax3.set_xlabel('Epoch')
        ax3.set_ylabel('Precision')
        ax3.legend()
        ax3.grid(True, alpha=0.3)
    
    # Recall
    if 'recall' in history.history:
        ax4.plot(history.history['recall'], label='Training')
        ax4.plot(history.history['val_recall'], label='Validation')
        ax4.set_title('Model Recall')
        ax4.set_xlabel('Epoch')
        ax4.set_ylabel('Recall')
        ax4.legend()
        ax4.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# Plot training history
plot_training_history(history)

In [None]:
# Evaluate model performance
def evaluate_model(model, validation_generator):
    """Evaluate model performance"""
    
    print("Evaluating model...")
    
    # Get predictions
    predictions = model.predict(validation_generator)
    y_pred = np.argmax(predictions, axis=1)
    y_true = validation_generator.classes
    
    # Classification report
    class_names = list(validation_generator.class_indices.keys())
    report = classification_report(y_true, y_pred, target_names=class_names)
    print("\nClassification Report:")
    print(report)
    
    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
               xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
    
    return {
        'predictions': predictions,
        'y_pred': y_pred,
        'y_true': y_true,
        'classification_report': report,
        'confusion_matrix': cm,
        'class_names': class_names
    }

# Evaluate model
evaluation_results = evaluate_model(model, validation_generator)

## 7. TensorFlow Lite Conversion <a name="conversion"></a>

In [None]:
# Convert to TensorFlow Lite
def convert_to_tflite(model, quantization='dynamic'):
    """Convert Keras model to TensorFlow Lite"""
    
    print(f"Converting model to TensorFlow Lite ({quantization} quantization)...")
    
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    
    if quantization == 'dynamic':
        # Dynamic range quantization
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
    elif quantization == 'full_int8':
        # Full integer quantization
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_ops = [
            tf.lite.OpsSet.TFLITE_BUILTINS_INT8
        ]
        converter.inference_input_type = tf.int8
        converter.inference_output_type = tf.int8
        
        # Representative dataset for quantization
        def representative_dataset_gen():
            # Load a few sample images for quantization calibration
            for img_path in glob.glob(os.path.join(DATASET_PATH, "**/*.jpg"))[:100]:
                img = tf.io.read_file(img_path)
                img = tf.image.decode_jpeg(img, channels=3)
                img = tf.image.resize(img, [IMG_HEIGHT, IMG_WIDTH])
                img = tf.cast(img, tf.float32) / 255.0
                img = tf.expand_dims(img, 0)
                yield [img]
        
        converter.representative_dataset = representative_dataset_gen
    elif quantization == 'float16':
        # Float16 quantization
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_types = [tf.float16]
    
    # Convert model
    tflite_model = converter.convert()
    
    # Save model
    os.makedirs('models', exist_ok=True)
    tflite_path = f'models/fire_detection_{quantization}.tflite'
    with open(tflite_path, 'wb') as f:
        f.write(tflite_model)
    
    # Get model size
    model_size = len(tflite_model)
    print(f"TensorFlow Lite model saved: {tflite_path}")
    print(f"Model size: {model_size} bytes ({model_size/1024:.1f} KB)")
    
    return tflite_model, model_size

# Convert model to TFLite
QUANTIZATION_TYPE = 'dynamic'  # 'dynamic', 'full_int8', or 'float16'
tflite_model, model_size = convert_to_tflite(model, QUANTIZATION_TYPE)

In [None]:
# Convert to C array for ESP32
def convert_to_c_array(tflite_model, array_name='model_data'):
    """Convert TFLite model to C array for embedding"""
    
    print("Converting to C array...")
    
    # Convert to hex array
    hex_array = [f'0x{byte:02x}' for byte in tflite_model]
    
    # Create C file content
    c_content = f'''// Auto-generated TensorFlow Lite model data
// Generated on {pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S")}
// Model size: {len(tflite_model)} bytes\n
#ifndef MODEL_DATA_H_\n
#define MODEL_DATA_H_\n
\n
#include <cstdint>\n
\n
const unsigned int {array_name}_len = {len(tflite_model)};\n
const unsigned char {array_name}[] = {{\n
    {', '.join(hex_array[:16])}'''
    
    # Add remaining bytes in chunks
    for i in range(16, len(hex_array), 16):
        chunk = ', '.join(hex_array[i:i+16])
        c_content += f''',\n    {chunk}'''
    
    c_content += f'''\n}};\n
\n
#endif  // MODEL_DATA_H_\n
'''
    
    # Save C file
    c_file_path = f'models/model_data_{QUANTIZATION_TYPE}.h'
    with open(c_file_path, 'w') as f:
        f.write(c_content)
    
    print(f"C array saved to: {c_file_path}")
    return c_file_path

# Convert to C array
c_file_path = convert_to_c_array(tflite_model)

## 8. Performance Analysis <a name="performance"></a>

In [None]:
# Test inference speed
def test_inference_speed(model, test_images=100):
    """Test model inference speed"""
    
    print(f"Testing inference speed on {test_images} images...")
    
    # Get sample images
    image_files = []
    for class_dir in [FIRE_DATASET_PATH, NO_FIRE_DATASET_PATH]:
        image_files.extend(glob.glob(os.path.join(class_dir, "*.jpg"))[:test_images//2])
    
    inference_times = []
    
    for img_path in image_files[:test_images]:
        # Load and preprocess image
        img = keras.preprocessing.image.load_img(
            img_path, target_size=(IMG_HEIGHT, IMG_WIDTH)
        )
        img_array = keras.preprocessing.image.img_to_array(img)
        img_array = tf.expand_dims(img_array, 0)
        img_array = img_array / 255.0
        
        # Measure inference time
        start_time = time.time()
        predictions = model.predict(img_array, verbose=0)
        inference_time = (time.time() - start_time) * 1000  # Convert to ms
        
        inference_times.append(inference_time)
    
    avg_time = np.mean(inference_times)
    min_time = np.min(inference_times)
    max_time = np.max(inference_times)
    
    print(f"Average inference time: {avg_time:.2f} ms")
    print(f"Min inference time: {min_time:.2f} ms")
    print(f"Max inference time: {max_time:.2f} ms")
    
    # Plot inference time distribution
    plt.figure(figsize=(10, 6))
    plt.hist(inference_times, bins=20, alpha=0.7, edgecolor='black')
    plt.axvline(avg_time, color='red', linestyle='--', label=f'Average: {avg_time:.1f}ms')
    plt.xlabel('Inference Time (ms)')
    plt.ylabel('Frequency')
    plt.title('Inference Time Distribution')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    return {
        'avg_inference_time': avg_time,
        'min_inference_time': min_time,
        'max_inference_time': max_time,
        'inference_times': inference_times
    }

# Test inference speed
speed_results = test_inference_speed(model)

In [None]:
# Analyze model complexity
def analyze_model_complexity(model):
    """Analyze model complexity and parameters"""
    
    total_params = model.count_params()
    trainable_params = sum([layer.count_params() for layer in model.layers if layer.trainable])
    non_trainable_params = total_params - trainable_params
    
    print("Model Complexity Analysis:")
    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")
    print(f"Non-trainable parameters: {non_trainable_params:,}")
    print(f"Model size (approx): {total_params * 4 / 1024:.1f} KB (32-bit floats)")
    
    # Layer-wise analysis
    print("\nLayer-wise parameter count:")
    for layer in model.layers:
        params = layer.count_params()
        if params > 0:
            print(f"  {layer.name}: {params:,} parameters")
    
    return {
        'total_params': total_params,
        'trainable_params': trainable_params,
        'non_trainable_params': non_trainable_params
    }

# Analyze model complexity
complexity_analysis = analyze_model_complexity(model)

In [None]:
# Save final results
def save_training_results(history, evaluation_results, speed_results, complexity_analysis, model_size):
    """Save training results to file"""
    
    results = {
        'timestamp': pd.Timestamp.now().isoformat(),
        'model_config': {
            'type': MODEL_TYPE,
            'input_shape': [IMG_HEIGHT, IMG_WIDTH, 3],
            'quantization': QUANTIZATION_TYPE,
            'learning_rate': LEARNING_RATE,
            'batch_size': BATCH_SIZE,
            'epochs': len(history.history['accuracy'])
        },
        'final_metrics': {
            'accuracy': history.history['accuracy'][-1],
            'val_accuracy': history.history['val_accuracy'][-1],
            'loss': history.history['loss'][-1],
            'val_loss': history.history['val_loss'][-1]
        },
        'inference_performance': speed_results,
        'model_complexity': complexity_analysis,
        'model_size_kb': model_size / 1024,
        'classification_report': evaluation_results['classification_report']
    }
    
    os.makedirs('models', exist_ok=True)
    results_path = f'models/training_results_{QUANTIZATION_TYPE}.json'
    
    with open(results_path, 'w') as f:
        import json
        json.dump(results, f, indent=2, default=str)
    
    print(f"Training results saved to: {results_path}")
    
    # Also save model in SavedModel format
    model.save(f'models/saved_model_{QUANTIZATION_TYPE}')
    print(f"Model saved in SavedModel format")
    
    return results_path

# Save results
results_path = save_training_results(
    history, evaluation_results, speed_results, 
    complexity_analysis, model_size
)

print("\n" + "="*60)
print("AI Development Complete!")
print("="*60)
print(f"Model: {MODEL_TYPE.upper()} ({QUANTIZATION_TYPE} quantization)")
print(f"Final Accuracy: {history.history['val_accuracy'][-1]:.1f}%")
print(f"Average Inference Time: {speed_results['avg_inference_time']:.1f}ms")
print(f"Model Size: {model_size/1024:.1f}KB")
print(f"Files saved in: models/ directory")
print("="*60)