In [None]:
import json
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from sklearn.metrics import confusion_matrix

class RecurrentPerceptron:
    def __init__(self, input_size, learning_rate=0.01):
        self.input_size = input_size
        self.learning_rate = learning_rate

        # Initialize weights
        self.W_in = np.random.randn(1, input_size)
        self.W_rec = np.random.randn(1, 1)

        # Initialize bias
        self.b_hidden = np.zeros((1, 1))

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def forward(self, x):
        T = len(x)
        # print(T, type(x), x[0])

        self.hidden_states = np.zeros((T+1, 1, 1))
        self.outputs = np.zeros((T, 1, 1))

        for t in range(T):
            # Update hidden state
            self.hidden_states[t+1] = self.sigmoid(np.dot(self.W_in, x[t]) + np.dot(self.W_rec, self.hidden_states[t]) + self.b_hidden)
            # print(self.hidden_states[t+1,0,0])
            # Output is same as hidden state
            self.outputs[t] = self.hidden_states[t+1]

        return self.outputs.flatten()

    def backward(self, x, y, outputs):
        # print(outputs,y)
        T = len(x)
        dW_in = np.zeros_like(self.W_in)
        dW_rec = np.zeros_like(self.W_rec)
        db_hidden = np.zeros_like(self.b_hidden)
        dh, dhnext = 0, 0

        delta_out = outputs - y

        for t in range(T-1, -1, -1):
            # Backpropagate through time
            delta_hidden = delta_out[t]
            sum_win = 0
            sum_wrec = 0
            for i in range(t,-1,-1):
              if i < t-50:
                break
              sum_win += x[i].T*(self.W_rec**(t-i))
              if i == t:
                continue
              sum_wrec += outputs[i]*(self.W_rec**(t-i-1))

            # Update input layer weights
            dW_in += (delta_hidden*sum_win)/T
            # Update hidden layer recurrent weights
            dW_rec += (delta_hidden*sum_wrec)/T
            # Update hidden layer bias
            db_hidden += delta_hidden/T

        # Clip gradients to prevent explosion
        clip_value = 1
        dW_in = np.clip(dW_in, -clip_value, clip_value)
        dW_rec = np.clip(dW_rec, -clip_value, clip_value)
        db_hidden = np.clip(db_hidden, -clip_value, clip_value)

        # Update weights and biases
        self.W_in -= self.learning_rate * dW_in
        self.W_rec -= self.learning_rate * dW_rec
        self.b_hidden -= self.learning_rate * db_hidden

    def train(self, x_train, y_train, epochs, print_losses = True):
        for epoch in range(epochs):
            total_loss = 0
            for x, y in zip(x_train, y_train):
                outputs = self.forward(x)
                loss = self.cross_entropy_loss(outputs, y)
                total_loss += loss
                self.backward(x, y, outputs)
            if print_losses == False:
              continue
            print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/14041}")
            # print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/5000}")

    def cross_entropy_loss(self, outputs, y):
        # print(outputs.shape, y.shape)
        eps = 1e-10  # Small constant to prevent log(0)
        loss = -np.sum(y * np.log(outputs + eps) + (1 - y) * np.log(1 - outputs + eps))
        return loss

    def get_weights(self):
      return self.W_in, self.W_rec, self.b_hidden

## Load Training Data

In [None]:
# Load training data from JSONL file
x_train = []
y_train = []

