In [20]:
import nltk
from nltk.corpus import brown
import numpy as np
from nltk.tag import map_tag
from collections import defaultdict, Counter

In [21]:
nltk.download('brown')
nltk.download('universal_tagset')

start_tag = '^'

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


In [22]:
data = brown.tagged_sents(tagset='universal')

In [23]:
tags = set(tag for sent in data for _, tag in sent)
sorted_tags = sorted(tags)
sorted_tags.append(start_tag)
print(tags)
sorted_tags

{'VERB', '.', 'PRON', 'CONJ', 'PRT', 'NOUN', 'ADV', 'X', 'DET', 'ADJ', 'ADP', 'NUM'}


['.',
 'ADJ',
 'ADP',
 'ADV',
 'CONJ',
 'DET',
 'NOUN',
 'NUM',
 'PRON',
 'PRT',
 'VERB',
 'X',
 '^']

In [24]:
tag_size = len(sorted_tags)

In [25]:
tag_to_idx = {c:i for i, c in enumerate(sorted_tags)}
idx_to_tag = {i:c for i, c in enumerate(sorted_tags)}

In [26]:
words = [word for sent in data for word, tag in sent]
words = set(words)
vocab_size = len(words)
print(vocab_size)

56057


In [27]:
words = list(words)

In [28]:
word_to_idx = {c:i for i, c in enumerate(words)}
idx_to_word = {i:c for i, c in enumerate(words)}

In [29]:
sent_by_word = []
sent_by_tag = []
for sent in data:
    s = []
    t = []
    for word, tag in sent:
        s.append(word)
        t.append(tag)
    sent_by_word.append(s)
    sent_by_tag.append(t)
print(len(sent_by_word))
print(len(sent_by_tag))

57340
57340


In [30]:
print(len(data))

57340


In [31]:
!pip install sentence-transformers

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [32]:
# pip install sentence-transformers
from sentence_transformers import SentenceTransformer
import numpy as np

# Load once (downloads the model the first time)
_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

In [33]:
vocab_size = 384

In [None]:
##### Imports #####
from tqdm import tqdm
import numpy as np

train_X = sent_by_word
train_y = sent_by_tag

##### Helper Functions #####
def Encode(text, vocab_size):
#     output = np.zeros((vocab_size, 1))
#     output[word_to_idx[text]] = 1
    output = np.reshape(Embedding(text), (-1, 1))
    

    return output

def Embedding(text: str) -> np.ndarray:
    """Returns a dense 384-dimensional embedding."""
    return _model.encode(text, normalize_embeddings=True)

# Xavier Normalized Initialization
def initWeights(input_size, output_size):
    return np.random.uniform(-1, 1, (output_size, input_size)) * np.sqrt(6 / (input_size + output_size))

##### Activation Functions #####
def sigmoid(input, derivative = False):
    if derivative:
        return input * (1 - input)
    
    return 1 / (1 + np.exp(-input))

def tanh(input, derivative = False):
    if derivative:
        return 1 - input ** 2
    
    return np.tanh(input)

def softmax(input):
    return np.exp(input) / np.sum(np.exp(input))

