In [None]:
!pip install -q diffusers transformers accelerate
!pip install -q safetensors
!pip install -q xformers

import os
import torch
import random
from pathlib import Path
from PIL import Image
from tqdm.auto import tqdm


print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"   Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")


FAKE_IMAGES_DIR = Path("data/fake_disaster_images")
FAKE_IMAGES_DIR.mkdir(parents=True, exist_ok=True)

from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler


model_id = "runwayml/stable-diffusion-v1-5"


pipe = StableDiffusionPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16,
    safety_checker=None,
    requires_safety_checker=False
)


pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)


pipe = pipe.to("cuda")


pipe.enable_attention_slicing()


try:
    pipe.enable_xformers_memory_efficient_attention()

except:
    print("⚠️ xformers not available, using standard attention")

print("✅ Stable Diffusion loaded!")

# ----- DISASTER PROMPTS ----- #

DISASTER_PROMPTS = {
    'earthquake': [
        "collapsed buildings after earthquake, rubble and debris, damaged infrastructure, realistic news photo",
        "earthquake damage to city street, cracked roads, fallen structures, photorealistic",
        "destroyed buildings from earthquake, search and rescue scene, emergency workers, news photography",
        "earthquake aftermath, damaged houses, broken walls, realistic documentary photo",
        "seismic damage to urban area, tilted buildings, structural collapse, photojournalism style",
    ],
    'flood': [
        "flooded city streets with submerged cars, muddy water, disaster scene, realistic photo",
        "flood disaster, houses partially underwater, rescue boats, news photography",
        "flash flood damage, debris in water, flooded neighborhood, photorealistic",
        "river flooding town, water covering roads, emergency scene, documentary photo",
        "flood aftermath, waterlogged streets, damaged property, realistic news image",
    ],
    'wildfire': [
        "wildfire burning forest, orange flames and smoke, firefighters, realistic news photo",
        "house on fire from wildfire, burning trees, smoke filled sky, photojournalism",
        "forest fire spreading rapidly, flames engulfing trees, emergency scene, realistic",
        "wildfire aftermath, burned landscape, charred trees, documentary photography",
        "fire tornado in wildfire, extreme fire behavior, dramatic but realistic photo",
    ],
    'hurricane': [
        "hurricane damage to coastal houses, strong winds, flooding, realistic news photo",
        "storm surge flooding streets, hurricane aftermath, damaged buildings, photojournalism",
        "hurricane destruction, fallen trees on houses, debris everywhere, documentary style",
        "cyclone damage to city, broken windows, flooded roads, realistic disaster photo",
        "tropical storm aftermath, destroyed roofs, emergency response, news photography",
    ],
}

# ----- GENERATION SETTINGS -----
IMAGES_PER_CLASS = 120
BATCH_SIZE = 1


GENERATION_CONFIG = {
    'num_inference_steps': 25,
    'guidance_scale': 7.5,
    'height': 512,
    'width': 512,
}

print(f"\n📋 Generation Plan:")
print(f"   Classes: {list(DISASTER_PROMPTS.keys())}")
print(f"   Images per class: {IMAGES_PER_CLASS}")
print(f"   Total fake images: {IMAGES_PER_CLASS * len(DISASTER_PROMPTS)}")

generated_count = 0
total_to_generate = IMAGES_PER_CLASS * len(DISASTER_PROMPTS)

for disaster_type, prompts in DISASTER_PROMPTS.items():
    print(f"\n🔥 Generating {disaster_type} images...")


    class_dir = FAKE_IMAGES_DIR / disaster_type
    class_dir.mkdir(exist_ok=True)

    for i in tqdm(range(IMAGES_PER_CLASS), desc=disaster_type):

        prompt = random.choice(prompts)


        full_prompt = f"{prompt}, high quality, detailed, 8k resolution"


        negative_prompt = "cartoon, anime, drawing, painting, blurry, low quality, watermark, text, logo"

        try:

            with torch.autocast("cuda"):
                result = pipe(
                    prompt=full_prompt,
                    negative_prompt=negative_prompt,
                    **GENERATION_CONFIG
                )

            image = result.images[0]


            image_path = class_dir / f"{disaster_type}_{i:03d}.jpg"
            image.save(image_path, "JPEG", quality=95)

            generated_count += 1

        except Exception as e:
            print(f"⚠️ Error generating image: {e}")

            torch.cuda.empty_cache()
            continue


        if i % 10 == 0:
            torch.cuda.empty_cache()

print(f"\n✅ Generated {generated_count} fake disaster images!")



for disaster_type in DISASTER_PROMPTS.keys():
    class_dir = FAKE_IMAGES_DIR / disaster_type
    count = len(list(class_dir.glob("*.jpg")))
    print(f"   {disaster_type}: {count} images")


print("\n📷 Sample Generated Images:")

import matplotlib.pyplot as plt

fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

idx = 0
for disaster_type in DISASTER_PROMPTS.keys():
    class_dir = FAKE_IMAGES_DIR / disaster_type
    images = list(class_dir.glob("*.jpg"))

    if images:

        for j in range(min(2, len(images))):
            img = Image.open(images[j])
            axes[idx].imshow(img)
            axes[idx].set_title(f"FAKE - {disaster_type}", fontweight='bold', color='red')
            axes[idx].axis('off')
            idx += 1