# l = 0
with open('train.jsonl', 'r') as f:
    for line in f:
        # l += 1
        # if l == 5000:
        #   break
        data = json.loads(line)
        tokens = data["tokens"]
        pos_tags = data["pos_tags"]
        chunk_tags = data["chunk_tags"]

        # Add start of sentence tag at the beginning of each sentence
        pos_tags.insert(0, 0)  # Assuming start of sentence tag is represented by 0

        sentence_x = []
        sentence_y = []

        for i in range(len(tokens)):
            current_pos = pos_tags[i + 1]  # Adjust index to start from 1
            previous_pos = pos_tags[i] if i > 0 else 0  # Assuming start of sentence token is represented by 0

            # One-hot encode current and previous POS tags
            current_pos_one_hot = np.zeros((4, 1))
            current_pos_one_hot[current_pos - 1] = 1  # Adjust index to start from 0
            previous_pos_one_hot = np.zeros((5, 1))
            previous_pos_one_hot[previous_pos] = 1

            # Concatenate current and previous POS tags as input
            # input_data = np.vstack((current_pos_one_hot, previous_pos_one_hot))
            input_data = np.vstack((previous_pos_one_hot, current_pos_one_hot))
            sentence_x.append(input_data)

            # Append corresponding chunk tag as output
            sentence_y.append(chunk_tags[i])

        sentence_x = np.array(sentence_x)
        sentence_y = np.array(sentence_y)
        x_train.append(sentence_x)
        y_train.append(sentence_y)

## Load Testing Data

In [None]:
# Load test data from JSONL file
x_test = []
y_test = []

with open('test.jsonl', 'r') as f:
    for line in f:
        data = json.loads(line)
        tokens = data["tokens"]
        pos_tags = data["pos_tags"]
        chunk_tags = data["chunk_tags"]

        # Add start of sentence tag at the beginning of each sentence
        pos_tags.insert(0, 0)  # Assuming start of sentence tag is represented by 0

        sentence_x = []
        sentence_y = []

        for i in range(len(tokens)):
            current_pos = pos_tags[i + 1]  # Adjust index to start from 1
            previous_pos = pos_tags[i] if i > 0 else 0  # Assuming start of sentence token is represented by 0

            # One-hot encode current and previous POS tags
            current_pos_one_hot = np.zeros((4, 1))
            current_pos_one_hot[current_pos - 1] = 1  # Adjust index to start from 0
            previous_pos_one_hot = np.zeros((5, 1))
            previous_pos_one_hot[previous_pos] = 1

            # Concatenate current and previous POS tags as input
            # input_data = np.vstack((current_pos_one_hot, previous_pos_one_hot))
            input_data = np.vstack((previous_pos_one_hot, current_pos_one_hot))
            sentence_x.append(input_data)

            # Append corresponding chunk tag as output
            sentence_y.append(chunk_tags[i])

        sentence_x = np.array(sentence_x)
        sentence_y = np.array(sentence_y)
        x_test.append(sentence_x)
        y_test.append(sentence_y)

## Evaluate model

In [None]:
# Create and train the recurrent perceptron
np.random.seed(10)
rnn = RecurrentPerceptron(input_size=9, learning_rate=0.01)
rnn.train(x_train, y_train, epochs=5)

Epoch 1/5, Loss: 6.166726348172704
Epoch 2/5, Loss: 5.890635589818979
Epoch 3/5, Loss: 5.84594660296437
Epoch 4/5, Loss: 5.821079545573299
Epoch 5/5, Loss: 5.806947265177464


In [None]:
# Evaluate the model on test data
def evaluate(model, x_test, y_test):
    correct = 0
    total = 0

    for x, y in zip(x_test, y_test):
        outputs = model.forward(x)
        predictions = (outputs >= 0.5).astype(int)  # Convert outputs to binary predictions
        correct += np.sum(predictions == y)
        total += len(y)
        # total += 1
        # if False in (predictions == y):
        #   continue
        # correct += 1

    accuracy = correct / total
    return accuracy

# Evaluate on test data
test_accuracy = evaluate(rnn, x_test, y_test)
print("Test Accuracy:", test_accuracy)

Test Accuracy: 0.8371056315279423


In [None]:
# Make predictions on test data
predictions = []
for x in x_test:
    outputs = rnn.forward(x)
    predictions.append(outputs)

# Convert predictions to binary (0 or 1)
predictions_binary = [np.array(p) > 0.5 for p in predictions]

