# YOLO-TsetlinMachine object-dectection object detection and symbol prediction walkthrough

### Download datasets and requirements

In [None]:
import os

if not os.path.exists(f"/home/{os.environ['USER']}/data/math_expression_data/yolo_data/JSON"):
    os.system("cd ~/tm-yolo-mathreader chmod +x download_data.sh && ./download_data.sh")

user = os.environ['USER']

## Load data

In [None]:
import json

root = f"/home/{user}/data/math_expression_data/yolo_data"

labels = os.path.join(root, "JSON", "kaggle_data_1.json")
images = os.path.join(root, "background_images")

with open(labels, "rb") as file:
    labels = json.load(file)

## Show image with bounding boxes

In [None]:
from PIL import Image, ImageDraw
import numpy as np

rndimg = np.random.randint(0, 10000)

_img = labels[rndimg]["filename"]

for file in os.listdir(images):

    if file == _img:
        img = Image.open(os.path.join(images, file))
        draw = ImageDraw.Draw(img)

        xmin = labels[rndimg]["image_data"]["xmins_raw"]
        xmax = labels[rndimg]["image_data"]["xmaxs_raw"]
        ymin = labels[rndimg]["image_data"]["ymins_raw"]
        ymax = labels[rndimg]["image_data"]["ymaxs_raw"]

        for x_min, x_max, y_min, y_max in zip(xmin, xmax, ymin, ymax):
            draw.rectangle([x_min, y_min, x_max, y_max], outline="red")

        display(img)

## Prepare YOLO dataset environment

In [None]:
dataset_root = f"/home/{user}/data/math_expression_data/yolo_dataset_formated"
root_dir = f"/home/{user}/tm-yolo-mathreader"

if not os.path.exists(dataset_root):
    os.mkdir(dataset_root)

train_dir = os.path.join(dataset_root, "train")
if not os.path.exists(train_dir):
    os.mkdir(train_dir)

val_dir = os.path.join(dataset_root, "valid")
if not os.path.exists(val_dir):
    os.mkdir(val_dir)

train_x_dir = os.path.join(train_dir, "images")
if not os.path.exists(train_x_dir):
    os.mkdir(train_x_dir)

val_x_dir = os.path.join(val_dir, "images")
if not os.path.exists(val_x_dir):
    os.mkdir(val_x_dir)

train_y_dir = os.path.join(train_dir, "labels")
if not os.path.exists(train_y_dir):
    os.mkdir(train_y_dir)

val_y_dir = os.path.join(val_dir, "labels")
if not os.path.exists(val_y_dir):
    os.mkdir(val_y_dir)

## Functions:
- Convert xmin, xmax, ymin, ymax to center_x, center_y, bbox_width, bbox_height. This is neccessary for yolo training

- Get image classes

In [None]:
import pickle

def convert_to_yolo_bbox(image_width, image_height, xmin, xmax, ymin, ymax):
    
    bbox_width = xmax - xmin
    bbox_height = ymax - ymin
    center_x = xmin + bbox_width / 2
    center_y = ymin + bbox_height / 2

    center_x /= image_width
    center_y /= image_height
    bbox_width /= image_width
    bbox_height /= image_height

    return center_x, center_y, bbox_width, bbox_height


def get_img_info(all_img_data, filename):

    with open(f"{current_dir}/keep_classes.pkl", "rb") as file:
        keep = pickle.load(file)

    for j, imginfo in enumerate(all_img_data):

        if imginfo["filename"] == filename:
            
            img = imginfo["image_data"]
            width = img["width"]
            height = img["height"]

            keep_classes = [i for i,lab in enumerate(imginfo["image_data"]["visible_latex_chars"]) if lab in keep]
            
            img_classes = [imginfo["image_data"]["visible_latex_chars"][i] for i in keep_classes]
            
            xmin = [imginfo["image_data"]["xmins_raw"][i] for i in keep_classes]
            xmax = [imginfo["image_data"]["xmaxs_raw"][i] for i in keep_classes]
            ymin = [imginfo["image_data"]["ymins_raw"][i] for i in keep_classes]
            ymax = [imginfo["image_data"]["ymaxs_raw"][i] for i in keep_classes]
            
            return [img_classes, width, height, xmin, xmax, ymin, ymax]
    
    assert False, f"Image {filename} not found"
        

