# Dataset for the Arithmetic Equation Correction (AEC) Task

In [1]:
#!/usr/bin/env python
# -*- coding:utf-8 -*-

__author__ = 'Shining'
__email__ = 'mrshininnnnn@gmail.com'

In [2]:
# dependency
# public
import os
import numpy as np
from tqdm import tqdm
# private
from utils import save_txt

## Simulated Error Types

1. Deletion  
2. Insertion  
3. Substitution  

In [3]:
# Class to generate dataset for Arithmetic Equation Correction (AEC)
class ArithmeticEquationCorrection(): 
    """docstring for ArithmeticEquationCorrection"""
    def __init__(self, operators, num_size):
        super().__init__()
        self.operators = operators
        self.pos_digits = np.arange(2, num_size+2).tolist()
        self.neg_digits = np.arange(-num_size, -1).tolist()
        self.digits = self.pos_digits + self.neg_digits
        
        def delete(tk_y, idx): 
            tk_y[idx] = ''
            return tk_y
        def insert(tk_y, idx): 
            tk_y[idx] = str(np.random.choice(self.operators+self.pos_digits)) + ' ' + tk_y[idx]
            return tk_y 
        def sub(tk_y, idx):
            tk_y[idx] = str(np.random.choice(self.operators+self.pos_digits))
            return tk_y
        
        self.trans_funs = [delete, insert, sub]
    
    def gen_base_dict(self):
        return {str(i):[] for i in self.pos_digits}
    
    def gen_operation(self, seq_len):
        if seq_len == 1:
            a = np.random.choice(self.digits)
            return [str(a)]
        else:
            left_side  = self.gen_operation(seq_len-1)
            o = np.random.choice(self.operators)
            b = np.random.choice(self.pos_digits)
            return left_side + [o, str(b)]
    
    def gen_operation_list(self, seq_len, data_size):
        # to control the data size
        operations_pool = set()
        for i in tqdm(range(data_size)):
            while True: 
                # to avoid duplicates
                operation = self.gen_operation(seq_len) 
                if ''.join(operation) in operations_pool: 
                    continue
                else:
                    operations_pool.add(''.join(operation)) 
                # to avoid zero division error
                try: 
                    # flost to int to string
                    value = eval(''.join(operation))
                    if value % 1 != 0.: 
                        continue
                    else:
                        value = str(int(value))
                        # to keep vocab size
                        if value in self.value_dict: 
                            self.value_dict[value].append(operation)
                            break
                except: 
                    pass
    
    def gen_equation_list(self):
        ys = []
        for v in self.value_dict:
            for y in self.value_dict[v]:
                y = y[0].replace('-', '- ').split() + y[1:]
                y += ["=="] + [v]
                ys.append(' '.join(y))
        return ys
    
    def transform(self, tk_y, idxes): 
        for idx in idxes: 
            f = np.random.choice(self.trans_funs)
            tk_y = f(tk_y, idx)
        return tk_y
        
    def random_transform(self, ys, num_errors): 
        xs = []
        for y in ys:
            tk_y = y.split() 
            y_len = len(tk_y) - 1
            num_idxes = np.random.choice(range(num_errors+1))
            idxes = sorted(np.random.choice(range(y_len), num_idxes, False))
            tk_x = self.transform(tk_y, idxes)
            xs.append(' '.join([x for x in tk_x if len(x)>0]))
        return xs
    
    def generate(self, seq_len, data_size, num_errors):
        # input sequences, output sequences
        xs, ys = [], []
        self.value_dict = self.gen_base_dict()
        self.gen_operation_list(
            seq_len=seq_len, 
            data_size=data_size)
        ys = self.gen_equation_list()
        xs = self.random_transform(ys, num_errors)
        
        return xs, ys

In [4]:
# data parameters 
N = 10
L = 5
D = 10000
operators = ['+', '-', '*', '/']
num_errors = 3

In [5]:
aec = ArithmeticEquationCorrection(operators, N)
xs, ys = aec.generate(L-1, D, num_errors)

100%|██████████| 10000/10000 [00:10<00:00, 962.11it/s]


In [6]:
len(xs)

10000

In [7]:
xs[:15]

['5 - 5 + 10 + 7 - 10 == 2',
 '5 + 5 - 2 - 4 == 2',
 '- 4 + 3 4 - 6 == 2',
 '- 2 / 2 + - - 5 == 2',
 '9 5 6 * 6 - 7 == 2',
 '8 7 - 11 + + 3 == 2',
 '11 - 3 / 3 3 8 2',
 '- + 6 6 + 8 * 8 / 8 == 2',
 '- 7 2 4 + 8 == 2',
 '5 - 10 * 2 + 11 * 5 == 2',
 '7 * 2 - * * 3 == 2',
 '5 - 4 + 8 - 7 == 2',
 '10 - 7 7 4 + 3 == 2',
 '11 - 11 + 7 - 8 / == 2',
 '- 8 + 5 + 10 - 5 == 2']

In [8]:
len(ys)

10000

In [9]:
ys[:15]

['- 5 + 10 + 7 - 10 == 2',
 '5 + 5 - 4 - 4 == 2',
 '- 4 + 3 * 4 - 6 == 2',
 '- 2 / 2 + 8 - 5 == 2',
 '9 / 6 * 6 - 7 == 2',
 '8 - 11 + 2 + 3 == 2',
 '11 - 3 / 3 - 8 == 2',
 '- 6 + 8 * 8 / 8 == 2',
 '2 - 2 * 4 + 8 == 2',
 '- 10 * 2 + 11 * 2 == 2',
 '7 * 2 - 4 * 3 == 2',
 '5 - 4 + 8 - 7 == 2',
 '10 - 7 - 4 + 3 == 2',
 '- 3 + 7 - 8 / 4 == 2',
 '- 8 + 5 + 10 - 5 == 2']

In [11]:
(np.array(xs) == np.array(ys)).sum()/D

0.2442

In [12]:
# train val test split
dataset = np.array([(x, y) for x, y in zip(xs, ys)])
data_size = dataset.shape[0]
indices = np.random.permutation(data_size)
train_size = int(0.7*data_size)
val_size = int(0.15*data_size)
test_size = data_size - train_size - val_size
train_idxes = indices[:train_size]
val_idxes = indices[train_size: train_size+val_size]
test_idxes = indices[train_size+val_size:]
trainset = dataset[train_idxes]
valset = dataset[val_idxes]
testset = dataset[test_idxes]
print('train size', train_size, trainset.shape)
print('val size', val_size, valset.shape)
print('test size', test_size, testset.shape)

train size 7000 (7000, 2)
val size 1500 (1500, 2)
test size 1500 (1500, 2)


In [13]:
# to save dataset
outdir = 'aec/'
outdir = os.path.join(
    outdir, 
    '{}N'.format(N), 
    '{}L'.format(L), 
    '{}D'.format(D))
if not os.path.exists(outdir): 
    os.makedirs(outdir)
outdir

'aec/10N/5L/10000D'

In [14]:
save_txt(os.path.join(outdir, 'train_x.txt'), trainset[:, 0])
save_txt(os.path.join(outdir, 'train_y.txt'), trainset[:, 1])
save_txt(os.path.join(outdir, 'val_x.txt'), valset[:, 0])
save_txt(os.path.join(outdir, 'val_y.txt'), valset[:, 1])
save_txt(os.path.join(outdir, 'test_x.txt'), testset[:, 0])
save_txt(os.path.join(outdir, 'test_y.txt'), testset[:, 1])