# Predicting Like Polynomial Terms

Remember in Algebra how you had to combine like terms to simplify problems? 

You'd see expressions like `60 + 2x^3 - 6x + x^3 + 17x` in which there are **5** total terms but only **4** are "like terms". 

`2x^3` and `x^3` are like, and `-6x` and `17x` are like, while `60` doesn't have any like siblings.

Can we teach a model to predict that there are `4` like terms in the above expression?

Let's give it a shot using [Mathy](https://mathy.ai) to generate math problems and [thinc](https://github.com/explosion/thinc) to build a regression model that outputs the number of like terms in each input problem.

In [None]:
!pip install thinc mathy

### Encode Text Inputs

Mathy generates ascii-math problems and we have to encode them into integers that the model can process. 

To do this we'll build a vocabulary of all the possible characters we'll see, and map each input character to its index in the list.

For math problems our vocabulary will include all the characters of the alphabet, numbers 0-9, and special characters like `*`, `-`, `.`, etc.

In [2]:
from thinc.types import Array2d
from thinc.api import Ops, get_current_ops

vocab = " .+-/^*()[]-01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"


def encode_input(text: str) -> Array2d:
    ops: Ops = get_current_ops()
    indices: List[List[int]] = []
    for c in text:
        if c not in vocab:
            raise ValueError(f"'{c}' missing from vocabulary in text: {text}")
        indices.append([vocab.index(c)])
    return ops.asarray(indices)

#### Try It

Let's try it out on some fixed data to be sure it works. 

We should expect to see an output of one-hot arrays, one for each character in the input.

In [3]:
outputs = encode_input("4+2")
assert outputs[0][0] == vocab.index("4")
assert outputs[1][0] == vocab.index("+")
assert outputs[2][0] == vocab.index("2")
print(outputs)

[[16]
 [ 2]
 [14]]


### Generate Math Problems

We'll use Mathy to generate random polynomial problems with a variable number of like terms. The generated problems will act as training data for our model.

In [4]:
from typing import List, Optional, Set
import random
from mathy.problems import gen_simplify_multiple_terms

def generate_problems(number: int, exclude: Optional[Set[str]] = None) -> List[str]:
    if exclude is None:
        exclude = set()
    problems: List[str] = []
    while len(problems) < number:
        text, complexity = gen_simplify_multiple_terms(
            random.randint(2, 6),
            noise_probability=1.0,
            noise_terms=random.randint(2, 10),
            op=["+", "-"],
        )
        assert text not in exclude, "duplicate problem generated!"
        exclude.add(text)
        problems.append(text)
    return problems

#### Try It

In [5]:
generate_problems(10)

['6r^2 - 6812u + 11.7v + 2j - -8954j - -323w - 159x',
 '6z^4 + 5391h - -7335r + 7080c - 12a^2 - 4008n^3 + -3311c - 1o^3 - 4145.0v^4 - 9.9z^4 - -7421t + 9z^4 - -7614c - 9261p',
 '3.9n - -1527z - 2u + 9327q^4 + 11s^4 - 1m + 2v - 8.1o - 8.2r - 10.0q^4 - 11z + 4.1h',
 '-6726s + 7401.4n + 6y + 656.0x + 5x - 10q + 6r',
 '3.8l + 5q - 6u^3 - 2.2y - 5n + -6488l + 7489z',
 '1439h - -584u + -3858.5g^3 - -2485k^4 - 3x^4 + 11.2a + 3597f - -4273y^4 - 2m - 10d + 2.0d',
 '-487o - 7q - -7447c + -6131l^2 + 6a^4 + 7.5a^4',
 '6863z - 3v - 3143q + 6.0u - -8699j^3 + 0.8k^2 + 3j^3 + -4087k^2 + 8.9j^3 - 2b - 1d',
 '7239.4y^4 - -8988y^4 + -9201w + 9c - 1c + -8147.8f + 4487c + 2.6y^4 + 11l^2',
 '-4180q^3 - -775f^3 - 3.7h + -8275.1a + 4w - 3851d + 10w']

### Count Like Terms

Now that we can generate input problems, we'll need a function that can count the like terms in each one and return the value for use as a label.

To accomplish this we'll use a few helpers from mathy to enumerate the terms and compare them to see if they're like.

In [6]:
from typing import Optional, List, Dict
from mathy import MathExpression, ExpressionParser, get_terms, get_term_ex, TermEx
from mathy.problems import mathy_term_string

parser = ExpressionParser()

