In [9]:
from sklearn.svm import OneClassSVM
from tqdm import tqdm
import pandas as pd
import numpy as np
import torch
from torchvision import transforms, models
import torchvision.transforms as transforms
from torchvision.models import resnet50
from torchvision.datasets import ImageFolder
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from torchvision.models.resnet import ResNet50_Weights
import os
from PIL import Image
import h5py
from sklearn.metrics.pairwise import cosine_similarity

In [11]:
train_left_path = 'data/train/left'
train_right_path = 'data/train/right'
test_left_path = 'data/test/left'
test_right_path = 'data/test/right'
test_candidates = 'data/test_candidates.csv'

In [16]:
import os
import pandas as pd
import torch
import torchvision.transforms as transforms
import torchvision.models as models
from PIL import Image
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. 定义图像预处理流程
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 2. 加载预训练的DenseNet121模型并移除全连接层
densenet_model = models.densenet121(pretrained=True).features
densenet_model = densenet_model.to(device).eval()

# 3. 定义从图像中提取特征的函数
# 3. 定义从图像中提取特征的函数
def extract_features_densenet(image_path, model):
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    image = image.unsqueeze(0).to(device)
    with torch.no_grad():
        features = model(image)
    return features.cpu().numpy().squeeze()




def save_features_to_hdf5(image_folder, model, output_hdf5_path):
    image_list = [os.path.splitext(f)[0] for f in os.listdir(image_folder) if os.path.isfile(os.path.join(image_folder, f))]
    with h5py.File(output_hdf5_path, 'w') as f:
        for img_name in tqdm(image_list, desc="Extracting Features"):
            img_path = os.path.join(image_folder, f"{img_name}.jpg")
            if os.path.exists(img_path):
                features = extract_features_densenet(img_path, model)
                f.create_dataset(img_name, data=features)
            else:
                print(f"Warning: Image not found at path: {img_path}")




# 保存特征到 HDF5
save_features_to_hdf5(train_left_path, densenet_model, 'data/train_left_features_densenet.hdf5')
save_features_to_hdf5(train_right_path, densenet_model, 'data/train_right_features_densenet.hdf5')
save_features_to_hdf5(test_left_path, densenet_model, 'data/test_left_features_densenet.hdf5')
save_features_to_hdf5(test_right_path, densenet_model, 'data/test_right_features_densenet.hdf5')




Extracting Features: 100%|██████████| 2000/2000 [02:22<00:00, 14.01it/s]
Extracting Features: 100%|██████████| 2000/2000 [02:23<00:00, 13.99it/s]
Extracting Features: 100%|██████████| 2000/2000 [02:29<00:00, 13.40it/s]
Extracting Features: 100%|██████████| 2000/2000 [02:31<00:00, 13.19it/s]


In [21]:
import h5py
import pandas as pd

def load_features_from_hdf5(hdf5_path):
    with h5py.File(hdf5_path, 'r') as f:
        keys = list(f.keys())
        data = {}
        for key in keys:
            # 将数据平坦化并转换为一个一维数组
            flattened_data = f[key][()].flatten()
            data[key] = flattened_data
        # 从平坦数据创建DataFrame
        df = pd.DataFrame.from_dict(data, orient='index')
    return df

# 示例：从 HDF5 加载特征到 pandas DataFrame
train_left_features_df = load_features_from_hdf5('data/train_left_features_densenet.hdf5')
train_right_features_df = load_features_from_hdf5('data/train_right_features_densenet.hdf5')

# 现在 train_left_features_df 和 train_right_features_df 是包含特征的 pandas DataFrames，索引是图片的名字
test_left_features_df = load_features_from_hdf5('data/test_left_features_densenet.hdf5')
test_right_features_df = load_features_from_hdf5('data/test_right_features_densenet.hdf5')

train_pairs = pd.read_csv('data/train.csv', header=None, names=['left', 'right'])




In [46]:
import torch
import torch.nn as nn

