**Torchvision + BERT**

In [None]:
# prompt: mount google drive

from google.colab import drive
drive.mount('/content/drive')


In [None]:
import pandas as pd
import os

# Load the dataset

df = pd.read_csv('/content/drive/MyDrive/cleaned_dataset.csv')
print(df.shape)
print(df['Review Image'][0])

# Function to update the image path
def update_image_path(old_path):
    # Split the path using os.path.basename, handling both Windows and Unix-like paths
    filename = os.path.basename(old_path.replace('\\', '/'))  # Handle Windows backslashes
    # Construct the new path (update this to your new directory path)
    new_path = f'/content/drive/MyDrive/hotel images/{filename}'
    return new_path

# Apply the function to the 'Review Image' column
df['Review Image'] = df['Review Image'].apply(update_image_path)

# Save the updated dataset
df.to_csv('/content/drive/MyDrive/updated_cleaneddataset.csv', index=False)

print("Image paths updated successfully.")
print(df['Review Image'][0])


In [None]:
import pandas as pd
import os

# Load the dataset

df = pd.read_csv('/content/drive/MyDrive/updated_cleaneddataset.csv')
print(df.shape)
print(df['Review Image'][0])

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, BertModel
from torchvision import transforms
from PIL import Image
from torch.utils.data import Dataset, DataLoader

# Define the combined model
class CombinedModel(nn.Module):
    def __init__(self, text_dim, image_dim, combined_dim):
        super(CombinedModel, self).__init__()
        self.text_fc = nn.Linear(text_dim, combined_dim)
        self.image_fc = nn.Linear(image_dim, combined_dim)
        self.fc = nn.Linear(combined_dim * 2, 5)  # Output layer for 5 classes

    def forward(self, text_features, image_features):
        text_out = self.text_fc(text_features)
        image_out = self.image_fc(image_features)
        combined = torch.cat((text_out, image_out), dim=1)
        output = self.fc(combined)
        return output

# Define the custom dataset class
class ReviewDataset(Dataset):
    def __init__(self, texts, images, labels, tokenizer, transform=None):
        self.texts = texts
        self.images = images
        self.labels = labels
        self.tokenizer = tokenizer
        self.transform = transform
        self.bert_model = BertModel.from_pretrained('bert-base-uncased')

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # Process text
        text = self.texts[idx]
        if not isinstance(text, str):
            text = str(text)  # Ensure text is a string

        # Tokenize the text input
        inputs = self.tokenizer(text, return_tensors="pt", padding='max_length', truncation=True, max_length=512)
        input_ids = inputs['input_ids'].squeeze(0)
        attention_mask = inputs['attention_mask'].squeeze(0)

        # Extract text features using the BERT model
        with torch.no_grad():
            text_features = self.bert_model(input_ids=input_ids.unsqueeze(0), attention_mask=attention_mask.unsqueeze(0)).last_hidden_state.mean(dim=1)
        text_features = text_features.squeeze(0)

        # Process image
        image_path = self.images[idx]
        try:
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            image_features = image.view(-1)
        except FileNotFoundError:
            # Handle cases where the image file is missing or invalid
            print(f"Warning: Image not found for index {idx}, using default zero image")
            image_features = torch.zeros(self.image_feature_size)  # Assuming image_feature_size is defined

        # Get the label and convert it to a tensor
        label = torch.tensor(self.labels[idx])

        return text_features, image_features, label

# Define the transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Initialize the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define dimensions
text_dim = 768  # BERT output dimension
image_dim = 150528  # 224x224x3
combined_dim = 256

# Training function
# Training function with progress percentage
def train_model(train_loader, model, criterion, optimizer, scheduler, device, grad_clip_value, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        total_batches = len(train_loader)
        running_loss = 0.0
        for batch_idx, (text_features, image_features, labels) in enumerate(train_loader):
            text_features = text_features.to(device)
            image_features = image_features.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(text_features.float(), image_features.float())
            loss = criterion(outputs, labels)
            loss.backward()

            # Apply gradient clipping to avoid instability
            torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip_value)

            optimizer.step()
            scheduler.step(loss)  # Update learning rate based on loss

            # Calculate progress percentage
            progress = (batch_idx + 1) / total_batches * 100
            running_loss += loss.item()
            print(f"Epoch [{epoch+1}/{num_epochs}] Batch [{batch_idx+1}/{total_batches}] - Training Loss: {loss.item():.4f} | Progress: {progress:.2f}%")

        # Log the average loss for the epoch
        avg_loss = running_loss / total_batches
        print(f"Epoch [{epoch+1}/{num_epochs}] - Average Training Loss: {avg_loss:.4f}")

# Predict function
def predict(model, dataloader, device):
    model.eval()
    predictions = []
    with torch.no_grad():
        for text_features, image_features, _ in dataloader:
            text_features = text_features.to(device)
            image_features = image_features.to(device)
            output = model(text_features.float(), image_features.float())
            preds = torch.argmax(output, dim=1)  # Get class indices
            predictions.extend(preds.cpu().tolist())  # Convert to list and extend
    return predictions

# Calculate accuracy function
def calculate_accuracy(predictions, labels):
    correct = sum(1 for pred, actual in zip(predictions, labels) if pred == actual)
    total = len(labels)
    accuracy = correct / total * 100
    print(f"Accuracy: {accuracy:.2f}%")
    return accuracy

# Train-test split and model evaluation
def train_test_model(csv_file, tokenizer, transform, device):
    # Load the dataset
    data = pd.read_csv(csv_file, encoding='ISO-8859-1')
    texts = data['Review Text'].tolist()
    images = data['Review Image'].tolist()
    labels = LabelEncoder().fit_transform(data['Rating'].tolist())

    # Split the dataset into training and test sets (80:20)
    train_texts, test_texts, train_images, test_images, train_labels, test_labels = train_test_split(
        texts, images, labels, test_size=0.2, stratify=labels, random_state=42)

    # Create DataLoaders
    train_dataset = ReviewDataset(train_texts, train_images, train_labels, tokenizer, transform)
    test_dataset = ReviewDataset(test_texts, test_images, test_labels, tokenizer, transform)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)  #  batch size
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)  #  batch size

    # Initialize model, criterion, and optimizer
    model = CombinedModel(text_dim, image_dim, combined_dim)
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.01)

    # Learning rate scheduler (ReduceLR on plateau)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)

    # Gradient clipping value
    grad_clip_value = 1.0  # To handle exploding gradients

    # Train the model
    # Modify the number of epochs in the train_test_model function call
    train_model(train_loader, model, criterion, optimizer, scheduler, device, grad_clip_value, num_epochs=10)  # Specify the number of epochs


    # Evaluate the model
    predictions = predict(model, test_loader, device)
    accuracy = calculate_accuracy(predictions, test_labels)

    print(f"Test Accuracy: {accuracy:.2f}%")

# Example usage
device = torch.device('cuda')  # Specify GPU
train_test_model('/content/drive/MyDrive/updated_cleaneddataset.csv', tokenizer, transform, device)
