# Import the dependencies

In [1]:
import re
import random
import numpy as np
import pandas as pd

from tqdm import tqdm
from collections import Counter

import tensorflow as tf
import tensorflow_addons as tfa
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, LSTM, Conv1D, GlobalAveragePooling1D
from keras.callbacks import ModelCheckpoint

import tensorflow as tf
from transformers import BertConfig
from transformers import RobertaTokenizerFast

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer

from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import explained_variance_score

from matplotlib import pyplot as plt
from matplotlib import rcParams

import sys
import time


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.10.1 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


In [2]:
physical_devices = tf.config.list_physical_devices('GPU')
print("Available GPUs:", physical_devices)

Available GPUs: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [3]:
sequence_len = 512
batch_size = 64

config = BertConfig(
    vocab_size=9,
    hidden_size=84,
    num_hidden_layers=2,
    num_attention_heads=6,
    intermediate_size=64,
    max_position_embeddings=sequence_len,
    num_labels=2
)

epochs = 30

# Load & Prepare the data

In [4]:
all_data = pd.read_csv("prepare_data/data_steps/steps_vars_term_str.csv", delimiter=',')

# leave only unique terms
print(f"Count all terms: {len(all_data)}")
all_data = all_data.drop_duplicates(subset="vars_terms").reset_index(drop=True)
print(f"Count original terms: {len(all_data)}\n")

# shuffle the dataset
all_data = shuffle(all_data, random_state=33).reset_index(drop=True)

# drop unreducable by LO or RI
print(f"number samples: {len(all_data)}")
all_data = all_data[[x_ != 1000 for x_ in all_data["RI_steps_num"]]].reset_index(drop=True)
all_data = all_data[[x_ != 1000 for x_ in all_data["LO_steps_num"]]].reset_index(drop=True)
print(f"number samples only reducable: {len(all_data)}\n")

print(f"max RI steps count: {max(all_data['RI_steps_num'])}")
print(f"max LO steps count: {max(all_data['LO_steps_num'])}")

x_test = all_data["vars_terms"].tolist()
# RI has fewer steps -> 1
# Otherwise 0
y_test = [1 if los > ris else 0 for los, ris in zip(all_data["LO_steps_num"].tolist(), all_data["RI_steps_num"].tolist())]
y_lo_test = all_data["LO_steps_num"].tolist()
y_ri_test = all_data["RI_steps_num"].tolist()

print(f"Count TESTING samples: {len(y_test)}")

Count all terms: 4282
Count original terms: 4282

number samples: 4282
number samples only reducable: 4251

max RI steps count: 386
max LO steps count: 219
Count TESTING samples: 4251


In [5]:
all_data = pd.read_csv("prepare_data/data_steps/steps_vars_term_str_train.csv", delimiter=',')

# leave only unique terms
print(f"Count all terms: {len(all_data)}")
all_data = all_data.drop_duplicates(subset="vars_terms").reset_index(drop=True)
print(f"Count original terms: {len(all_data)}\n")

# shuffle the dataset
all_data = shuffle(all_data, random_state=33).reset_index(drop=True)

# drop unreducable by LO or RI
print(f"number samples: {len(all_data)}")
all_data = all_data[[x_ != 1000 for x_ in all_data["RI_steps_num"]]].reset_index(drop=True)
all_data = all_data[[x_ != 1000 for x_ in all_data["LO_steps_num"]]].reset_index(drop=True)
print(f"number samples only reducable: {len(all_data)}\n")

print(f"max RI steps count: {max(all_data['RI_steps_num'])}")
print(f"max LO steps count: {max(all_data['LO_steps_num'])}")

x_train = all_data["vars_terms"].tolist()
# RI has fewer steps -> 1
# Otherwise 0
y_train = [1 if los > ris else 0 for los, ris in zip(all_data["LO_steps_num"].tolist(), all_data["RI_steps_num"].tolist())]
y_lo_train = all_data["LO_steps_num"].tolist()
y_ri_train = all_data["RI_steps_num"].tolist()

print(f"Count TRAINING samples: {len(y_train)}")

Count all terms: 45038
Count original terms: 45038

number samples: 45038
number samples only reducable: 42912

max RI steps count: 400
max LO steps count: 308
Count TRAINING samples: 42912