plt.suptitle("AI-Generated Fake Disaster Images", fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('outputs/fake_samples.png', dpi=150, bbox_inches='tight')
plt.show()

# ----- CLEANUP -----

del pipe
torch.cuda.empty_cache()

print("\n" + "="*60)
print("✅ STEP 1 COMPLETE!")
print("="*60)
print(f"\n📁 Fake images saved to: {FAKE_IMAGES_DIR}")
print(f"   Total: {generated_count} images")
print("\n👉 Now run STEP 2: Train Real/Fake Classifier")

In [None]:

import os
os.makedirs('outputs', exist_ok=True)


plt.savefig('outputs/fake_samples.png', dpi=150, bbox_inches='tight')
plt.show()

from pathlib import Path
FAKE_IMAGES_DIR = Path("data/fake_disaster_images")

print("\n✅ Fake images generated:")
for disaster_type in ['earthquake', 'flood', 'wildfire', 'hurricane']:
    class_dir = FAKE_IMAGES_DIR / disaster_type
    if class_dir.exists():
        count = len(list(class_dir.glob("*.jpg")))
        print(f"   {disaster_type}: {count} images")

del pipe
torch.cuda.empty_cache()


In [None]:

!pip install -q transformers timm

print("="*60)
print("📥 UPLOAD YOUR kaggle.json FILE")
print("="*60)

from google.colab import files

print("\n👉 Click 'Choose Files' and select your kaggle.json")
uploaded = files.upload()


!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


!mkdir -p data/raw/disaster_images
!kaggle datasets download -d alex1994/natural-disaster-image-dataset -p data/raw/
!unzip -q -o data/raw/natural-disaster-image-dataset.zip -d data/raw/disaster_images/

print("\n Dataset downloaded!")


print("\n Verifying images...")

from pathlib import Path

REAL_IMAGES_DIR = Path("data/raw/disaster_images/natural_disaster_dataset/train")

print("\n📊 Real images found:")
for class_dir in sorted(REAL_IMAGES_DIR.iterdir()):
    if class_dir.is_dir():
        count = len(list(class_dir.glob("*.*")))
        print(f"   {class_dir.name}: {count} images")

import os
os.makedirs('outputs', exist_ok=True)
os.makedirs('models', exist_ok=True)


In [None]:

from pathlib import Path

real_path = Path("data/raw/disaster_images/natural_disaster_dataset/train")
print("📷 REAL images:")
if real_path.exists():
    for d in sorted(real_path.iterdir()):
        if d.is_dir():
            print(f"   {d.name}: {len(list(d.glob('*.*')))} images")
else:
    print("   ❌ Not found!")

fake_path = Path("data/fake_disaster_images")
print("\n🎨 FAKE images:")
if fake_path.exists():
    for d in sorted(fake_path.iterdir()):
        if d.is_dir():
            print(f"   {d.name}: {len(list(d.glob('*.jpg')))} images")
else:
    print("   ❌ Not found!")

In [None]:


import shutil
from pathlib import Path


real_path = Path("data/raw/disaster_images/natural_disaster_dataset/train")
cyclone_path = real_path / "cyclone"
hurricane_path = real_path / "hurricane"

if cyclone_path.exists() and not hurricane_path.exists():

    hurricane_path.symlink_to(cyclone_path)
    print("✅ Created 'hurricane' link to 'cyclone' folder")
else:
    print("✅ Paths already aligned")

print("\n👉 Now run STEP 2!")

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.optim import AdamW
from torchvision import transforms, models
import timm
from PIL import Image
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score
import matplotlib.pyplot as plt
import seaborn as sns
import random
import warnings
warnings.filterwarnings('ignore')


DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"🖥️ Device: {DEVICE}")


REAL_IMAGES_DIR = Path("data/raw/disaster_images/natural_disaster_dataset/train")
FAKE_IMAGES_DIR = Path("data/fake_disaster_images")

os.makedirs('models', exist_ok=True)
os.makedirs('outputs', exist_ok=True)


print("\n📂 Loading image paths...")

def load_image_paths(directory, label, limit_per_class=None):

    data = []

    for class_dir in directory.iterdir():
        if not class_dir.is_dir():
            continue

        images = list(class_dir.glob("*.jpg")) + list(class_dir.glob("*.jpeg")) + list(class_dir.glob("*.png"))

        if limit_per_class:
            images = images[:limit_per_class]

        for img_path in images:
            data.append({
                'image_path': str(img_path),
                'label': label,  # 0 = fake, 1 = real
                'disaster_type': class_dir.name
            })

    return data


real_data = load_image_paths(REAL_IMAGES_DIR, label=1, limit_per_class=120)
print(f"   Real images: {len(real_data)}")


fake_data = load_image_paths(FAKE_IMAGES_DIR, label=0, limit_per_class=None)
print(f"   Fake images: {len(fake_data)}")


all_data = real_data + fake_data
random.shuffle(all_data)
df = pd.DataFrame(all_data)

print(f"\n📊 Total dataset: {len(df)} images")
print(f"   Real: {len(df[df['label']==1])} ({len(df[df['label']==1])/len(df)*100:.1f}%)")
print(f"   Fake: {len(df[df['label']==0])} ({len(df[df['label']==0])/len(df)*100:.1f}%)")


