In [2]:

import os
import pandas as pd
import numpy as np
from google.colab import drive
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Concatenate, Dropout
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import glob
import zipfile
import pickle
from PIL import Image
import uuid

In [3]:
# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
# Define paths
zip_path = '/content/drive/MyDrive/image.zip'  # Update to your zip file path
extract_path = '/content/disaster_dataset'  # Temporary folder in Colab
csv_path = '/content/drive/My Drive/disaster_dataset.csv'

# Step 1: Unzip the dataset
def unzip_dataset(zip_path, extract_path):
    os.makedirs(extract_path, exist_ok=True)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print(f"Dataset unzipped to {extract_path}")
    # Find the folder containing disaster subfolders
    for root, dirs, _ in os.walk(extract_path):
        # Look for a folder with multiple subfolders (likely disaster types)
        subdirs = [d for d in dirs if os.path.isdir(os.path.join(root, d))]
        if len(subdirs) > 1:  # Assume folder with multiple subfolders is the right one
            dataset_path = root
            print(f"Found dataset root: {dataset_path} with subfolders: {subdirs}")
            return dataset_path
        # Special case: Check if 'image' folder contains subfolders
        if 'image' in dirs:
            image_path = os.path.join(root, 'image')
            image_subdirs = [d for d in os.listdir(image_path) if os.path.isdir(os.path.join(image_path, d))]
            if image_subdirs:
                print(f"Found dataset root: {image_path} with subfolders: {image_subdirs}")
                return image_path
    print(f"Warning: Could not find folder with disaster subfolders. Using root: {extract_path}")
    return extract_path

# Unzip the dataset
dataset_path = unzip_dataset(zip_path, extract_path)

Dataset unzipped to /content/disaster_dataset
Found dataset root: /content/disaster_dataset/image with subfolders: ['wildefire', 'sinkhole', 'volcano', 'Hailstorm', 'Earthquake', 'Drought', 'landslideDisaster', 'Flood', 'locustwarn']


In [5]:
# Step 2: Generate CSV file from dataset
def generate_csv(dataset_path, csv_path):
    image_paths = []
    labels = []

    # Get list of disaster folders
    disaster_folders = [f for f in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, f))]
    if not disaster_folders:
        raise ValueError(f"No disaster folders found in {dataset_path}. Check zip structure.")
    print(f"Disaster folders found: {disaster_folders}")

    for disaster in disaster_folders:
        # Get all files in the disaster folder recursively
        disaster_path = os.path.join(dataset_path, disaster)
        all_files = glob.glob(os.path.join(disaster_path, '**', '*'), recursive=True)

        # Verify each file as an image
        valid_images = []
        for file_path in all_files:
            if os.path.isdir(file_path):
                continue  # Skip directories
            try:
                with Image.open(file_path) as img:
                    img.verify()  # Check if file is a valid image
                # Reopen to check format
                with Image.open(file_path) as img:
                    if img.format in ['JPEG', 'PNG', 'BMP', 'GIF', 'TIFF', 'WEBP']:
                        valid_images.append(file_path)
                    else:
                        print(f"Skipping unsupported image format: {file_path} (format: {img.format})")
            except Exception as e:
                print(f"Skipping invalid file: {file_path} ({e})")

        image_paths.extend(valid_images)
        labels.extend([disaster] * len(valid_images))
        print(f"Found {len(valid_images)} valid images in {disaster}")

    # Create DataFrame
    df = pd.DataFrame({
        'image_path': image_paths,
        'label': labels
    })

    if df.empty:
        raise ValueError(f"No valid images found in {dataset_path}. Check folder structure or image files.")

    # Save to CSV
    df.to_csv(csv_path, index=False)
    print(f"CSV file saved at {csv_path} with {len(df)} images across {len(df['label'].unique())} disaster types")
    return df

# Generate CSV if it doesn't exist
if not os.path.exists(csv_path):
    df = generate_csv(dataset_path, csv_path)
else:
    df = pd.read_csv(csv_path)


