Important imports and deviee setup

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import LongformerTokenizer, LongformerModel
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
import pandas as pd
import numpy as np
from tqdm import tqdm
import copy
import math

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(64)

Load the Saved Models - Choose one of the three

Run this cell to load Imelda

In [None]:
save_path = 'Training_results/Imelda_5foldcv_Longformer_Multihead/'
saved_model_paths = [os.path.join(save_path, f"best_longformer_model_fold_{i+1}.pth") for i in range(5)]
models = []

Run this cell to load Beryl

In [None]:
save_path = 'Training_results/Beryl_5foldcv_Longformer_Multihead/'
saved_model_paths = [os.path.join(save_path, f"best_longformer_model_fold_{i+1}.pth") for i in range(5)]
models = []

Run this cell to load Harvey

In [None]:
save_path = 'Training_results/Harvey_5foldcv_Longformer_Multihead/'
saved_model_paths = [os.path.join(save_path, f"best_longformer_model_fold_{i+1}.pth") for i in range(5)]
models = []

Declare Tokenizer and other important functions

In [None]:
tokenizer = LongformerTokenizer.from_pretrained('allenai/longformer-base-4096')
model_longformer = LongformerModel.from_pretrained('allenai/longformer-base-4096')

# Get the target list
target_list = [
    'homeOwnersInsurance', 'floodInsurance', 'destroyed', 'floodDamage', 'roofDamage', 
    'tsaEligible', 'tsaCheckedIn', 'rentalAssistanceEligible', 'repairAssistanceEligible', 
    'replacementAssistanceEligible', 'personalPropertyEligible'
]

def haversine(lat1, lon1, lat2, lon2):
    R = 3959  
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

def get_box_area(lat1, lon1, lat2, lon2):
    side1 = haversine(lat1, lon1, lat1, lon2)
    side2 = haversine(lat1, lon1, lat2, lon1)
    return side1 * side2

# Get the target list if not provided
def get_target_list(target_list=[]):
    if not target_list:
        target_list = [
            'homeOwnersInsurance', 'floodInsurance', 'destroyed', 'floodDamage', 'roofDamage', 
            'tsaEligible', 'tsaCheckedIn', 'rentalAssistanceEligible', 'repairAssistanceEligible', 
            'replacementAssistanceEligible', 'personalPropertyEligible'
        ]
    return target_list


Define DataSet class

In [None]:

# Define dataset class to handle tokenization and data loading
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        # Tokenizing the text
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.float32)
        }

class LongformerDeepMultiHeadClassifier(nn.Module):
    def __init__(self, num_targets=11, hidden_dim=256):
        super(LongformerDeepMultiHeadClassifier, self).__init__()
        self.longformer = model_longformer
        self.drop = nn.Dropout(0.3)
        
        self.heads = nn.ModuleList([
            nn.Sequential(
                nn.Linear(self.longformer.config.hidden_size, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, 1)
            ) for _ in range(num_targets)
        ])
    
    def forward(self, input_ids, attention_mask):
        outputs = self.longformer(input_ids=input_ids, attention_mask=attention_mask)
        # Longformer doesn't use pooler_output, so use mean pooling over the last hidden state
        sequence_output = outputs.last_hidden_state
        pooled_output = sequence_output.mean(dim=1)
        x = self.drop(pooled_output)
        return torch.cat([torch.sigmoid(head(x)) for head in self.heads], dim=1)


Load Trained Models and perform Evaluation

In [None]:
# Load all trained models
for path in saved_model_paths:
    model = LongformerDeepMultiHeadClassifier(num_targets=len(target_list)).to(device)
    model.load_state_dict(torch.load(path))
    model.eval()
    models.append(model)

size_threshold = 80