## Train test split

In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(os.listdir(images), test_size=0.2, random_state=42)

## Main function.

In [None]:
from tqdm import tqdm

def make_data(files : list, x_path : str, y_path : str, settype : str):

    for i, filename in enumerate(tqdm(files)):

        center_x, center_y, bbox_width, bbox_height = [], [], [], []

        img = Image.open(os.path.join(images, filename))
        img_data = get_img_info(labels, filename)
        
        if img_data[0] == []:
            continue
        
        for xmin, xmax, ymin, ymax in zip(img_data[3], img_data[4], img_data[5], img_data[6]):
            
            c_x, c_y, b_w, b_h = convert_to_yolo_bbox(img_data[1], img_data[2], xmin, xmax, ymin, ymax)
            
            center_x.append(c_x)
            center_y.append(c_y)
            bbox_width.append(b_w)
            bbox_height.append(b_h)

        with open(f"{current_dir}/keep_classes.pkl", "rb") as f:
            keep = pickle.load(f)

        with open(os.path.join(y_path, f"{settype}_{i}.txt"), "w") as f:
            for c, (c_x, c_y, b_w, b_h) in enumerate(zip(center_x, center_y, bbox_width, bbox_height)):                
                
                is_class = keep.index(img_data[0][c])
                f.write(f"{is_class} {c_x} {c_y} {b_w} {b_h}\n")

        img.save(os.path.join(x_path, f"{settype}_{i}.jpg"))


if not len(os.listdir(train_x_dir)) > 7000 and not len(os.listdir(val_x_dir)) > 1000:
    make_data(train, train_x_dir, train_y_dir, "train")
    make_data(test, val_x_dir, val_y_dir, "val")

## Create config file
- For yolo to find the classes

- For yolo find the train and val images

In [None]:
run_dir = os.path.join(root_dir, "src/yolo")

with open(os.path.join(run_dir, "keep_classes.pkl"), "rb") as file:
        keep = pickle.load(file)

config_path = os.path.join(run_dir, "config.yaml")

if not os.path.exists(config_path):
    with open(config_path, "w") as f:
        f.write(f"train: {train_x_dir}\n")
        f.write(f"val: {val_x_dir}\n")
        f.write(f"nc: {len(keep)}\n")
        f.write(f"names: {keep}")

## Train yolo model

In [None]:
from ultralytics import YOLO
import shutil

model = YOLO(os.path.join(run_dir, "yolov8n.pt")) 

model_runs_dir = f"/home/{user}/runs"
if not os.path.exists(model_runs_dir):
    os.mkdir(model_runs_dir)

r1 = os.path.join(model_runs_dir, "r1")
if os.path.exists(r1):
    shutil.rmtree(r1)
    
model.train(data=config_path, 
            epochs=10,
            imgsz=640,
            batch=32,
            device=0,
            project=model_runs_dir,
            name="r1",
            patience=100)

## Inference

In [None]:
model_path = f"/home/{user}/runs/r1/weights/best.pt"
model = YOLO(model_path)

In [None]:
img = Image.open(f"/home/{user}/tm-yolo-mathreader/src/yolo/test_imgs/math_6.png")

result = model.predict(source=img, conf=0.3)[0]

img_array = result.orig_img

im_array = result.plot()  
yolores = Image.fromarray(im_array[..., ::-1])
display(yolores)

In [None]:
imgs = []
for res in result:

    box = res.boxes.xyxy[0].cpu().numpy().astype(int)

    img = img_array[box[1]:box[3], box[0]:box[2]]
    imgs.append([img, box])
    
imgs = sorted(imgs, key=lambda x: x[1][0])

for img, box in imgs:
    im = Image.fromarray(img)
    display(im)

