In [1]:
import random
from collections import defaultdict
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
import os
import torch
from torch.utils.data import Dataset, Subset, random_split
import numpy as np

class BaseDataset(Dataset):
    def __init__(
        self,
        images: Union[Sequence, torch.Tensor],
        targets: Union[Sequence, torch.Tensor],
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
    ) -> None:
        self.images = images
        self.targets = targets
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self) -> int:
        """Returns the number of samples."""
        return len(self.targets)

    def __getitem__(self, idx: int):
        """Returns a sample from the dataset."""
        image, target = self.images[idx], self.targets[idx]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            target = self.target_transform(target)
        return image, target


class DatasetGenerator:
    def __init__(
        self,
        single_digit_mnist: Dataset,
        max_length: int,
        min_overlap: float,
        max_overlap: float,
        padding_index: int,
    ) -> None:
        self.single_digit_mnist = single_digit_mnist
        self.max_length = max_length
        self.min_overlap = min_overlap
        self.max_overlap = max_overlap
        self.padding_index = padding_index

        self.mnist_digit_dim = 28
        self.samples_by_digit = self._get_samples_by_digit()

    def _get_samples_by_digit(self) -> Dict[int, List]:
        """Stores a collection of images for each digit."""
        samples_by_digit = defaultdict(list)
        for image, digit in self.single_digit_mnist:
            samples_by_digit[digit].append(image.squeeze())
        blank_image = torch.zeros((self.mnist_digit_dim, self.mnist_digit_dim))
        samples_by_digit[-1].append(blank_image)
        return samples_by_digit

    def generate(self, num_samples) -> Tuple[torch.Tensor, torch.Tensor]:
        """Main methods to generate a dataset.

        Args:
            num_samples: Number of samples to generate.

        Returns:
            Images and labels (padded).
        """
        labels = torch.full((num_samples, self.max_length), self.padding_index)
        images = torch.zeros((num_samples, 96, self.mnist_digit_dim * self.max_length))
        for i in range(num_samples):
            rand_num = self._get_random_number()
            for j, digit in enumerate(str(rand_num)):
                labels[i, j] = int(digit)
            images[i] = self._construct_image_from_number(rand_num)
        return images, labels

    def _get_random_number(self) -> int:
        """Generate a random number.

        The probabiltiy of getting a small number is artifically inflated; otherwise,
        there will be not enough numbers of short lengths and the model will not
        generalize well.
        """
        num_digits_choices = list(range(4, self.max_length + 1))
        probs = [n / sum(num_digits_choices) for n in num_digits_choices]
        num_digits = random.choices(num_digits_choices, weights=probs)[0]
        rand_num = random.randint(
            int("1" + "0" * (num_digits - 1)), int("1" + "0" * num_digits) - 1
        )
        return rand_num

    def _construct_image_from_number(self, number: int) -> torch.Tensor:
        """Concatenate images of single digits."""
        overlap = random.uniform(self.min_overlap, self.max_overlap)
        overlap_width = int(overlap * self.mnist_digit_dim)
        width_increment = self.mnist_digit_dim - overlap_width
        x, y = 0, 2  # Current pointers at x and y coordinates
        digits = self._add_left_and_right_paddings(number)
        multi_digit_image = torch.zeros((96, self.mnist_digit_dim * self.max_length))
        for digit in digits:
            digit_image = random.choice(self.samples_by_digit[digit])
            digit_image = torch.clone(
                digit_image
            )  # To avoid overwriting the original image
            digit_image[:, :overlap_width] = torch.maximum(
                multi_digit_image[y : y + self.mnist_digit_dim, x : x + overlap_width],
                digit_image[:, :overlap_width],
            )
            multi_digit_image[
                y : y + self.mnist_digit_dim, x : x + self.mnist_digit_dim
            ] = digit_image
            x += width_increment
        return multi_digit_image

    def _add_left_and_right_paddings(self, number: int) -> List[int]:
        """Add paddings to left and right of the number."""
        digits = [int(digit) for digit in list(str(number))]
        remanining_length = self.max_length - len(digits)
        left_padding = random.randint(0, remanining_length)
        right_padding = remanining_length - left_padding
        digits = [-1] * left_padding + digits + [-1] * right_padding
        return digits