In [6]:
test_vars = set()
for x_ in x_test:
    test_vars.update(x_.replace("λ", " ").replace("(", " ").replace(")", " ").replace(".", " ").split(" "))
    
train_vars = set()
for x_ in x_train:
    train_vars.update(x_.replace("λ", " ").replace("(", " ").replace(")", " ").replace(".", " ").split(" "))

In [7]:
vars_to_sym_dict = dict()

list_vars = "x y z a b c d e j i n m t r q w u o p s f g h k l v".split()
greek_small = "α β γ δ ε ζ η θ ι κ λ μ ν ξ ο π ρ σ τ υ φ χ ψ ω Α Β".split()
greek_big = "Γ Δ Ε Ζ Η Θ Ι Κ Λ Μ Ν Ξ Ο Π Ρ Σ Τ Υ Φ Χ Ψ Ω 1 2 3 4".split()
            
for kv_ in list_vars:
    vars_to_sym_dict[kv_] = kv_

for key_, val_ in zip(list_vars, "X Y Z A B C D E J I N M T R Q W U O P S F G H K L V".split()):
    vars_to_sym_dict[key_ + "_1"] = val_
    
for key_, val_ in zip(list_vars, greek_small):
    vars_to_sym_dict[key_ + "_2"] = val_
    
for key_, val_ in zip(list_vars, greek_big):
    vars_to_sym_dict[key_ + "_3"] = val_
    
for key_, val_ in zip(list_vars[:6], "5 6 7 8 9 0".split()):
    vars_to_sym_dict[key_ + "_4"] = val_

for x_ in test_vars:
    if x_ and x_ not in vars_to_sym_dict:
        vars_to_sym_dict[x_] = x_[0]
        
for x_ in train_vars:
    if x_ and x_ not in vars_to_sym_dict:
        vars_to_sym_dict[x_] = x_[0]

for x_ in list_vars:
    del vars_to_sym_dict[x_]

print(vars_to_sym_dict)

