# Mathematical Operations Seq2Seq

Seq2Seq models, originally designed for translation tasks, have found applications beyond language. One such application is solving mathematical operations. By training the model on pairs of mathematical expressions and their corresponding results, it learns to understand and predict outcomes. This process involves encoding the expression into a fixed-size representation and decoding it to generate the result. Once trained, the model can accurately predict results for new expressions, offering a versatile solution for mathematical problem-solving.

In [106]:
import numpy as np
import tensorflow as tf
import keras
from keras.layers import Embedding, LSTM, StringLookup, Dense, Input, RepeatVector, TimeDistributed, StringLookup, TextVectorization

## Load Dataset

In [3]:
path_hard = '/kaggle/input/mathematical-operations-dataset1/output_hard.txt'
path_simple = '/kaggle/input/mathematical-operations-dataset1/output_simple.txt'

In [4]:
def read_dataset(path):
    X = []
    Y = []
    with open(path, 'r') as file:
        for row in file:
            values = row.strip().split(',')  # Strip off newline and split by comma
            X.append(values[0])
            Y.append(values[1])
    return X, Y

In [5]:
X_hard, y_hard = read_dataset(path_hard)
X_simple, y_simple = read_dataset(path_simple)

## Visualization

In [6]:
for i in range(5):
    print(f'{X_hard[i]} = {y_hard[i]}')

(7+5)-5-8 = -1
1+4*4 = 17
1*(2-2) = 0
2-(7*5) = -33
((6+5)-4+2) = 9


In [7]:
for i in range(5):
    print(f'{X_simple[i]} = {y_simple[i]}')

9*1 = 9
9*8 = 72
4-4 = 0
6-8 = -2
0-6 = -6


In [8]:
print(len(X_hard))
print(len(X_simple))

200000
200000


## Preprocessing

In [9]:
def concate(X_hard, y_hard, X_simple, y_simple):
    
    X = np.concatenate((np.array(X_hard), np.array(X_simple)), axis=0)
    Y = np.concatenate((np.array(y_hard), np.array(y_simple)), axis=0)
    
    return X, Y

In [10]:
X, Y = concate(X_hard, y_hard, X_simple, y_simple)

X_MAX_LEN = max([len(x) for x in X])
print(f'X_train_max_len_ops = {X_MAX_LEN}')
Y_MAX_LEN = max([len(y) for y in Y])
print(f'Y_train_max_len_output = {Y_MAX_LEN}')

X_train_max_len_ops = 13
Y_train_max_len_output = 4


In [11]:
def pre_padding(text, max_len):
    for i in range(len(text)):
        text[i] = text[i].rjust(max_len)
    return text

In [12]:
X_hard = pre_padding(X_hard, X_MAX_LEN)
y_hard = pre_padding(y_hard, Y_MAX_LEN)
X_simple = pre_padding(X_simple, X_MAX_LEN)
y_simple = pre_padding(y_simple, Y_MAX_LEN)

