In [1]:
import torch
from PIL import Image
import open_clip
from open_clip import tokenizer
import subprocess
import os
import numpy as np

from transformers import BertTokenizer, BertForQuestionAnswering

  from .autonotebook import tqdm as notebook_tqdm


In [2]:

result = subprocess.run('bash -c "source /etc/network_turbo && env | grep proxy"', shell=True, capture_output=True, text=True)
output = result.stdout
for line in output.splitlines():
    if '=' in line:
        var, value = line.split('=', 1)
        os.environ[var] = value

In [3]:
# 判断并选择设备
def select_device():
       if torch.cuda.is_available():
              device = torch.device("cuda")  # 优先使用CUDA（NVIDIA GPU）
              print("Using CUDA (GPU)")
       elif torch.backends.mps.is_available():
              device = torch.device("mps")  # 如果CUDA不可用但MPS可用，使用MPS（Apple Silicon）
              print("Using MPS (Apple Silicon)")
       else:
              device = torch.device("cpu")  # 如果都不可用，使用CPU
              print("Using CPU")
       return device

device = select_device()

Using CUDA (GPU)


# openclip

In [23]:
clip_model, _, clip_preprocess = open_clip.create_model_and_transforms('ViT-B-32', pretrained='laion2b_s34b_b79k', device='cuda')
vit_tokenizer = open_clip.get_tokenizer('ViT-B-32')

In [24]:
clip_preprocess

Compose(
    Resize(size=224, interpolation=bicubic, max_size=None, antialias=True)
    CenterCrop(size=(224, 224))
    <function _convert_to_rgb at 0x7f4a597fa830>
    ToTensor()
    Normalize(mean=(0.48145466, 0.4578275, 0.40821073), std=(0.26862954, 0.26130258, 0.27577711))
)

In [25]:
clip_model.eval()
context_length = clip_model.context_length
vocab_size = clip_model.vocab_size

print("Model parameters:", f"{np.sum([int(np.prod(p.shape)) for p in clip_model.parameters()]):,}")
print("Context length:", context_length)
print("Vocab size:", vocab_size)

Model parameters: 151,277,313
Context length: 77
Vocab size: 49408


In [27]:
# vit_tokenizer.encode("Which object can be found in a jazz club")
from open_clip import tokenizer
tokenizer.tokenize("Which object can be found in a jazz club")

tensor([[49406,  1448, 14115,   753,   655,  1546,   530,   320,  4528,  1736,
         49407,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0]])

In [30]:
image = Image.open('data/KG_VQA/fvqa/exp_data/images/images/COCO_val2014_000000000136.jpg').convert("RGB")
image_input = clip_preprocess(image).unsqueeze(0).to(device)  # Unsqueeze 添加一个批次维度
text_tokens = tokenizer.tokenize("Which object can be found in a jazz club").to(device)

with torch.no_grad():
    image_features = clip_model.encode_image(image_input).float()
    text_features = clip_model.encode_text(text_tokens).float()
    
image_features.shape, text_features.shape

(torch.Size([1, 512]), torch.Size([1, 512]))

In [31]:

image = clip_preprocess(Image.open('data/KG_VQA/fvqa/exp_data/images/images/COCO_val2014_000000000136.jpg')).unsqueeze(0).to(device)
text = vit_tokenizer(["a diagram", "a dog", "a cat"]).to(device)

with torch.no_grad(), torch.cuda.amp.autocast():
    image_features = clip_model.encode_image(image)
    text_features = clip_model.encode_text(text)
    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)

print("Label probs:", text_probs)  # prints: [[1., 0., 0.]]

Label probs: tensor([[0.3620, 0.4722, 0.1658]], device='cuda:0')


# BertForQuestionAnswering

In [17]:
# 加载预训练的 BERT 模型和分词器
from transformers import BertTokenizerFast
tokenizer = BertTokenizerFast.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')
model = BertForQuestionAnswering.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')


Some weights of the model checkpoint at bert-large-uncased-whole-word-masking-finetuned-squad were not used when initializing BertForQuestionAnswering: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForQuestionAnswering from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForQuestionAnswering from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [18]:
# 假设这是问题和上下文
question = "What is the capital of France?"
context = "Paris is the capital and most populous city of France."

# 编码问题和上下文
inputs = tokenizer(question, context, return_tensors='pt')
input_ids = inputs['input_ids'].tolist()[0]

# 获取答案
with torch.no_grad():
    outputs = model(**inputs)
    answer_start_scores = outputs.start_logits
    answer_end_scores = outputs.end_logits

# 找到答案的开始和结束位置
answer_start = torch.argmax(answer_start_scores)
answer_end = torch.argmax(answer_end_scores) + 1

# 转换回文本
answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))

print(f"Question: {question}")
print(f"Answer: {answer}")

Question: What is the capital of France?
Answer: paris


In [None]:
# BertForQuestionAnswering训练数据格式
{
  "context": "文本上下文，包含问题的答案。",
  "question": "问题文本？",
  "answers": {
    "text": ["答案文本"],
    "answer_start": ['答案在上下文中的起始字符位置']
  }
}


In [32]:
import torch
from torch.utils.data import Dataset
import clip
from PIL import Image
from transformers import CLIPProcessor