{'x_1': 'X', 'y_1': 'Y', 'z_1': 'Z', 'a_1': 'A', 'b_1': 'B', 'c_1': 'C', 'd_1': 'D', 'e_1': 'E', 'j_1': 'J', 'i_1': 'I', 'n_1': 'N', 'm_1': 'M', 't_1': 'T', 'r_1': 'R', 'q_1': 'Q', 'w_1': 'W', 'u_1': 'U', 'o_1': 'O', 'p_1': 'P', 's_1': 'S', 'f_1': 'F', 'g_1': 'G', 'h_1': 'H', 'k_1': 'K', 'l_1': 'L', 'v_1': 'V', 'x_2': 'α', 'y_2': 'β', 'z_2': 'γ', 'a_2': 'δ', 'b_2': 'ε', 'c_2': 'ζ', 'd_2': 'η', 'e_2': 'θ', 'j_2': 'ι', 'i_2': 'κ', 'n_2': 'λ', 'm_2': 'μ', 't_2': 'ν', 'r_2': 'ξ', 'q_2': 'ο', 'w_2': 'π', 'u_2': 'ρ', 'o_2': 'σ', 'p_2': 'τ', 's_2': 'υ', 'f_2': 'φ', 'g_2': 'χ', 'h_2': 'ψ', 'k_2': 'ω', 'l_2': 'Α', 'v_2': 'Β', 'x_3': 'Γ', 'y_3': 'Δ', 'z_3': 'Ε', 'a_3': 'Ζ', 'b_3': 'Η', 'c_3': 'Θ', 'd_3': 'Ι', 'e_3': 'Κ', 'j_3': 'Λ', 'i_3': 'Μ', 'n_3': 'Ν', 'm_3': 'Ξ', 't_3': 'Ο', 'r_3': 'Π', 'q_3': 'Ρ', 'w_3': 'Σ', 'u_3': 'Τ', 'o_3': 'Υ', 'p_3': 'Φ', 's_3': 'Χ', 'f_3': 'Ψ', 'g_3': 'Ω', 'h_3': '1', 'k_3': '2', 'l_3': '3', 'v_3': '4', 'x_4': '5', 'y_4': '6', 'z_4': '7', 'a_4': '8', 'b_4': '9', 'c_

In [8]:
tokenizer = RobertaTokenizerFast.from_pretrained("tokenizer_data", max_len=sequence_len)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [9]:
x_train_prep = []
for x_ in x_train:
    x_ = x_.replace("λ", "@")
    for key_, val_ in vars_to_sym_dict.items():
        x_ = x_.replace(key_, val_)
    x_ = x_.replace(".", "").replace(" ", "")
    x_train_prep.append(x_)
    
x_test_prep = []
for x_ in x_test:
    x_ = x_.replace("λ", "@")
    for key_, val_ in vars_to_sym_dict.items():
        x_ = x_.replace(key_, val_)
    x_ = x_.replace(".", "").replace(" ", "")
    x_test_prep.append(x_)

train_df = pd.DataFrame({
    "term_str": x_train_prep, "is_ri_best": y_train,
    "lo_steps": y_lo_train, "ri_steps": y_ri_train,
    "sample_weights": [(abs(ri_ - lo_) / (max(ri_, lo_) + 0.001)) + 0.001 for ri_, lo_ in zip(y_ri_train, y_lo_train)],
})
test_df = pd.DataFrame({
    "term_str": x_test_prep, "is_ri_best": y_test,
    "lo_steps": y_lo_test, "ri_steps": y_ri_test,
    "sample_weights": [(abs(ri_ - lo_) / (max(ri_, lo_) + 0.001)) + 0.001 for ri_, lo_ in zip(y_ri_test, y_lo_test)],
})

def preprocess(example):
    # Tokenize the prompt
    tokenized_texts = tokenizer(example['term_str'].to_list(), truncation=True, padding='max_length', max_length=sequence_len, return_tensors="tf")
    labels = tf.convert_to_tensor(example["is_ri_best"])
    return tokenized_texts, labels


tokenized_train_data = preprocess(train_df)
tokenized_test_data = preprocess(test_df)

In [10]:
train_dataset = tf.data.Dataset.from_tensor_slices((dict(tokenized_train_data[0]), tokenized_train_data[1])).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((dict(tokenized_test_data[0]), tokenized_test_data[1])).batch(batch_size)

In [11]:
def calculate_accuracy(actual_labels, predicted_labels):
    correct_predictions = sum(1 for actual, predicted in zip(actual_labels, predicted_labels) if actual == predicted)
    total_predictions = len(actual_labels)
    accuracy = correct_predictions / total_predictions
    return accuracy


from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_pred, classes, class_labels, normalize=False, title=None, cmap=plt.cm.Blues):
    """
    This function plots the confusion matrix of a classification model.

    Args:
        y_true (numpy.ndarray): The ground truth labels.
        y_pred (numpy.ndarray): The predicted labels.
        classes (list): The list of class labels.
        class_labels: The list of class names.
        normalize (bool, optional): Whether to normalize the confusion matrix. Defaults to False.
        title (str, optional): The title of the plot. Defaults to None.
        cmap (matplotlib.colors.Colormap, optional): The colormap to use for the plot. Defaults to plt.cm.Blues.
    """

    cm = confusion_matrix(y_true, y_pred, labels=classes)

    if normalize:
        cm = confusion_matrix(y_true, y_pred, labels=classes).astype('float') / cm.sum(axis=1)[:, np.newaxis]

    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.set_xticks(np.arange(len(classes)))
    ax.set_yticks(np.arange(len(classes)))
    ax.set_xticklabels(class_labels)
    ax.set_yticklabels(class_labels)
    plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
    ax.set_title(title)
    fig.colorbar(im)

    thresh = cm.max() / 2.
    for i in range(len(classes)):
        for j in range(len(classes)):
            ij = float(cm[i, j])
            ax.text(j, i, f"{ij:.2f}", ha='center', va='center', color='white' if ij > thresh else 'black')

    plt.tight_layout()
    plt.show()

# The best & worst possible steps

In [12]:
y_test_best_sum = 0
for lo_, ri_ in zip(y_lo_test, y_ri_test):
    y_test_best_sum += lo_ if lo_ <= ri_ else ri_
y_test_best_avg = y_test_best_sum / len(y_lo_test)

y_train_best_sum = 0
for lo_, ri_ in zip(y_lo_train, y_ri_train):
    y_train_best_sum += lo_ if lo_ <= ri_ else ri_
