In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [1]:
!unzip -o '/content/drive/MyDrive/new_project/D-Fire.zip' -d '/content/D-Fire dataset'

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
  inflating: /content/D-Fire dataset/train/labels/WEB04443.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04444.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04445.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04446.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04447.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04448.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04449.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04450.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04451.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04452.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04453.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04454.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04455.txt  
  inflating: /content/D-Fire dataset/train/labels/WEB04456.txt  
  inflating: /con

In [2]:
# Download .yaml file
!cp /content/drive/MyDrive/new_project/dfire_config.yaml /content/dfire_config.yaml

In [3]:
import os
import random
import shutil

random.seed(10)

train_images_folder = "D-Fire dataset/train/images"
train_labels_folder = "D-Fire dataset/train/labels"
val_images_folder = "D-Fire dataset/val/images"
val_labels_folder = "D-Fire dataset/val/labels"

# Create the validation folders if they don't exist
os.makedirs(val_images_folder, exist_ok=True)
os.makedirs(val_labels_folder, exist_ok=True)

# Get the list of image files in the train set
image_files = os.listdir(train_images_folder)

# Calculate the number of images to move to the validation set
num_val_images = int(0.1 * len(image_files))

# Randomly select the images to move
val_image_files = random.sample(image_files, num_val_images)

# Move the selected images and their corresponding labels to the validation set
for image_file in val_image_files:
   # Move image file
   image_src = os.path.join(train_images_folder, image_file)
   image_dst = os.path.join(val_images_folder, image_file)
   shutil.move(image_src, image_dst)

   # Move label file
   label_file = image_file.replace(".jpg", ".txt")
   label_src = os.path.join(train_labels_folder, label_file)
   label_dst = os.path.join(val_labels_folder, label_file)
   shutil.move(label_src, label_dst)

In [7]:
import numpy as np


def polynomial_space(x, X=None):
    """
    Construct a polynomial feature space from input vector x.

    Each column of the output corresponds to x raised to a power
    specified in X.

    Parameters
    ----------
    x : np.ndarray
        Input vector or matrix. If x is 1D, it is treated as a column vector.
    X : array-like, optional
        Sequence of exponents. If None, exponents range from
        0 to size(x, 0) - 1, matching the MATLAB default behavior.

    Returns
    -------
    Px : np.ndarray
        Polynomial space matrix where:
        Px[:, i] = x ** X[i]
    """
    x = np.asarray(x)

    # Ensure column vector behavior like MATLAB
    if x.ndim == 1:
        x = x.reshape(-1, 1)

    if X is None:
        X = np.arange(x.shape[0])

    X = np.asarray(X)
    Px = np.zeros((x.shape[0], len(X)), dtype=x.dtype)

    for i, exp in enumerate(X):
        Px[:, i] = np.power(x[:, 0], exp)

    return Px


In [8]:
import numpy as np


def truth_space(d, G):
    """
    Construct the truth space for a given depth and group size.

    This function generates all combinations of values in the range
    [0, G-1] with depth `d`, arranged to match the original MATLAB
    construction order.

    Parameters
    ----------
    d : int
        Depth of the truth space (number of variables).
    G : int
        Group size (number of discrete values per variable).

    Returns
    -------
    x : np.ndarray
        Matrix of shape (G**d, d) representing the truth space.
        If d == 0, returns an empty array.
    """
    if d == 0:
        return np.empty((0, 0), dtype=int)

    x = np.empty((0, 0), dtype=int)

    for id_ in range(d):
        xi = np.tile(np.arange(G), (G ** id_, 1))
        xi = xi.reshape(-1, 1)

        if x.size == 0:
            x = xi
        else:
            x = np.hstack([xi, np.tile(x, (G, 1))])

    return x


In [9]:
import numpy as np
from itertools import combinations
from math import ceil, log


