In [1]:
import json
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.models import resnet50
from PIL import Image
from gensim.models import KeyedVectors
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import make_pipeline
import numpy as np
from sklearn.metrics import mean_squared_error, accuracy_score
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image
from gensim.models import KeyedVectors

# Load pretrained ResNet
resnet = resnet50(pretrained=False)
resnet.load_state_dict(torch.load('/data1/dxw_data/llm/resnet/resnet50-19c8e357.pth'))
resnet = nn.Sequential(*list(resnet.children())[:-1])  # Remove the classification layer
resnet.eval()

word2vec_path = '/data1/dxw_data/llm/word2vec/GoogleNews-vectors-negative300.bin.gz'
word2vec_model  = KeyedVectors.load_word2vec_format(word2vec_path, binary=True)




In [3]:
# Preprocess transforms for images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Function to apply mask, save the merged image, and extract image features
def extract_imagemask_features(image_path, mask_path, cover_path):
    image = Image.open(image_path).convert('RGB')
    mask = Image.open(mask_path).convert('L')

    # Apply mask to the image
    image_np = np.array(image)
    mask_np = np.array(mask)
    masked_image_np = np.multiply(image_np, mask_np[:, :, None] / 255.0)
    masked_image = Image.fromarray(masked_image_np.astype(np.uint8))

    # Save the masked image
    masked_image_save_path = os.path.join(cover_path, os.path.basename(image_path))
    masked_image.save(masked_image_save_path)

    # Apply transformations
    masked_image = transform(masked_image).unsqueeze(0)

    # Extract features using ResNet
    with torch.no_grad():
        features = resnet(masked_image).squeeze().numpy()
    return features

# Ensure the cover directory exists
cover_path = '/data1/dxw_data/llm/redbook/1000/cover'
os.makedirs(cover_path, exist_ok=True)

# Function to extract image features
def extract_image_features(image_path):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)
    with torch.no_grad():
        features = resnet(image).squeeze().numpy()
    return features

# Function to extract text features
def extract_text_features(caption):
    words = caption.split()
    word_vectors = []
    for word in words:
        if word in word2vec_model:
            vector = word2vec_model[word]
            word_vectors.append(vector)
    if not word_vectors:
        return np.zeros(word2vec_model.vector_size)
    return np.mean(word_vectors, axis=0)

# Prepare dataset
image_features = []
text_features = []
mask_features= []
labels = []
times = []

# Load JSON file
with open('/data1/dxw_data/llm/redbook/1000/captions_with_hotness_and_time.json', 'r') as f:
    data = json.load(f)
    
for item in data:
    image_path = os.path.join('/data1/dxw_data/llm/redbook/1000/data2', item['image'])
    mask_path = os.path.join('/data1/dxw_data/llm/redbook/1000/processed2', item['image'])
    if os.path.exists(image_path) and os.path.exists(mask_path):
        mask_feat = extract_imagemask_features(image_path, mask_path, cover_path)
        img_feat = extract_image_features(image_path)
        txt_feat = extract_text_features(item['caption'])
        mask_features.append(mask_feat)
        image_features.append(img_feat)
        text_features.append(txt_feat)
        labels.append(item['hotness'])
        times.append(item['time'])

# Convert to numpy arrays
image_features = np.array(image_features)
mask_features = np.array(mask_features)
text_features = np.array(text_features)
labels = np.array(labels)
times = np.array(times)

In [4]:
# Combine image and text features
combined_features = np.hstack((mask_features, image_features))
combined_embeddings = np.hstack((combined_features, text_features))


# Device configuration
device = torch.device('cuda:7' if torch.cuda.is_available() else 'cpu')


In [5]:
# Function to create train and test splits
def create_time_based_splits(features, labels, times):
    train_indices = []
    test_indices = []
    
    unique_times = np.unique(times)
    for time in unique_times:
        indices = np.where(times == time)[0]
        num_samples = len(indices)
        train_indices.extend(indices[:num_samples - 1])
        test_indices.extend(indices[num_samples - 1:])
        
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Create training and testing datasets
X_train = combined_embeddings[train_indices]
y_train = labels[train_indices]
X_test = combined_embeddings[test_indices]
y_test = labels[test_indices]

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32).reshape(-1, 1, X_train.shape[1])
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32).reshape(-1, 1, X_test.shape[1])
y_test = torch.tensor(y_test, dtype=torch.float32)

class HotnessDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [6]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        lstm_out = lstm_out[:, -1, :]  # Take the output of the last time step
        out = self.fc(lstm_out)
        out = self.sigmoid(out)
        return out

input_dim = X_train.shape[2]
hidden_dim = 64
output_dim = 1

model = LSTMModel(input_dim, hidden_dim, output_dim).to(device)


In [8]:
# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')


Epoch [1/20], Loss: 0.2435
Epoch [2/20], Loss: 0.3424
Epoch [3/20], Loss: 0.1724
Epoch [4/20], Loss: 0.2410
Epoch [5/20], Loss: 0.1675
Epoch [6/20], Loss: 0.2122
Epoch [7/20], Loss: 0.1312
Epoch [8/20], Loss: 0.1531
Epoch [9/20], Loss: 0.1597
Epoch [10/20], Loss: 0.2196
Epoch [11/20], Loss: 0.1146
Epoch [12/20], Loss: 0.1200
Epoch [13/20], Loss: 0.1001
Epoch [14/20], Loss: 0.1266
Epoch [15/20], Loss: 0.1061
Epoch [16/20], Loss: 0.0691
Epoch [17/20], Loss: 0.0525
Epoch [18/20], Loss: 0.0994
Epoch [19/20], Loss: 0.0597
Epoch [20/20], Loss: 0.0495
Test Accuracy: 63.20%
