In [1]:
import json
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.models import resnet50
from PIL import Image
from gensim.models import KeyedVectors
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import make_pipeline
import numpy as np
from sklearn.metrics import mean_squared_error, accuracy_score
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image
from gensim.models import KeyedVectors

# Load pretrained ResNet
resnet = resnet50(pretrained=False)
resnet.load_state_dict(torch.load('/data1/dxw_data/llm/resnet/resnet50-19c8e357.pth'))
resnet = nn.Sequential(*list(resnet.children())[:-1])  # Remove the classification layer
resnet.eval()

word2vec_path = '/data1/dxw_data/llm/word2vec/GoogleNews-vectors-negative300.bin.gz'
word2vec_model  = KeyedVectors.load_word2vec_format(word2vec_path, binary=True)




In [2]:
# Preprocess transforms for images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Ensure the cover directory exists
cover_path = '/data1/dxw_data/llm/redbook/1000/cover'
os.makedirs(cover_path, exist_ok=True)

# Function to apply mask, save the merged image, and extract image features
def extract_imagemask_features(image_path, mask_path, cover_path):
    image = Image.open(image_path).convert('RGB')
    mask = Image.open(mask_path).convert('L')

    # Apply mask to the image
    image_np = np.array(image)
    mask_np = np.array(mask)
    masked_image_np = np.multiply(image_np, mask_np[:, :, None] / 255.0)
    masked_image = Image.fromarray(masked_image_np.astype(np.uint8))

    # Save the masked image
    masked_image_save_path = os.path.join(cover_path, os.path.basename(image_path))
    masked_image.save(masked_image_save_path)

    # Apply transformations
    masked_image = transform(masked_image).unsqueeze(0)

    # Extract features using ResNet
    with torch.no_grad():
        features = resnet(masked_image).squeeze().numpy()
    return features

# Function to extract image features
def extract_image_features(image_path):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)
    with torch.no_grad():
        features = resnet(image).squeeze().numpy()
    return features

# Function to extract text features
def extract_text_features(caption):
    words = caption.split()
    word_vectors = []
    for word in words:
        if word in word2vec_model:
            vector = word2vec_model[word]
            word_vectors.append(vector)
    if not word_vectors:
        return np.zeros(word2vec_model.vector_size)
    return np.mean(word_vectors, axis=0)