# Tsetlin Machine Training:

### Step 1. Initialize dataset

In [None]:
# this was all done in vscode connected to fe server
import os
import cv2
import pickle
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from collections import Counter
from tqdm import tqdm

cwd = os.getcwd()
data_dir = f"/home/{os.environ['USER']}/data/math_expression_data/tm_data"
dataset_dir = os.path.join(data_dir, "dataset")

class_names = os.listdir(dataset_dir)

images = []
labels = []

# map class names to integers
class_names_map = {i: class_name for i, class_name in enumerate(class_names)}
print(class_names_map)
pickle.dump(class_names_map, open(os.path.join(data_dir, "class_names_map.pkl"), "wb"))

for class_id, class_name in tqdm(class_names_map.items()):
    class_dir = os.path.join(dataset_dir, class_name)

    for filename in os.listdir(class_dir):
        if filename.endswith(".jpg"):
            image_path = os.path.join(class_dir, filename)
            
            image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            image = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]
            
            images.append(image)
            labels.append(class_id)

# Save the dataset to a pickle file
dataset = {
    'images': images,
    'labels': labels
}
print("DATASET LABELS:",set(labels))
df = pd.DataFrame(dataset)

pickle.dump(dataset, open(os.path.join(data_dir, "dataset.pkl"), "wb"))



dataset = pickle.load(open(os.path.join(data_dir, "dataset.pkl"), "rb"))

images = dataset['images']
labels = dataset['labels']
print(set(labels))

In [None]:
# print random images to check if the data is correct
for i in random.sample(range(len(images)), 2):
    print(labels[i])
    display(Image.fromarray(images[i]))
    

### Step 2: Balance the dataset

In [None]:
# balance dataset to 5000 images per class

dataset = pickle.load(open(os.path.join(data_dir, "dataset.pkl"), "rb"))

images = dataset['images']
labels = dataset['labels']
print(set(labels))

class_counts = Counter(labels)
print(class_counts)

target_count = 5000

balanced_images = []
balanced_labels = []

for class_name in tqdm(class_counts.keys()):
    # Get the number of images with the current class
    class_indices = [i for i, label in enumerate(labels) if label == class_name]
    class_count = len(class_indices)

    if class_count > target_count:
        images_to_keep_indices = np.random.choice(class_indices, target_count)

        balanced_images.extend([images[i] for i in images_to_keep_indices])
        balanced_labels.extend([class_name] * target_count)
    else:  
        balanced_images.extend([images[i] for i in class_indices])
        balanced_labels.extend([class_name] * class_count)

# Save the balanced dataset
balanced_dataset = {
    'images': balanced_images,
    'labels': balanced_labels
}
pickle.dump(balanced_dataset, open(os.path.join(data_dir, "balanced_dataset.pkl"), "wb"))

# Count the number of images per class
balanced_class_counts = Counter(balanced_labels)
print(balanced_class_counts)


### Step 3: Split up the dataset into train and val datasets

In [None]:
# Split the dataset into training and validation sets
from sklearn.model_selection import train_test_split
import pickle
import os
import matplotlib.pyplot as plt

balanced_dataset = pickle.load(open(os.path.join(data_dir, "balanced_dataset.pkl"), "rb"))

images = balanced_dataset['images']
labels = balanced_dataset['labels']

train_images, val_images, train_labels, val_labels = train_test_split(images, labels, test_size=0.2, random_state=42)

# Save the training and validation sets
train_dataset = {
    'images': train_images,
    'labels': train_labels
}

val_dataset = {
    'images': val_images,
    'labels': val_labels
}

plt.imshow(train_dataset['images'][3],cmap='gray')

pickle.dump(train_dataset, open(os.path.join(data_dir, "train_dataset.pkl"), "wb"))
pickle.dump(val_dataset, open(os.path.join(data_dir, "val_dataset.pkl"), "wb"))


### Step 4: Training the TM

In [None]:
from time import perf_counter
import random
import pickle
import os 
import uuid

import numpy as np
import pandas as pd
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
import tqdm