def split_dataset(dataset: Dataset, fraction: float, seed: int) -> List[Subset]:
    """Split a dataset into two."""
    num_samples = len(dataset)
    split_a_size = int(num_samples * fraction)
    split_b_size = num_samples - split_a_size
    return random_split(
        dataset,
        [split_a_size, split_b_size],
        generator=torch.Generator().manual_seed(seed),
    )

In [2]:
from pathlib import Path
from typing import Dict, Optional, Union

from pytorch_lightning import LightningDataModule
from torch.utils.data import DataLoader, Dataset, Subset


class BaseDataModule(LightningDataModule):
    """Base data module.

    Args:
        batch_size: The number samples to load per batch.
        num_workers: The number of subprocesses to use for data loading.
        pin_memory: Whether to copy Tensors into CUDA pinned memory before returning
            them.
    """

    def __init__(
        self, batch_size: int = 32, num_workers: int = 0, pin_memory: bool = False
    ):
        super().__init__()
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.pin_memory = pin_memory

        self.dataset: Dict[str, Union[Dataset, Subset]] = {}

    @classmethod
    def data_dirname(cls):
        """Root directory to all datasets."""
        return os.getcwd()

    def prepare_data(self):
        """Download and preprocess datasets."""

    def setup(self, stage: Optional[str] = None):
        """Should popularize self.dataset after being called."""

    def train_dataloader(self) -> DataLoader:
        return DataLoader(
            self.dataset["train"],
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=self.num_workers,
            pin_memory=self.pin_memory,
        )

    def val_dataloader(self) -> DataLoader:
        return DataLoader(
            self.dataset["val"],
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            pin_memory=self.pin_memory,
        )

    def test_dataloader(self) -> DataLoader:
        return DataLoader(
            self.dataset["test"],
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            pin_memory=self.pin_memory,
        )

In [3]:
from typing import Optional

import torchvision.transforms as transforms
from torchvision.datasets import MNIST



TRAIN_FRACTION = 0.8