def count_like_terms(input_problem: str) -> int:
    expression: MathExpression = parser.parse(input_problem)
    term_nodes: List[MathExpression] = get_terms(expression)
    node_groups: Dict[str, List[int]] = {}
    for term_node in term_nodes:
        ex: Optional[TermEx] = get_term_ex(term_node)
        assert ex is not None, f"invalid expression {term_node}"
        key = mathy_term_string(variable=ex.variable, exponent=ex.exponent)
        if key == "":
            key = "const"
        if key not in node_groups:
            node_groups[key] = [term_node]
        else:
            node_groups[key].append(term_node)
    like_terms = 0
    for k, v in node_groups.items():
        if len(v) <= 1:
            continue
        like_terms += len(v)
    return like_terms

#### Try It

In [7]:
assert count_like_terms("4x - 2y + q") == 0
assert count_like_terms("x + x + z") == 2
assert count_like_terms("4x + 2x - x + 7") == 3

### Generate Problem/Answer pairs

Now that we can generate problems, count the number of like terms in them, and encode their text into one-hot vectors, we have the pieces required to generate random problems and answers that we can train a neural network on.

Let's write a function that will return a tuple of: the problem text, its encoded example form, and the output label.

In [8]:
from typing import Tuple
from thinc.types import Array2d

def to_example(input_problem: str) -> Tuple[str, Array2d, List[int]]:
    encoded_input = encode_input(input_problem)
    like_terms = count_like_terms(input_problem)
    return input_problem, encoded_input, [like_terms]

#### Try It

In [9]:
text, X, Y = to_example("x+2x")
# text is preserved for debugging/visualization
assert text == "x+2x"
# X contains one-hot vectors
assert X[0] == vocab.index("x")
# Y is the number of like terms
assert Y[0] == 2
print(text, X, Y)

x+2x [[46]
 [ 2]
 [14]
 [46]] [2]


### Build a Model

Now that we can generate X/Y values, let's define our model and verify it can process a single input/output.

For this we'll use Thinc and define a model that takes in an `Array2d` of examples and outputs an `Array1d` of scalar values, one for each example input.

In [27]:
from typing import List
from thinc.model import Model
from thinc.types import Array2d, Array1d
from thinc.api import chain, clone, list2ragged, reduce_sum, Mish, with_array, Embed, residual

def to_ints():
    def forward(model: Model[Array1d, Array1d], Y: Array2d, is_train: bool):
        Y = Y.astype("i")
        return Y, lambda dY: dY.astype("f")
    return Model("toint", forward)


def build_model(n_hidden: int, dropout: float = 0.1) -> Model[List[Array2d], Array2d]:
    model: Model[List[Array2d], Array2d] = chain(
        list2ragged(),
        with_array(chain(Embed(n_hidden, len(vocab)), Mish(n_hidden, dropout=dropout))),
        reduce_sum(),
        clone(residual(Mish(n_hidden, normalize=True)), 4),
        Mish(1),
        to_ints(),
    )
    return model

#### Try It

Let's pass an example through the model to make sure we have all the sizes right.

In [28]:
text, X, Y = to_example("14x + 2y - 3x + 7x")
m = build_model(12)
m.initialize([X], m.ops.asarray(Y, dtype="f"))
assert m.predict([X]).shape == (1, 1)

### Generate Training Datasets

Now that we can generate examples and we have a model that can process them, let's generate random unique training and evaluation datasets.

For this we'll write another helper function that can generate (n) training examples and respects an exclude list to avoid letting examples from the training/test sets overlap.

In [12]:
from typing import Callable, Optional, Set, List
from thinc.types import Array2d

# A function that takes in a size and outputs a list generated problems
ProblemsFunction = Callable[[int, Optional[Set[str]]], List[str]]

# A tuple of (texts, X, Y)
DatasetTuple = Tuple[List[str], List[Array2d], List[Array1d]]


def generate_dataset(
    size: int,
    exclude: Optional[Set[str]] = None,
    problems_fn: ProblemsFunction = generate_problems,
) -> DatasetTuple:
    ops: Ops = get_current_ops()
    texts: List[str] = generate_problems(size, exclude=exclude)
    examples: List[Array2d] = []
    labels: List[Array1d] = []
    for i, text in enumerate(texts):
        text, x, y = to_example(text)
        examples.append(ops.asarray(x))
        labels.append(ops.asarray(y))

    return texts, examples, labels

#### Try It

Generate a small dataset to be sure everything is working as expected