In [6]:
# Step 3: Data preprocessing
disaster_types = df['label'].unique()
num_classes = len(disaster_types)
print(f"Disaster types: {disaster_types}")
print(f"Number of classes: {num_classes}")

from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
df['label_encoded'] = label_encoder.fit_transform(df['label'])

from tensorflow.keras.preprocessing.image import load_img, img_to_array

# Function to load and preprocess images
def load_and_preprocess_image(image_path, target_size=(224, 224)):
    try:
        img = load_img(image_path, target_size=target_size)
        img_array = img_to_array(img) / 255.0
        return img_array
    except Exception as e:
        print(f"Error loading image {image_path}: {e}")
        return None

# Generator to load data in batches
def data_generator(df, batch_size, target_size=(224, 224), is_training=True):
    while True:
        df_sample = df.sample(frac=1).reset_index(drop=True) if is_training else df
        for start in range(0, len(df_sample), batch_size):
            end = min(start + batch_size, len(df_sample))
            batch_df = df_sample[start:end]

            images = []
            true_labels = []
            declared_labels = []

            for _, row in batch_df.iterrows():
                img = load_and_preprocess_image(row['image_path'], target_size)
                if img is None:
                    continue

                images.append(img)

                # One-hot encode true label
                true_label = np.zeros(num_classes, dtype=np.float32)
                true_label[row['label_encoded']] = 1
                true_labels.append(true_label)

                # Declared label: sometimes correct, sometimes incorrect (for training)
                if is_training and np.random.rand() > 0.5:
                    possible_labels = [i for i in range(num_classes) if i != row['label_encoded']]
                    declared_label_idx = np.random.choice(possible_labels)
                else:
                    declared_label_idx = row['label_encoded']

                declared_label = np.zeros(num_classes, dtype=np.float32)
                declared_label[declared_label_idx] = 1
                declared_labels.append(declared_label)

            if not images:
                continue

            images = np.array(images, dtype=np.float32)
            true_labels = np.array(true_labels, dtype=np.float32)
            declared_labels = np.array(declared_labels, dtype=np.float32)

            outputs = np.all(true_labels == declared_labels, axis=1).astype(np.float32)

            # Print every 10 batches only
            if start % (10 * batch_size) == 0:
                print(f"Yielding batch of {images.shape[0]} images")

            yield (images, declared_labels), outputs

# Create tf.data.Dataset
import tensorflow as tf

def create_dataset(df, batch_size, target_size=(224, 224), is_training=True):
    dataset = tf.data.Dataset.from_generator(
        lambda: data_generator(df, batch_size, target_size, is_training),
        output_signature=(
            (
                tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),
                tf.TensorSpec(shape=(None, num_classes), dtype=tf.float32)
            ),
            tf.TensorSpec(shape=(None,), dtype=tf.float32)
        )
    )

    dataset = dataset.shuffle(buffer_size=100).prefetch(tf.data.AUTOTUNE)
    return dataset


Disaster types: ['wildefire' 'sinkhole' 'volcano' 'Hailstorm' 'Earthquake' 'Drought'
 'landslideDisaster' 'Flood' 'locustwarn']
Number of classes: 9


In [7]:
# Step 4: Model definition
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Concatenate, Dropout

# Number of classes (already defined in Step 3)
# num_classes = len(disaster_types)  <-- already defined, no need to repeat

# Input 1: Image
image_input = Input(shape=(224, 224, 3), name="image_input")

# CNN feature extractor
x = Conv2D(32, (3, 3), activation='relu')(image_input)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Flatten()(x)

# Input 2: Declared disaster label (one-hot vector)
declared_label_input = Input(shape=(num_classes,), name="declared_label_input")

# Combine image features and declared label
merged = Concatenate()([x, declared_label_input])
merged = Dense(224, activation='relu')(merged)
merged = Dropout(0.3)(merged)

# Output: Binary prediction (1 = match, 0 = mismatch)
output = Dense(1, activation='sigmoid')(merged)