class SingleDigitMNIST(BaseDataModule):
    """Data module for the standard single digit MNIST dataset.

    Args:
        kwargs: Keyword arguments to BaseDataModule.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.transform = transforms.ToTensor()

    def prepare_data(self) -> None:
        """Download the MNIST dataset."""
        MNIST(self.data_dirname(), train=True, download=True)
        MNIST(self.data_dirname(), train=False, download=True)

    def setup(self, stage: Optional[str] = None) -> None:
        if stage in ("fit", None):
            full_dataset = MNIST(
                self.data_dirname(), train=True, transform=self.transform
            )
            self.dataset["train"], self.dataset["val"] = split_dataset(
                full_dataset, TRAIN_FRACTION, seed=42
            )

        if stage in ("test", None):
            self.dataset["test"] = MNIST(
                self.data_dirname(), train=False, transform=self.transform
            )

In [4]:
from pathlib import Path
from typing import Optional

import h5py
import torch
import torchvision.transforms as transforms
from PIL import Image
from torchvision.transforms import InterpolationMode


class MultiDigitMNIST(BaseDataModule):
    """Data module for a synthetic multi-digit MNIST dataset.

    Args:
        num_train: Number of training samples.
        num_val: Number of validation samples.
        num_test: Number of test samples.
        max_length: Maximum number of digits.
        min_overlap: Minimum proportion of an image being overlapped with another image.
        max_overlap: Maximum proportion of an image being overlapped with another image.
        kwargs: Keyward arguments to BaseDataModule.
    """

    def __init__(
        self,
        num_train: int = 75000,
        num_val: int = 10000,
        num_test: int = 20000,
        max_length: int = 16,
        min_overlap: float = 0.02,
        max_overlap: float = 0.5,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        assert 1 <= max_length
        assert 0 <= min_overlap < max_overlap

        self.num_samples = {"train": num_train, "val": num_val, "test": num_test}
        self.max_length = max_length
        self.min_overlap = min_overlap
        self.max_overlap = max_overlap

        self.padding_index = -1
        self.blank_index = -1
        self.single_digit_mnist = SingleDigitMNIST()
        self.transform = {
            "train": transforms.Compose(
                [
                    transforms.RandomAffine(
                        degrees=(-0.05, 0.05),
                        scale=(0.7, 1.1),
                        shear=(-30, 30),
                        interpolation=InterpolationMode.BILINEAR,
                        fill=0,
                    ),
                    transforms.ToTensor(),
                ]
            ),
            "val/test": transforms.ToTensor(),
        }

    @property
    def dataset_dirname(self) -> Path:
        """Directory to the dataset."""
        return self.data_dirname() 

    
    def dataset_filename(self,indicator = 'b1') -> Path:
        """Filename of the dataset created by prepare_data."""
        return (
            f"ml_{self.max_length}_o{self.min_overlap:.2f}_{self.max_overlap:.2f}_"
            f"ntr{self.num_samples['train']}_nv{self.num_samples['val']}_"
            f"nte{self.num_samples['test']}_{indicator}.h5"
        )

    def prepare_data(self,indicator = 'b1') -> None:
        """Create a synthetic dataset."""
        # if self.dataset_filename.is_file():
        #     return
        self.single_digit_mnist.prepare_data()
        self.single_digit_mnist.setup()
        # self.dataset_dirname.mkdir(parents=True, exist_ok=True)
        with h5py.File(f"{self.dataset_filename(indicator)}", "w") as f:
            for split in ("train", "val", "test"):
                print(f"Preparing {split} dataset...")
                image_generator = DatasetGenerator(
                    self.single_digit_mnist.dataset[split],
                    max_length=self.max_length,
                    min_overlap=self.min_overlap,
                    max_overlap=self.max_overlap,
                    padding_index=self.padding_index,
                )
                images, labels = image_generator.generate(self.num_samples[split])
                f.create_dataset(
                    f"X_{split}", data=images, dtype="f4", compression="lzf"
                )
                f.create_dataset(
                    f"y_{split}", data=labels, dtype="i1", compression="lzf"
                )
        print(f"Dataset saved to {str(self.dataset_filename)}")

    def setup(self, stage: Optional[str] = None) -> None:
        if stage in ("fit", None):
            with h5py.File(self.dataset_filename, "r") as f:
                X_train = [Image.fromarray(_) for _ in f["X_train"][:]]
                y_train = torch.IntTensor(f["y_train"])
                X_val = [Image.fromarray(_) for _ in f["X_val"][:]]
                y_val = torch.IntTensor(f["y_val"])
            self.dataset["train"] = BaseDataset(
                X_train, y_train, transform=self.transform["train"]
            )
            self.dataset["val"] = BaseDataset(
                X_val, y_val, transform=self.transform["val/test"]
            )

        if stage in ("test", None):
            with h5py.File(self.dataset_filename, "r") as f:
                X_test = f["X_test"][:]
                y_test = torch.IntTensor(f["y_test"][:])
            self.dataset["test"] = BaseDataset(
                X_test, y_test, transform=self.transform["val/test"]
            )

In [5]:
def set_seed(seed: int = 42) -> None:
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    print(f"Random seed set as {seed}")


In [6]:
set_seed()

Random seed set as 42


In [7]:
test = MultiDigitMNIST()


In [8]:
datamodule = MultiDigitMNIST()
datamodule.prepare_data()


100%|██████████| 9.91M/9.91M [00:00<00:00, 11.6MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 338kB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 3.18MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 5.82MB/s]


Preparing train dataset...
Preparing val dataset...
Preparing test dataset...
Dataset saved to <bound method MultiDigitMNIST.dataset_filename of <__main__.MultiDigitMNIST object at 0x7eee8818c850>>


In [9]:
def set_seed(seed: int = 2025) -> None:
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    print(f"Random seed set as {seed}")

In [10]:
set_seed()

Random seed set as 2025


In [11]:
test = MultiDigitMNIST()

In [12]:
datamodule = MultiDigitMNIST()
datamodule.prepare_data(indicator='b2')


Preparing train dataset...
Preparing val dataset...
Preparing test dataset...
Dataset saved to <bound method MultiDigitMNIST.dataset_filename of <__main__.MultiDigitMNIST object at 0x7eee88169250>>