def mai_train(xh, yh, options=None):
    """
    Train a Modular Artificial Intelligence (MAI) model.

    Parameters
    ----------
    xh : np.ndarray
        Input training samples of shape (S, N).
    yh : np.ndarray
        Output training samples of shape (S,).
    options : dict, optional
        Training options:
        - verbose (int)
        - refined (bool)
        - group_size (int)
        - check (bool)
        - modulus (int)
        - input_step (int)
        - best_error_reduction (bool)

    Returns
    -------
    model : dict
        Trained MAI model.
    info : dict
        Training information and statistics.
    """
    if options is None:
        options = {}

    options.setdefault("verbose", 0)
    options.setdefault("refined", True)
    options.setdefault("group_size", 2)
    options.setdefault("check", False)
    options.setdefault("modulus", 0)
    options.setdefault("input_step", 0)
    options.setdefault("best_error_reduction", True)

    # -----------------------------
    # Basic parameters
    # -----------------------------
    S, N = xh.shape
    Ns = options["input_step"] or N
    G = options["group_size"]

    M = next_prime(
        max(
            S,
            2 ** min(G, Ns),
            int(xh.max()) + 1,
            int(yh.max()) + 1,
        )
    )

    # -----------------------------
    # Fixed modulus refinement
    # -----------------------------
    if options["modulus"]:
        if options["modulus"] < M:
            Mp = options["modulus"]
            rmax = ceil(log(max(xh.max(), yh.max()) + 1) / log(Mp))
            M = Mp ** rmax

            refiners = []
            xhr = np.zeros((S, 0), dtype=int)

            for r in range(1, rmax + 1):
                refiners.append(
                    lambda x, r=r: np.floor(
                        (x * Mp ** (r - 1)) % M / ceil(M / Mp)
                    ).astype(int)
                )
                xhr = np.hstack([xhr, refiners[-1](xh)])

            options["input_step"] = N
            options["modulus"] = 0

            model, info = mai_train(xhr, yh, options)

            model["refiners"] = refiners
            model["infer"] = lambda x: model["decoder"](
                infer(
                    model["encoder"](x),
                    model["inputs"],
                    model["refiners"],
                    model["refined"],
                    model["parameters"],
                    model["exponents"],
                    options["group_size"],
                )
            )
            return model, info
        else:
            M = next_prime(options["modulus"])

    # -----------------------------
    # Encoding / decoding
    # -----------------------------
    xlim = (xh.min(), xh.max())
    if not np.all(xh.astype(int) == xh):
        encoder = lambda x: np.floor(
            (x - xlim[0]) / (xlim[1] - xlim[0]) * M
        ).astype(int)
    else:
        encoder = lambda x: x.astype(int)

    ylim = (yh.min(), yh.max())
    if not np.all(yh.astype(int) == yh):
        yh = np.floor(
            (yh - ylim[0]) / (ylim[1] - ylim[0]) * M
        ).astype(int)
        decoder = lambda y: y / (M - 1) * (ylim[1] - ylim[0]) + ylim[0]
    else:
        decoder = lambda y: y

    xh = modular_residue(encoder(xh), M)
    yh = modular_residue(yh, M)

    # -----------------------------
    # Initialization
    # -----------------------------
    Xh = [xh]
    ey = yh

    info = {
        "modulus": M,
        "samples": S,
        "grouping": G,
        "inputs": [],
        "groups": [],
        "parameters": [],
        "energy": 0,
        "error": [ey.residue.sum()],
    }

    model = {
        "parameters": modular_residue(np.zeros((S, 0)), M),
        "inputs": [[list(range(N))]],
        "refined": [[True] * N],
        "refiners": [],
        "exponents": np.arange(S),
        "encoder": encoder,
        "decoder": decoder,
        "error": ey,
        "cascade": Xh,
    }

    Ni = N

    # -----------------------------
    # Training loop
    # -----------------------------
    while Ni > 0:
        gmax = min(Ns, Ni, G)
        groups = list(combinations(range(Ni), gmax))
        info["groups"].append(len(groups))

        Xh.append(modular_residue(np.zeros((S, 0)), M))
        model["inputs"].append([])
        model["refined"].append([])

        info["parameters"].append(0)
        info["error"].append(ey.residue.sum())

        if options["verbose"]:
            print(f"\n{Ni:3d} inputs starting with error {info['error'][-1]:6d}")

        X = subsizes(M, gmax)
        xs = scale(Xh[-2].residue, M, X)

        for g in groups:
            xhe = group(xs[:, g], M, X)
            xhe, yhe = prune_error(xhe, ey)
            dy = yhe.residue.sum()

            if dy <= 0:
                continue

            P = polynomial_space(xhe, model["exponents"])
            w = np.linalg.lstsq(P, yhe.residue, rcond=None)[0]
            yw = (P @ w).astype(int)

            eyn = ey - yw
            err_new = eyn.residue.sum()

            if err_new < info["error"][-1]:
                ey = eyn
                Xh[-1] = np.column_stack([Xh[-1], yw])
                model["parameters"] = np.column_stack(
                    [model["parameters"], w]
                )
                model["inputs"][-1].append(list(g))
                model["refined"][-1].append(False)
                info["error"][-1] = err_new
                info["parameters"][-1] += np.count_nonzero(w)

            if err_new == 0:
                break

        info["inputs"].append(Xh[-1].shape[1])
        if info["error"][-1] == 0 or Ni == 1:
            break

        Ni = Xh[-1].shape[1]

    # -----------------------------
    # Final inference function
    # -----------------------------
    model["infer"] = lambda x: decoder(
        infer(
            encoder(x),
            model["inputs"],
            model["refiners"],
            model["refined"],
            model["parameters"],
            model["exponents"],
            options["group_size"],
        )
    )

    return model, info