y_train_best_avg = y_train_best_sum / len(y_lo_train)


y_test_worst_sum = 0
for lo_, ri_ in zip(y_lo_test, y_ri_test):
    y_test_worst_sum += lo_ if lo_ > ri_ else ri_
y_test_worst_avg = y_test_worst_sum / len(y_lo_test)

y_train_worst_sum = 0
for lo_, ri_ in zip(y_lo_train, y_ri_train):
    y_train_worst_sum += lo_ if lo_ > ri_ else ri_
y_train_worst_avg = y_train_worst_sum / len(y_lo_train)


y_test_worst_LO_sum = np.sum(y_lo_test)
y_test_worst_LO_avg = np.mean(y_lo_test)

y_train_worst_LO_sum = np.sum(y_lo_train)
y_train_worst_LO_avg = np.mean(y_lo_train)


y_test_worst_RI_sum = np.sum(y_ri_test)
y_test_worst_RI_avg = np.mean(y_ri_test)

y_train_worst_RI_sum = np.sum(y_ri_train)
y_train_worst_RI_avg = np.mean(y_ri_train)


print(f"Test BEST steps: avg={y_test_best_avg:.3f}, sum={y_test_best_sum}")
print(f"Train BEST steps: avg={y_train_best_avg:.3f}, sum={y_train_best_sum}\n")

print(f"Test WORST steps: avg={y_test_worst_avg:.3f}, sum={y_test_worst_sum}")
print(f"Train WORST steps: avg={y_train_worst_avg:.3f}, sum={y_train_worst_sum}\n")

print(f"Test WORST LO steps: avg={y_test_worst_LO_avg:.3f}, sum={y_test_worst_LO_sum}")
print(f"Train WORST LO steps: avg={y_train_worst_LO_avg:.3f}, sum={y_train_worst_LO_sum}\n")

print(f"Test WORST RI steps: avg={y_test_worst_RI_avg:.3f}, sum={y_test_worst_RI_sum}")
print(f"Train WORST RI steps: avg={y_train_worst_RI_avg:.3f}, sum={y_train_worst_RI_sum}\n")

Test BEST steps: avg=15.143, sum=64372
Train BEST steps: avg=13.082, sum=561374

Test WORST steps: avg=23.707, sum=100780
Train WORST steps: avg=23.989, sum=1029427

Test WORST LO steps: avg=16.635, sum=70714
Train WORST LO steps: avg=15.328, sum=657760

Test WORST RI steps: avg=22.215, sum=94438
Train WORST RI steps: avg=21.743, sum=933041


In [13]:
def calc_steps_accuracy(y_predictions, y_lo_steps, y_ri_steps, threshold=0.5):
    y_steps_sum = 0
    for lo_, ri_, pred_ in zip(y_lo_steps, y_ri_steps, y_predictions):
        y_steps_sum += lo_ if pred_ < threshold else ri_
    y_steps_avg = y_steps_sum / len(y_lo_steps)
    return y_steps_sum, y_steps_avg

# Test 90 epochs trained model on all terms with vars

In [14]:
# load the model
from transformers import TFBertModel

with open("./fine_models/model_vars_ri.json", "r") as file:
    loaded_model_json = file.read()

model = tf.keras.models.model_from_json(loaded_model_json, custom_objects={"TFBertModel": TFBertModel})
model.load_weights('./fine_models/model_vars_ri.h5')


y_test_pred = model.predict(test_dataset)
y_train_pred = model.predict(train_dataset)



In [15]:
print(f"Test BEST steps: avg={y_test_best_avg:.3f}, sum={y_test_best_sum}")
print(f"Train BEST steps: avg={y_train_best_avg:.3f}, sum={y_train_best_sum}\n")

print(f"0.5th")
y_test_sum_05, y_test_avg_05 = calc_steps_accuracy(y_test_pred, y_lo_test, y_ri_test, threshold=0.5)
y_train_sum_05, y_train_avg_05 = calc_steps_accuracy(y_train_pred, y_lo_train, y_ri_train, threshold=0.5)
print(f"Test steps: avg={y_test_avg_05:.3f}, sum={y_test_sum_05}")
print(f"Train steps: avg={y_train_avg_05:.3f}, sum={y_train_sum_05}\n")

