In [None]:
#Definiamo una classe Ranger che utilizza l'Enum per rappresentare tre stati (ABOVE, UNDER e IN_RANGE) e tramite il metodo is_in_range
#verifichiamo la posizione di un valore rispetto a un valore atteso (expected_value) e una percentuale di tolleranza (range_perc).
from enum import Enum

class Ranger(Enum):
  ABOVE    = 1
  UNDER    = 2
  IN_RANGE = 3

  def is_in_range(value, expected_value, range_perc, range_type):
    range_diff = np.floor((expected_value / 100) * range_perc).astype(int)
    if range_type == Ranger.ABOVE:
      return value > expected_value
    elif range_type == Ranger.UNDER:
      return value < expected_value
    elif range_type == Ranger.IN_RANGE:
      return (value >= (expected_value - range_diff)) and (value <= (expected_value + range_diff))

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.models import load_model
import numpy as np
import gzip
import os
import struct

class Classifier:
    def __init__(self, model = "", dataset_prefix = "", labels_mapping = {}, labels_cnt = 0, labels_diff = 0, train = False, use_compression = False, enable_debug = False):
        self.model = None
        self.dataset_prefix = dataset_prefix
        self.labels_mapping = labels_mapping
        self.labels_cnt = labels_cnt
        self.labels_diff = labels_diff
        self.use_compression = use_compression
        self.enable_debug = enable_debug
        assert (model != "")
        if train:
          self.train_model()
          self.save_model(self.model)
        else: self.model = load_model(model)
        pass

    def train_model(self):
        # Detect and configure the best available hardware (TPU, GPU, or CPU)
        try:
            # Try to detect a TPU
            resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
            tf.config.experimental_connect_to_cluster(resolver)
            tf.tpu.experimental.initialize_tpu_system(resolver)
            strategy = tf.distribute.TPUStrategy(resolver)
            print("Running on TPU")
        except ValueError:
            # Fallback to GPU or CPU
            if tf.config.list_physical_devices('GPU'):
                strategy = tf.distribute.MirroredStrategy()  # Use all available GPUs
                print("Running on GPU")
            else:
                strategy = tf.distribute.get_strategy()  # Default strategy for CPU
                print("Running on CPU")

        with strategy.scope():
            # Load and preprocess the training data
            x_train, y_train = self.load_emnist(
                f"{self.dataset_prefix}-train-labels-idx1-ubyte",
                f"{self.dataset_prefix}-train-images-idx3-ubyte"
            )
            x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0  # Normalize and reshape
            y_train = y_train - self.labels_diff  # Adjust labels to 0 index

            x_test, y_test = self.load_emnist(
                f"{self.dataset_prefix}-test-labels-idx1-ubyte",
                f"{self.dataset_prefix}-test-images-idx3-ubyte"
            )
            x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0  # Normalize and reshape
            y_test = y_test - self.labels_diff  # Adjust labels to 0 index

            # Build the CNN model
            self.model = tf.keras.Sequential([
                tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
                tf.keras.layers.BatchNormalization(),
                tf.keras.layers.MaxPooling2D((2, 2)),
                tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
                tf.keras.layers.BatchNormalization(),
                tf.keras.layers.MaxPooling2D((2, 2)),
                tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
                tf.keras.layers.BatchNormalization(),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(256, activation='relu'),
                tf.keras.layers.Dropout(0.4),
                tf.keras.layers.Dense(self.labels_cnt, activation='softmax')
            ])

            # Compile the model
            self.model.compile(
                optimizer='adam',
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy']
            )

        # Prepare the data pipeline
        batch_size = 128 if isinstance(strategy, tf.distribute.TPUStrategy) else 64  # Adjust batch size for TPU
        train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) \
            .shuffle(1024).batch(batch_size).prefetch(tf.data.AUTOTUNE)
        test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)) \
            .batch(batch_size).prefetch(tf.data.AUTOTUNE)

        # Train the model
        self.model.fit(train_dataset, epochs=5, validation_data=test_dataset)

        # Evaluate the model on test data
        test_loss, test_accuracy = self.model.evaluate(test_dataset)
        print(f"Test Accuracy: {test_accuracy:.4f}")

        return


    def save_model(self, model_name):
      self.model.save(model_name)
      return

    # Classify a single processed image
    def classify_image(self, image):
        if self.model == None: self.train_model()

        # Load and preprocess the image
        image = cv2.resize(image, (28, 28)).astype('float32')  # Resize to 28x28
        if self.enable_debug:
          print("Resized Image:")
          cv2_imshow(image)
        image = image.reshape(1, 28, 28, 1)  # Reshape to model input shape

        # Predict the class
        prediction = self.model.predict(image)
        predicted_label = np.argmax(prediction)
        predicted_char = self.labels_mapping[predicted_label]  # Map label to character
        print(f"Predicted Label: {predicted_label}, Predicted Character: {predicted_char}")
        return

    def load_emnist(self, label_path, image_path):
        if self.use_compression:
            label_path += ".gz"
            image_path += ".gz"

        labels = []
        labels_data = b""
        with open(label_path, 'rb') as f:
            labels_data = f.read()

        if self.use_compression: labels_data = gzip.decompress(labels_data)
        magic, size = struct.unpack('>II', labels_data[:8])
        assert (magic == 2049)
        print(f"magic: {magic}, size: {size}")
        labels = np.frombuffer(labels_data[8:], dtype=np.uint8)

        images = []
        images_data = b""
        with open(image_path, 'rb') as f:
            images_data = f.read()

        if self.use_compression: images_data = gzip.decompress(images_data)
        magic, size, rows, cols = struct.unpack('>IIII', images_data[:16])
        assert (magic == 2051)
        num_pixels = size * rows * cols
        image_data = np.frombuffer(images_data[16:16+num_pixels], dtype=np.uint8)
        images = image_data.reshape((size, rows, cols))

        return images, labels