# Model
model = Model(inputs=[image_input, declared_label_input], outputs=output)

# Compile model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print model summary
model.summary()


In [8]:
# Step 5: Split data and train
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)

# Training parameters
batch_size = 32 if num_classes <= 20 else 16
epochs = 15
steps_per_epoch = len(train_df) // batch_size
validation_steps = len(val_df) // batch_size

# Create datasets
train_dataset = create_dataset(train_df, batch_size, is_training=True)
val_dataset = create_dataset(val_df, batch_size, is_training=False)

# Train the model
history = model.fit(
    train_dataset,
    steps_per_epoch=steps_per_epoch,
    epochs=epochs,
    validation_data=val_dataset,
    validation_steps=validation_steps
)

Epoch 1/15
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images




Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
[1m 10/530[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m32:09[0m 4s/step - accuracy: 0.5195 - loss: 5.0451Yielding batch of 32 images
[1m 20/530[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m31:52[0m 4s/step - accuracy: 0.5134 - loss: 3.7935Yielding batch of 32 images
[1m 30/530[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m30:39[0m 4s/step - accuracy: 0.5191 - loss: 3.1265Yielding batch of 32 images
[1m 40/530[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m29:35[0m 4s/step - accuracy: 0.5246 - loss: 2.7143Yielding batch of 32 images
[1m 50/530[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m29:15[0m 4s/step - accuracy: 0.5300 - loss: 2.4321Yielding batch of 32 images
[1m 60/530[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m28:32[0m 4s/step - accuracy: 0.5371 - loss: 2.2253Yielding batch of 32 images
[1m 70/530[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m28:03[0m 4s/step - accuracy: 0.5437 - loss: 2.

In [9]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Step 6: Evaluate model accuracy with metrics

# Collect predictions and true labels
y_true = []
y_pred = []

for (images, declared_labels), outputs in val_dataset.take(10):  # Adjust take(n) as needed
    predictions = model.predict([images, declared_labels])
    predicted_labels = (predictions > 0.5).astype(int)

    y_true.extend(outputs.numpy().astype(int))
    y_pred.extend(predicted_labels.flatten().astype(int))

# Convert to numpy arrays
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Accuracy & metrics
acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, zero_division=0)
rec = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
cm = confusion_matrix(y_true, y_pred)

# Display results
print(f"✅ Accuracy:  {acc:.4f}")
print(f"✅ Precision: {prec:.4f}")
print(f"✅ Recall:    {rec:.4f}")
print(f"✅ F1 Score:  {f1:.4f}")
print("\n📊 Confusion Matrix:")
print(cm)

Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
Yielding batch of 32 images
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Yielding batch of 32 images
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 951ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 868ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 885ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [

In [11]:
# Save model to a .keras file
model.save("disaster_match_modelqwe.keras")
print("✅ Model saved to disaster_match_model.keras")



✅ Model saved to disaster_match_model.keras


In [None]:
from tensorflow.keras.models import load_model

model = load_model("disaster_match_model.keras")
print("✅ Model loaded.")

✅ Model loaded.


  saveable.load_own_variables(weights_store.get(inner_path))


In [None]:
# Replace with your actual image path
image_path = "/content/disaster_dataset/image/Earthquake/1 (1).PNG"
declared_label = "Earthquake"

# Preprocess image
img = load_img(image_path, target_size=(128, 128))
img_array = img_to_array(img) / 255.0
img_array = np.expand_dims(img_array, axis=0)  # shape: (1, 128, 128, 3)

# Encode declared label
declared_idx = label_encoder.transform([declared_label])[0]
declared_one_hot = np.zeros((1, num_classes))
declared_one_hot[0, declared_idx] = 1

# Predict
prediction = model.predict([img_array, declared_one_hot])[0][0]
is_match = prediction > 0.5

# Output
print(f"Declared: {declared_label}")
print(f"✅ Match: {is_match} (Confidence: {prediction:.2f})")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 258ms/step
Declared: Earthquake
✅ Match: True (Confidence: 0.93)
