In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import pandas as pd

df=pd.read_csv('/kaggle/input/20th-aug-images/updated_dataset.csv')

In [None]:
df.head()

In [None]:
def split_text_by_words(text):
    words = text.split()
    mid = len(words) // 2
    
    first_part = ' '.join(words[:mid])
    second_part = ' '.join(words[mid:])
    
    return first_part, second_part

df[['Part1', 'second_part']] = df['text'].apply(lambda x: pd.Series(split_text_by_words(x)))

In [None]:
df['Split_Headline_Responses']=str(df['Split_Headline_Responses'])

In [None]:
!pip install sentence_transformers

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from scipy.spatial.distance import cosine
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from transformers import BertTokenizer, BertModel
from sentence_transformers import SentenceTransformer
import torch
from tqdm import tqdm

# Load the USE model
use_embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")

# Load pre-trained BERT model and tokenizer
model_name = 'bert-base-uncased'
bert_tokenizer = BertTokenizer.from_pretrained(model_name)
bert_model = BertModel.from_pretrained(model_name)

# Load Sentence-BERT model
sbert_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# Function to get USE embedding
def get_use_embedding(texts):
    return use_embed(texts).numpy()

# Function to get BERT embedding
def get_bert_embedding(texts):
    inputs = bert_tokenizer(texts, return_tensors='pt', truncation=True, padding=True, max_length=512)
    with torch.no_grad():
        outputs = bert_model(**inputs)
    return outputs.last_hidden_state.mean(dim=1).squeeze().numpy()

# Function to get Sentence-BERT embedding
def get_sbert_embedding(texts):
    return sbert_model.encode(texts)

def calculate_similarity(emb1, emb2):
    return 1 - cosine(emb1, emb2)

# Function to process data in batches
def process_data_in_batches(df, batch_size=32):
    features = []
    for i in tqdm(range(0, len(df), batch_size)):
        batch = df.iloc[i:i+batch_size]
        
        use_emb1 = get_use_embedding(batch['second_part'].tolist())
        use_emb2 = get_use_embedding(batch['Split_Headline_Responses'].tolist())
        bert_emb1 = get_bert_embedding(batch['second_part'].tolist())
        bert_emb2 = get_bert_embedding(batch['Split_Headline_Responses'].tolist())
        sbert_emb1 = get_sbert_embedding(batch['second_part'].tolist())
        sbert_emb2 = get_sbert_embedding(batch['Split_Headline_Responses'].tolist())
        
        use_similarities = np.array([calculate_similarity(e1, e2) for e1, e2 in zip(use_emb1, use_emb2)])
        bert_similarities = np.array([calculate_similarity(e1, e2) for e1, e2 in zip(bert_emb1, bert_emb2)])
        sbert_similarities = np.array([calculate_similarity(e1, e2) for e1, e2 in zip(sbert_emb1, sbert_emb2)])
        
        batch_features = np.concatenate([use_emb1, use_emb2, bert_emb1, bert_emb2, sbert_emb1, sbert_emb2,
                                         use_similarities.reshape(-1, 1), 
                                         bert_similarities.reshape(-1, 1),
                                         sbert_similarities.reshape(-1, 1)], axis=1)
        features.append(batch_features)
    
    return np.concatenate(features, axis=0)

# Process data in batches
print("Processing data in batches...")
X = process_data_in_batches(df)
y = df['label'].values

In [None]:
df2=df

In [None]:
del df

In [None]:
import gc
gc.collect()

In [None]:
!pip install salesforce-lavis

In [None]:
df=df2

In [None]:
del df2

In [None]:
gc.collect()

In [None]:
df['path'] = df['path'].str.replace('/content/drive/MyDrive/20th_aug_capstone','/kaggle/input/20th-aug-images')

In [None]:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device set to:", device)

In [None]:
import gc
torch.cuda.empty_cache()
gc.collect()

In [None]:
from lavis.models import load_model_and_preprocess
model, vis_processors, txt_processors = load_model_and_preprocess(name="blip_feature_extractor", model_type="base", is_eval=True, device=device)

In [None]:
from PIL import Image
import pandas as pd

multimodal_embeddings = []
image_embeddings = []
text_embeddings = []