In [13]:
texts, x, y = generate_dataset(10)
assert len(texts) == 10
assert len(x) == 10
assert len(y) == 10

### Evaluate Model Performance

We're almost ready to train our model, we just need to write a function that will check a given trained model against a given dataset and return a 0-1 score of how accurate it was.

We'll use this function to print the score as training progresses and print final test predictions at the end of training.

In [14]:
from typing import List
from thinc.model import Model
from thinc.types import Array2d, Array1d
from wasabi import msg

def evaluate_model(
    model: Model[List[Array2d], Array2d],
    *,
    print_problems: bool = False,
    texts: List[str],
    X: List[Array2d],
    Y: List[Array1d],
):
    Yeval = model.predict(X)
    correct_count = 0
    print_n = 10
    if print_problems:
        msg.divider(f"eval samples max({print_n})")
    for text, y_answer, y_guess in zip(texts, Y, Yeval):
        y_guess = round(float(y_guess))
        correct = y_guess == int(y_answer[0])
        print_fn = msg.fail
        if correct:
            correct_count += 1
            print_fn = msg.good
        if print_problems and print_n > 0:
            print_n -= 1
            print_fn(f"Answer[{y_answer[0]}] Guess[{y_guess}] Text: {text}")
    if print_problems:
        print(f"Model predicted {correct_count} out of {len(X)} correctly.")
    return correct_count / len(X)

#### Try It

Let's try it out with an untrained model and expect to see a really sad score.

In [15]:
texts, X, Y = generate_dataset(128)
m = build_model(12)
m.initialize(X, m.ops.asarray(Y, dtype="f"))
# Assume the model should do so poorly as to round down to 0
assert round(evaluate_model(m, texts=texts, X=X, Y=Y)) == 0

### Train/Evaluate a Model

The final helper function we need is one to train and evaluate a model given two input datasets. 

This function does a few things:

 1. Create an Adam optimizer we can use for minimizing the model's prediction error.
 2. Loop over the given training dataset (epoch) number of times.
 3. For each epoch, make batches of (batch_size) examples. For each batch(X), predict the number of like terms (Yh) and subtract the known answers (Y) to get the prediction error. Update the model using the optimizer with the calculated error.
 5. After each epoch, check the model performance against the evaluation dataset.
 6. Save the model weights for the best score out of all the training epochs.
 7. After all training is done, restore the best model and print results from the evaluation set.

In [23]:
from typing import Set, TypeVar
from thinc.api import Adam, fix_random_seed
from thinc.model import Model
from wasabi import msg
import numpy

ModelInput = TypeVar("ModelInput", bound=Model)


def train_and_evaluate(
    model: ModelInput,
    train_tuple: DatasetTuple,
    eval_tuple: DatasetTuple,
    *,
    lr: float = 3e-3,
    batch_size: int = 64,
    epochs: int = 48,
) -> float:
    (train_texts, train_X, train_y) = train_tuple
    (eval_texts, eval_X, eval_y) = eval_tuple
    msg.divider("Train and Evaluate Model")
    msg.info(f"Batch size = {batch_size}\tEpochs = {epochs}\tLearning Rate = {lr}")

    optimizer = Adam(lr)
    best_score: float = 0.0
    best_model: Optional[bytes] = None
    for n in range(epochs):
        loss = 0.0
        batches = model.ops.multibatch(batch_size, train_X, train_y, shuffle=True)
        for X, Y in batches:
            Y = model.ops.asarray(Y, dtype="float32")
            Yh, backprop = model.begin_update(X)
            err = Yh - Y
            backprop(err)
            loss += (err ** 2).sum()
            model.finish_update(optimizer)
        score = evaluate_model(model, texts=eval_texts, X=eval_X, Y=eval_y)
        if score > best_score:
            best_model = model.to_bytes()
            best_score = score
        print(f"{n}\t{score:.2f}\t{loss:.2f}")

    if best_model is not None:
        model.from_bytes(best_model)
    print(f"Evaluating with best model")
    score = evaluate_model(
        model, texts=eval_texts, print_problems=True, X=eval_X, Y=eval_y
    )
    print(f"Final Score: {score}")
    return score

We'll generate the dataset first, so we can iterate on the model without having to spend time generating examples for each run. This also ensures we have the same dataset across different model runs, to make it easier to compare performance.

In [24]:
train_size = 1024 * 8
test_size = 2048
seen_texts: Set[str] = set()
with msg.loading(f"Generating train dataset with {train_size} examples..."):
    train_dataset = generate_dataset(train_size, seen_texts)
