In [5]:
import tensorflow as tf
from tensorflow.keras import *
import numpy as np
import pickle

In [7]:
class MLP:
    def __init__(self, units, activations, **kwargs):
        assert len(units) == len(activations) + 1
        self._units = units
        self._input_dim = units[0]
        self._output_dim = units[-1]
        self._n_hidden_layers = len(units) - 2
        self._activations = activations
        self._weight_initializer = kwargs.get(
            'weight_initializer',
            tf.initializers.RandomNormal(stddev=.1)
        )
        self._bias_initializer = kwargs.get(
            'bias_initializer',
            tf.initializers.RandomNormal(stddev=.1)
        )
        self._build_weights()
        self._compiled = False
        
    def __call__(self, x):
        assert len(x.shape) == 2 and x.shape[-1] == self._input_dim
        for weight, bias, activation in zip(self._weights,
                                            self._biases,
                                            self._activations):
            x = activation(tf.matmul(x, weight) + bias)
        return x
    
    def _build_weights(self, **kwargs):
        self._weights = []
        self._biases = []
        units = self._units
        for i in range(len(units[:-1])):
            weight = tf.Variable(self._weight_initializer([units[i], units[i+1]]))
            bias = tf.Variable(self._bias_initializer([units[i+1]]))
            self._weights.append(weight)
            self._biases.append(bias)
        self._parameters = (*self._weights, *self._biases)
        
    def compile(self, loss, optimizer):
        self._loss = loss
        self._optimizer = optimizer
        self._compiled = True
        
    @tf.function
    def train_step(self, x, y_true):
        with tf.GradientTape() as tape:
            y_pred = self(x)
            loss = self._loss(y_true, y_pred)
        grads = tape.gradient(loss, self.parameters)
        self._optimizer.apply_gradients(zip(grads, self.parameters))
        return loss, y_pred
        
    def fit(self, dataset, epochs=1, verbose=1):
        assert self._compiled
        n_batches = dataset.cardinality().numpy().item()
        losses = []
        try:
            for epoch in range(1, epochs+1):                
                print(f'epoch {epoch}/{epochs}',
                      ' ' * 10,
                      end='\r')
                if epoch % verbose == 0:
                    prog_bar = utils.Progbar(n_batches)
                else:
                    prog_bar = type('',
                                    (object,),
                                    {'add': lambda *args: None})
                    
                for x, y in dataset:
                    loss, y_pred = self.train_step(x, y)
                    losses.append(loss)
                    values = (('loss', loss),)
                    prog_bar.add(1, values)
        except KeyboardInterrupt:
            print('\nINTERUPTED')
        return losses
        
    def save_weights(self, path):
        with open(path, 'wb') as file:
            pickle.dump((self._weights, self._biases), file)
            print('Weights saved')
    
    def load_weights(self, path):
        with open(path, 'rb') as file:
            self._weights, self._biases = pickle.load(file)
            self._parameters = (*self._weights, *self._biases)
            print('Weights loaded')
    
    @property
    def parameters(self):
        return self._parameters
    
    def __str__(self):
        return f'MLP:\n\tinput dim: {self._input_dim}\
                 \n\toutput dim: {self._output_dim}\
                 \n\thidden layers: {self._n_hidden_layers}\
                 \n\thidden units: {self._units[1:-1]}\
                 \n\tactivations: {[f.__name__ for f in self._activations]}'
    
    def __repr__(self):
        return self.__str__()

In [8]:
# build the model
mlp = MLP(units=[1, 8, 8, 1],
          activations=[tf.nn.relu, tf.nn.relu, activations.linear],
          weight_initializer=initializers.RandomNormal(stddev=.5))
mlp.compile(losses.MeanSquaredError(),
            optimizers.Adam())

In [8]:
class DataReader:
    def __init__(self, data_path, batch_size, vocab_size):
        self._batch_size = batch_size
        with open(data_path) as f:
            d_lines = f.read().splitlines()
            
        self._data = []
        self._labels = []
        for data_id, line in enumerate(d_lines):
            vector = [0.0 for _ in range(vocab_size)]
            features = line.split('<fff>')
            label, doc_id = int(features[0]), int(features[1])
            tokens = features[2].split()
            for token in tokens:
                index, value = int(token.split(':')[0]), float(token.split(':')[1])
                vector[index] = value

            self._data.append(vector)
            self._labels.append(label)
            
        self._data = np.array(self._data)
        self._labels = np.array(self._labels)
        
        self._num_epoch = 0
        self._batch_id = 0
    
    def next_batch(self):
        start = self._batch_id * self._batch_size
        end = start + self._batch_size
        self._batch_id += 1
        
        if end + self._batch_size > len(self._data):
            end = len(self._data)
            self._num_epoch += 1
            self._batch_id = 0
            indices = list(range(len(self._data)))
            random.seed(2022)
            random.shuffle(indices)
            self._data, self._labels = self._data[indices], self._labels[indices]
        
        return self._data[start:end], self._labels[start:end]

In [3]:
def load_dataset(vocab_size):
    train_data_reader = DataReader(
        data_path='data/train_ds.txt',
        batch_size=50,
        vocab_size=vocab_size
    )
    test_data_reader = DataReader(
        data_path='data/test_ds.txt',
        batch_size=50,
        vocab_size=vocab_size
    )
    return train_data_reader, test_data_reader

In [None]:
with open('data/words_idfs.txt', 'rb') as file:
    vocab_size = len(file.read().splitlines())
    
train_reader, test_reader = load_dataset(vocab_size)