In [10]:
def subsizes(M, N):
    """Return fair subsizes for modular grouping."""
    return int(M ** (1 / N))


def scale(x, M, X):
    """Scale modular inputs to group domain."""
    return np.floor(x / (M - 1) * X).astype(int)


def group(x, M, X):
    """Group variables into modular indices."""
    return modular_residue(subs2ind(X, x), M)


def subs2ind(S, I):
    """Convert subscripts to linear indices."""
    if np.isscalar(S):
        offsets = np.cumprod([1] + [S] * I.shape[1])
    else:
        offsets = np.cumprod([1] + list(S))

    idx = np.zeros(I.shape[0], dtype=int)
    for d in range(I.shape[1]):
        idx += I[:, d] * offsets[d]
    return idx


def prune_error(xh, yh):
    """Prune duplicated inputs keeping minimal error."""
    _, inv = np.unique(xh.residue, return_inverse=True)
    y = np.zeros(inv.max() + 1, dtype=int)
    for i in range(len(y)):
        y[i] = yh.residue[inv == i].min()
    return xh, modular_residue(y[inv], xh.modulus)


In [11]:
def is_prime(n):
    """
    Check whether a number is prime.

    Parameters
    ----------
    n : int

    Returns
    -------
    bool
        True if n is prime, False otherwise.
    """
    if n < 2:
        return False
    if n in (2, 3):
        return True
    if n % 2 == 0:
        return False
    i = 3
    while i * i <= n:
        if n % i == 0:
            return False
        i += 2
    return True

def next_prime(n, s=1):
    """
    Return the next prime number greater than or equal to `n`.

    The function increments the value by `s` until a prime number
    is found.

    Parameters
    ----------
    n : int
        Starting integer value.
    s : int, optional
        Step size used to increment `n` (default is 1).

    Returns
    -------
    int
        The first prime number p >= n such that p increases by steps of s.
    """
    p = int(n)
    while not is_prime(p):
        p += s
    return p


In [12]:
import os
import numpy as np
from imageio import imread
from math import atan2, degrees


def d_fire_train(folder="./", options=None):
    """
    Train a Modular Artificial Intelligence (MAI) model for the DIP angle dataset.

    This function loads paired image and label files, assembles them into
    modular input/output representations, and trains a MAI model.

    Parameters
    ----------
    folder : str, optional
        Path to the dataset folder containing images and labels.
    options : dict, optional
        Training options:
        - verbose (int): verbosity level
        - samples (int): maximum number of samples
        - size (tuple): expected image size (H, W, C)
        - modulus (int): modular arithmetic base

    Returns
    -------
    model : object
        Trained MAI model with an attached inference function.
    info : dict
        Training metadata and statistics.
    """
    if options is None:
        options = {}

    options.setdefault("verbose", 0)
    options.setdefault("samples", np.inf)
    options.setdefault("size", (64, 64, 3))
    options.setdefault("modulus", next_prime(255))

    label_extension = ".txt"
    image_extension = ".jpg"

    # Ensure trailing slash
    if not folder.endswith(os.sep):
        folder += os.sep

    # Match image and label files by name
    label_files = remove_extension(
        [f for f in os.listdir(folder) if f.endswith(label_extension)]
    )
    image_files = remove_extension(
        [f for f in os.listdir(folder) if f.endswith(image_extension)]
    )

    common = sorted(set(label_files).intersection(image_files))
    label_files = [folder + f + label_extension for f in common]
    image_files = [folder + f + image_extension for f in common]

    # Cardinalities
    S = int(min(options["samples"], len(label_files)))  # number of samples
    G = 2  # input group size
    M = next_prime(max(S, 2 ** G, 256))
    N = options["size"]

    # Dataset assembly
    xh = assemble_input(image_files[:S], M, N, options["verbose"])
    yh = assemble_output(label_files[:S], M, options["verbose"])

    # Training
    mai_options = {
        "verbose": options["verbose"],
        "modulus": M,
        "group_size": G,
    }

    model, info = mai_train(xh, yh, mai_options)

    # Attach inference helper
    model.infer_angle = lambda file: infer(file, model, M, N)
    info["images"] = len(image_files)

    return model, info