# Flatten y_test for sklearn metrics
y_test_flat = np.concatenate(y_test)
predictions_flat = np.concatenate(predictions_binary)

# Calculate precision, recall, f1 score
precision, recall, f1_score, _ = precision_recall_fscore_support(y_test_flat, predictions_flat, average='binary')

# Calculate accuracy
accuracy = accuracy_score(y_test_flat, predictions_flat)

# Compute confusion matrix
conf_matrix = confusion_matrix(y_test_flat, predictions_flat)

# Extract true positives, false positives, true negatives, false negatives
tn, fp, fn, tp = conf_matrix.ravel()

print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)
print("Total Test Accuracy:", accuracy)
print("Accuracy Class-1:", tp/(tp+fn))
print("Accuracy Class-0:", tn/(tn+fp))
print("True positives:", tp, ", False positives:", fp, ", True negatives:", tn, ", False negatives:", fn)

Precision: 0.8120050593928728
Recall: 0.9759732963183291
F1 Score: 0.886470747155765
Total Test Accuracy: 0.8371056315279423
Accuracy Class-1: 0.9759732963183291
Accuracy Class-0: 0.5773629226679854
True positives: 29531 , False positives: 6837 , True negatives: 9340 , False negatives: 727


## Weights satisfying conditions

In [None]:
Win, Wrec, b = rnn.get_weights()

In [None]:
Win, Wrec, b

(array([[ 5.09518762, -0.73548008, -1.37135318, -1.30932991,  1.09020914,
         -1.13124588,  2.13834087, -0.34739816,  0.84361143]]),
 array([[-1.00297167]]),
 array([[1.04960552]]))

In [None]:
def check_conditions(W, V, thresh=0.5) :
        # w = [w^, w_nn_prev, w_dt_prev, w_jj_prev, w_ot_prev, w_nn, w_dt, w_jj, w_ot]
      conditions = [
      W[0] + W[-3] >= thresh,
      W[0] + W[-2] >= thresh,
      W[0] + W[-4] >= thresh,
      W[0] + W[-1] >= thresh,
      V + W[2] + W[-2] <= thresh,
      V + W[2] + W[-4] <= thresh,
      W[3] + W[-2] <= thresh,
      W[3] + W[-4] <= thresh,
      V + W[3] + W[-2] <= thresh,
      V + W[3] + W[-4] <= thresh,
      W[1] + W[-1] >= thresh,
      V + W[1] + W[-1] >= thresh,
      V + W[4] + W[-3] >= thresh,
      V + W[4] + W[-2] >= thresh,
      V + W[4] + W[-4] >= thresh,
      V + W[4] + W[-1] >= thresh]

      false_cond = []

      for i, c in enumerate(conditions):
        if c == False:
          false_cond.append(i)

      if false_cond == []:
        print("All conditions satisfied")
      else:
        for j in false_cond:
          print("Condition", j, "is false")

      return

In [None]:
check_conditions(Win.flatten(), Wrec[0,0], -b)

All conditions satisfied


## 5 Fold Cross Validation

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, confusion_matrix

kf = KFold(n_splits=5, shuffle=True, random_state=0)
i = 0