In [13]:
def train_test_val_split(X, Y, percentage):
    length_X = len(X)
    p_split = int(length_X * (percentage / 100))
    
    X_train = X[:-p_split]
    Y_train = Y[:-p_split]
    
    X = X[-p_split:]
    X_test = X[-(p_split // 2):]
    X_val = X[:-(p_split // 2)]
    
    Y = Y[-p_split:]
    Y_test = Y[-(p_split // 2):]
    Y_val = Y[:-(p_split // 2)]
    
    return (X_train, Y_train), (X_test, Y_test), (X_val, Y_val) 


In [14]:
(H_X_train, H_Y_train), (H_X_test, H_Y_test), (H_X_val, H_Y_val) = train_test_val_split(X_hard, y_hard, percentage=20)
(S_X_train, S_Y_train), (S_X_test, S_Y_test), (S_X_val, S_Y_val) = train_test_val_split(X_simple, y_simple, percentage=20)

In [15]:
X_train, Y_train = concate(H_X_train, H_Y_train, S_X_train, S_Y_train)
X_test, Y_test = concate(H_X_test, H_Y_test, S_X_test, S_Y_test)
X_val, Y_val = concate(H_X_val, H_Y_val, S_X_val, S_Y_val)

In [16]:
i = 100055
f'{X_train[i]} = {Y_train[i]}'

'        5+1+9 =   15'

In [17]:
combined_string = ''.join(X_train[:500])
X_vocabs = sorted(set(combined_string))

combined_string = ''.join(Y_train[:500])
Y_vocabs = sorted(set(combined_string))

print(len(Y_vocabs))
print(Y_vocabs)

print(len(X_vocabs))
print(X_vocabs)

12
[' ', '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
16
[' ', '(', ')', '*', '+', '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9']


In [18]:
X_ids_to_chars = StringLookup(vocabulary=X_vocabs, mask_token=None, invert=True)
X_chars_to_ids = StringLookup(vocabulary=X_vocabs, mask_token=None, invert=False)

Y_ids_to_chars = StringLookup(vocabulary=Y_vocabs, mask_token=None, invert=True)
Y_chars_to_ids = StringLookup(vocabulary=Y_vocabs, mask_token=None, invert=False)

In [19]:
X_ids_to_chars.get_vocabulary()

['[UNK]',
 ' ',
 '(',
 ')',
 '*',
 '+',
 '-',
 '0',
 '1',
 '2',
 '3',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9']

## Create Sequences

In [20]:
def create_seq(X, Y):
    x = tf.strings.unicode_split(X, 'UTF-8')
    y = tf.strings.unicode_split(Y, 'UTF-8')
    
    x = X_chars_to_ids(x)
    y = Y_chars_to_ids(y)

    return x, y

In [21]:
def ids_to_text(ids, ids_to_chars):
    characters = ids_to_chars(ids).numpy()
    decoded_strings = np.vectorize(lambda x: x.decode('utf-8'))(characters)
    decoded_characters_str = [''.join(decoded_strings[i]) for i in range(len(decoded_strings))]
    return decoded_characters_str

In [22]:
x = tf.strings.unicode_split(X_train[0], 'UTF-8')
x = X_chars_to_ids(x)
ids_to_text(x, X_ids_to_chars)

[' ', ' ', ' ', ' ', '(', '7', '+', '5', ')', '-', '5', '-', '8']

## Create Pipline

In [23]:
BATCH_SIZE = 256
SHUFFLE_BUFFER = 1000
AUTOTUNE = tf.data.experimental.AUTOTUNE

dataset = tf.data.Dataset.from_tensor_slices((X_train, Y_train))
training = dataset.map(create_seq)
training = training.cache()
training = training.shuffle(SHUFFLE_BUFFER)
training = training.batch(BATCH_SIZE, num_parallel_calls=AUTOTUNE, drop_remainder=True)
training = training.prefetch(AUTOTUNE)

In [24]:
dataset = tf.data.Dataset.from_tensor_slices((X_val, Y_val))
validation = dataset.map(create_seq)
validation = validation.cache()
validation = validation.batch(BATCH_SIZE, num_parallel_calls=AUTOTUNE)
validation = validation.prefetch(AUTOTUNE)


dataset = tf.data.Dataset.from_tensor_slices((X_test, Y_test))
test = dataset.map(create_seq)
test = test.cache()
test = test.batch(BATCH_SIZE, num_parallel_calls=AUTOTUNE)
test = test.prefetch(AUTOTUNE)

In [25]:
for x, y in training.take(1):
    print(x , y)

tf.Tensor(
[[ 1  1  1 ...  7  5  8]
 [ 1  1  2 ... 14  3  3]
 [ 1  1  2 ...  6 10  3]
 ...
 [ 1  1  2 ...  4 10  3]
 [ 1  1  1 ... 16  4 10]
 [ 1  1  1 ...  6 12  3]], shape=(256, 13), dtype=int64) tf.Tensor(
[[ 1  1  1  4]
 [ 1  1  4  3]
 [ 1  1  4  5]
 ...
 [ 4 12  7  7]
 [ 1  5  4  9]
 [ 1  1  1  4]], shape=(256, 4), dtype=int64)


## Model

In [31]:
class MYModel(keras.Model):
    def __init__(self, in_seq_len, out_seq_len, embd_dim, rnn_units, x_vocab_size, y_vocab_size):
        super(MYModel, self).__init__()
        
        self.embedding_layer = Embedding(x_vocab_size, embd_dim)
        self.lstm1_layer = LSTM(rnn_units[0])
        self.repeat_vector = RepeatVector(out_seq_len)
        self.lstm2_layer = LSTM(rnn_units[1], return_sequences=True)
        self.dense_layer = TimeDistributed((Dense(y_vocab_size, activation='softmax')))
        
    def build(self, input_shape):
        inputs = Input(shape=input_shape[1:])
        self.call(inputs)
        super(MYModel, self).__init__()
        
    def call(self, inputs):
        
        x = self.embedding_layer(inputs)
        x = self.lstm1_layer(x)
        x = self.repeat_vector(x)
        x = self.lstm2_layer(x)
        x = self.dense_layer(x)
        
        return x

In [53]:
OUTPUT_SEQ_LEN = Y_MAX_LEN
INPUT_SEQ_LEN = X_MAX_LEN
EMBD_DIM = 60
RNN_UNITS = [512, 1024]
X_VOCAB_SIZE = len(X_ids_to_chars.get_vocabulary())
Y_VOCAB_SIZE = len(Y_ids_to_chars.get_vocabulary())

model = MYModel(INPUT_SEQ_LEN, OUTPUT_SEQ_LEN, EMBD_DIM, RNN_UNITS, X_VOCAB_SIZE, Y_VOCAB_SIZE)

In [54]:
model.build(input_shape=(None, INPUT_SEQ_LEN))
model.summary()

In [55]:
model.compile(optimizer=tf.optimizers.Adam(3e-3), loss=tf.losses.SparseCategoricalCrossentropy(), metrics=['acc'])

In [56]:
model.fit(training, epochs=50, validation_data=validation)

Epoch 1/50
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 20ms/step - acc: 0.6891 - loss: 0.8818 - val_acc: 0.8420 - val_loss: 1.1510
Epoch 2/50
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 19ms/step - acc: 0.8468 - loss: 0.4197 - val_acc: 0.9204 - val_loss: 0.3519
Epoch 3/50
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 19ms/step - acc: 0.8875 - loss: 0.2952 - val_acc: 0.9352 - val_loss: 0.2393
Epoch 4/50
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 19ms/step - acc: 0.9049 - loss: 0.2464 - val_acc: 0.9367 - val_loss: 0.1935
Epoch 5/50
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 19ms/step - acc: 0.9235 - loss: 0.1966 - val_acc: 0.9538 - val_loss: 0.1248
Epoch 6/50
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 19ms/step - acc: 0.9403 - loss: 0.1500 - val_acc: 0.9597 - val_loss: 0.1156
Epoch 7/50
[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m

<keras.src.callbacks.history.History at 0x7ed725bb2260>

## Evaluate

In [57]:
model.evaluate(test)

[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - acc: 0.9822 - loss: 0.0654


[0.037631914019584656, 0.9895312786102295]

In [58]:
pred = model.predict(test)
pred.shape

[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step


(40000, 4, 13)

In [59]:
ids_per_prediction = tf.argmax(pred, axis=2)
pred = ids_to_text(ids_per_prediction, Y_ids_to_chars)

In [60]:
for i in range(10):
    print(f'{X_test[i]} ==> predicted:{pred[i]}, True value {Y_test[i]}')

  ((2*7)-0-5) ==> predicted:   9, True value    9
  (8*4+(6*7)) ==> predicted:  70, True value   74
        7-5*5 ==> predicted: -18, True value  -18
    (2+4-3+8) ==> predicted:  11, True value   11
    (5-1)-2*5 ==> predicted:  -6, True value   -6
      5+(1+8) ==> predicted:  14, True value   14
      0+(5-7) ==> predicted:  -2, True value   -2
        5*2+5 ==> predicted:  15, True value   15
      4+2-2-0 ==> predicted:   4, True value    4
  ((1+7)+0+7) ==> predicted:  15, True value   15


In [103]:
def predict(text):
    x = pre_padding(text, X_MAX_LEN)
    x, _ = create_seq(x, ['1'])
    x = x.numpy()
    p = model.predict(x)
    p = tf.argmax(p, axis=2)
    return ids_to_text(list(p), Y_ids_to_chars)

In [105]:
text = ['((0-7)*1+2)']
predict(text)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step


['  -5']