tweets2 = pd.read_csv('organized_with_zipcode.csv')  # read in massive tweets dataset
tweets_imelda = tweets2[(tweets2.storm_name == 'imelda')]
bboxes_useful = tweets_imelda.place_bbox.apply(lambda x: [[float(i.strip('(').strip(')')) for i in x.split(', ')][i] for i in [1,0,3,2]])
bbu_areas = bboxes_useful.apply(lambda x: get_box_area(*x))
tweets_imelda = tweets_imelda.loc[((tweets_imelda.geo.apply(lambda x: 'Point' in str(x))) | (bbu_areas < size_threshold)),:]  # since i'm using iloc i think the indices will match
tweet_grouped_imelda = tweets_imelda.groupby('zip_code')

targets_imelda = pd.read_csv('disaster_4466.csv')
target_grouped_imelda = targets_imelda.groupby('damagedZipCode')
    
tweet_dict = {int(name): group['text'] for name, group in tweet_grouped_imelda}
target_dict = {int(name): group[target_list] for name, group in target_grouped_imelda}

intersecting_zips = list(set(target_dict.keys()) & set(tweet_dict.keys()))
paired_data = {
    name: [target_dict[name].sum().apply(lambda x: 1 if x > 0 else 0), tweet_dict[name]] for name in intersecting_zips
}

# Prepare dataset for inference
texts = ['\n'.join(v[1].to_list()) for v in paired_data.values()]
labels_ = [v[0] for v in paired_data.values()]
zip_codes = list(paired_data.keys())

test_dataset = CustomDataset(texts, labels_, tokenizer, max_len=512)
test_loader = DataLoader(test_dataset, batch_size=24, shuffle=False)

# Store predictions
all_preds = []

with torch.no_grad():
    for batch in tqdm(test_loader, desc="Making predictions", unit="batch"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)

        fold_preds = []
        for model in models:
            outputs = model(input_ids, attention_mask)
            fold_preds.append(outputs.cpu().numpy())

        fold_preds = np.stack(fold_preds, axis=0)  # Shape: (num_models, batch_size, num_targets)
        all_preds.append(fold_preds)

# Convert list to numpy array
all_preds = np.concatenate(all_preds, axis=1)  # Shape: (num_models, total_samples, num_targets)
mean_preds = all_preds.mean(axis=0)  # Mean prediction probabilities (Shape: total_samples, num_targets)
binary_preds = (mean_preds > 0.5).astype(int)  # Convert probabilities to binary predictions

# Convert labels to numpy array
true_labels = np.vstack(labels_)

# Compute evaluation metrics
# metrics_dict = {'Target': target_list, 'Accuracy': [], 'F1_Score': [], 'Precision': [], 'Recall': []}
metrics_dict = {'Target': target_list, 'Accuracy': [], 'F1_Score': [], 'Precision': [], 'Recall': [], 'Support': []}

for i in range(len(target_list)):
    metrics_dict['Accuracy'].append(accuracy_score(true_labels[:, i], binary_preds[:, i]))
    metrics_dict['F1_Score'].append(f1_score(true_labels[:, i], binary_preds[:, i]))
    metrics_dict['Precision'].append(precision_score(true_labels[:, i], binary_preds[:, i]))
    metrics_dict['Recall'].append(recall_score(true_labels[:, i], binary_preds[:, i]))
    metrics_dict['Support'].append(int(true_labels[:, i].sum()))

# Compute and append average metrics
metrics_dict['Target'].append('Average')
metrics_dict['Accuracy'].append(np.mean(metrics_dict['Accuracy']))
metrics_dict['F1_Score'].append(np.mean(metrics_dict['F1_Score']))
metrics_dict['Precision'].append(np.mean(metrics_dict['Precision']))
metrics_dict['Recall'].append(np.mean(metrics_dict['Recall']))
metrics_dict['Support'].append('–')

# Convert to DataFrame and save
metrics_df = pd.DataFrame(metrics_dict)
metrics_df_path = os.path.join(save_path, "longformer_metrics_on_imelda.xlsx")
metrics_df.to_excel(metrics_df_path, index=False)

print(f"Metrics saved to {metrics_df_path}")