IMAGE_SIZE = 224

train_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE + 32, IMAGE_SIZE + 32)),
    transforms.RandomCrop(IMAGE_SIZE),
    transforms.RandomHorizontalFlip(0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


class RealFakeDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.df = dataframe.reset_index(drop=True)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        try:
            image = Image.open(row['image_path']).convert('RGB')
            if self.transform:
                image = self.transform(image)
        except Exception as e:

            image = torch.zeros(3, IMAGE_SIZE, IMAGE_SIZE)

        label = torch.tensor(row['label'], dtype=torch.long)

        return image, label


print("\n📊 Splitting dataset...")

from sklearn.model_selection import train_test_split

train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['label'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42, stratify=temp_df['label'])

print(f"   Train: {len(train_df)}")
print(f"   Val:   {len(val_df)}")
print(f"   Test:  {len(test_df)}")


train_dataset = RealFakeDataset(train_df, train_transform)
val_dataset = RealFakeDataset(val_df, val_transform)
test_dataset = RealFakeDataset(test_df, val_transform)


BATCH_SIZE = 32

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

print(f"\n✅ DataLoaders created (batch_size={BATCH_SIZE})")


print("\n🤖 Creating Real/Fake Classifier...")

class RealFakeClassifier(nn.Module):
    """
    CNN classifier for detecting AI-generated images.
    Uses EfficientNet-B0 backbone with custom head.
    """
    def __init__(self, num_classes=2):
        super().__init__()


        self.backbone = timm.create_model('efficientnet_b0', pretrained=True, num_classes=0)


        self.classifier = nn.Sequential(
            nn.Linear(1280, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        features = self.backbone(x)
        output = self.classifier(features)
        return output

    def predict_proba(self, x):
        """Get probability scores"""
        with torch.no_grad():
            logits = self.forward(x)
            probs = F.softmax(logits, dim=1)
        return probs

model = RealFakeClassifier(num_classes=2)
model = model.to(DEVICE)


total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"   Total parameters: {total_params:,}")
print(f"   Trainable: {trainable_params:,}")

# ----- TRAINING SETUP -----
NUM_EPOCHS = 10
LEARNING_RATE = 1e-4

criterion = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS)

scaler = torch.cuda.amp.GradScaler()

print(f"\n⚙️ Training config:")
print(f"   Epochs: {NUM_EPOCHS}")
print(f"   Learning Rate: {LEARNING_RATE}")


def train_epoch(model, loader, optimizer, criterion, scaler):
    model.train()
    total_loss = 0
    all_preds, all_labels = [], []

    for images, labels in tqdm(loader, desc="Training"):
        images, labels = images.to(DEVICE), labels.to(DEVICE)

        optimizer.zero_grad()

        with torch.cuda.amp.autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()
        preds = torch.argmax(outputs, dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    return total_loss / len(loader), acc

def validate(model, loader, criterion):
    model.eval()
    total_loss = 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for images, labels in tqdm(loader, desc="Validating"):
            images, labels = images.to(DEVICE), labels.to(DEVICE)

            outputs = model(images)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return total_loss / len(loader), acc, f1, all_preds, all_labels




history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': [], 'val_f1': []}
best_val_acc = 0

for epoch in range(NUM_EPOCHS):
    print(f"\n📅 Epoch {epoch+1}/{NUM_EPOCHS}")

    train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, scaler)
    val_loss, val_acc, val_f1, _, _ = validate(model, val_loader, criterion)

    scheduler.step()

    print(f"   Train Loss: {train_loss:.4f} | Acc: {train_acc*100:.2f}%")
    print(f"   Val Loss:   {val_loss:.4f} | Acc: {val_acc*100:.2f}% | F1: {val_f1*100:.2f}%")

    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['train_acc'].append(train_acc)
    history['val_acc'].append(val_acc)
    history['val_f1'].append(val_f1)


    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            'model_state_dict': model.state_dict(),
            'val_acc': val_acc,
            'val_f1': val_f1,
            'epoch': epoch
        }, 'models/real_fake_classifier.pt')
        print(f"   ✅ Saved best model! (Acc: {val_acc*100:.2f}%)")

print(f"\n🏆 Best Val Accuracy: {best_val_acc*100:.2f}%")


print("\n" + "="*60)
print("🧪 EVALUATING ON TEST SET")
print("="*60)


checkpoint = torch.load('models/real_fake_classifier.pt')
model.load_state_dict(checkpoint['model_state_dict'])

test_loss, test_acc, test_f1, test_preds, test_labels = validate(model, test_loader, criterion)

print(f"\n📊 Test Results:")
print(f"   Accuracy: {test_acc*100:.2f}%")
print(f"   F1 Score: {test_f1*100:.2f}%")


print("\n📋 Classification Report:")
print(classification_report(test_labels, test_preds, target_names=['FAKE', 'REAL'], digits=4))


cm = confusion_matrix(test_labels, test_preds)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='RdYlGn',
            xticklabels=['FAKE', 'REAL'],
            yticklabels=['FAKE', 'REAL'])