In [None]:
# TODO: Convert all this methods in class methods, of the class "Scrutineer"

def is_near(rect_a, rect_b):
  contour_a, img_a, x_a, y_a, w_a, h_a = rect_a
  contour_b, img_b, x_b, y_b, w_b, h_b = rect_b
  v_diff = -((y_a + h_a) - y_b) if w_a > w_b else -((y_b - h_a) - y_a)

  if w_a > w_b:
    if not (x_a < x_b and (x_a + w_a) > (x_b + w_b)): return False
  else:
    if not (x_b < x_a and (x_b + w_b) > (x_a + w_a)): return False

  if not (v_diff >= 0 and v_diff <= (max(h_a, h_b) / 2)): return False

  return True

def reassemble_rects(img, rects):
  for i in range(0, len(rects)):
    for j in range(i + 1, len(rects)):
      if is_near(rects[j], rects[i]):
        # Merge contours
        merged_contour = np.vstack((rects[i][0], rects[j][0]))
        x, y, w, h = cv2.boundingRect(merged_contour)
        rects.pop(j)
        rects[i] = (merged_contour, img.copy()[y:y + h, x:x + w], x, y, w, h)
        break
  return rects

def find_rect_from_bin(img, expected_w_perc, expected_h_perc, expected_diff_perc, range_type, enable_debug):
  gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  # Convert to grayscale

  # Preprocess the image using Otsu's thresholding after Gaussian filtering
  blur = cv2.GaussianBlur(gray, (5,5), 0)
  _, binary = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
  kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
  closing = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)

  if enable_debug:
    print("Closing Image:")
    cv2_imshow(closing)

  # Detect contours
  contours, _ = cv2.findContours(closing, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

  # Determine expected size
  min_expected_h = np.ceil(expected_h_perc * (np.array(closing).shape[0] / 100)).astype(int)
  min_expected_w = np.ceil(expected_w_perc * (np.array(closing).shape[1] / 100)).astype(int)
  if range_type == Ranger.ABOVE:
    print(f"Expected size: ({min_expected_h}>) x ({min_expected_w}>)")
  elif range_type == Ranger.UNDER:
    print(f"Expected size: ({min_expected_h}<) x ({min_expected_w}<)")
  elif range_type == Ranger.IN_RANGE:
    h_range_diff = np.floor((min_expected_h / 100) * expected_diff_perc).astype(int)
    w_range_diff = np.floor((min_expected_w / 100) * expected_diff_perc).astype(int)
    if enable_debug: print(f"min_expected_h: {min_expected_h}, min_expected_w : {min_expected_w}")
    print(f"Expected size: ({min_expected_h - h_range_diff} - {min_expected_h + h_range_diff}) x ({min_expected_w - w_range_diff} - {min_expected_w + w_range_diff})")

  rects = []
  discarded = []
  idx = 0
  discarded_idx = 0
  for contour in contours:
      x, y, w, h = cv2.boundingRect(contour)
      if Ranger.is_in_range(w, min_expected_w, expected_diff_perc, range_type) and Ranger.is_in_range(h, min_expected_h, expected_diff_perc, range_type):
        if enable_debug: print(f"Rect match: {h} x {w} at {y} x {x}, index: {idx}")
        rects.append((contour, closing.copy()[y:y + h, x:x + w], x, y, w, h))
        idx += 1
      else:
        if enable_debug: print(f"Rect found: {h} x {w} at {y} x {x}, index: {discarded_idx}")
        discarded.append((closing.copy()[y:y + h, x:x + w], x, y, w, h))
        discarded_idx += 1

  rects = reassemble_rects(closing, rects)
  rects_cnt = len(rects)
  rects_idx = 0

  print(" -- Check for Written Sign -- ")
  while rects_idx != rects_cnt:
    contour, image, x, y, w, h = rects[rects_idx]
    erosion_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (18, 18))
    eroded = cv2.erode(image.copy(), erosion_kernel, iterations = 1)

    if enable_debug:
      print(f"Testing Rect {rects_idx}")
      cv2_imshow(image)

    if not (is_a_written_sign(image, 2.8, enable_debug) or is_a_written_sign(eroded, 2.8, enable_debug)):
      print(f" -- Removing Rect {rects_idx} -- ")
      print("Original Image:")
      is_a_written_sign(image, 2.8, True)
      cv2_imshow(image)
      print("After Erosion:")
      is_a_written_sign(eroded, 2.8, True)
      cv2_imshow(eroded)
      print(" -- End Rect Removed -- ")
      cv2.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)
      rects.pop(rects_idx)
      rects_cnt -= 1
      rects_idx -= 1

    rects_idx += 1
    if enable_debug: print("-------------------------")

  print("-----------------------------")

  # Sort the Cells and determine the number of elements per row and per column
  grouped_rects = group_rects(rects)

  return img, grouped_rects, discarded

