Find file Copy path
330 lines (285 sloc) 18.7 KB
"NLP data loading pipeline. Supports csv, folders, and preprocessed data."
from ..torch_core import *
from .transform import *
from ..basic_data import *
from ..data_block import *
from ..callback import Callback
__all__ = ['LanguageModelPreLoader', 'SortSampler', 'SortishSampler', 'TextList', 'pad_collate', 'TextDataBunch',
'TextLMDataBunch', 'TextClasDataBunch', 'Text', 'open_text', 'TokenizeProcessor', 'NumericalizeProcessor',
TextMtd = IntEnum('TextMtd', 'DF TOK IDS')
text_extensions = {'.txt'}
class LanguageModelPreLoader(Callback):
"Transforms the texts in `dataset` in a stream for language modelling."
def __init__(self, dataset:LabelList, lengths:Collection[int]=None, bs:int=64, bptt:int=70, backwards:bool=False,
shuffle:bool=False, drop_last:bool=False):
self.dataset,,self.bptt,self.backwards = dataset,bs,bptt,backwards
self.shuffle,self.drop_last = shuffle,drop_last
self.lengths = np.array(ifnone(lengths, [len(o) for o in dataset.x.items]))
self.n = self.lengths.sum() //
def __len__(self):
if self.item is not None: return 1
if (self.n-1)%self.bptt == 0 or self.drop_last: return ((self.n-1) // self.bptt) *
return ((self.n-1) // self.bptt + 1) *
def __getattr__(self,k:str)->Any: return getattr(self.dataset, k)
def on_epoch_begin(self, **kwargs):
self.idxs = np.random.permutation(len(self.dataset)) if self.shuffle else arange_of(self.dataset)
self.text_idx = np.concatenate([[0],self.lengths[self.idxs].cumsum()])
#Training dl gets on_epoch_begin called, val_dl, on_epoch_end
def on_epoch_end(self, **kwargs): self.on_epoch_begin()
def __getitem__(self, k:int):
if self.item is not None: return self.dataset[0]
if not hasattr(self, 'idxs'): self.on_epoch_begin()
#The dataloader will send (batch_index) * bs + sample_index, converting to where to read in the stream
read_idx = (k// * self.bptt
i = read_idx + (k % * self.n
seq_len = min(self.bptt, self.n-read_idx-1)
#Getting the indexes of the texts that start and finish the portion i---i+seq_len in the stream
start,end = np.argmax(self.text_idx >= i)-1,np.argmin(self.text_idx <= i+seq_len+1)
start = max(0,start)
#Grabbing the texts we need in the dataset
if self.backwards: concat = np.concatenate([self.dataset.x.items[j][::-1] for j in self.idxs[start:end]])
else: concat = np.concatenate([self.dataset.x.items[j] for j in self.idxs[start:end]])
#Returning the right portion.
start_idx = i-self.text_idx[start]
return concat[start_idx:start_idx+seq_len], concat[start_idx+1:start_idx+seq_len+1]
class SortSampler(Sampler):
"Go through the text data by order of length."
def __init__(self, data_source:NPArrayList, key:KeyFunc): self.data_source,self.key = data_source,key
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
return iter(sorted(range_of(self.data_source), key=self.key, reverse=True))
class SortishSampler(Sampler):
"Go through the text data by order of length with a bit of randomness."
def __init__(self, data_source:NPArrayList, key:KeyFunc, bs:int):
self.data_source,self.key, = data_source,key,bs
def __len__(self) -> int: return len(self.data_source)
def __iter__(self):
idxs = np.random.permutation(len(self.data_source))
sz =*50
ck_idx = [idxs[i:i+sz] for i in range(0, len(idxs), sz)]
sort_idx = np.concatenate([sorted(s, key=self.key, reverse=True) for s in ck_idx])
sz =
ck_idx = [sort_idx[i:i+sz] for i in range(0, len(sort_idx), sz)]
max_ck = np.argmax([self.key(ck[0]) for ck in ck_idx]) # find the chunk with the largest key,
ck_idx[0],ck_idx[max_ck] = ck_idx[max_ck],ck_idx[0] # then make sure it goes first.
sort_idx = np.concatenate(np.random.permutation(ck_idx[1:])) if len(ck_idx) > 1 else np.array([],
sort_idx = np.concatenate((ck_idx[0], sort_idx))
return iter(sort_idx)
def pad_collate(samples:BatchSamples, pad_idx:int=1, pad_first:bool=True) -> Tuple[LongTensor, LongTensor]:
"Function that collect samples and adds padding."
samples = to_data(samples)
max_len = max([len(s[0]) for s in samples])
res = torch.zeros(len(samples), max_len).long() + pad_idx
for i,s in enumerate(samples):
if pad_first: res[i,-len(s[0]):] = LongTensor(s[0])
else: res[i,:len(s[0]):] = LongTensor(s[0])
return res, tensor(np.array([s[1] for s in samples]))
def _get_processor(tokenizer:Tokenizer=None, vocab:Vocab=None, chunksize:int=10000, max_vocab:int=60000,
min_freq:int=2, mark_fields:bool=False):
return [TokenizeProcessor(tokenizer=tokenizer, chunksize=chunksize, mark_fields=mark_fields),
NumericalizeProcessor(vocab=vocab, max_vocab=max_vocab, min_freq=min_freq)]
class TextDataBunch(DataBunch):
"General class to get a `DataBunch` for NLP. Subclassed by `TextLMDataBunch` and `TextClasDataBunch`."
def save(self, cache_name:PathOrStr='tmp'):
"Save the `DataBunch` in `self.path/cache_name` folder."
os.makedirs(self.path/cache_name, exist_ok=True)
cache_path = self.path/cache_name
pickle.dump(self.train_ds.vocab.itos, open(cache_path/'itos.pkl','wb'))'train_ids.npy', self.train_ds.x.items)'train_lbl.npy', self.train_ds.y.items)'valid_ids.npy', self.valid_ds.x.items)'valid_lbl.npy', self.valid_ds.y.items)
if self.test_dl is not None:'test_ids.npy', self.test_ds.x.items)
if hasattr(self.train_ds, 'classes'): save_texts(cache_path/'classes.txt', self.train_ds.classes)
def from_ids(cls, path:PathOrStr, vocab:Vocab, train_ids:Collection[Collection[int]], valid_ids:Collection[Collection[int]],
test_ids:Collection[Collection[int]]=None, train_lbls:Collection[Union[int,float]]=None,
valid_lbls:Collection[Union[int,float]]=None, classes:Collection[Any]=None,
processor:PreProcessor=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from ids, labels and a `vocab`."
src = ItemLists(path, TextList(train_ids, vocab, path=path, processor=[]),
TextList(valid_ids, vocab, path=path, processor=[]))
src = src.label_for_lm() if cls==TextLMDataBunch else src.label_from_lists(train_lbls, valid_lbls, classes=classes, processor=[])
if not is1d(train_lbls): src.train.y.one_hot,src.valid.y.one_hot = True,True
if test_ids is not None: src.add_test(TextList(test_ids, vocab, path=path), label=train_lbls[0])
src.valid.x.processor = ifnone(processor, [TokenizeProcessor(), NumericalizeProcessor(vocab=vocab)])
return src.databunch(**kwargs)
def load(cls, path:PathOrStr, cache_name:PathOrStr='tmp', processor:PreProcessor=None, **kwargs):
"Load a `TextDataBunch` from `path/cache_name`. `kwargs` are passed to the dataloader creation."
cache_path = Path(path)/cache_name
vocab = Vocab(pickle.load(open(cache_path/'itos.pkl','rb')))
train_ids,train_lbls = np.load(cache_path/f'train_ids.npy'), np.load(cache_path/f'train_lbl.npy')
valid_ids,valid_lbls = np.load(cache_path/f'valid_ids.npy'), np.load(cache_path/f'valid_lbl.npy')
test_ids = np.load(cache_path/f'test_ids.npy') if os.path.isfile(cache_path/f'test_ids.npy') else None
classes = loadtxt_str(cache_path/'classes.txt') if os.path.isfile(cache_path/'classes.txt') else None
return cls.from_ids(path, vocab, train_ids, valid_ids, test_ids, train_lbls, valid_lbls, classes, processor, **kwargs)
@classmethod#TODO: test
def from_tokens(cls, path:PathOrStr, trn_tok:Collection[Collection[str]], trn_lbls:Collection[Union[int,float]],
val_tok:Collection[Collection[str]], val_lbls:Collection[Union[int,float]], vocab:Vocab=None,
tst_tok:Collection[Collection[str]]=None, classes:Collection[Any]=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from tokens and labels."
p_kwargs, kwargs = split_kwargs_by_func(kwargs, _get_processor)
processor = _get_processor(tokenizer=None, vocab=vocab, **p_kwargs)[1]
src = ItemLists(path, TextList(trn_tok, path=path, processor=processor),
TextList(val_tok, path=path, processor=processor))
src = src.label_for_lm() if cls==TextLMDataBunch else src.label_from_lists(trn_lbls, val_lbls, classes=classes)
if tst_tok is not None: src.add_test(TextList(tst_tok, path=path))
return src.databunch(**kwargs)
def from_df(cls, path:PathOrStr, train_df:DataFrame, valid_df:DataFrame, test_df:Optional[DataFrame]=None,
tokenizer:Tokenizer=None, vocab:Vocab=None, classes:Collection[str]=None, text_cols:IntsOrStrs=1,
label_cols:IntsOrStrs=0, label_delim:str=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from DataFrames."
p_kwargs, kwargs = split_kwargs_by_func(kwargs, _get_processor)
processor = _get_processor(tokenizer=tokenizer, vocab=vocab, **p_kwargs)
if classes is None and is_listy(label_cols) and len(label_cols) > 1: classes = label_cols
src = ItemLists(path, TextList.from_df(train_df, path, cols=text_cols, processor=processor),
TextList.from_df(valid_df, path, cols=text_cols, processor=processor))
src = src.label_for_lm() if cls==TextLMDataBunch else src.label_from_df(cols=label_cols, classes=classes, sep=label_delim)
if test_df is not None: src.add_test(TextList.from_df(test_df, path, cols=text_cols))
return src.databunch(**kwargs)
def from_csv(cls, path:PathOrStr, csv_name, valid_pct:float=0.2, test:Optional[str]=None,
tokenizer:Tokenizer=None, vocab:Vocab=None, classes:Collection[str]=None, header = 'infer', text_cols:IntsOrStrs=1,
label_cols:IntsOrStrs=0, label_delim:str=None, **kwargs) -> DataBunch:
"Create a `TextDataBunch` from texts in csv files."
df = pd.read_csv(Path(path)/csv_name, header=header)
df = df.iloc[np.random.permutation(len(df))]
cut = int(valid_pct * len(df)) + 1
train_df, valid_df = df[cut:], df[:cut]
test_df = None if test is None else pd.read_csv(Path(path)/test, header=header)
return cls.from_df(path, train_df, valid_df, test_df, tokenizer, vocab, classes, text_cols,
label_cols, label_delim, **kwargs)
def from_folder(cls, path:PathOrStr, train:str='train', valid:str='valid', test:Optional[str]=None,
classes:Collection[Any]=None, tokenizer:Tokenizer=None, vocab:Vocab=None, **kwargs):
"Create a `TextDataBunch` from text files in folders."
path = Path(path).absolute()
p_kwargs, kwargs = split_kwargs_by_func(kwargs, _get_processor)
processor = [OpenFileProcessor()] + _get_processor(tokenizer=tokenizer, vocab=vocab, **p_kwargs)
src = (TextList.from_folder(path, processor=processor)
.split_by_folder(train=train, valid=valid))
src = src.label_for_lm() if cls==TextLMDataBunch else src.label_from_folder(classes=classes)
if test is not None: src.add_test_folder(path/test)
return src.databunch(**kwargs)
class TextLMDataBunch(TextDataBunch):
"Create a `TextDataBunch` suitable for training a language model."
def create(cls, train_ds, valid_ds, test_ds=None, path:PathOrStr='.', no_check:bool=False, bs=64, num_workers:int=0,
device:torch.device=None, collate_fn:Callable=data_collate, dl_tfms:Optional[Collection[Callable]]=None,
**kwargs) -> DataBunch:
"Create a `TextDataBunch` in `path` from the `datasets` for language modelling."
datasets = cls._init_ds(train_ds, valid_ds, test_ds)
datasets = [LanguageModelPreLoader(ds, shuffle=(i==0), drop_last=(i==0), bs=bs, **kwargs) for i,ds in enumerate(datasets)]
val_bs = bs
dls = [DataLoader(d, b, shuffle=False) for d,b in zip(datasets, (bs,val_bs,val_bs,val_bs)) if d is not None]
return cls(*dls, path=path, device=device, dl_tfms=dl_tfms, collate_fn=collate_fn, no_check=no_check)
class TextClasDataBunch(TextDataBunch):
"Create a `TextDataBunch` suitable for training an RNN classifier."
def create(cls, train_ds, valid_ds, test_ds=None, path:PathOrStr='.', bs=64, pad_idx=1, pad_first=True,
no_check:bool=False, **kwargs) -> DataBunch:
"Function that transform the `datasets` in a `DataBunch` for classification."
datasets = cls._init_ds(train_ds, valid_ds, test_ds)
collate_fn = partial(pad_collate, pad_idx=pad_idx, pad_first=pad_first)
train_sampler = SortishSampler(datasets[0].x, key=lambda t: len(datasets[0][t][0].data), bs=bs//2)
train_dl = DataLoader(datasets[0], batch_size=bs//2, sampler=train_sampler, drop_last=True, **kwargs)
dataloaders = [train_dl]
for ds in datasets[1:]:
lengths = [len(t) for t in ds.x.items]
sampler = SortSampler(ds.x, key=lengths.__getitem__)
dataloaders.append(DataLoader(ds, batch_size=bs, sampler=sampler, **kwargs))
return cls(*dataloaders, path=path, collate_fn=collate_fn, no_check=no_check)
def open_text(fn:PathOrStr, enc='utf-8'):
"Read the text in `fn`."
with open(fn,'r', encoding = enc) as f: return ''.join(f.readlines())
class Text(ItemBase):
"Basic item for <code>text</code> data in numericalized `ids`."
def __init__(self, ids, text):,self.text = np.array(ids, dtype=np.int64),text
def __str__(self): return str(self.text)
class TokenizeProcessor(PreProcessor):
"`PreProcessor` that tokenizes the texts in `ds`."
def __init__(self, ds:ItemList=None, tokenizer:Tokenizer=None, chunksize:int=10000, mark_fields:bool=False):
self.tokenizer,self.chunksize,self.mark_fields = ifnone(tokenizer, Tokenizer()),chunksize,mark_fields
def process_one(self, item): return self.tokenizer._process_all_1([item])[0]
def process(self, ds):
ds.items = _join_texts(ds.items, self.mark_fields)
tokens = []
for i in progress_bar(range(0,len(ds),self.chunksize), leave=False):
tokens += self.tokenizer.process_all(ds.items[i:i+self.chunksize])
ds.items = tokens
class NumericalizeProcessor(PreProcessor):
"`PreProcessor` that numericalizes the tokens in `ds`."
def __init__(self, ds:ItemList=None, vocab:Vocab=None, max_vocab:int=60000, min_freq:int=2):
vocab = ifnone(vocab, ds.vocab if ds is not None else None)
self.vocab,self.max_vocab,self.min_freq = vocab,max_vocab,min_freq
def process_one(self,item): return np.array(self.vocab.numericalize(item), dtype=np.int64)
def process(self, ds):
if self.vocab is None: self.vocab = Vocab.create(ds.items, self.max_vocab, self.min_freq)
ds.vocab = self.vocab
class OpenFileProcessor(PreProcessor):
"`PreProcessor` that opens the filenames and read the texts."
def process_one(self,item):
return open_text(item) if isinstance(item, Path) else item
class TextList(ItemList):
"Basic `ItemList` for text data."
_bunch = TextClasDataBunch
_processor = [TokenizeProcessor, NumericalizeProcessor]
_is_lm = False
def __init__(self, items:Iterator, vocab:Vocab=None, pad_idx:int=1, **kwargs):
super().__init__(items, **kwargs)
self.vocab,self.pad_idx = vocab,pad_idx
self.copy_new += ['vocab', 'pad_idx']
def get(self, i):
o = super().get(i)
return Text(o, self.vocab.textify(o))
def label_for_lm(self, **kwargs):
"A special labelling method for language models."
self.__class__ = LMTextList
return self.label_const(0, label_cls=LMLabel)
def reconstruct(self, t:Tensor):
idx = (t != self.pad_idx).nonzero().min()
return Text(t[idx:], self.vocab.textify(t[idx:]))
def from_folder(cls, path:PathOrStr='.', extensions:Collection[str]=text_extensions, vocab:Vocab=None,
processor:PreProcessor=None, **kwargs)->'TextList':
"Get the list of files in `path` that have a text suffix. `recurse` determines if we search subfolders."
processor = ifnone(processor, [OpenFileProcessor(), TokenizeProcessor(), NumericalizeProcessor(vocab=vocab)])
return super().from_folder(path=path, extensions=extensions, processor=processor, **kwargs)
def show_xys(self, xs, ys, max_len:int=70)->None:
"Show the `xs` (inputs) and `ys` (targets). `max_len` is the maximum number of tokens displayed."
from IPython.display import display, HTML
items = [['idx','text']] if self._is_lm else [['text','target']]
for i, (x,y) in enumerate(zip(xs,ys)):
txt_x = ' '.join(x.text.split(' ')[:max_len]) if max_len is not None else x.text
items.append([str(i), str(txt_x)] if self._is_lm else [str(txt_x), str(y)])
display(HTML(text2html_table(items, ([5,95] if self._is_lm else [90,10]))))
def show_xyzs(self, xs, ys, zs, max_len:int=70):
"Show `xs` (inputs), `ys` (targets) and `zs` (predictions). `max_len` is the maximum number of tokens displayed."
from IPython.display import display, HTML
items = [['text','target','prediction']]
for i, (x,y,z) in enumerate(zip(xs,ys,zs)):
txt_x = ' '.join(x.text.split(' ')[:max_len]) if max_len is not None else x.text
items.append([str(txt_x), str(y), str(z)])
display(HTML(text2html_table(items, [85,7.5,7.5])))
class LMLabel(CategoryList):
def predict(self, res): return res
def reconstruct(self,t:Tensor): return 0
class LMTextList(TextList):
"Special `TextList` for a language model."
_bunch = TextLMDataBunch
_is_lm = True
_label_cls = EmptyLabel
def _join_texts(texts:Collection[str], mark_fields:bool=False):
if not isinstance(texts, np.ndarray): texts = np.array(texts)
if is1d(texts): texts = texts[:,None]
df = pd.DataFrame({i:texts[:,i] for i in range(texts.shape[1])})
#text_col = f'{BOS} {FLD} {1} ' + df[0] if mark_fields else f'{BOS} ' + df[0]
text_col = f'{BOS} {FLD} {1} ' + df[0].astype(str) if mark_fields else f'{BOS} ' + df[0].astype(str)
for i in range(1,len(df.columns)):
#text_col += (f' {FLD} {i+1} ' if mark_fields else ' ') + df[i]
text_col += (f' {FLD} {i+1} ' if mark_fields else ' ') + df[i].astype(str)
return text_col.values