##### Long Short-Term Memory Network Class #####
class LSTM:
    def __init__(self, input_size, hidden_size, output_size, num_epochs, learning_rate):
        # Hyperparameters
        self.learning_rate = learning_rate
        self.hidden_size = hidden_size
        self.num_epochs = num_epochs

        # Forget Gate
        self.wf = initWeights(input_size, hidden_size)
        self.bf = np.zeros((hidden_size, 1))

        # Input Gate
        self.wi = initWeights(input_size, hidden_size)
        self.bi = np.zeros((hidden_size, 1))

        # Candidate Gate
        self.wc = initWeights(input_size, hidden_size)
        self.bc = np.zeros((hidden_size, 1))

        # Output Gate
        self.wo = initWeights(input_size, hidden_size)
        self.bo = np.zeros((hidden_size, 1))

        # Final Gate
        self.wy = initWeights(hidden_size, output_size)
        self.by = np.zeros((output_size, 1))

    # Reset Network Memory
    def reset(self):
        self.concat_inputs = {}

        self.hidden_states = {-1:np.zeros((self.hidden_size, 1))}
        self.cell_states = {-1:np.zeros((self.hidden_size, 1))}

        self.activation_outputs = {}
        self.candidate_gates = {}
        self.output_gates = {}
        self.forget_gates = {}
        self.input_gates = {}
        self.outputs = {}

    # Forward Propogation
    def forward(self, inputs):
        self.reset()

        outputs = []
        for q in range(len(inputs)):
            self.concat_inputs[q] = np.concatenate((self.hidden_states[q - 1], inputs[q]))

            self.forget_gates[q] = sigmoid(np.dot(self.wf, self.concat_inputs[q]) + self.bf)
            self.input_gates[q] = sigmoid(np.dot(self.wi, self.concat_inputs[q]) + self.bi)
            self.candidate_gates[q] = tanh(np.dot(self.wc, self.concat_inputs[q]) + self.bc)
            self.output_gates[q] = sigmoid(np.dot(self.wo, self.concat_inputs[q]) + self.bo)

            self.cell_states[q] = self.forget_gates[q] * self.cell_states[q - 1] + self.input_gates[q] * self.candidate_gates[q]
            self.hidden_states[q] = self.output_gates[q] * tanh(self.cell_states[q])

            outputs += [np.dot(self.wy, self.hidden_states[q]) + self.by]

        return outputs

    # Backward Propogation
    def backward(self, errors, inputs):
        d_wf, d_bf = 0, 0
        d_wi, d_bi = 0, 0
        d_wc, d_bc = 0, 0
        d_wo, d_bo = 0, 0
        d_wy, d_by = 0, 0

        dh_next, dc_next = np.zeros_like(self.hidden_states[0]), np.zeros_like(self.cell_states[0])
        for q in reversed(range(len(inputs))):
            error = errors[q]

            # Final Gate Weights and Biases Errors
            d_wy += np.dot(error, self.hidden_states[q].T)
            d_by += error

            # Hidden State Error
            d_hs = np.dot(self.wy.T, error) + dh_next

            # Output Gate Weights and Biases Errors
            d_o = tanh(self.cell_states[q]) * d_hs 
            d_wo += np.dot(d_o, inputs[q].T)* sigmoid(self.output_gates[q], derivative = True)
            d_bo += d_o* sigmoid(self.output_gates[q], derivative = True)

            # Cell State Error
            d_cs = tanh(tanh(self.cell_states[q]), derivative = True) * self.output_gates[q] * d_hs + dc_next

            # Forget Gate Weights and Biases Errors
            d_f = d_cs * self.cell_states[q - 1] 
            d_wf += np.dot(d_f, inputs[q].T)* sigmoid(self.forget_gates[q], derivative = True)
            d_bf += d_f* sigmoid(self.forget_gates[q], derivative = True)

            # Input Gate Weights and Biases Errors
            d_i = d_cs * self.candidate_gates[q] 
            d_wi += np.dot(d_i, inputs[q].T) * sigmoid(self.input_gates[q], derivative = True)
            d_bi += d_i * sigmoid(self.input_gates[q], derivative = True)
            
            # Candidate Gate Weights and Biases Errors
            d_c = d_cs * self.input_gates[q] 
            d_wc += np.dot(d_c, inputs[q].T) * tanh(self.candidate_gates[q], derivative = True)
            d_bc += d_c * tanh(self.candidate_gates[q], derivative = True)

            # Concatenated Input Error (Sum of Error at Each Gate!)
            d_z = np.dot(self.wf.T, d_f * sigmoid(self.forget_gates[q], derivative = True)) + 
                    np.dot(self.wi.T, d_i*sigmoid(self.input_gates[q], derivative = True)) + 
                    np.dot(self.wc.T, d_c* tanh(self.candidate_gates[q], derivative = True)) + 
                    np.dot(self.wo.T, d_o* sigmoid(self.output_gates[q], derivative = True))

            # Error of Hidden State and Cell State at Next Time Step
            dh_next = d_z[:self.hidden_size, :]
            dc_next = self.forget_gates[q] * d_cs

#         for d_ in (d_wf, d_bf, d_wi, d_bi, d_wc, d_bc, d_wo, d_bo, d_wy, d_by):
#             np.clip(d_, -1, 1, out = d_)

        self.wf -= d_wf * self.learning_rate
        self.bf -= d_bf * self.learning_rate

        self.wi -= d_wi * self.learning_rate
        self.bi -= d_bi * self.learning_rate

        self.wc -= d_wc * self.learning_rate
        self.bc -= d_bc * self.learning_rate

        self.wo -= d_wo * self.learning_rate
        self.bo -= d_bo * self.learning_rate

        self.wy -= d_wy * self.learning_rate
        self.by -= d_by * self.learning_rate

    # Train
    def train(self, original_inputs, labels):
        for _ in tqdm(range(self.num_epochs)):
            for inputs1, labels1 in zip(original_inputs, labels):
                inputs = [Encode(input, vocab_size) for input in inputs1]

                predictions = self.forward(inputs)

                errors = []
                for q in range(len(predictions)):
#                     print(q)
#                     print(labels1[q])
                    errors += [softmax(predictions[q])]
                    errors[-1][tag_to_idx[labels1[q]]] -= 1

                self.backward(errors, self.concat_inputs)
    
    # Test
    def test(self, original_inputs, labels):
        accuracy = 0
        to_divide = 0
        for inputs, labels1 in zip(original_inputs, labels):
            probabilities = self.forward([Encode(input, vocab_size) for input in inputs])

            output = ''
            to_divide = to_divide + len(labels1)
            for q in range(len(labels1)):
#                 print(np.random.choice([*range(vocab_size)]))
#                 print(softmax(probabilities[q].reshape(-1)))
                probs = softmax(probabilities[q].reshape(-1))
                pred_idx = np.argmax(probs)   # deterministic prediction
                prediction = idx_to_tag[pred_idx]

                output += prediction

                if prediction == labels1[q]:
                    accuracy += 1
                

#         print(f'Ground Truth:\nt{labels}\n')
#         print(f'Predictions:\nt{"".join(output)}\n')
        
        print(f'Accuracy: {round(accuracy * 100 / to_divide, 2)}%')
        
# Initialize Network
hidden_size = 128

lstm = LSTM(input_size = vocab_size + hidden_size, hidden_size = hidden_size, output_size = tag_size, num_epochs = 10, learning_rate = 0.05)

##### Training #####
lstm.train(train_X[:500], train_y[:500])

##### Testing #####
lstm.test(train_X[500:600], train_y[500:600])