print(f"0.35th")
y_test_sum_05, y_test_avg_05 = calc_steps_accuracy(y_test_pred, y_lo_test, y_ri_test, threshold=0.35)
y_train_sum_05, y_train_avg_05 = calc_steps_accuracy(y_train_pred, y_lo_train, y_ri_train, threshold=0.35)
print(f"Test steps: avg={y_test_avg_05:.3f}, sum={y_test_sum_05}")
print(f"Train steps: avg={y_train_avg_05:.3f}, sum={y_train_sum_05}\n")

print(f"0.15th")
y_test_sum_05, y_test_avg_05 = calc_steps_accuracy(y_test_pred, y_lo_test, y_ri_test, threshold=0.15)
y_train_sum_05, y_train_avg_05 = calc_steps_accuracy(y_train_pred, y_lo_train, y_ri_train, threshold=0.15)
print(f"Test steps: avg={y_test_avg_05:.3f}, sum={y_test_sum_05}")
print(f"Train steps: avg={y_train_avg_05:.3f}, sum={y_train_sum_05}\n")

Test BEST steps: avg=15.143, sum=64372
Train BEST steps: avg=13.082, sum=561374

0.5th
Test steps: avg=17.424, sum=74071
Train steps: avg=13.343, sum=572562

0.35th
Test steps: avg=17.594, sum=74793
Train steps: avg=13.402, sum=575114

0.15th
Test steps: avg=18.511, sum=78690
Train steps: avg=13.620, sum=584445


# Test 90 epochs trained, best f1 model on all terms

In [16]:
# load the model
from transformers import TFBertModel

with open("./fine_models/model_vars_ri.json", "r") as file:
    loaded_model_json = file.read()

model = tf.keras.models.model_from_json(loaded_model_json, custom_objects={"TFBertModel": TFBertModel})
model.load_weights('./fine_models/model_vars_ri_f1.h5')


y_test_pred = model.predict(test_dataset)
y_train_pred = model.predict(train_dataset)



In [17]:
print(f"Test BEST steps: avg={y_test_best_avg:.3f}, sum={y_test_best_sum}")
print(f"Train BEST steps: avg={y_train_best_avg:.3f}, sum={y_train_best_sum}\n")

print(f"0.5th")
y_test_sum_05, y_test_avg_05 = calc_steps_accuracy(y_test_pred, y_lo_test, y_ri_test, threshold=0.5)
y_train_sum_05, y_train_avg_05 = calc_steps_accuracy(y_train_pred, y_lo_train, y_ri_train, threshold=0.5)
print(f"Test steps: avg={y_test_avg_05:.3f}, sum={y_test_sum_05}")
print(f"Train steps: avg={y_train_avg_05:.3f}, sum={y_train_sum_05}\n")

print(f"0.35th")
y_test_sum_05, y_test_avg_05 = calc_steps_accuracy(y_test_pred, y_lo_test, y_ri_test, threshold=0.35)
y_train_sum_05, y_train_avg_05 = calc_steps_accuracy(y_train_pred, y_lo_train, y_ri_train, threshold=0.35)
print(f"Test steps: avg={y_test_avg_05:.3f}, sum={y_test_sum_05}")
print(f"Train steps: avg={y_train_avg_05:.3f}, sum={y_train_sum_05}\n")

print(f"0.15th")
y_test_sum_05, y_test_avg_05 = calc_steps_accuracy(y_test_pred, y_lo_test, y_ri_test, threshold=0.15)
y_train_sum_05, y_train_avg_05 = calc_steps_accuracy(y_train_pred, y_lo_train, y_ri_train, threshold=0.15)
print(f"Test steps: avg={y_test_avg_05:.3f}, sum={y_test_sum_05}")
print(f"Train steps: avg={y_train_avg_05:.3f}, sum={y_train_sum_05}\n")

Test BEST steps: avg=15.143, sum=64372
Train BEST steps: avg=13.082, sum=561374

0.5th
Test steps: avg=17.713, sum=75298
Train steps: avg=13.417, sum=575766

0.35th
Test steps: avg=17.869, sum=75963
Train steps: avg=13.489, sum=578834

0.15th
Test steps: avg=18.382, sum=78143
Train steps: avg=13.769, sum=590870
