/
nb_007b.py
211 lines (176 loc) · 10.2 KB
/
nb_007b.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#################################################
### THIS FILE WAS AUTOGENERATED! DO NOT EDIT! ###
#################################################
# file to edit: dev_nb/007b_imdb_classifier.ipynb
from nb_007a import *
Weights = Dict[str,Tensor]
def convert_weights(wgts:Weights, stoi_wgts:Dict[str,int], itos_new:Collection[str]) -> Weights:
"Converts the model weights to go with a new vocabulary."
dec_bias, enc_wgts = wgts['1.decoder.bias'], wgts['0.encoder.weight']
bias_m, wgts_m = dec_bias.mean(0), enc_wgts.mean(0)
new_w = enc_wgts.new_zeros((len(itos_new),enc_wgts.size(1))).zero_()
new_b = dec_bias.new_zeros((len(itos_new),)).zero_()
for i,w in enumerate(itos_new):
r = stoi_wgts[w] if w in stoi_wgts else -1
new_w[i] = enc_wgts[r] if r>=0 else wgts_m
new_b[i] = dec_bias[r] if r>=0 else bias_m
wgts['0.encoder.weight'] = new_w
wgts['0.encoder_dp.emb.weight'] = new_w.clone()
wgts['1.decoder.weight'] = new_w.clone()
wgts['1.decoder.bias'] = new_b
return wgts
def lm_split(model:Model) -> List[Model]:
"Splits a RNN model in groups for differential learning rates."
groups = [nn.Sequential(rnn, dp) for rnn, dp in zip(model[0].rnns, model[0].hidden_dps)]
groups.append(nn.Sequential(model[0].encoder, model[0].encoder_dp, model[1]))
return groups
from torch.utils.data import Sampler, BatchSampler
NPArrayList = Collection[np.ndarray]
KeyFunc = Callable[[int], int]
class SortSampler(Sampler):
"Go through the text data by order of length"
def __init__(self, data_source:NPArrayList, key:KeyFunc): self.data_source,self.key = data_source,key
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
return iter(sorted(range(len(self.data_source)), key=self.key, reverse=True))
class SortishSampler(Sampler):
"Go through the text data by order of length with a bit of randomness"
def __init__(self, data_source:NPArrayList, key:KeyFunc, bs:int):
self.data_source,self.key,self.bs = data_source,key,bs
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
idxs = np.random.permutation(len(self.data_source))
sz = self.bs*50
ck_idx = [idxs[i:i+sz] for i in range(0, len(idxs), sz)]
sort_idx = np.concatenate([sorted(s, key=self.key, reverse=True) for s in ck_idx])
sz = self.bs
ck_idx = [sort_idx[i:i+sz] for i in range(0, len(sort_idx), sz)]
max_ck = np.argmax([self.key(ck[0]) for ck in ck_idx]) # find the chunk with the largest key,
ck_idx[0],ck_idx[max_ck] = ck_idx[max_ck],ck_idx[0] # then make sure it goes first.
sort_idx = np.concatenate(np.random.permutation(ck_idx[1:]))
sort_idx = np.concatenate((ck_idx[0], sort_idx))
return iter(sort_idx)
BatchSamples = Collection[Tuple[Collection[int], int]]
def pad_collate(samples:BatchSamples, pad_idx:int=1, pad_first:bool=True) -> Tuple[LongTensor, LongTensor]:
"Function that collect samples and adds padding"
max_len = max([len(s[0]) for s in samples])
res = torch.zeros(max_len, len(samples)).long() + pad_idx
for i,s in enumerate(samples): res[-len(s[0]):,i] = LongTensor(s[0])
return res, LongTensor([s[1] for s in samples]).squeeze()
def classifier_data(datasets:Collection[TextDataset], path:PathOrStr, **kwargs) -> DataBunch:
"Function that transform the `datasets` in a `DataBunch` for classification"
bs = kwargs.pop('bs') if 'bs' in kwargs else 64
pad_idx = kwargs.pop('pad_idx') if 'pad_idx' in kwargs else 1
train_sampler = SortishSampler(datasets[0].ids, key=lambda x: len(datasets[0].ids[x]), bs=bs//2)
train_dl = DeviceDataLoader.create(datasets[0], bs//2, sampler=train_sampler, collate_fn=pad_collate)
dataloaders = [train_dl]
for ds in datasets[1:]:
sampler = SortSampler(ds.ids, key=lambda x: len(ds.ids[x]))
dataloaders.append(DeviceDataLoader.create(ds, bs, sampler=sampler, collate_fn=pad_collate))
return DataBunch(*dataloaders, path=path)
class MultiBatchRNNCore(RNNCore):
"Creates a RNNCore module that can process a full sentence."
def __init__(self, bptt:int, max_seq:int, *args, **kwargs):
self.max_seq,self.bptt = max_seq,bptt
super().__init__(*args, **kwargs)
def concat(self, arrs:Collection[Tensor]) -> Tensor:
"Concatenates the arrays along the batch dimension."
return [torch.cat([l[si] for l in arrs]) for si in range(len(arrs[0]))]
def forward(self, input:LongTensor) -> Tuple[Tensor,Tensor]:
sl,bs = input.size()
self.reset()
raw_outputs, outputs = [],[]
for i in range(0, sl, self.bptt):
r, o = super().forward(input[i: min(i+self.bptt, sl)])
if i>(sl-self.max_seq):
raw_outputs.append(r)
outputs.append(o)
return self.concat(raw_outputs), self.concat(outputs)
class PoolingLinearClassifier(nn.Module):
"Creates a linear classifier with pooling."
def __init__(self, layers:Collection[int], drops:Collection[float]):
super().__init__()
mod_layers = []
activs = [nn.ReLU(inplace=True)] * (len(layers) - 2) + [None]
for n_in,n_out,p,actn in zip(layers[:-1],layers[1:], drops, activs):
mod_layers += bn_drop_lin(n_in, n_out, p=p, actn=actn)
self.layers = nn.Sequential(*mod_layers)
def pool(self, x:Tensor, bs:int, is_max:bool):
"Pools the tensor along the seq_len dimension."
f = F.adaptive_max_pool1d if is_max else F.adaptive_avg_pool1d
return f(x.permute(1,2,0), (1,)).view(bs,-1)
def forward(self, input:Tuple[Tensor,Tensor]) -> Tuple[Tensor,Tensor,Tensor]:
raw_outputs, outputs = input
output = outputs[-1]
sl,bs,_ = output.size()
avgpool = self.pool(output, bs, False)
mxpool = self.pool(output, bs, True)
x = torch.cat([output[-1], mxpool, avgpool], 1)
x = self.layers(x)
return x, raw_outputs, outputs
def rnn_classifier_split(model:Model) -> List[Model]:
"Splits a RNN model in groups."
groups = [nn.Sequential(model[0].encoder, model[0].encoder_dp)]
groups += [nn.Sequential(rnn, dp) for rnn, dp in zip(model[0].rnns, model[0].hidden_dps)]
groups.append(model[1])
return groups
def get_rnn_classifier(bptt:int, max_seq:int, n_class:int, vocab_sz:int, emb_sz:int, n_hid:int, n_layers:int,
pad_token:int, layers:Collection[int], drops:Collection[float], bidir:bool=False, qrnn:bool=False,
hidden_p:float=0.2, input_p:float=0.6, embed_p:float=0.1, weight_p:float=0.5) -> Model:
"Creates a RNN classifier model"
rnn_enc = MultiBatchRNNCore(bptt, max_seq, vocab_sz, emb_sz, n_hid, n_layers, pad_token=pad_token, bidir=bidir,
qrnn=qrnn, hidden_p=hidden_p, input_p=input_p, embed_p=embed_p, weight_p=weight_p)
return SequentialRNN(rnn_enc, PoolingLinearClassifier(layers, drops))
SplitFunc = Callable[[Model], List[Model]]
OptSplitFunc = Optional[SplitFunc]
OptStrTuple = Optional[Tuple[str,str]]
class RNNLearner(Learner):
"Basic class for a Learner in RNN"
def __init__(self, data:DataBunch, model:Model, bptt:int=70, split_func:OptSplitFunc=None, clip:float=None,
adjust:bool=False, alpha:float=2., beta:float=1., **kwargs):
super().__init__(data, model)
self.callbacks.append(RNNTrainer(self, bptt, alpha=alpha, beta=beta, adjust=adjust))
if clip: self.callback_fns.append(partial(GradientClipping, clip=clip))
if split_func: self.split(split_func)
self.metrics = [accuracy]
def save_encoder(self, name:str):
"Saves the encoder to the model directory"
torch.save(self.model[0].state_dict(), self.path/self.model_dir/f'{name}.pth')
def load_encoder(self, name:str):
"Loads the encoder from the model directory"
self.model[0].load_state_dict(torch.load(self.path/self.model_dir/f'{name}.pth'))
def load_pretrained(self, wgts_fname:str, itos_fname:str):
"Loads a pretrained model and adapts it to the data vocabulary."
old_itos = pickle.load(open(self.path/self.model_dir/f'{itos_fname}.pkl', 'rb'))
old_stoi = {v:k for k,v in enumerate(old_itos)}
wgts = torch.load(self.path/self.model_dir/f'{wgts_fname}.pth', map_location=lambda storage, loc: storage)
wgts = convert_weights(wgts, old_stoi, self.data.train_ds.vocab.itos)
self.model.load_state_dict(wgts)
@classmethod
def language_model(cls, data:DataBunch, bptt:int=70, emb_sz:int=400, nh:int=1150, nl:int=3, pad_token:int=1,
drop_mult:float=1., tie_weights:bool=True, bias:bool=True, qrnn:bool=False,
pretrained_fnames:OptStrTuple=None, **kwargs) -> 'RNNLearner':
"Creates a `Learner` with a language model."
dps = np.array([0.25, 0.1, 0.2, 0.02, 0.15]) * drop_mult
vocab_size = len(data.train_ds.vocab.itos)
model = get_language_model(vocab_size, emb_sz, nh, nl, pad_token, input_p=dps[0], output_p=dps[1],
weight_p=dps[2], embed_p=dps[3], hidden_p=dps[4], tie_weights=tie_weights, bias=bias, qrnn=qrnn)
learn = cls(data, model, bptt, split_func=lm_split, **kwargs)
if pretrained_fnames is not None: learn.load_pretrained(*pretrained_fnames)
return learn
@classmethod
def classifier(cls, data:DataBunch, bptt:int=70, max_len:int=70*20, emb_sz:int=400, nh:int=1150, nl:int=3,
layers:Collection[int]=None, drops:Collection[float]=None, pad_token:int=1,
drop_mult:float=1., qrnn:bool=False, **kwargs) -> 'RNNLearner':
"Creates a RNN classifier."
dps = np.array([0.4,0.5,0.05,0.3,0.4]) * drop_mult
if layers is None: layers = [50]
if drops is None: drops = [0.1]
vocab_size = len(data.train_ds.vocab.itos)
n_class = len(data.train_ds.classes)
layers = [emb_sz*3] + layers + [n_class]
drops = [dps[4]] + drops
model = get_rnn_classifier(bptt, max_len, n_class, vocab_size, emb_sz, nh, nl, pad_token,
layers, drops, input_p=dps[0], weight_p=dps[1], embed_p=dps[2], hidden_p=dps[3], qrnn=qrnn)
learn = cls(data, model, bptt, split_func=rnn_classifier_split, **kwargs)
return learn