msg.good(f"Train set created with {train_size} examples.")
with msg.loading(f"Generating eval dataset with {test_size} examples..."):
    eval_dataset = generate_dataset(test_size, seen_texts)
msg.good(f"Eval set created with {test_size} examples.")
init_x = train_dataset[1][:2]
init_y = train_dataset[2][:2]

[2Knerating train dataset with 8192 examples...[38;5;2m✔ Train set created with 8192 examples.[0m
[2Knerating eval dataset with 2048 examples...[38;5;2m✔ Eval set created with 2048 examples.[0m


Finally, we can create our model and test it with some hyperparameters!

In [29]:
model = build_model(64)
model.initialize(init_x, init_y)
train_and_evaluate(
    model, train_dataset, eval_dataset, lr=2e-3, batch_size=64, epochs=16
)

[1m
[38;5;4mℹ Batch size = 64      Epochs = 16     Learning Rate = 0.002[0m
0	0.21	24853.00
1	0.25	18502.00
2	0.26	17255.00
3	0.27	17137.00
4	0.19	16353.00
5	0.27	15548.00
6	0.18	14738.00
7	0.25	14619.00
8	0.27	13296.00
9	0.27	13176.00
10	0.30	12516.00
11	0.31	11760.00
12	0.32	11211.00
13	0.33	10498.00
14	0.32	10465.00
15	0.33	10046.00
Evaluating with best model
[1m
[38;5;1m✘ Answer[2] Guess[5] Text: -4578.3z + 9105.5u + 8.8k^2 - 9r^3 + 5.0q +
1j^4 + 1q - -3363d[0m
[38;5;1m✘ Answer[2] Guess[4] Text: 0.6l - 3.2p^3 - 2.7c + -6226y^2 + 8281.8z +
2k - 2.3z + 4149o - 1543g - 2n - 3880b + 1932m^3[0m
[38;5;1m✘ Answer[6] Guess[5] Text: 8.7n^3 - 0.7c + 12g^3 + 5x + 11.4v -
-8389g^3 - 3.7m + 10.1n^3 - 12g^3 + 6.6n^3[0m
[38;5;2m✔ Answer[2] Guess[2] Text: 1522n^2 + 8h + 4379p - -7007.9m - 2238.8y -
11n^2 - 8.8t^3[0m
[38;5;1m✘ Answer[2] Guess[4] Text: 960q - 8.7c + 5.4f^4 + 7247h^4 - 2461y -
10.2m + 4481m + 10l - 7s + 7.5a + 4.4g[0m
[38;5;2m✔ Answer[4] Guess[4] Text: -3939f - -9597w 

0.3330078125

### Intermediate Exercise

The model we built can train up to ~80% given 100 or more epochs. Improve the model architecture so that it trains to a similar accuracy while requiring fewer epochs or a smaller dataset size.

In [None]:
from typing import List
from thinc.model import Model
from thinc.types import Array2d, Array1d
from thinc.api import chain, clone, list2ragged, reduce_mean, Mish, with_array, Embed, residual

def custom_model(n_hidden: int, dropout: float = 0.1) -> Model[List[Array2d], Array2d]:
    return chain(
        list2ragged(),
        with_array(Embed(n_hidden, len(vocab))),
        reduce_sum(),
        Mish(1),
        to_ints(),
    )
model = custom_model(64)
model.initialize(init_x, init_y)
train_and_evaluate(
    model, train_dataset, eval_dataset, lr=2e-3, batch_size=64, epochs=16
)

### Advanced Exercise

Rewrite the model to encode the whole expression with a BiLSTM, and then generate pairs of terms, using the BiLSTM vectors. Over each pair of terms, predict whether the terms are alike or unlike.

In [None]:
from dataclasses import dataclass
from thinc.types import Array2d, Ragged
from thinc.model import Model


@dataclass
class Comparisons:
    data: Array2d  # Batch of vectors for each pair
    indices: Array2d  # Int array of shape (N, 3), showing the (batch, term1, term2) positions

def pairify() -> Model[Ragged, Comparisons]:
    """Create pair-wise comparisons for items in a sequence. For each sequence of N
    items, there will be (N**2-N)/2 comparisons."""
    ...

def predict_over_pairs(model: Model[Array2d, Array2d]) -> Model[Comparisons, Comparisons]:
    """Apply a prediction model over a batch of comparisons. Outputs a Comparisons
    object where the data is the scores. The prediction model should predict over
    two classes, True and False."""
    ...