import green_tsetlin as gt

def get_dataset(data_dir):
    t0 = perf_counter()
    
    dataset_train = pickle.load(open(os.path.join(data_dir, "train_dataset.pkl"),"rb"))
    dataset_test = pickle.load(open(os.path.join(data_dir, "val_dataset.pkl"), "rb"))

    X_train = np.array(dataset_train["images"])
    y_train = np.array(dataset_train["labels"])

    X_test = np.array(dataset_test["images"])
    y_test = np.array(dataset_test["labels"])
    
    X_train = np.where(X_train.reshape((X_train.shape[0], 45 * 45)) > 200, 1, 0)
    X_train = X_train.astype(np.uint8)
        
    X_test = np.where(X_test.reshape((X_test.shape[0], 45 * 45)) > 200, 1, 0)
    X_test = X_test.astype(np.uint8)
    
    y_train = y_train.astype(np.uint32)
    y_test = y_test.astype(np.uint32)

    print("X_train.shape:{}".format(X_train.shape))
    print("y_train.shape:{}".format(y_train.shape))
    print("X_test.shape:{}".format(X_test.shape))
    print("y_test.shape:{}".format(y_test.shape))

    t1 = perf_counter()    
    delta = t1 - t0
    print("getting data time:{}".format(delta))

    return X_train, X_test, y_train, y_test

xt, xe, yt, ye = get_dataset(data_dir)
#print(xe)

n_clauses = 4354
n_literals = xt.shape[1]
n_classes = 41
s = 15.74
n_literal_budget = 50
threshold = 3982.17
n_jobs = 128
seed = 42

tm = gt.TsetlinMachine(n_literals=n_literals, n_clauses=n_clauses, n_classes=n_classes, s=s, threshold=threshold, literal_budget=n_literal_budget)

trainer = gt.Trainer(tm, n_epochs=5, seed=seed, n_jobs=n_jobs, progress_bar=True)
trainer.set_train_data(xt, yt)
trainer.set_eval_data(xe, ye)    
trainer.train()

tm_save_path = os.path.join(data_dir, "tm_state_v2.npz")
tm.save_state(tm_save_path)

print("--- results ---")
print(trainer.results)
print("--")


predictor = tm.get_predictor()

total=0
for i, x in enumerate(xe):
    y_pred = predictor.predict(x)
    #print("y_pred:{}".format(y_pred))
    #print("y_true:{}".format(ye[i]))
    if y_pred == ye[i]:
        total += 1
accuracy = total/len(xe)


print("accuracy:{}".format(accuracy))


print("<done>")
    

### Step 6: Validating the TM results with examples from outside the dataset

In [None]:
from time import perf_counter
import random
import pickle
import os 
import uuid

import numpy as np
import pandas as pd
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
import tqdm

tm_save_path = os.path.join(data_dir, "tm_state_v2.npz")
ds = gt.DenseState.load_from_file(tm_save_path)
    
rs = gt.RuleSet(False)
rs.compile_from_dense_state(ds)

print(rs.rules[0])
print(rs.weights[0])

predictor = gt.Predictor(explanation="none", multi_label=False)
predictor._set_ruleset(rs)
predictor._allocate_backend()


print("predictor.predict(x):", predictor.predict(xe[8]))
print("y true:", ye[8])
print("votes:", predictor._inf.get_votes())

In [None]:
from skimage.morphology import skeletonize, thin, medial_axis
from skimage import data
import skimage
import matplotlib.pyplot as plt
from skimage.util import invert

import os
import cv2
import pickle
import numpy as np
import polars as pl
import pandas as pd

from collections import Counter
from tqdm import tqdm

