Установим нужные нам версии библиотек:



In [0]:
!pip install git+https://www.github.com/keras-team/keras-contrib.git
!pip install keras==2.2.2
!pip install tensorflow==1.15.0
!pip install keras_applications==1.0.7

Подключимся к гугл-диску, чтобы получить доступ к датасету:

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Распакуем архив с данными:

In [0]:
import os
import zipfile

time_dir = '/content/gdrive/My Drive/Time_extraction/'

if 'train' in os.listdir(time_dir):
  print('Files are already extracted')
else:
  with zipfile.ZipFile(os.path.join(time_dir, 'train.zip'), 'r') as zip_ref:
      zip_ref.extractall(os.path.join(time_dir))

print(len(os.listdir(os.path.join(time_dir, 'train'))))

Импортируем нужные нам библиотеки:

In [0]:
import csv

import numpy as np
from tqdm import tqdm
import tensorflow as tf
import keras
from keras import backend as K
from keras.preprocessing.sequence import pad_sequences
from keras.models import Model, Input
from keras.layers.merge import add, concatenate
from keras.layers import LSTM, Embedding, Dense, TimeDistributed, Dropout, Bidirectional, Lambda
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras_contrib.layers import CRF

In [0]:
print(tf.__version__)
print(keras.__version__)

Определим класс для загрузки наших данных:

In [0]:
class DatasetLoader:

    def __init__(self, path_to_data_dir):
        self._path_to_data_dir = path_to_data_dir
        self._data_files = sorted(os.listdir(self._path_to_data_dir))


    def load_dataset(self):
        sentences = []
        for data_file in tqdm(self._data_files, desc='Loading data'):
            if not data_file.endswith('.csv'):
              continue
            with open(os.path.join(self._path_to_data_dir, data_file), 'r') as data_f:
                reader = csv.DictReader(data_f)
                sentence = []
                for row in reader:
                    sentence.append((row['token'], row['tag']))
                sentences.append(sentence)
        return sentences

Определим класс для векторизации данных:

In [0]:
class Vectorizer:

    def __init__(self):
        self._max_sentence_len = 1000
        self._max_wordform_len = 30
        self._all_chars = u'qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM,.?!:;"«»-—1234567890'
        self._len_all_chars = len(self._all_chars)

        self._tag2id = {'O': 0, 'B-TIME': 1, 'I-TIME': 2, 'B-DATE': 3, 'I-DATE': 4, 'B-DURATION': 5, 'I-DURATION': 6,
                        'B-SET': 7, 'I-SET': 8}
    
    def get_len_all_chars(self):
      return self._len_all_chars

    def vectorize_chars(self, sentences):
        X_chars = self.vectorize_chars_dataset(sentences)
        return X_chars

    def vectorize_targets(self, sentences):
        y = [[self._tag2id[w[1]] for w in s] for s in sentences]
        y = pad_sequences(maxlen=self._max_sentence_len, sequences=y, padding="post", value=self._tag2id["O"])
        y = y.reshape(y.shape[0], y.shape[1], 1)
        return y

    def vectorize_chars_dataset(self, sentences):
        X_chars = [[self._vectorize_chars_wordform(w[0]) for w in s] for s in sentences]
        X_chars = pad_sequences(maxlen=self._max_sentence_len, sequences=X_chars, padding="post",
                                value=np.zeros((self._max_wordform_len, self._len_all_chars), dtype=np.int32))
        return X_chars

    def _vectorize_chars_wordform(self, wordform):
        vector = np.zeros(self._max_wordform_len * self._len_all_chars)
        for i in range(len(wordform)):
            if i == self._max_wordform_len:
                break
            if wordform[i] in self._all_chars:
                ind = self._all_chars.index(wordform[i])
                vector[i * self._len_all_chars + ind] = 1.0
        vector = vector.reshape((self._max_wordform_len, self._len_all_chars))
        return vector

Определим класс для обучения модели:

In [0]:
class Trainer:

    def __init__(self):
        data_dir = os.path.join(time_dir, 'train')
        self._data_loader = DatasetLoader(data_dir)
        self._vectorizer = Vectorizer()

        self._sentences = self._data_loader.load_dataset()
        self._val_sentences = self._sentences[-50:]
        self._train_samples = self._sentences[:200]

        self._batch_size = 5
        self._max_len = 1000

        self._model = self._define_model()

    def _define_model(self):
        input_chars = Input(shape=(self._max_len, 30, self._vectorizer.get_len_all_chars()))
        chars = TimeDistributed(Bidirectional(LSTM(units=100,
                                                   recurrent_dropout=0.5)))(input_chars)

        crf = CRF(9, sparse_target=True)  # CRF layer
        out = crf(chars)  # output

        model = Model([input_chars], out)
        model.summary()

        model.compile(optimizer="adam", loss=crf.loss_function, metrics=[crf.accuracy])
        return model

    def _generate_train_samples(self):
      i = 0
      while True:
        X_chars = self._vectorizer.vectorize_chars(self._train_samples[i:i + self._batch_size])
        y = self._vectorizer.vectorize_targets(self._train_samples[i:i + self._batch_size])
        i += self._batch_size
        yield [np.array(X_chars)], np.array(y)
        if i == len(self._train_samples):
          i = 0

    def _generate_val_samples(self):
      i = 0
      while True:
        X_chars = self._vectorizer.vectorize_chars(self._val_sentences[i:i + self._batch_size])
        y = self._vectorizer.vectorize_targets(self._val_sentences[i:i + self._batch_size])
        i += self._batch_size
        yield [np.array(X_chars)], np.array(y)
        if i == len(self._val_sentences):
          i = 0

    def train(self):
        weights_dir = '/content/gdrive/My Drive/Time_extraction/weights'
        if not os.path.exists(weights_dir):
          os.makedirs(weights_dir)
        weights_file = 'weights_chars_crf.h5'
        modelPath = os.path.join(weights_dir, weights_file)
        saver = ModelCheckpoint(modelPath, monitor='val_loss', verbose=1, save_best_only=True, mode='auto')
        stopper = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='auto')
        history = self._model.fit_generator(self._generate_train_samples(), epochs=50,
                            validation_data=self._generate_val_samples(), steps_per_epoch=40, validation_steps=10,
                            verbose=1, callbacks=[saver, stopper])
        return history

In [0]:
trainer = Trainer()
trainer.train()

Теперь давайте попробуем найти временные выражения, используя обученную модель. Для этого напишем класс Predictor:

In [0]:
from nltk.tokenize import WordPunctTokenizer


class Predictor:

    def __init__(self):
        self._vectorizer = Vectorizer()
        self._trainer = Trainer()
        self._tokenizer = WordPunctTokenizer()
        self._model = self._trainer._model
        self._model.load_weights('/content/gdrive/My Drive/Time_extraction/weights/weights_chars_crf.h5')
        self._tag2id = {'O': 0, 'B-TIME': 1, 'I-TIME': 2, 'B-DATE': 3, 'I-DATE': 4, 'B-DURATION': 5, 'I-DURATION': 6,
                        'B-SET': 7, 'I-SET': 8}
        self._id2tag = self._get_id2tag()

    def _get_id2tag(self):
        id2tag = dict()
        for tag, id in self._tag2id.items():
            id2tag[id] = tag
        return id2tag

    def _process_input_sentence(self, sentence):
        tokens = self._tokenizer.tokenize(sentence)
        tokens_to_process = [(token, 'O') for token in tokens]
        input_tokens = []
        for i in range(20):
            input_tokens.append(tokens_to_process)
        X_chars = self._vectorizer.vectorize_chars(sentences=input_tokens)
        return X_chars, tokens

    def predict(self, text):
        X_chars, tokens = self._process_input_sentence(text)
        predicts = self._model.predict([X_chars])[0]
        result = []
        for i, token in enumerate(tokens):
            tag = self._id2tag[np.argmax(predicts[i])]
            result.append((token, tag))
        return result


In [0]:
predictor = Predictor()

In [0]:
text = 'he was born in 1994'
result = predictor.predict(text)
for r in result:
    print(r)