plt.title('Confusion Matrix - Real vs Fake', fontsize=14, fontweight='bold')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.tight_layout()
plt.savefig('outputs/confusion_matrix_real_fake.png', dpi=150)
plt.show()


fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(history['train_loss'], label='Train')
axes[0].plot(history['val_loss'], label='Val')
axes[0].set_title('Loss', fontweight='bold')
axes[0].set_xlabel('Epoch')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].plot([x*100 for x in history['train_acc']], label='Train')
axes[1].plot([x*100 for x in history['val_acc']], label='Val')
axes[1].set_title('Accuracy (%)', fontweight='bold')
axes[1].set_xlabel('Epoch')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('outputs/training_history_real_fake.png', dpi=150)
plt.show()


import json

model_info = {
    'model': 'EfficientNet-B0 + Custom Head',
    'task': 'Real vs Fake Image Classification',
    'test_accuracy': float(test_acc),
    'test_f1': float(test_f1),
    'best_val_acc': float(best_val_acc),
    'classes': ['FAKE', 'REAL'],
}

with open('models/real_fake_model_info.json', 'w') as f:
    json.dump(model_info, f, indent=2)


print(f"\n📁 Files saved:")
print(f"   - models/real_fake_classifier.pt")
print(f"   - models/real_fake_model_info.json")
print(f"   - outputs/confusion_matrix_real_fake.png")
print(f"   - outputs/training_history_real_fake.png")

In [None]:


import re
import pandas as pd
import folium
from geopy.geocoders import Nominatim
import time


def clean_tweet(text):
    """Clean tweet text"""
    if not text or pd.isna(text):
        return ""

    text = str(text)
    text = text.replace('â€™', "'").replace('â€œ', '"').replace('â€', '"')
    text = re.sub(r'http\S+|www\S+|https\S+', '', text)
    text = re.sub(r'@\w+', '', text)
    text = re.sub(r'\bRT\b', '', text)
    text = re.sub(r'#(\w+)', r'\1', text)
    text = ' '.join(text.split())
    return text.strip()

print("✅ Text cleaning ready!")


class DisasterClassifier:
    """Classifies disaster type from text"""

    def __init__(self):
        self.keywords = {
            'earthquake': {
                'primary': ['earthquake', 'quake', 'seismic', 'tremor', 'magnitude', 'richter'],
                'secondary': ['shaking', 'aftershock', 'fault', 'epicenter', 'collapsed']
            },
            'flood': {
                'primary': ['flood', 'flooding', 'flash flood', 'submerged', 'inundation'],
                'secondary': ['water level', 'rising water', 'overflow', 'dam', 'waterlogged']
            },
            'wildfire': {
                'primary': ['wildfire', 'fire', 'blaze', 'burning', 'flames', 'inferno'],
                'secondary': ['smoke', 'evacuation', 'firefighter', 'forest fire', 'brush fire']
            },
            'hurricane': {
                'primary': ['hurricane', 'cyclone', 'typhoon', 'tropical storm', 'storm surge'],
                'secondary': ['wind', 'landfall', 'category', 'eye of storm', 'coastal']
            }
        }

    def classify(self, text):
        text_lower = text.lower()
        scores = {}

        for disaster, kws in self.keywords.items():
            score = sum(3 for kw in kws['primary'] if kw in text_lower)
            score += sum(1 for kw in kws['secondary'] if kw in text_lower)
            scores[disaster] = score

        max_disaster = max(scores, key=scores.get)
        max_score = scores[max_disaster]

        if max_score == 0:
            return {'disaster_type': 'unknown', 'confidence': 0.25, 'scores': scores}

        total = sum(scores.values())
        confidence = max_score / total if total > 0 else 0.25

        if max_score >= 6:
            confidence = min(0.95, confidence + 0.2)
        elif max_score >= 3:
            confidence = min(0.85, confidence + 0.1)

        return {'disaster_type': max_disaster, 'confidence': round(confidence, 2), 'scores': scores}

disaster_classifier = DisasterClassifier()
print("✅ Disaster classifier ready!")


def estimate_severity(text):
    """Estimate severity from text"""
    text_lower = text.lower()

    high_kws = ['catastrophic', 'devastating', 'massive', 'destroyed', 'deaths',
                'casualties', 'collapsed', 'critical', 'emergency', 'urgent',
                'evacuation', 'fatalities', 'killed', 'trapped', 'thousands']

    medium_kws = ['damage', 'injured', 'warning', 'alert', 'spreading', 'affected']

    low_kws = ['minor', 'small', 'slight', 'contained', 'under control', 'safe']

    high_count = sum(1 for kw in high_kws if kw in text_lower)
    low_count = sum(1 for kw in low_kws if kw in text_lower)

    if high_count >= 2:
        return 'HIGH', min(0.9, 0.6 + high_count * 0.1)
    elif low_count >= 1 and high_count == 0:
        return 'LOW', 0.7
    else:
        return 'MEDIUM', 0.65

print("✅ Severity estimator ready!")

print("\n📍 Setting up geolocation...")

geolocator = Nominatim(user_agent="disasterscope_ai_v2")