class VQADataset(Dataset):
    def __init__(self, questions, contexts, answers, images):
        super(VQADataset, self).__init__()
        self.questions = questions
        self.contexts = contexts
        self.answers = answers
        self.images = images
        
        # 初始化模型和处理器
        self.clip_model, self.clip_preprocess = clip.load("ViT-B/32", device='cuda')
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
        self.tokenizer = self.clip_processor.tokenizer
        
    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        question = self.questions[idx]
        context = self.contexts[idx]
        answer = self.answers[idx]
        image_path = self.images[idx]

        # 处理文本
        encoded_dict = self.tokenizer.encode_plus(
            question, context,
            max_length=512,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        # 处理图像
        image = Image.open(image_path)
        image = self.clip_preprocess(image).unsqueeze(0).to('cuda')
        with torch.no_grad():
            image_features = self.clip_model.encode_image(image)

        # 计算答案的 token 位置
        answer_start = context.find(answer)
        start_position = encoded_dict.char_to_token(0, answer_start)
        end_position = encoded_dict.char_to_token(0, answer_start + len(answer) - 1)
        
        # 如果找不到答案位置，则设置为最大长度
        if start_position is None:
            start_position = self.tokenizer.model_max_length
        if end_position is None:
            end_position = self.tokenizer.model_max_length

        return {
            'input_ids': encoded_dict['input_ids'].squeeze(0),
            'attention_mask': encoded_dict['attention_mask'].squeeze(0),
            'start_positions': start_position,
            'end_positions': end_position,
            'image_features': image_features.squeeze(0)
        }

# 使用自定义 Dataset
questions = ["What is in the picture?"]
contexts = ["There is a dog in the picture."]
answers = ["dog"]
images = ["path_to_image.jpg"]

dataset = VQADataset(questions, contexts, answers, images)


In [20]:
# 假设已经读取 JSON 数据
data = {
    "fact_surface": "You are likely to find [[a trumpet]] in [[a jazz club]]",
    "answer": "trumpet",
    "question": "Which object can be found in a jazz club"
}

questions = [data['question']]
contexts = [data['fact_surface'].replace("[[", "").replace("]]", "")]
answers = [data['answer']]

dataset = encode_examples(questions, contexts, answers)

In [22]:
import json

project_root = os.getcwd()
train_data_dir = project_root+'/data/KG_VQA/fvqa/exp_data/train_seen_data'
test_data_dir = project_root+'/data/KG_VQA/fvqa/exp_data/test_unseen_data'
img_dir = project_root+"/data/KG_VQA/fvqa/exp_data/images/images"

questions = []
contexts = []
answers = []
img = []
sub_folders_train = ['train0', 'train1', 'train2', 'train3', 'train4']
sub_folders_test = ['test0', 'test1', 'test2', 'test3', 'test4']

def load_datasets(data_dir, sub_folders, img_dir):
    for folder in sub_folders:
        json_file = os.path.join(data_dir, folder, 'all_qs_dict_release_train_500.json' if 'train' in data_dir else 'all_qs_dict_release_test_500.json')
        with open(os.path.join(train_data_dir, json_file)) as f:
            data = json.load(f)
            questions.append(data['question'])
            contexts.append(data['fact_surface'].replace("[[", "").replace("]]", ""))
            answers.append(data['answer'])
            img.append(os.path.join(img_dir, data['image_path']))
            
    train_dataset = encode_examples(questions, contexts, answers)
    return train_dataset

IsADirectoryError: [Errno 21] Is a directory: '/root/autodl-tmp/vqa/VQA-with-XProNet/data/KG_VQA/fvqa/exp_data/train_seen_data/train0'

# gpt2

In [16]:
image = Image.open('data/KG_VQA/fvqa/exp_data/images/images/COCO_val2014_000000000136.jpg').convert("RGB")
image_input = preprocess(image).unsqueeze(0).to(device)  # Unsqueeze 添加一个批次维度
text_tokens = tokenizer.tokenize("Which object can be found in a jazz club").to(device)

with torch.no_grad():
    image_features = model.encode_image(image_input).float()
    text_features = model.encode_text(text_tokens).float()
    
image_features.shape, text_features.shape

# 特征融合示例，这里简单使用连接
combined_features = torch.cat((image_features, text_features), dim=1).to(device)

In [22]:
from transformers import GPT2Model, GPT2Config, GPT2LMHeadModel, GPT2Tokenizer

# configuration = GPT2Config.from_pretrained('gpt2', n_embd=combined_features.shape[1], n_head=16)  # 确保输入尺寸匹配
# transformer_model = GPT2Model(configuration)
# transformer_model.to(device)

# 加载 GPT-2 模型和分词器
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2LMHeadModel.from_pretrained('gpt2')
model.to(device)     

GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(50257, 768)
    (wpe): Embedding(1024, 768)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-11): 12 x GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2Attention(
          (c_attn): Conv1D()
          (c_proj): Conv1D()
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D()
          (c_proj): Conv1D()
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (lm_head): Linear(in_features=768, out_features=50257, bias=False)
)

In [34]:
# 创建适配层将特征维度调整为 GPT2 模型的维度
adapter_layer = torch.nn.Linear(combined_features.shape[1], model.config.n_embd).to(device)
adapted_features = adapter_layer(combined_features).to(device)

In [37]:
# 使用适配后的特征进行生成
outputs = model.generate(inputs_embeds=adapted_features, max_length=5)
outputs

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


ValueError: Input length of input_ids is 0, but `max_length` is set to -763. This can lead to unexpected behavior. You should consider increasing `max_length` or, better yet, setting `max_new_tokens`.

In [None]:
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(generated_text)

In [19]:
from transformers import GPT2Tokenizer

tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
outputs = transformer_model.generate(input_ids=None, inputs_embeds=combined_features, max_length=10)
answer = tokenizer.decode(outputs[0], skip_special_tokens=True)

print(answer)


TypeError: The current model class (GPT2Model) is not compatible with `.generate()`, as it doesn't have a language model head. Please use one of the following classes instead: {'GPT2LMHeadModel'}