for train_index, val_index in kf.split(x_train):
    print(f'Split {i+1}')
    # print(len(train_index))
    x_train_fold = [x_train[i] for i in train_index]
    y_train_fold = [y_train[i] for i in train_index]
    x_val_fold = [x_train[i] for i in val_index]
    y_val_fold = [y_train[i] for i in val_index]

    # Create and train the recurrent perceptron
    rnn = RecurrentPerceptron(input_size=9, learning_rate=0.1)
    rnn.train(x_train_fold, y_train_fold, epochs=5, print_losses = False)

    # Make predictions on validation data
    predictions = []
    for x in x_val_fold:
        outputs = rnn.forward(x)
        predictions.append(outputs)

    # Convert predictions to binary (0 or 1)
    predictions_binary = [np.array(p) > 0.5 for p in predictions]

    # Flatten y_val_fold for sklearn metrics
    y_val_flat = np.concatenate(y_val_fold)
    predictions_flat = np.concatenate(predictions_binary)

    # Calculate precision, recall, f1 score
    precision, recall, f1_score, _ = precision_recall_fscore_support(y_val_flat, predictions_flat, average='binary')

    # Calculate accuracy
    accuracy = accuracy_score(y_val_flat, predictions_flat)

    # Compute confusion matrix
    conf_matrix = confusion_matrix(y_val_flat, predictions_flat)

    # Extract true positives, false positives, true negatives, false negatives
    tn, fp, fn, tp = conf_matrix.ravel()

    # Calculate accuracy of class 0 and class 1
    accuracy_class_0 = tn / (tn + fp) if tn + fp > 0 else 0
    accuracy_class_1 = tp / (tp + fn) if tp + fn > 0 else 0


    print('Precision:', precision)
    print('Recall:', recall)
    print('F1 Score:', f1_score)
    print('Total Accuracy:', accuracy)
    print('Accuracy Class-0:', accuracy_class_0)
    print('Accuracy Class-1:', accuracy_class_1)
    print('True Positives:', tp, ', False Positives:', fp, ', True Negatives:', tn, ', False Negatives:', fn)
    print()

    i += 1

Split 1
Precision: 0.5326494150193073
Recall: 0.47097793405697397
F1 Score: 0.4999188619029588
Total Accuracy: 0.37634916351861847
Accuracy Class-0: 0.1911221945137157
Accuracy Class-1: 0.47097793405697397
True Positives: 9242 , False Positives: 8109 , True Negatives: 1916 , False Negatives: 10381

Split 2
Precision: 0.747615136774275
Recall: 0.5966522133709792
F1 Score: 0.6636570114684688
Total Accuracy: 0.5980497285704711
Accuracy Class-0: 0.6008193445243805
Accuracy Class-1: 0.5966522133709792
True Positives: 11834 , False Positives: 3995 , True Negatives: 6013 , False Negatives: 8000

Split 3
Precision: 0.7745789995047053
Recall: 0.6412937618535035
F1 Score: 0.7016628810184796
Total Accuracy: 0.6341220166448862
Accuracy Class-0: 0.6195004702685756
Accuracy Class-1: 0.6412937618535035
True Positives: 12511 , False Positives: 3641 , True Negatives: 5928 , False Negatives: 6998

Split 4
Precision: 0.8062553000448945
Recall: 0.8158186957399556
F1 Score: 0.8110088060412957
Total Accurac

## GUI

In [None]:
!pip install gradio

In [None]:
import gradio as gr
import numpy as np

# Define the function to predict chunk tags for custom inputs
def predict_chunk_tags_custom(pos_tags):
    # Assuming the input string contains space-separated integers
    pos_tags = list(map(int, pos_tags.split()))

    sentence_x = []

    for i in range(len(pos_tags)):
        current_pos = pos_tags[i]
        previous_pos = pos_tags[i - 1] if i > 0 else 0

        current_pos_one_hot = np.zeros((4, 1))
        current_pos_one_hot[current_pos - 1] = 1
        previous_pos_one_hot = np.zeros((5, 1))
        previous_pos_one_hot[previous_pos] = 1

        input_data = np.vstack((previous_pos_one_hot, current_pos_one_hot))
        sentence_x.append(input_data)

    sentence_x = np.array(sentence_x)
    outputs = rnn.forward(sentence_x)
    predictions = (outputs >= 0.5).astype(int)
    predicted_chunk_tags = "".join(map(str, predictions))

    return predicted_chunk_tags

gr.Interface(
    predict_chunk_tags_custom,
    inputs= gr.Textbox(label="Enter POS tags (space-separated)"),
    outputs= gr.Text(label="Predicted Chunk Tags")
).launch(debug=False)