KNOWN_LOCATIONS = {
    'california': {'lat': 36.7783, 'lng': -119.4179, 'name': 'California, USA'},
    'los angeles': {'lat': 34.0522, 'lng': -118.2437, 'name': 'Los Angeles, USA'},
    'san francisco': {'lat': 37.7749, 'lng': -122.4194, 'name': 'San Francisco, USA'},
    'new york': {'lat': 40.7128, 'lng': -74.0060, 'name': 'New York, USA'},
    'texas': {'lat': 31.9686, 'lng': -99.9018, 'name': 'Texas, USA'},
    'houston': {'lat': 29.7604, 'lng': -95.3698, 'name': 'Houston, USA'},
    'florida': {'lat': 27.6648, 'lng': -81.5158, 'name': 'Florida, USA'},
    'miami': {'lat': 25.7617, 'lng': -80.1918, 'name': 'Miami, USA'},
    'japan': {'lat': 36.2048, 'lng': 138.2529, 'name': 'Japan'},
    'tokyo': {'lat': 35.6762, 'lng': 139.6503, 'name': 'Tokyo, Japan'},
    'india': {'lat': 20.5937, 'lng': 78.9629, 'name': 'India'},
    'delhi': {'lat': 28.6139, 'lng': 77.2090, 'name': 'Delhi, India'},
    'mumbai': {'lat': 19.0760, 'lng': 72.8777, 'name': 'Mumbai, India'},
    'philippines': {'lat': 12.8797, 'lng': 121.7740, 'name': 'Philippines'},
    'australia': {'lat': -25.2744, 'lng': 133.7751, 'name': 'Australia'},
    'indonesia': {'lat': -0.7893, 'lng': 113.9213, 'name': 'Indonesia'},
    'china': {'lat': 35.8617, 'lng': 104.1954, 'name': 'China'},
    'turkey': {'lat': 38.9637, 'lng': 35.2433, 'name': 'Turkey'},
    'nepal': {'lat': 28.3949, 'lng': 84.1240, 'name': 'Nepal'},
    'haiti': {'lat': 18.9712, 'lng': -72.2852, 'name': 'Haiti'},
    'chile': {'lat': -35.6751, 'lng': -71.5430, 'name': 'Chile'},
}

def extract_location(text):
    """Extract location from text with geocoding"""
    text_lower = text.lower()

    
    for loc_key, loc_data in KNOWN_LOCATIONS.items():
        if loc_key in text_lower:
            return {
                'found': True,
                'name': loc_data['name'],
                'lat': loc_data['lat'],
                'lng': loc_data['lng'],
                'confidence': 0.9
            }


    patterns = [
        r'(?:in|at|near|hits|strikes|devastates)\s+([A-Z][a-zA-Z\s]+?)(?:[,.\!\?]|$)',
        r'([A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)?)\s+(?:earthquake|flood|fire|hurricane)',
    ]

    for pattern in patterns:
        matches = re.findall(pattern, text)
        if matches:
            location_name = matches[0].strip()
            try:
                location = geolocator.geocode(location_name, timeout=5)
                if location:
                    return {
                        'found': True,
                        'name': location.address.split(',')[0],
                        'lat': location.latitude,
                        'lng': location.longitude,
                        'confidence': 0.75
                    }
            except:
                pass

    return {'found': False, 'name': None, 'lat': None, 'lng': None, 'confidence': 0}