class SiameseNetwork(nn.Module):
    def __init__(self, input_dim):
        super(SiameseNetwork, self).__init__()
        self.fc = nn.Linear(input_dim, 512)
        
    def forward_one(self, x):
        x = self.fc(x)
        return x

    def forward(self, input1, input2):
        output1 = self.forward_one(input1)
        output2 = self.forward_one(input2)
        return output1, output2


In [65]:
import torch.optim as optim
import torch.nn.functional as F

# 对比损失
class ContrastiveLoss(nn.Module):
    def __init__(self, margin):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        self.eps = 1e-9

    def forward(self, output1, output2, target, size_average=True):
        distances = (output2 - output1).pow(2).sum(1)  # squared distances
        losses = 0.5 * (target.float() * distances +
                        (1 + -1 * target).float() * F.relu(self.margin - (distances + self.eps).sqrt()).pow(2))
        return losses.mean() if size_average else losses.sum()


In [66]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SiameseNetwork(input_dim=train_left_features_df.shape[1]).to(device)
criterion = ContrastiveLoss(margin=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)




In [67]:

def extract_embeddings(df_features, model, device):
    embeddings = []
    image_to_index = {}
    for index, (image_name, features) in enumerate(df_features.iterrows()):
        tensor_features = torch.tensor(features.values, dtype=torch.float).unsqueeze(0).to(device)
        with torch.no_grad():
            embedding = model.forward_one(tensor_features)
        embeddings.append(embedding.cpu().numpy().flatten())
        image_to_index[image_name] = index
    return np.array(embeddings), image_to_index


In [69]:
def generate_embeddings(model, features_df):
    model.eval()  # Set the model to evaluation mode
    embeddings = {}
    
    with torch.no_grad():
        for img_id, features in features_df.iterrows():
            features_tensor = torch.tensor(features.values).float().to(device)
            embedding = model.forward_one(features_tensor.unsqueeze(0))
            embeddings[img_id] = embedding.cpu().numpy().flatten()
            
    return embeddings


In [70]:
train_embeddings_left, _ = extract_embeddings(train_left_features_df, model, device)
train_embeddings_right, image_to_index = extract_embeddings(train_right_features_df, model, device)
test_left_embeddings, _ = extract_embeddings(test_left_features_df, model, device)


In [76]:
# 检查不一致的图像名
print(set(test_left_features_df.index) - set(image_to_index.keys()))
print(set(image_to_index.keys()) - set(test_left_features_df.index))


True
False


In [75]:
from scipy.spatial.distance import cosine
def generate_embeddings(model, features_df):
    model.eval()  # Set the model to evaluation mode
    embeddings = {}
    
    with torch.no_grad():
        for img_id, features in features_df.iterrows():
            features_tensor = torch.tensor(features.values).float().to(device)
            embedding = model.forward_one(features_tensor.unsqueeze(0))
            embeddings[img_id] = embedding.cpu().numpy().flatten()
            
    return embeddings

# 4. 生成测试集嵌入
test_left_embeddings = generate_embeddings(model, test_left_features_df)

# 5. 比较嵌入，生成测试集相似度得分
def compare_test_embeddings(test_embeddings, train_embeddings, test_candidate_df, image_to_index):
    scores = []
    
    for idx, (left_img, *candidates) in test_candidate_df.iterrows():
       
        emb_left = test_embeddings[image_to_index[left_img]]

        scores_row = [left_img]
        for right_img in candidates:
            emb_index = image_to_index[right_img]
            emb_right = train_embeddings[emb_index]
            score = 1 - cosine(emb_left, emb_right)
            scores_row.append(score)
        scores.append(scores_row)

        print(test_left_features_df.head())
        print('abm' in test_left_features_df.index)

    return pd.DataFrame(scores, columns=test_candidate_df.columns)


test_candidate = pd.read_csv('data/test_candidates.csv')
# 计算相似度得分
similarity_scores_df = compare_test_embeddings(test_left_embeddings, train_embeddings_right, test_candidate, image_to_index)

# 保存到CSV
similarity_scores_df.to_csv('similarity_scores.csv', index=False)


KeyError: 'abm'