def classify_image(test_image_path, data_dir):
    # Load the test image
    test_image_path = os.path.join(data_dir, test_image_path)
    test_image = cv2.imread(test_image_path, cv2.IMREAD_GRAYSCALE)

    h, w = test_image.shape[:2]
    aspect = w / h

    test_image = cv2.threshold(test_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]

    # PADDING
    if aspect != 1:
        if aspect > 1:
            pad_vert = (w - h) // 2
            pad_horiz = 0
        else:
            pad_vert = 0
            pad_horiz = (h - w) // 2
        
        # Pad the image to make it square
        test_image = cv2.copyMakeBorder(test_image, pad_vert, pad_vert, pad_horiz, pad_horiz, cv2.BORDER_CONSTANT, value=[0, 0, 0])
    
    test_image = cv2.resize(test_image, (45, 45), interpolation=cv2.INTER_AREA)
    skeleton = skeletonize(test_image)
    thinned_partial = thin(test_image)

    fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(12, 4), sharex=True, sharey=True)
    ax = axes.ravel()

    ax[0].imshow(test_image, cmap=plt.cm.gray)
    ax[0].axis('off')
    ax[0].set_title('original', fontsize=20)

    ax[1].imshow(skeleton, cmap=plt.cm.gray)
    ax[1].axis('off')
    ax[1].set_title('skeleton', fontsize=20)

    ax[2].imshow(thinned_partial, cmap=plt.cm.gray)
    ax[2].axis('off')
    ax[2].set_title('thinned', fontsize=20)

    fig.tight_layout()
    plt.show()
    
    skeleton = skeleton.astype(int)
    print(skeleton.shape)
    
    pred_image = skeleton.flatten()
    #pred_image =  np.where(test_image.reshape((45 * 45)) > 200, 1, 0)
    
    # Predict the class of the test
    predicted_class = predictor.predict(pred_image)
    
    print("PRED CLASS:",predicted_class)
    print("PRED:", class_names_map[predicted_class])
    print("votes:", predictor._inf.get_votes())
    
    return predicted_class

class_names_map = pickle.load(open(os.path.join(data_dir, "class_names_map.pkl"), "rb"))
print(class_names_map)

data_dir_tests = f"{root_dir}/src/testing"

classify_image("test.jpg", data_dir_tests)
classify_image("test2.jpg", data_dir_tests)
classify_image("test4.jpeg", data_dir_tests)
classify_image("test5.jpeg", data_dir_tests)
classify_image("test6.jpeg", data_dir_tests)
classify_image("test7.jpeg", data_dir_tests)
classify_image("test8.jpeg", data_dir_tests)

# Final Application:

In [None]:
import cv2
import requests
import customtkinter as ctk
from PIL import Image

appWidth, appHeight = 1920, 1080

class MathExpressionRecognizerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Math Expression Recognizer")
        self.root.geometry(f"{appWidth}x{appHeight}")
        self.root.grid_columnconfigure(0, weight = 1)
        self.root.grid_columnconfigure(1, weight = 1)
        self.root.grid_rowconfigure(0, weight = 1)

        # Create left and right frames
        self.left_frame = ctk.CTkFrame(root, corner_radius=2)
        self.left_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")

        self.right_frame = ctk.CTkFrame(root, corner_radius=2)
        self.right_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")

        self.camera_label = ctk.CTkLabel(self.left_frame, text="")
        self.camera_label.grid(pady=appHeight/4-20)

        # input text box to write the math expression
        self.input_label = ctk.CTkLabel(self.right_frame, text="Input:", font=("Arial", 30))
        self.input_label.grid(pady=10)

        self.input_text = ctk.CTkEntry(self.right_frame, font=("Arial", 30), corner_radius=2)
        self.input_text.grid(pady=10)

        self.execute_button = ctk.CTkButton(self.right_frame, text="Calculate", command=self.execute, font=("Arial", 40), corner_radius=2)
        self.execute_button.grid(padx=(appWidth/4)-100, pady=10)

        self.output_label = ctk.CTkLabel(self.right_frame, text="Output:", font=("Arial", 30))
        self.output_label.grid(pady=10)

        # Initialize the video capture object
        self.cap = cv2.VideoCapture(0)
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)  # Set camera width
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 360)  # Set camera height
        self.show_camera_feed()

    def show_camera_feed(self):
        ret, frame = self.cap.read()
        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            #frame = cv2.resize(frame, (960, 720))
            image = Image.fromarray(frame)
            #image = ImageTk.PhotoImage(image)
            image = ctk.CTkImage(image, size=(appWidth/2,(appWidth/2)*0.5625))

            self.camera_label.configure(image=image)
            self.camera_label.image = image

            # Call this function again after 10 milliseconds
            self.root.after(10, self.show_camera_feed)
        else:
            self.root.after(10, self.show_camera_feed)

    def classify_image(test_image):
        test_image = cv2.imread(test_image, cv2.IMREAD_GRAYSCALE)

        h, w = test_image.shape[:2]
        aspect = w / h

        test_image = cv2.threshold(test_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]

        # PADDING
        if aspect != 1:
            if aspect > 1:
                pad_vert = (w - h) // 2
                pad_horiz = 0
            else:
                pad_vert = 0
                pad_horiz = (h - w) // 2
            
            # Pad the image to make it square
            test_image = cv2.copyMakeBorder(test_image, pad_vert, pad_vert, pad_horiz, pad_horiz, cv2.BORDER_CONSTANT, value=[0, 0, 0])
        
        test_image = cv2.resize(test_image, (45, 45), interpolation=cv2.INTER_AREA)
        skeleton = skeletonize(test_image)

        plt.imshow(skeleton, cmap=plt.cm.gray)
        plt.show()
        
        skeleton = skeleton.astype(int)
        
        pred_image = skeleton.flatten()
        #pred_image =  np.where(test_image.reshape((45 * 45)) > 200, 1, 0)
        
        # Predict the class of the test
        predicted_class = predictor.predict(pred_image)
        
        return class_names_map[predicted_class]

    def execute(self):
        # Capture image from webcam
        ret, frame = self.cap.read()

        model_path = f"{os.environ['USER']}/runs/run_nr1/weights/best.pt"
        model = YOLO(model_path)

        result = model.predict(source=frame, conf=0.5)[0]

        img_array = result.orig_img

        imgs = []
        for res in result:

            box = res.boxes.xyxy[0].cpu().numpy().astype(int)

            img = img_array[box[1]:box[3], box[0]:box[2]]
            imgs.append([img, box])
            
        imgs = sorted(imgs, key=lambda x: x[1][0])

        math_expression = []

        for img, box in imgs:
            im = Image.fromarray(img)
            display(im)
            tm_res = self.classify_image(im)
            print(tm_res)
            math_expression.append(tm_res)

        # Change how this is done
        
        # Process image to recognize math expression
        math_expression = self.recognize_math_expression(frame)

        # Update output label
        output_string = ""
        for i in range(20):
            output_string += f"{math_expression}\n"
        self.output_label.configure(text=f"{output_string}")

    def recognize_math_expression(self, image):
        # Use your math expression recognition model to process the image
        # Replace this with your actual recognition code
        api_key = "AU2LAE-TE4HQU7YEP"
        input_eq = self.input_text.get()
        # format the input equation to be used in the API call, all notation needs to be replaced with the corresponding URL encoding
        input_eq = input_eq.replace(" ", "%20").replace("+", "%2B").replace("/", "%2F").replace("=", "%3D")
        api_call = f"http://api.wolframalpha.com/v2/query?appid={api_key}&input={input_eq}&output=json"
        res = self.get_data(api_call)
        
        return res
    
    def get_data(self, api_call):
        response = requests.get(f"{api_call}")
        if response.status_code == 200:
            print("sucessfully fetched the data")
            json_res = response.json()
            res = json_res['queryresult']['pods'][1]['subpods'][0]['plaintext']
            # if the result is a line, get the plot
            if res=='line':
                res = json_res['queryresult']['pods'][2]['subpods'][0]['plaintext']
            # FIX THIS
            
            return res
        else:
            print(f"An error occured while sending API call: {response.status_code}")
            return None

def run_app():
    root = ctk.CTk()
    app = MathExpressionRecognizerApp(root)
    root.mainloop()

run_app()