def remove_extension(files):
    """
    Remove file extensions from a list of filenames.

    Parameters
    ----------
    files : list of str
        Filenames with extensions.

    Returns
    -------
    list of str
        Filenames without extensions.
    """
    return [os.path.splitext(f)[0] for f in files]


def infer(file, model, M, N):
    """
    Infer the angle (in degrees) from a single image file.

    Parameters
    ----------
    file : str
        Path to the image file.
    model : object
        Trained MAI model.
    M : int
        Modulus used during training.
    N : tuple
        Expected image size.

    Returns
    -------
    float
        Estimated angle in degrees.
    """
    y = model.infer(assemble_input([file], M, N, verbose=0))
    return (y[0] / M) * 360.0


def assemble_input(files, M, N, verbose=0):
    """
    Convert image files into modular input vectors.

    Images are normalized, downsampled, centered, and flattened into
    fixed-size vectors suitable for MAI training.

    Parameters
    ----------
    files : list of str
        Paths to image files.
    M : int
        Modulus for quantization.
    N : tuple
        Target image size (H, W, C).
    verbose : int
        Verbosity level.

    Returns
    -------
    np.ndarray
        Input matrix of shape (samples, prod(N)).
    """
    if verbose:
        print("assembling input...", end="", flush=True)

    xh = np.zeros((len(files), np.prod(N)), dtype=int)

    for s, file in enumerate(files):
        image = imread(file).astype(float)
        image = np.floor(image / 255.0 * (M / (1 + np.finfo(float).eps)))

        # Downsample (equivalent to 1:4:end in MATLAB)
        image = image[::4, ::4, ...]

        if image.ndim == 2:
            image = image[:, :, np.newaxis]

        h, w, c = image.shape

        if c == 1 and N[2] > 1:
            image = np.repeat(image, N[2], axis=2)
            c = N[2]

        if N[2] == 1 and c > 1:
            image = np.round(image.mean(axis=2, keepdims=True))
            c = 1

        icon = np.zeros(N)

        if h <= N[0] and w <= N[1] and c <= N[2]:
            ih = slice((N[0] - h) // 2, (N[0] - h) // 2 + h)
            iw = slice((N[1] - w) // 2, (N[1] - w) // 2 + w)
            icon[ih, iw, :] = image

        xh[s, :] = icon.flatten()

    if verbose:
        print(" ok")

    return xh


def assemble_output(files, M, verbose=0):
    """
    Convert label files into modular output values.

    Each label file contains four integers defining two points.
    The angle between them is computed and mapped into modular space.

    Parameters
    ----------
    files : list of str
        Paths to label files.
    M : int
        Modulus for quantization.
    verbose : int
        Verbosity level.

    Returns
    -------
    np.ndarray
        Output vector of shape (samples,).
    """
    if verbose:
        print("assembling output...", end="", flush=True)

    yh = np.zeros(len(files), dtype=int)

    for s, file in enumerate(files):
        with open(file, "r") as f:
            C = np.array(list(map(int, f.read().split())))

        angle = degrees(atan2(C[1] - C[3], C[2] - C[0]))
        if angle < 0:
            angle += 360

        yh[s] = int(np.floor(angle / 360 * (M / (1 + np.finfo(float).eps))))

    if verbose:
        print(" ok")

    return yh


In [16]:
"""
Example script for training and using the DIP angle MAI model.
"""

def main():
    # --------------------------------------------------
    # Training options
    # --------------------------------------------------
    options = {
        "samples": 64,          # max number of training samples
        "verbose": 2,           # verbosity level
        "size": (32, 32, 1),    # image size (H, W, C)
        "modulus": 31,          # modular arithmetic base
    }

    dataset_folder = "/content/D-Fire dataset/train"

    # --------------------------------------------------
    # Train model
    # --------------------------------------------------
    model, info = d_fire_train(dataset_folder, options)

    print("\n=== Training info ===")
    print(info)

    # --------------------------------------------------
    # Inference on a single image
    # --------------------------------------------------
    test_image = "/content/D-Fire dataset/train/images/AoF00009.jpg"
    angle = model.infer_angle(test_image)

    print("\n=== Inference ===")
    print(f"Predicted angle: {angle:.2f} degrees")


if __name__ == "__main__":
    main()


assembling input... ok
assembling output... ok


ValueError: zero-size array to reduction operation maximum which has no identity