def create_disaster_map(location, disaster_type, severity, save_path='outputs/disaster_map.html'):
    """
    Create interactive map with:
    - RED zone at epicenter (critical)
    - ORANGE zone around it (warning)
    - YELLOW zone outer (caution)
    """
    if not location['found']:
        print("⚠️ No location found - cannot create map")
        return None

    lat, lng = location['lat'], location['lng']


    m = folium.Map(location=[lat, lng], zoom_start=10)


    zone_sizes = {
        'HIGH': {'red': 5000, 'orange': 15000, 'yellow': 30000},
        'MEDIUM': {'red': 3000, 'orange': 10000, 'yellow': 20000},
        'LOW': {'red': 1000, 'orange': 5000, 'yellow': 10000},
    }
    sizes = zone_sizes.get(severity, zone_sizes['MEDIUM'])


    folium.Circle(
        location=[lat, lng],
        radius=sizes['yellow'],
        color='#FFD700',
        fill=True,
        fill_color='yellow',
        fill_opacity=0.2,
        popup='⚠️ Caution Zone'
    ).add_to(m)


    folium.Circle(
        location=[lat, lng],
        radius=sizes['orange'],
        color='orange',
        fill=True,
        fill_color='orange',
        fill_opacity=0.3,
        popup='🟠 Warning Zone'
    ).add_to(m)


    folium.Circle(
        location=[lat, lng],
        radius=sizes['red'],
        color='red',
        fill=True,
        fill_color='red',
        fill_opacity=0.4,
        popup='🔴 Critical Zone - Immediate Danger'
    ).add_to(m)


    popup_html = f"""
    <div style="width: 200px; font-family: Arial;">
        <h4 style="color: red; margin: 0;">🚨 {disaster_type.upper()}</h4>
        <hr style="margin: 5px 0;">
        <p><b>📍 Location:</b> {location['name']}</p>
        <p><b>⚠️ Severity:</b> <span style="color: {'red' if severity=='HIGH' else 'orange' if severity=='MEDIUM' else 'green'};">{severity}</span></p>
        <p><b>📐 Coordinates:</b><br>{lat:.4f}, {lng:.4f}</p>
    </div>
    """

    folium.Marker(
        location=[lat, lng],
        popup=folium.Popup(popup_html, max_width=250),
        icon=folium.Icon(color='red', icon='exclamation-triangle', prefix='fa'),
        tooltip=f"🚨 {disaster_type.upper()} - Click for details"
    ).add_to(m)


    title_html = f'''
    <div style="position: fixed; top: 10px; left: 50px;
                background: white; padding: 10px 15px;
                border: 3px solid red; border-radius: 10px;
                z-index: 9999; font-family: Arial;">
        <h3 style="margin: 0; color: red;">🚨 DisasterScope AI Alert</h3>
        <p style="margin: 5px 0 0 0;"><b>{disaster_type.upper()}</b> detected in <b>{location['name']}</b></p>
        <p style="margin: 0; color: {'red' if severity=='HIGH' else 'orange'};">Severity: {severity}</p>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(title_html))

    legend_html = '''
    <div style="position: fixed; bottom: 30px; right: 30px;
                background: white; padding: 10px;
                border: 2px solid gray; border-radius: 8px;
                z-index: 9999; font-family: Arial; font-size: 12px;">
        <b>Legend</b><br>
        🔴 Critical Zone<br>
        🟠 Warning Zone<br>
        🟡 Caution Zone
    </div>
    '''
    m.get_root().html.add_child(folium.Element(legend_html))


    import os
    os.makedirs('outputs', exist_ok=True)
    m.save(save_path)
    print(f"✅ Map saved to: {save_path}")

    return m

print("✅ Map generator ready!")

def generate_alert(disaster_type, severity, location_name):
    """Generate alert message"""

    alerts = {
        'HIGH': {
            'emoji': '🚨',
            'level': 'CRITICAL',
            'message': f"CRITICAL: {disaster_type.upper()} in {location_name}! Immediate action required!",
            'action': 'Evacuate immediately. Follow emergency services instructions.'
        },
        'MEDIUM': {
            'emoji': '⚠️',
            'level': 'WARNING',
            'message': f"WARNING: {disaster_type.upper()} reported in {location_name}. Stay alert!",
            'action': 'Prepare emergency supplies. Monitor official channels.'
        },
        'LOW': {
            'emoji': 'ℹ️',
            'level': 'ADVISORY',
            'message': f"ADVISORY: Minor {disaster_type} activity in {location_name}.",
            'action': 'Stay informed. No immediate action needed.'
        }
    }

    return alerts.get(severity, alerts['MEDIUM'])

print("✅ Alert generator ready!")


def analyze_text(tweet_text):
    """Complete text analysis pipeline"""

    cleaned = clean_tweet(tweet_text)

    # Disaster type
    disaster_result = disaster_classifier.classify(cleaned)

    # Location
    location = extract_location(tweet_text)

    # Severity
    severity, severity_conf = estimate_severity(cleaned)

    # Alert
    loc_name = location['name'] if location['found'] else 'Unknown Location'
    alert = generate_alert(disaster_result['disaster_type'], severity, loc_name)

    return {
        'original_text': tweet_text,
        'cleaned_text': cleaned,
        'disaster_type': disaster_result['disaster_type'],
        'disaster_confidence': disaster_result['confidence'],
        'location': location,
        'severity': severity,
        'severity_confidence': severity_conf,
        'alert': alert
    }

print("\n Complete text analysis ready!")



test_tweets = [
    "BREAKING: Massive earthquake hits Tokyo, Japan! Buildings collapsed, thousands trapped!",
    "Devastating wildfire spreading near Los Angeles. 50,000 evacuated!",
    "Flash flood warning for Houston, Texas. Roads completely submerged!",
    "Hurricane makes landfall in Florida with Category 4 winds!",
    "Minor tremors felt in California. No damage reported.",
]

for i, tweet in enumerate(test_tweets, 1):
    result = analyze_text(tweet)

    print(f"\n{'='*50}")
    print(f"📝 Tweet {i}: {tweet[:50]}...")
    print(f"   🌪️ Disaster: {result['disaster_type'].upper()} ({result['disaster_confidence']*100:.0f}%)")
    print(f"   📍 Location: {result['location']['name'] if result['location']['found'] else 'Not found'}")
    print(f"   ⚠️ Severity: {result['severity']}")
    print(f"   {result['alert']['emoji']} {result['alert']['level']}: {result['alert']['message'][:50]}...")




result = analyze_text(test_tweets[0])
if result['location']['found']:
    disaster_map = create_disaster_map(
        result['location'],
        result['disaster_type'],
        result['severity']
    )



    from IPython.display import display, HTML
    display(disaster_map)





In [None]:
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification, pipeline
import torch
import os

LABELS = ["earthquake","flood","wildfire","hurricane","unknown"]

def _load_distilbert_model(model_path="models/distilbert_disaster.pt", device='cpu'):
    if os.path.exists(model_path):
        tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")
        model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=len(LABELS))
        state = torch.load(model_path, map_location=device)
        if isinstance(state, dict) and "state_dict" in state:
            model.load_state_dict(state["state_dict"])
        else:
            model.load_state_dict(state)
        model.to(device)
        model.eval()
        return tokenizer, model, device
    return None, None, None

tokenizer, distilbert_model, distil_device = _load_distilbert_model()

if distilbert_model is None:
    zero_shot = pipeline("zero-shot-classification", model="facebook/bart-large-mnli", device=0 if torch.cuda.is_available() else -1)
else:
    zero_shot = None

def classify_with_distilbert(text):
    if distilbert_model is not None:
        inputs = tokenizer(text, truncation=True, padding=True, return_tensors="pt").to(distil_device)
        with torch.no_grad():
            logits = distilbert_model(**inputs).logits
        probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
        idx = int(probs.argmax())
        label = LABELS[idx]
        confidence = float(probs[idx])
        scores = {LABELS[i]: float(probs[i]) for i in range(len(LABELS))}
        return {"disaster_type": label if label!="unknown" else "unknown", "confidence": round(confidence,2), "scores": scores}
    else:
        candidate_labels = ["earthquake","flood","wildfire","hurricane"]
        res = zero_shot(text, candidate_labels)
        lab = res["labels"][0] if res["labels"] else "unknown"
        conf = float(res["scores"][0]) if res["scores"] else 0.0
        scores = {label: float(score) for label, score in zip(res.get("labels",[]), res.get("scores",[]))}
        for c in candidate_labels:
            if c not in scores:
                scores[c] = 0.0
        return {"disaster_type": lab, "confidence": round(conf,2), "scores": scores}


In [None]:
import cv2
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from pathlib import Path


class DamageSeverityEstimator:
    """
    Estimates damage severity from disaster images using:
    1. Color analysis (fire=red/orange, flood=brown/blue)
    2. Texture analysis (destruction patterns)
    3. Edge density (collapsed structures)
    """

    def __init__(self):
        self.severity_levels = ['LOW', 'MEDIUM', 'HIGH']

    def analyze_colors(self, image):
        """Analyze disaster-related colors"""
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)


        fire_mask1 = cv2.inRange(hsv, np.array([0, 100, 100]), np.array([20, 255, 255]))
        fire_mask2 = cv2.inRange(hsv, np.array([160, 100, 100]), np.array([180, 255, 255]))
        fire_ratio = (np.sum(fire_mask1 > 0) + np.sum(fire_mask2 > 0)) / (image.shape[0] * image.shape[1])

        flood_mask = cv2.inRange(hsv, np.array([10, 50, 50]), np.array([30, 255, 200]))
        flood_ratio = np.sum(flood_mask > 0) / (image.shape[0] * image.shape[1])


        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        smoke_mask = cv2.inRange(gray, 100, 180)
        smoke_ratio = np.sum(smoke_mask > 0) / (image.shape[0] * image.shape[1])

        return {
            'fire_ratio': fire_ratio,
            'flood_ratio': flood_ratio,
            'smoke_ratio': smoke_ratio,
            'total_disaster_color': fire_ratio + flood_ratio + smoke_ratio
        }

    def analyze_destruction(self, image):
        """Analyze destruction patterns"""
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)


        edges = cv2.Canny(gray, 50, 150)
        edge_density = np.sum(edges > 0) / edges.size


        texture_var = np.var(gray) / 255.0


        laplacian = cv2.Laplacian(gray, cv2.CV_64F)
        laplacian_var = np.var(laplacian) / 10000.0

        return {
            'edge_density': edge_density,
            'texture_variance': texture_var,
            'detail_level': min(1.0, laplacian_var)
        }

    def estimate_severity(self, image_path):
        """Estimate damage severity from image"""
        try:

            if isinstance(image_path, str):
                image = cv2.imread(image_path)
            else:
                image = np.array(image_path)
                image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            if image is None:
                return {'severity': 'MEDIUM', 'confidence': 0.5, 'error': 'Could not load'}


            image = cv2.resize(image, (512, 512))

            color = self.analyze_colors(image)
            destruction = self.analyze_destruction(image)


            score = 0


            if color['total_disaster_color'] > 0.3:
                score += 3
            elif color['total_disaster_color'] > 0.15:
                score += 2
            elif color['total_disaster_color'] > 0.05:
                score += 1


            if destruction['edge_density'] > 0.2:
                score += 3
            elif destruction['edge_density'] > 0.1:
                score += 2
            elif destruction['edge_density'] > 0.05:
                score += 1


            if destruction['texture_variance'] > 0.15:
                score += 2
            elif destruction['texture_variance'] > 0.08:
                score += 1


            if score >= 6:
                severity = 'HIGH'
                confidence = min(0.9, 0.6 + score * 0.03)
            elif score >= 3:
                severity = 'MEDIUM'
                confidence = 0.65 + score * 0.02
            else:
                severity = 'LOW'
                confidence = 0.6


            damage_map = self._generate_damage_map(image)

            return {
                'severity': severity,
                'confidence': round(confidence, 2),
                'score': score,
                'color_analysis': color,
                'destruction_analysis': destruction,
                'damage_map': damage_map
            }

        except Exception as e:
            return {'severity': 'MEDIUM', 'confidence': 0.5, 'error': str(e)}

    def _generate_damage_map(self, image):
        """Generate damage intensity heatmap"""
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)


        edges = cv2.Canny(gray, 50, 150)


        kernel = np.ones((15, 15), np.uint8)
        damage_regions = cv2.dilate(edges, kernel, iterations=2)


        damage_map = damage_regions.astype(float) / 255.0
        damage_map = cv2.GaussianBlur(damage_map, (31, 31), 0)

        return damage_map


damage_estimator = DamageSeverityEstimator()
print("✅ Damage severity estimator ready!")

def create_damage_heatmap(image_path, severity_result, save_path=None):
    """
    Create visual damage heatmap overlay.
    Red = high damage, Yellow = medium, Green = low
    """

    if isinstance(image_path, str):
        original = cv2.imread(image_path)
        original = cv2.cvtColor(original, cv2.COLOR_BGR2RGB)
    else:
        original = np.array(image_path)

    original = cv2.resize(original, (512, 512))


    damage_map = severity_result.get('damage_map', np.zeros((512, 512)))

    colors = ['green', 'yellow', 'orange', 'red']
    cmap = mcolors.LinearSegmentedColormap.from_list('damage', colors)


    fig, axes = plt.subplots(1, 3, figsize=(15, 5))


    axes[0].imshow(original)
    axes[0].set_title('Original Image', fontweight='bold', fontsize=12)
    axes[0].axis('off')


    heatmap = axes[1].imshow(damage_map, cmap=cmap, vmin=0, vmax=1)
    axes[1].set_title('Damage Intensity Map', fontweight='bold', fontsize=12)
    axes[1].axis('off')
    plt.colorbar(heatmap, ax=axes[1], label='Damage Intensity', shrink=0.8)


    axes[2].imshow(original)
    axes[2].imshow(damage_map, cmap=cmap, alpha=0.5, vmin=0, vmax=1)
    severity = severity_result.get('severity', 'UNKNOWN')
    conf = severity_result.get('confidence', 0)
    color = 'red' if severity == 'HIGH' else 'orange' if severity == 'MEDIUM' else 'green'
    axes[2].set_title(f'Severity: {severity} ({conf*100:.0f}%)',
                      fontweight='bold', fontsize=12, color=color)
    axes[2].axis('off')

    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"✅ Saved to: {save_path}")

    plt.show()
    return fig

print("✅ Damage heatmap visualization ready!")


print("\n" + "="*60)
print("🧪 TESTING DAMAGE ESTIMATOR")
print("="*60)


test_dir = Path("data/raw/disaster_images/natural_disaster_dataset/train")

if test_dir.exists():
    print("\n📊 Testing on sample images from each class:\n")

    for disaster in ['earthquake', 'flood', 'wildfire', 'cyclone']:
        class_dir = test_dir / disaster
        if class_dir.exists():
            images = list(class_dir.glob("*.jpg"))[:1]
            if images:
                result = damage_estimator.estimate_severity(str(images[0]))
                sev_emoji = {'HIGH': '🔴', 'MEDIUM': '🟡', 'LOW': '🟢'}
                print(f"{disaster.upper():12} → {sev_emoji.get(result['severity'], '⚪')} {result['severity']} ({result['confidence']*100:.0f}%) | Score: {result.get('score', 'N/A')}")



sample_dir = test_dir / "wildfire"
if sample_dir.exists():
    sample_images = list(sample_dir.glob("*.jpg"))
    if sample_images:
        sample_path = str(sample_images[0])

        print(f"\n📷 Analyzing: {sample_path}")
        result = damage_estimator.estimate_severity(sample_path)

        print(f"\n📊 Analysis Results:")
        print(f"   Severity: {result['severity']} ({result['confidence']*100:.0f}%)")
        print(f"   Score: {result.get('score', 'N/A')}/8")
        print(f"\n   Color Analysis:")
        print(f"      Fire ratio: {result['color_analysis']['fire_ratio']*100:.1f}%")
        print(f"      Flood ratio: {result['color_analysis']['flood_ratio']*100:.1f}%")
        print(f"      Smoke ratio: {result['color_analysis']['smoke_ratio']*100:.1f}%")
        print(f"\n   Destruction Analysis:")
        print(f"      Edge density: {result['destruction_analysis']['edge_density']*100:.1f}%")
        print(f"      Texture variance: {result['destruction_analysis']['texture_variance']*100:.1f}%")

        print("\n🗺️ Generating damage heatmap...")
        create_damage_heatmap(sample_path, result, 'outputs/damage_heatmap_example.png')



In [None]:


import pickle
import os

os.makedirs('checkpoints', exist_ok=True)

checkpoint = {

    'real_fake_model_path': 'models/real_fake_classifier.pt',


    'config': {
        'DEVICE': str(DEVICE),
        'IMAGE_SIZE': 224,
        'KNOWN_LOCATIONS': KNOWN_LOCATIONS,
    }
}

with open('checkpoints/checkpoint_info.pkl', 'wb') as f:
    pickle.dump(checkpoint, f)

print("✅ Checkpoint saved!")
print("\n📁 Files to keep safe:")
print("   - models/real_fake_classifier.pt")
print("   - checkpoints/checkpoint_info.pkl")
print("   - data/fake_disaster_images/ (your generated images)")