# Define the dataset class
class HotnessDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# Define the Transformer model class
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, hidden_dim, output_dim, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.input_embedding = nn.Linear(input_dim, hidden_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.input_embedding(x)
        x = x.permute(1, 0, 2)  # Transformer expects input of shape (seq_len, batch_size, hidden_dim)
        transformer_out = self.transformer_encoder(x)
        transformer_out = transformer_out.permute(1, 0, 2)  # Back to (batch_size, seq_len, hidden_dim)
        transformer_out = transformer_out[:, -1, :]  # Take the output of the last time step
        out = self.fc(transformer_out)
        out = self.sigmoid(out)
        return out

# Prepare dataset
image_features = []
text_features = []
mask_features = []
labels = []
times = []

# Load JSON file
with open('/data1/dxw_data/llm/MKT_data_mining/Multimodal/time_sequence/captions_with_hotness_and_time.json', 'r') as f:
    data = json.load(f)

for item in data:
    image_path = os.path.join('/data1/dxw_data/llm/redbook/1000/data2', item['image'])
    mask_path = os.path.join('/data1/dxw_data/llm/redbook/1000/processed2', item['image'])
    if os.path.exists(image_path) and os.path.exists(mask_path):
        mask_feat = extract_imagemask_features(image_path, mask_path, cover_path)
        img_feat = extract_image_features(image_path)
        txt_feat = extract_text_features(item['caption'])
        mask_features.append(mask_feat)
        image_features.append(img_feat)
        text_features.append(txt_feat)
        labels.append(item['hotness'])
        times.append(item['time'])

# Convert to numpy arrays
mask_features = np.array(mask_features)
image_features = np.array(image_features)
text_features = np.array(text_features)
labels = np.array(labels)
times = np.array(times)

In [3]:
# ----------- mask,image和text ----------- #
combined_features = np.hstack((mask_features, image_features))
combined_embeddings = np.hstack((combined_features, text_features))

In [4]:
# Encode time information
def encode_time(times, max_time):
    times = np.array(times)
    sin_time = np.sin(2 * np.pi * times / max_time)
    cos_time = np.cos(2 * np.pi * times / max_time)
    return np.vstack((sin_time, cos_time)).T

encoded_times = encode_time(times, max_time=1000)

def create_time_based_splits(features, labels, times):
    # Split based on time
    train_indices = np.where(times <= 800)[0]
    test_indices = np.where(times > 800)[0]
    
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Add encoded time information as features
time_train = encoded_times[train_indices]
time_test = encoded_times[test_indices]

# Concatenate encoded time information with features
X_train = np.hstack((combined_embeddings[train_indices], time_train))
X_test = np.hstack((combined_embeddings[test_indices], time_test))
y_train = labels[train_indices]
y_test = labels[test_indices]

# Function to create sliding windows
def create_sliding_windows(X, y, window_size):
    features = []
    labels = []
    for i in range(len(X) - window_size):
        window = X[i:i + window_size]
        label = y[i + window_size - 1]
        features.append(window)
        labels.append(label)
    return np.array(features), np.array(labels)

window_size = 10

# Create sliding windows for training and testing
X_train, y_train = create_sliding_windows(X_train, y_train, window_size)
X_test, y_test = create_sliding_windows(X_test, y_test, window_size)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Create datasets and dataloaders
train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = X_train.shape[2]
num_heads = 8
num_layers = 2
hidden_dim = 64
output_dim = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

Epoch [1/20], Loss: 0.6757
Epoch [2/20], Loss: 0.6674
Epoch [3/20], Loss: 0.6173
Epoch [4/20], Loss: 0.6343
Epoch [5/20], Loss: 0.6398
Epoch [6/20], Loss: 0.7228
Epoch [7/20], Loss: 0.6549
Epoch [8/20], Loss: 0.6442
Epoch [9/20], Loss: 0.6190
Epoch [10/20], Loss: 0.6565
Epoch [11/20], Loss: 0.7578
Epoch [12/20], Loss: 0.7891
Epoch [13/20], Loss: 0.6662
Epoch [14/20], Loss: 0.6926
Epoch [15/20], Loss: 0.7435
Epoch [16/20], Loss: 0.6827
Epoch [17/20], Loss: 0.6619
Epoch [18/20], Loss: 0.6216
Epoch [19/20], Loss: 0.6948
Epoch [20/20], Loss: 0.6048
Test Accuracy: 57.37%


In [3]:
# ----------- image和text ----------- #
combined_embeddings = np.hstack((image_features, text_features))

In [4]:
# Encode time information
def encode_time(times, max_time):
    times = np.array(times)
    sin_time = np.sin(2 * np.pi * times / max_time)
    cos_time = np.cos(2 * np.pi * times / max_time)
    return np.vstack((sin_time, cos_time)).T

encoded_times = encode_time(times, max_time=1000)

def create_time_based_splits(features, labels, times):
    # Split based on time
    train_indices = np.where(times <= 800)[0]
    test_indices = np.where(times > 800)[0]
    
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Add encoded time information as features
time_train = encoded_times[train_indices]
time_test = encoded_times[test_indices]

# Concatenate encoded time information with features
X_train = np.hstack((combined_embeddings[train_indices], time_train))
X_test = np.hstack((combined_embeddings[test_indices], time_test))
y_train = labels[train_indices]
y_test = labels[test_indices]

# Function to create sliding windows
def create_sliding_windows(X, y, window_size):
    features = []
    labels = []
    for i in range(len(X) - window_size):
        window = X[i:i + window_size]
        label = y[i + window_size - 1]
        features.append(window)
        labels.append(label)
    return np.array(features), np.array(labels)

window_size = 10

# Create sliding windows for training and testing
X_train, y_train = create_sliding_windows(X_train, y_train, window_size)
X_test, y_test = create_sliding_windows(X_test, y_test, window_size)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Create datasets and dataloaders
train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = X_train.shape[2]
num_heads = 8
num_layers = 2
hidden_dim = 64
output_dim = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

Epoch [1/20], Loss: 0.7512
Epoch [2/20], Loss: 0.6817
Epoch [3/20], Loss: 0.6514
Epoch [4/20], Loss: 0.6642
Epoch [5/20], Loss: 0.6549
Epoch [6/20], Loss: 0.6785
Epoch [7/20], Loss: 0.7633
Epoch [8/20], Loss: 0.6774
Epoch [9/20], Loss: 0.6885
Epoch [10/20], Loss: 0.6358
Epoch [11/20], Loss: 0.6739
Epoch [12/20], Loss: 0.7445
Epoch [13/20], Loss: 0.6660
Epoch [14/20], Loss: 0.6464
Epoch [15/20], Loss: 0.6967
Epoch [16/20], Loss: 0.6903
Epoch [17/20], Loss: 0.6725
Epoch [18/20], Loss: 0.6745
Epoch [19/20], Loss: 0.6715
Epoch [20/20], Loss: 0.6674
Test Accuracy: 57.37%


In [3]:
# ----------- mask和text ----------- #
combined_embeddings = np.hstack((mask_features, text_features))

In [4]:
# Encode time information
def encode_time(times, max_time):
    times = np.array(times)
    sin_time = np.sin(2 * np.pi * times / max_time)
    cos_time = np.cos(2 * np.pi * times / max_time)
    return np.vstack((sin_time, cos_time)).T

encoded_times = encode_time(times, max_time=1000)

def create_time_based_splits(features, labels, times):
    # Split based on time
    train_indices = np.where(times <= 800)[0]
    test_indices = np.where(times > 800)[0]
    
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Add encoded time information as features
time_train = encoded_times[train_indices]
time_test = encoded_times[test_indices]

# Concatenate encoded time information with features
X_train = np.hstack((combined_embeddings[train_indices], time_train))
X_test = np.hstack((combined_embeddings[test_indices], time_test))
y_train = labels[train_indices]
y_test = labels[test_indices]

# Function to create sliding windows
def create_sliding_windows(X, y, window_size):
    features = []
    labels = []
    for i in range(len(X) - window_size):
        window = X[i:i + window_size]
        label = y[i + window_size - 1]
        features.append(window)
        labels.append(label)
    return np.array(features), np.array(labels)

window_size = 10

# Create sliding windows for training and testing
X_train, y_train = create_sliding_windows(X_train, y_train, window_size)
X_test, y_test = create_sliding_windows(X_test, y_test, window_size)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Create datasets and dataloaders
train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = X_train.shape[2]
num_heads = 8
num_layers = 2
hidden_dim = 64
output_dim = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

Epoch [1/20], Loss: 0.5563
Epoch [2/20], Loss: 0.6271
Epoch [3/20], Loss: 0.6627
Epoch [4/20], Loss: 0.6625
Epoch [5/20], Loss: 0.6028
Epoch [6/20], Loss: 0.6110
Epoch [7/20], Loss: 0.5488
Epoch [8/20], Loss: 0.5587
Epoch [9/20], Loss: 0.4130
Epoch [10/20], Loss: 0.4359
Epoch [11/20], Loss: 0.5192
Epoch [12/20], Loss: 0.4280
Epoch [13/20], Loss: 0.4097
Epoch [14/20], Loss: 0.4087
Epoch [15/20], Loss: 0.1147
Epoch [16/20], Loss: 0.1139
Epoch [17/20], Loss: 0.4606
Epoch [18/20], Loss: 0.3291
Epoch [19/20], Loss: 0.3062
Epoch [20/20], Loss: 0.0923
Test Accuracy: 57.37%


In [7]:
# ----------- mask和image ----------- #
combined_embeddings = np.hstack((image_features, mask_features))

In [8]:
# Encode time information
def encode_time(times, max_time):
    times = np.array(times)
    sin_time = np.sin(2 * np.pi * times / max_time)
    cos_time = np.cos(2 * np.pi * times / max_time)
    return np.vstack((sin_time, cos_time)).T

encoded_times = encode_time(times, max_time=1000)

def create_time_based_splits(features, labels, times):
    # Split based on time
    train_indices = np.where(times <= 800)[0]
    test_indices = np.where(times > 800)[0]
    
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Add encoded time information as features
time_train = encoded_times[train_indices]
time_test = encoded_times[test_indices]

# Concatenate encoded time information with features
X_train = np.hstack((combined_embeddings[train_indices], time_train))
X_test = np.hstack((combined_embeddings[test_indices], time_test))
y_train = labels[train_indices]
y_test = labels[test_indices]

# Function to create sliding windows
def create_sliding_windows(X, y, window_size):
    features = []
    labels = []
    for i in range(len(X) - window_size):
        window = X[i:i + window_size]
        label = y[i + window_size - 1]
        features.append(window)
        labels.append(label)
    return np.array(features), np.array(labels)

window_size = 10

# Create sliding windows for training and testing
X_train, y_train = create_sliding_windows(X_train, y_train, window_size)
X_test, y_test = create_sliding_windows(X_test, y_test, window_size)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Create datasets and dataloaders
train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = X_train.shape[2]
num_heads = 8
num_layers = 2
hidden_dim = 64
output_dim = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

Epoch [1/20], Loss: 0.4576
Epoch [2/20], Loss: 0.6603
Epoch [3/20], Loss: 0.6817
Epoch [4/20], Loss: 0.7087
Epoch [5/20], Loss: 0.6514
Epoch [6/20], Loss: 0.6426
Epoch [7/20], Loss: 0.6345
Epoch [8/20], Loss: 0.5691
Epoch [9/20], Loss: 0.6018
Epoch [10/20], Loss: 0.4396
Epoch [11/20], Loss: 0.4114
Epoch [12/20], Loss: 0.4476
Epoch [13/20], Loss: 0.4564
Epoch [14/20], Loss: 0.4712
Epoch [15/20], Loss: 0.4537
Epoch [16/20], Loss: 0.5982
Epoch [17/20], Loss: 0.5311
Epoch [18/20], Loss: 0.5961
Epoch [19/20], Loss: 0.4227
Epoch [20/20], Loss: 0.4204
Test Accuracy: 59.47%


In [3]:
# ----------- 纯mask ----------- #
combined_embeddings = mask_features

In [4]:
# Encode time information
def encode_time(times, max_time):
    times = np.array(times)
    sin_time = np.sin(2 * np.pi * times / max_time)
    cos_time = np.cos(2 * np.pi * times / max_time)
    return np.vstack((sin_time, cos_time)).T

encoded_times = encode_time(times, max_time=1000)

def create_time_based_splits(features, labels, times):
    # Split based on time
    train_indices = np.where(times <= 800)[0]
    test_indices = np.where(times > 800)[0]
    
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Add encoded time information as features
time_train = encoded_times[train_indices]
time_test = encoded_times[test_indices]

# Concatenate encoded time information with features
X_train = np.hstack((combined_embeddings[train_indices], time_train))
X_test = np.hstack((combined_embeddings[test_indices], time_test))
y_train = labels[train_indices]
y_test = labels[test_indices]

# Function to create sliding windows
def create_sliding_windows(X, y, window_size):
    features = []
    labels = []
    for i in range(len(X) - window_size):
        window = X[i:i + window_size]
        label = y[i + window_size - 1]
        features.append(window)
        labels.append(label)
    return np.array(features), np.array(labels)

window_size = 10

# Create sliding windows for training and testing
X_train, y_train = create_sliding_windows(X_train, y_train, window_size)
X_test, y_test = create_sliding_windows(X_test, y_test, window_size)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Create datasets and dataloaders
train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = X_train.shape[2]
num_heads = 8
num_layers = 2
hidden_dim = 64
output_dim = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

Epoch [1/20], Loss: 0.6051
Epoch [2/20], Loss: 0.7161
Epoch [3/20], Loss: 0.7009
Epoch [4/20], Loss: 0.6164
Epoch [5/20], Loss: 0.8163
Epoch [6/20], Loss: 0.6027
Epoch [7/20], Loss: 0.7159
Epoch [8/20], Loss: 0.6818
Epoch [9/20], Loss: 0.7240
Epoch [10/20], Loss: 0.6713
Epoch [11/20], Loss: 0.7116
Epoch [12/20], Loss: 0.6614
Epoch [13/20], Loss: 0.6669
Epoch [14/20], Loss: 0.6743
Epoch [15/20], Loss: 0.7476
Epoch [16/20], Loss: 0.6702
Epoch [17/20], Loss: 0.7029
Epoch [18/20], Loss: 0.7988
Epoch [19/20], Loss: 0.7248
Epoch [20/20], Loss: 0.6695
Test Accuracy: 57.37%


In [3]:
# ----------- 纯image ----------- #
combined_embeddings = image_features

In [4]:
# Encode time information
def encode_time(times, max_time):
    times = np.array(times)
    sin_time = np.sin(2 * np.pi * times / max_time)
    cos_time = np.cos(2 * np.pi * times / max_time)
    return np.vstack((sin_time, cos_time)).T

encoded_times = encode_time(times, max_time=1000)

def create_time_based_splits(features, labels, times):
    # Split based on time
    train_indices = np.where(times <= 800)[0]
    test_indices = np.where(times > 800)[0]
    
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Add encoded time information as features
time_train = encoded_times[train_indices]
time_test = encoded_times[test_indices]

# Concatenate encoded time information with features
X_train = np.hstack((combined_embeddings[train_indices], time_train))
X_test = np.hstack((combined_embeddings[test_indices], time_test))
y_train = labels[train_indices]
y_test = labels[test_indices]

# Function to create sliding windows
def create_sliding_windows(X, y, window_size):
    features = []
    labels = []
    for i in range(len(X) - window_size):
        window = X[i:i + window_size]
        label = y[i + window_size - 1]
        features.append(window)
        labels.append(label)
    return np.array(features), np.array(labels)

window_size = 10

# Create sliding windows for training and testing
X_train, y_train = create_sliding_windows(X_train, y_train, window_size)
X_test, y_test = create_sliding_windows(X_test, y_test, window_size)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Create datasets and dataloaders
train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = X_train.shape[2]
num_heads = 8
num_layers = 2
hidden_dim = 64
output_dim = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

Epoch [1/20], Loss: 0.6885
Epoch [2/20], Loss: 0.6667
Epoch [3/20], Loss: 0.7730
Epoch [4/20], Loss: 0.7527
Epoch [5/20], Loss: 0.6590
Epoch [6/20], Loss: 0.6100
Epoch [7/20], Loss: 0.6585
Epoch [8/20], Loss: 0.6370
Epoch [9/20], Loss: 0.6894
Epoch [10/20], Loss: 0.6554
Epoch [11/20], Loss: 0.7128
Epoch [12/20], Loss: 0.7085
Epoch [13/20], Loss: 0.6904
Epoch [14/20], Loss: 0.6241
Epoch [15/20], Loss: 0.6981
Epoch [16/20], Loss: 0.6780
Epoch [17/20], Loss: 0.6315
Epoch [18/20], Loss: 0.6255
Epoch [19/20], Loss: 0.6781
Epoch [20/20], Loss: 0.7022
Test Accuracy: 57.37%


In [3]:
# ----------- 纯text ----------- #
combined_embeddings = text_features

In [4]:
# Encode time information
def encode_time(times, max_time):
    times = np.array(times)
    sin_time = np.sin(2 * np.pi * times / max_time)
    cos_time = np.cos(2 * np.pi * times / max_time)
    return np.vstack((sin_time, cos_time)).T

encoded_times = encode_time(times, max_time=1000)

def create_time_based_splits(features, labels, times):
    # Split based on time
    train_indices = np.where(times <= 800)[0]
    test_indices = np.where(times > 800)[0]
    
    return train_indices, test_indices

train_indices, test_indices = create_time_based_splits(combined_embeddings, labels, times)

# Add encoded time information as features
time_train = encoded_times[train_indices]
time_test = encoded_times[test_indices]

# Concatenate encoded time information with features
X_train = np.hstack((combined_embeddings[train_indices], time_train))
X_test = np.hstack((combined_embeddings[test_indices], time_test))
y_train = labels[train_indices]
y_test = labels[test_indices]

# Function to create sliding windows
def create_sliding_windows(X, y, window_size):
    features = []
    labels = []
    for i in range(len(X) - window_size):
        window = X[i:i + window_size]
        label = y[i + window_size - 1]
        features.append(window)
        labels.append(label)
    return np.array(features), np.array(labels)

window_size = 10

# Create sliding windows for training and testing
X_train, y_train = create_sliding_windows(X_train, y_train, window_size)
X_test, y_test = create_sliding_windows(X_test, y_test, window_size)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# Create datasets and dataloaders
train_dataset = HotnessDataset(X_train, y_train)
test_dataset = HotnessDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

input_dim = X_train.shape[2]
num_heads = 8
num_layers = 2
hidden_dim = 64
output_dim = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(input_dim, num_heads, num_layers, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

Epoch [1/20], Loss: 0.6607
Epoch [2/20], Loss: 0.6676
Epoch [3/20], Loss: 0.7149
Epoch [4/20], Loss: 0.6617
Epoch [5/20], Loss: 0.6973
Epoch [6/20], Loss: 0.5634
Epoch [7/20], Loss: 0.5553
Epoch [8/20], Loss: 0.5736
Epoch [9/20], Loss: 0.5220
Epoch [10/20], Loss: 0.5654
Epoch [11/20], Loss: 0.4082
Epoch [12/20], Loss: 0.4654
Epoch [13/20], Loss: 0.2523
Epoch [14/20], Loss: 0.3043
Epoch [15/20], Loss: 0.4653
Epoch [16/20], Loss: 0.4162
Epoch [17/20], Loss: 0.3319
Epoch [18/20], Loss: 0.4853
Epoch [19/20], Loss: 0.2706
Epoch [20/20], Loss: 0.2764
Test Accuracy: 72.63%
