In [30]:
from sklearn import preprocessing
import numpy as np
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Input, Embedding, Flatten, Dense, LSTM
from nervaluate import Evaluator
from keras.utils import to_categorical

In [2]:
class txtReader:
    def __init__(self, filename):
        self.filename = filename

    def read_split(self):
        with open(self.filename, 'r') as f:
            file_read = f.read()
        efg = []
        lines = file_read.split('\n')
        efg.append(lines)
        text = []
        text_id = []
        for i in lines:
            if i != '':
                word = i.split('\t')
                text.append(word[0])
                text_id.append(word[1])

        return text, text_id

In [3]:
class alphabet:
    def __init__(self, train_file, dev_file, test_file):
        self.train_file = train_file
        self.dev_file = dev_file
        self.test_file = test_file
        self.data = dict()
        self.labels = dict()

    def read_split(self):
        text_files = []
        for text_file in [self.train_file, self.dev_file, self.test_file]:
            txt = txtReader(text_file)
            text, text_id = txt.read_split()
            text_files.append(text)
            text_files.append(text_id)

        return text_files[0], text_files[1], text_files[2], text_files[3], text_files[4], text_files[5]
    
    def _tagger(self, dataset, cnt, dictionary):
        for i in dataset:
            # obtener indice de i en dataset
            pos = dataset.index(i)
            if i not in dictionary:
                dictionary[i] = cnt
                dataset[pos] = cnt
                cnt += 1
            else:
                dataset[pos] = dictionary[i]

        return dataset, cnt, dictionary 

    def labelEncoder(self):
        train, train_id, dev, dev_id, test, test_id = self.read_split()
        cnt = 1
        cnt_id = 1

        train, cnt, self.data = self._tagger(train, cnt, self.data) 
        train_id, cnt_id, self.labels = self._tagger(train_id, cnt_id, self.labels)
        dev, cnt, self.data = self._tagger(dev, cnt, self.data)
        dev_id, cnt_id, self.labels = self._tagger(dev_id, cnt_id, self.labels)
        
        for te in test:
            pos = test.index(te)
            if te not in self.data:
                self.data[te] = -1 # -1 indica que la palabra es desconocida
                test[pos] = -1
            else:
                test[pos] = self.data[te]

        for te_id in test_id:
            pos_id = test_id.index(te_id)
            if te_id not in self.labels:  
                self.labels[te_id] = -1
                test_id[pos_id] = self.labels[te_id]
            else:
                test_id[pos_id] = self.labels[te_id]  

        return train, train_id, dev, dev_id, test, test_id

In [96]:
train_PartTUT, train_id_PartTUT, dev_PartTUT, dev_id_PartTUT, test_PartTUT, test_id_PartTUT = alphabet('materiales_practica/datasets/PartTUT/train.txt', 'materiales_practica/datasets/PartTUT/dev.txt', 'materiales_practica/datasets/PartTUT/test.txt').labelEncoder()

In [13]:
train_MITMovie, train_id_MITMovie, dev_MITMovie, dev_id_MITMovie, test_MITMovie, test_id_MITMovie = alphabet('materiales_practica/datasets/MITMovie/train.txt', 'materiales_practica/datasets/MITMovie/dev.txt', 'materiales_practica/datasets/MITMovie/test.txt').labelEncoder()

In [None]:
train_MITRestaurant, train_id_MITRestaurant, dev_MITRestaurant, dev_id_MITRestaurant, test_MITRestaurant, test_id_MITRestaurant = alphabet('materiales_practica/datasets/MITRestaurant/train.txt', 'materiales_practica/datasets/MITRestaurant/dev.txt', 'materiales_practica/datasets/MITRestaurant/test.txt').labelEncoder()