def find_rect_from_img(img_path, expected_w_perc, expected_h_perc, expected_diff_perc, range_type, enable_debug):
  # Load the image
  image = cv2.imread(img_path)  # Load the image
  return find_rect_from_bin(image, expected_w_perc, expected_h_perc, expected_diff_perc, range_type, enable_debug)

def get_matrix_list_shape(matrix_list):
  return len(matrix_list), len(matrix_list[0])

def get_row_y_pos(row):
  contour, img, x, y, w, h = row[0]
  return y

def get_rect_x_pos(rect):
  contour, img, x, y, w, h = rect
  return x

def group_rects(rects):
  grouped_rects = []
  for idx_rect, rect in enumerate(rects):
    contour, img, x, y, w, h = rect
    new_row = True
    for idx, row in enumerate(grouped_rects):
      contour_1, img_1, x_1, y_1, w_1, h_1 = row[0]
      v_diff = abs(y - y_1)
      if v_diff < max(h, h_1):
        grouped_rects[idx].append(rect)
        new_row = False
        break
    if new_row:
      row = [rect]
      grouped_rects.append(row)

  grouped_rects = sorted(grouped_rects, key=get_row_y_pos)

  for idx, row in enumerate(grouped_rects):
    grouped_rects[idx] = sorted(row, key=get_rect_x_pos)

  return grouped_rects

def is_a_written_sign(img, threshold, enable_debug):
  total_cnt = 0
  white_cnt = 0
  black_cnt = 0

  for row in img:
    for element in row:
      if element == 255: white_cnt += 1
      else: black_cnt += 1
      total_cnt += 1

  if white_cnt == 0 or black_cnt == 0:
    if enable_debug: print(f"total_cnt: {total_cnt}, black_cnt: {black_cnt}, white_cnt: {white_cnt}")
    return False

  if enable_debug: print(f"total_cnt: {total_cnt}, black_cnt: {black_cnt}, white_cnt: {white_cnt}, percentage_a: {black_cnt/white_cnt}")

  # TODO: Apply more complex and finer strategy to determine if it is noise, a random line or a written sign like a character
  return black_cnt/white_cnt > threshold

In [None]:
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

enable_debug = False

labels_mapping = {0: "g", 1: "y", 2: "b", 3: "t", 4: "invalid"}
classifier = Classifier("my-small-model.keras", "./my-dataset", labels_mapping, len(labels_mapping), train=True, use_compression=True, enable_debug=enable_debug)

#                                                        <path to image>  - Percentages -   Range Type   -   Debug?
image, grouped_cells, discarded = find_rect_from_img("TestImage_hpencil.png", 7, 7, 100, Ranger.IN_RANGE, enable_debug)

# The previous function returns the original image, the cells of the detected table in the same order, the elements discarded
# From each cells the image that represent the character needs to be mapped to the right character

print("\nResult:")

(rows, cols) = get_matrix_list_shape(grouped_cells)
print("-----------------------------------")
print(f"Matrix shape: ({rows}, {cols})")
print("-----------------------------------")

for idx_row, row in enumerate(grouped_cells):
  print(f"Row {idx_row}:")
  for idx, cell in enumerate(row):
    contour, img, x, y, w, h = cell
    cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
    print(f"Element {idx}:")
    cv2_imshow(img)
    classifier.classify_image(img)
  print("-----------------------------------")

for idx, discard in enumerate(discarded):
  img, x, y, w, h = discard
  cv2.rectangle(image, (x, y), (x + w, y + h), (0, 0, 255), 2) # Add the discarded ones to the image contours
  if enable_debug:
      print(f"Discarded {idx}:")
      cv2_imshow(img)

print(" -- Image with Contours -- ")
cv2_imshow(image)
print("-----------------------------------")

Running on CPU
magic: 2049, size: 495616
magic: 2049, size: 57344


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/5
[1m7744/7744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m737s[0m 94ms/step - accuracy: 0.9978 - loss: 0.0409 - val_accuracy: 0.6651 - val_loss: 126.1912
Epoch 2/5
[1m5453/7744[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m3:24[0m 89ms/step - accuracy: 0.9923 - loss: 0.4134