forked from stanford-futuredata/ColBERT
-
Notifications
You must be signed in to change notification settings - Fork 1
/
model.py
96 lines (68 loc) · 3.5 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import string
import torch
import torch.nn as nn
from transformers import BertPreTrainedModel, BertModel, BertTokenizer
from transformers import XLMRobertaTokenizer, XLMRobertaModel
from src.parameters import DEVICE
import logging
import pdb
logging.basicConfig(level=logging.ERROR)
#class ColBERT(BertPreTrainedModel):
class ColBERT(XLMRobertaModel):
def __init__(self, config, query_maxlen, doc_maxlen, dim=128, similarity_metric='cosine'):
super(ColBERT, self).__init__(config)
self.query_maxlen = query_maxlen
self.doc_maxlen = doc_maxlen
self.similarity_metric = similarity_metric
#self.tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-uncased')
self.tokenizer = XLMRobertaTokenizer.from_pretrained('xlm-roberta-base')
self.tokenizer.add_tokens(["[unused0]"])
self.tokenizer.add_tokens(["[unused1]"])
self.skiplist = {w: True for w in string.punctuation}
#self.bert = BertModel(config)
self.bert = XLMRobertaModel(config)
self.bert.resize_token_embeddings(len(self.tokenizer))
self.linear = nn.Linear(config.hidden_size, dim, bias=False)
self.init_weights()
def forward(self, Q, D):
return self.score(self.query(Q), self.doc(D))
def query(self, queries):
queries = [["[unused0]"] + self._tokenize(q) for q in queries]
input_ids, attention_mask = zip(*[self._encode(x, self.query_maxlen) for x in queries])
input_ids, attention_mask = self._tensorize(input_ids), self._tensorize(attention_mask)
Q = self.bert(input_ids, attention_mask=attention_mask)[0]
Q = self.linear(Q)
return torch.nn.functional.normalize(Q, p=2, dim=2)
def doc(self, docs, return_mask=False):
docs = [["[unused1]"] + self._tokenize(d)[:self.doc_maxlen-3] for d in docs]
lengths = [len(d)+2 for d in docs] # +2 for [CLS], [SEP]
d_max_length = max(lengths)
input_ids, attention_mask = zip(*[self._encode(x, d_max_length) for x in docs])
input_ids, attention_mask = self._tensorize(input_ids), self._tensorize(attention_mask)
D = self.bert(input_ids, attention_mask=attention_mask)[0]
D = self.linear(D)
# [CLS] .. d ... [SEP] [PAD] ... [PAD]
mask = [[1] + [x not in self.skiplist for x in d] + [1] + [0] * (d_max_length - length)
for d, length in zip(docs, lengths)]
D = D * torch.tensor(mask, device=DEVICE, dtype=torch.float32).unsqueeze(2)
D = torch.nn.functional.normalize(D, p=2, dim=2)
return (D, mask) if return_mask else D
def score(self, Q, D):
#pdb.set_trace()
if self.similarity_metric == 'cosine':
return (Q.float() @ D.permute(0, 2, 1)).max(2).values.sum(1)
assert self.similarity_metric == 'l2'
return (-1.0 * ((Q.unsqueeze(2) - D.unsqueeze(1))**2).sum(-1)).max(-1).values.sum(-1)
def _tokenize(self, text):
if type(text) == list:
return text
return self.tokenizer.tokenize(text)
def _encode(self, x, max_length):
input_ids = self.tokenizer.encode(x, add_special_tokens=True, max_length=max_length)
#print(input_ids)
padding_length = max_length - len(input_ids)
attention_mask = [1] * len(input_ids) + [0] * padding_length
input_ids = input_ids + [103] * padding_length
return input_ids, attention_mask
def _tensorize(self, l):
return torch.tensor(l, dtype=torch.long, device=DEVICE)