In [120]:
class FFTagger():
    def __init__(self, train, train_id, dev, dev_id, test, test_id, n, loss, optimizer, metrics):
        self.model = Sequential()
        self.train = train
        self.train_id = train_id
        self.dev = dev
        self.dev_id = dev_id
        self.test = test
        self.test_id = test_id
        self.n = n
        self.m = len(self.train)
        self.vocab_size = 0
        self.loss=loss
        self.optimizer=optimizer
        self.metrics=metrics
        # self.weighted_metrics=weighted_metrics
        self.train_windows = []
        self.dev_windows = []
        self.test_windows = []

    def build_model(self): 
        self.vocab_size = len(set(self.train))
        padding = []
        for i in range(self.n):
            padding.append(0)
        self.train = padding + self.train + padding
        self.dev = padding + self.dev + padding

        # almacenar ventanas de tamaño n*2+1 en una lista de listas para cada conjunto

        for i in range(self.n, len(self.train) - self.n):
            data = self.train[i-self.n:i+self.n+1]
            self.train_windows.append(data)
        
        for i in range(self.n, len(self.dev) - self.n):
            data = self.dev[i-self.n:i+self.n+1]
            self.dev_windows.append(data) 

        num_classes = len(set(self.train_id))
        print(num_classes)
        print(len(self.train_id))
        print(set(self.train_id))

        one_hot_train_id = to_categorical(self.train_id)[:, 0:17] #, num_classes=num_classes)
        one_hot_dev_id = to_categorical(self.dev_id)[:, 0:17] #, num_classes=num_classes)
       
        batch_size = 10
        train_tensor = tf.data.Dataset.from_tensor_slices((self.train_windows, one_hot_train_id))
        train_tensor = train_tensor.batch(batch_size)
        dev_tensor = tf.data.Dataset.from_tensor_slices((self.dev_windows, one_hot_dev_id))
        dev_tensor = dev_tensor.batch(batch_size)

        # input_size = len(self.train_windows[0])

        self.model.add(Input(shape=(self.n*2+1,), dtype=tf.int32))
        self.model.add(Embedding(input_dim = self.m, output_dim=20, mask_zero=True, input_length=self.n*2+1))
        self.model.add(Flatten())
        self.model.add(Dense(64, activation='relu'))
        self.model.add(Dense(num_classes, activation='softmax'))

        self.model.compile(loss=self.loss, optimizer=self.optimizer, metrics=self.metrics)
        self.model.fit(train_tensor, epochs=1, validation_data=dev_tensor, verbose=1)

    # def train_model(self):

    #     self.model.compile(loss=self.loss, optimizer=self.optimizer, metrics=self.metrics, weighted_metrics=self.weighted_metrics)
    #     self.model.fit(self.train_windows, self.train_id, epochs=10, validation_data=(self.dev_windows, self.dev_id), verbose=0)

    def evaluate(self, task):
        padding = []
        for i in range(self.n):
            padding.append(0)
        self.test = padding + self.test + padding

        self.test_windows = []
        for i in range(self.n, len(self.test) - self.n):
            data = self.test[i-self.n:i+self.n+1]
            self.test_windows.append(data)

        batch_size = 10
        one_hot_test_id = to_categorical(self.test_id, num_classes=len(set(self.test_id)))
        test_tensor = tf.data.Dataset.from_tensor_slices((self.test_windows, one_hot_test_id))
        test_tensor = test_tensor.batch(batch_size)
        
        if task == "PoS":
            return self.model.evaluate(test_tensor, verbose=1)
        elif task == "NER":
            evaluator = Evaluator(self.test, self.test_id, tags=['ent_type', 'partial', 'exact', 'strict']).evaluate()
            
            return self.model.evaluate(test_tensor, verbose=1), evaluator['ent_type']['f1'], evaluator['partial']['f1'], evaluator['exact']['f1'], evaluator['strict']['f1']
        else:
            return "Task not found"

self.m es en realidad vocab_size??????

In [121]:
modelPartTUT = FFTagger(train_PartTUT, train_id_PartTUT, dev_PartTUT, dev_id_PartTUT, test_PartTUT, test_id_PartTUT, 2, 'categorical_crossentropy', 'adam', ['accuracy'])
modelPartTUT.build_model()
# modelPartTUT.train_model()

17
43503
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}


In [122]:
# modelPartTUT.evaluate("PoS")