for index, row in df.iterrows():
    image_path = row['path']
    image = Image.open(image_path).convert("RGB")
    
    text = row['text']
    text_input = txt_processors["eval"](text)
    
    image_processed = vis_processors["eval"](image).unsqueeze(0).to(device)
    sample = {"image": image_processed, "text_input": [text_input]}
    
    multimodal_emb = model.extract_features(sample).multimodal_embeds[0,0,:] 
    image_emb = model.extract_features(sample, mode="image").image_embeds[0,0,:] 
    text_emb = model.extract_features(sample, mode="text").text_embeds[0,0,:]
    
    multimodal_embeddings.append(multimodal_emb.cpu().numpy())
    image_embeddings.append(image_emb.cpu().numpy())
    text_embeddings.append(text_emb.cpu().numpy())

df['Multimodal Embeddings'] = multimodal_embeddings
df['Image Embeddings'] = image_embeddings
df['Text Embeddings'] = text_embeddings
import numpy as np
multimodal_embeddings =np.array(multimodal_embeddings)
text_embeddings = np.array(text_embeddings)
image_embeddings = np.array(image_embeddings)

In [None]:
del df

In [None]:
gc.collect()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score, classification_report
import numpy as np

In [None]:
# Convert similarity metrics features and labels to PyTorch tensors
X_similarity = torch.tensor(X, dtype=torch.float32)
y_similarity = torch.tensor(y, dtype=torch.float32)

In [None]:
# Convert multimodal inputs to tensors (assuming these are numpy arrays)
multimodal_inputs = torch.tensor(multimodal_embeddings, dtype=torch.float32)
image_inputs = torch.tensor(image_embeddings, dtype=torch.float32)
text_inputs = torch.tensor(text_embeddings, dtype=torch.float32)

In [None]:
# Combine all inputs for the BLIP embeddings
combined_blip_inputs = torch.cat((multimodal_inputs, image_inputs, text_inputs), dim=1)

In [None]:
# Combine similarity metrics features and BLIP embeddings into one dataset
combined_inputs = torch.cat((combined_blip_inputs, X_similarity), dim=1)
labels_combined = torch.tensor(labels, dtype=torch.float32)  # Assuming the labels are the same for both parts

In [None]:
# Train-test-validation split
X_train, X_temp, y_train, y_temp = train_test_split(combined_inputs, labels_combined, test_size=0.3, random_state=2, stratify=labels_combined)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.6667, random_state=2, stratify=y_temp)

In [None]:
# Create DataLoaders
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Define the combined neural network for binary classification
class CombinedBinaryNet(nn.Module):
    def __init__(self, blip_input_size, similarity_input_size):
        super(CombinedBinaryNet, self).__init__()
        combined_input_size = blip_input_size + similarity_input_size
        self.fc1 = nn.Linear(combined_input_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 1)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = torch.sigmoid(self.fc3(x))
        return x.squeeze()

In [None]:
# Initialize the model
blip_input_size = combined_blip_inputs.shape[1]
similarity_input_size = X_similarity.shape[1]
model = CombinedBinaryNet(blip_input_size, similarity_input_size)

In [None]:
# Define loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Early stopping parameters
patience = 10
best_val_loss = float('inf')
counter = 0

In [None]:
# Train the model
num_epochs = 100
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_loss += loss.item()
    
    train_loss /= len(train_loader)
    val_loss /= len(val_loader)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
    
    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0
        torch.save(model.state_dict(), 'best_combined_model.pth')
    else:
        counter += 1
        if counter >= patience:
            print(f'Early stopping triggered after epoch {epoch+1}')
            break

In [None]:
# Load best model
model.load_state_dict(torch.load('best_combined_model.pth'))

In [None]:
# Evaluate the model
model.eval()
all_preds = []
all_targets = []

In [None]:
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        predicted = (outputs > 0.5).float()
        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())

In [None]:
# Convert to numpy arrays
all_preds = np.array(all_preds)
all_targets = np.array(all_targets)

In [None]:
# Compute and print accuracy
accuracy = accuracy_score(all_targets, all_preds)
print(f'Test Accuracy: {accuracy:.4f}')

In [None]:
# Compute and print F1 score
f1 = f1_score(all_targets, all_preds)
print(f'Test F1 Score: {f1:.4f}')

In [None]:
# Print classification report
print("Classification Report:")
print(classification_report(all_targets, all_preds))