In [1]:
from pytorch_lightning import seed_everything, Trainer

In [2]:
pwd

'c:\\disk\\AI\\2022_AI_Chanllenage\\maneuver-classification\\hopular-main\\hopular'

In [3]:
from hopfield_layers_master.hflayers import Hopfield

In [4]:
from torch.utils.data import ConcatDataset

In [5]:
from torch.utils.data import dataset

In [6]:
from pytorch_lightning import LightningDataModule

In [7]:
from abc import ABCMeta, abstractmethod

In [8]:
from torch.utils.data import ConcatDataset, DataLoader, Dataset, TensorDataset

In [9]:
from enum import Enum

In [10]:
import torch

In [11]:
from typing import Any, Dict, List, Optional, Tuple

In [12]:
class BaseDataset(Dataset, metaclass=ABCMeta):
    """
    Abstract base class of a dataset to be used in Hopular.
    """

    class CheckpointMode(Enum):
        """
        Enumeration of available checkpoint modes used during the training and validation of Hopular.
        """
        MIN = r'min'
        MAX = r'max'

    def encode_sample(self,
                      sample: torch.Tensor) -> torch.Tensor:
        """
        Encode all features of a single sample depending on their respective feature type.

        :param sample: sample to be encoded
        :return: encoded sample
        """
        sample = sample.view(-1)
        assert len(sample) == len(self.feature_numeric) + len(self.feature_discrete), r'Invalid sample to encode!'

        # Encode sample features according to feature type.
        sample_encoded = []
        for index, (datum, size) in enumerate(zip(sample, self.sizes)):
            if index in self.feature_numeric:
                sample_encoded.append(datum.view(-1))
            else:
                sample_encoded.append(torch.zeros(size).view(-1))
                sample_encoded[-1][datum.int().item()] = 1
        return torch.cat(sample_encoded, dim=0)

    @property
    def feature_mean(self) -> Optional[torch.Tensor]:
        return None

    @property
    def feature_stdv(self) -> Optional[torch.Tensor]:
        return None

    @property
    @abstractmethod
    def shape(self) -> Tuple[int, ...]:
        pass

    @property
    @abstractmethod
    def sizes(self) -> Tuple[int, ...]:
        return

    @property
    @abstractmethod
    def split_train(self) -> torch.Tensor:
        pass

    @property
    @abstractmethod
    def split_validation(self) -> torch.Tensor:
        pass

    @property
    @abstractmethod
    def split_test(self) -> torch.Tensor:
        pass

    @property
    @abstractmethod
    def feature_numeric(self) -> torch.Tensor:
        pass

    @property
    @abstractmethod
    def feature_discrete(self) -> torch.Tensor:
        pass

    @property
    @abstractmethod
    def target_numeric(self) -> torch.Tensor:
        pass

    @property
    @abstractmethod
    def target_discrete(self) -> torch.Tensor:
        pass

    @property
    @abstractmethod
    def checkpoint_mode(self) -> CheckpointMode:
        pass

In [13]:
from pytorch_lightning import LightningDataModule

In [14]:
class DataModule(LightningDataModule):
    """
    Data module encapsulating a dataset to be used in Hopular, providing data loading and masking capabilities.
    """

    def __init__(self,
                 dataset: BaseDataset,
                 batch_size: Optional[int] = None,
                 super_sample_factor: int = 1,
                 noise_probability: float = 0.15,
                 mask_probability: float = 0.80,
                 replace_probability: float = 0.10,
                 num_workers: int = 0):
        """
        Initialize a data module from a dataset.

        :param dataset: dataset to be encapsulated by the data module
        :param batch_size: sample count of a single mini-batch
        :param super_sample_factor: multiplicity of the training set
        :param noise_probability: probability of selecting an input feature for the self-supervised loss
        :param mask_probability: probability of masking out a selected input feature
        :param replace_probability: probability of replacing a selected input feature with a randomly drawn feature
        :param num_workers: worker count of the data loaders
        """
        
        
        self.dataset = dataset
        self.__batch_size = None if batch_size < 1 else batch_size
        self.__super_sample_factor = super_sample_factor
        self.__noise_probability = noise_probability
        self.__mask_probability = mask_probability
        self.__replace_probability = replace_probability
        self.__num_workers = num_workers
        assert 0 <= self.__noise_probability <= 1.0, r'Invalid noise probability!'
        assert 0 <= self.__mask_probability <= 1.0, r'Invalid mask probability!'
        assert 0 <= self.__replace_probability <= 1.0, r'Invalid replacement probability!'
        assert (self.__mask_probability + self.__replace_probability) <= 1.0, r'Invalid mask/replacement probabilities!'

        self.dims = self.dataset.shape[1:]
        self.memory = None
        self._has_setup_memory = False
        self.__data_train = None
        self.__data_validation = None
        self.__data_test = None

        # Register hyperparameters for logging.
        self.save_hyperparameters(ignore=[r'dataset'])

    @staticmethod
    def scale_and_noise_collate(
            samples: List[Tuple[torch.Tensor, ...]],
            mean: Optional[torch.Tensor],
            stdv: Optional[torch.Tensor],
            sizes: torch.Tensor,
            noise_probability: float,
            mask_probability: float,
            replace_probability: float,
            target_discrete: torch.Tensor,
            target_numeric: torch.Tensor,
            feature_discrete: torch.Tensor,
            exclude_targets: bool) -> Tuple[torch.Tensor, ...]:
        """
        Pre-process samples to be used in Hopular training and inference.

        :param samples: collection of samples to be pre-processed
        :param mean: feature means used for feature shifting
        :param stdv: feature standard deviations used for feature scaling
        :param sizes: feature sizes (class count for discrete features)
        :param noise_probability: probability of selecting an input feature for the self-supervised loss
        :param mask_probability: probability of masking out a selected input feature
        :param replace_probability: probability of replacing a selected input feature with a randomly drawn feature
        :param target_discrete: indices of discrete targets
        :param target_numeric: indices of continuous targets
        :param feature_discrete: indices of discrete features
        :param exclude_targets: completely mask out targets
        :return: masked samples, masking positions, unmasked samples, missing positions and original sample indices
        """
        samples_collated = {}
        for sample in samples:
            for sample_index, sample_element in enumerate(sample):
                samples_collated.setdefault(sample_index, []).append(sample_element)
        samples_collated = tuple(torch.stack(
            samples_collated[sample_index], dim=0
        ) for sample_index in sorted(samples_collated))
        feature_boundaries = torch.cumsum(torch.as_tensor([0] + sizes), dim=0)
        feature_boundaries = zip(feature_boundaries[:-1], feature_boundaries[1:])

        # Compute noise mask.
        noise_mask = torch.ones(samples_collated[0].shape[0], len(sizes))
        if noise_probability > 0:
            noise_mask = torch.dropout(noise_mask, p=1.0 - noise_probability, train=True)
            noise_mask = noise_mask != 0
        else:
            noise_mask = noise_mask == 0

        # Scale sample features according to feature statistics and add optional noise.
        samples_modified = []
        for index, (start, end) in enumerate(feature_boundaries):

            # Standardize attributes.
            if index not in feature_discrete:
                if mean is not None:
                    assert not np.isnan(mean[index])
                    samples_collated[0][:, start:end] = samples_collated[0][:, start:end] - mean[index]
                if stdv is not None and stdv[index] > 0:
                    assert not np.isnan(stdv[index])
                    samples_collated[0][:, start:end] = samples_collated[0][:, start:end] / stdv[index]

            # Encode features and targets accordingly and introduce optional noise.
            if exclude_targets and (index in target_discrete or index in target_numeric):
                current_sample = torch.cat((
                    torch.zeros(len(samples_collated[0]), end - start),
                    torch.ones(len(samples_collated[0]), 1)
                ), dim=1)
                samples_modified.append(current_sample)
            else:
                samples_modified.append(torch.cat((
                    samples_collated[0][:, start:end],
                    torch.zeros(len(samples_collated[0]), 1)
                ), dim=1))

                if noise_mask[:, index].any():
                    current_mask = noise_mask[:, index]
                    noise_feature = torch.rand(current_mask.sum())
                    noise_zero = noise_feature < mask_probability
                    noise_replace = mask_probability <= noise_feature
                    noise_replace.logical_and_(noise_feature < (mask_probability + replace_probability))
                    if noise_zero.any():
                        current_feature = samples_modified[-1][current_mask]
                        current_feature[noise_zero, :-1] = 0.0
                        current_feature[noise_zero, -1] = 1.0
                        samples_modified[-1][current_mask] = current_feature
                    if noise_replace.any():
                        current_feature = samples_modified[-1][current_mask]
                        if index in feature_discrete:
                            current_feature[noise_replace, :-1] = torch.nn.functional.one_hot(
                                input=torch.randint(low=0, high=end - start, size=(noise_replace.sum(),)),
                                num_classes=sizes[index]
                            ).to(dtype=samples_modified[-1].dtype)
                            samples_modified[-1][current_mask] = current_feature
                        else:
                            current_feature[noise_replace, :-1] = torch.randn(
                                noise_replace.sum(), end - start)
                            samples_modified[-1][current_mask] = current_feature

            # Mask out missing features.
            missing_mask = torch.as_tensor([]) if len(samples_collated) <= 1 else samples_collated[1][:, index]
            missing_mask_count = missing_mask.sum()
            if missing_mask_count > 0:
                missing_sample = torch.zeros(missing_mask_count, end - start + 1)
                samples_modified[-1][missing_mask] = missing_sample

        # Adapt noise mask to include targets.
        if len(target_discrete) > 0:
            noise_mask[:, target_discrete] = True
        if len(target_numeric) > 0:
            noise_mask[:, target_numeric] = True

        return torch.cat(samples_modified, dim=1), noise_mask, *samples_collated

    def _get_subset(self,
                    indices: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Get the specified subset from the dataset.

        :param indices: indices of the subset samples
        :return: specified subset
        """
        data, data_missing = [], []
        for index in indices:
            current_sample = self.dataset[index]
            data.append(current_sample[0])
            data_missing.append(current_sample[1])
        return torch.stack(data, dim=0), torch.stack(data_missing, dim=0)

    def setup(self,
              stage: Optional[str] = None) -> None:
        """
        Set up the specified stage of the data module.

        :param stage: stage to set up
        :return: None
        """
        if stage in (TrainerFn.FITTING, r'memory'):
            assert self.dataset.split_train is not None, r'No training samples specified!'
            data_train = self._get_subset(indices=self.dataset.split_train)
            self.__data_train = ConcatDataset([
                TensorDataset(*data_train, self.dataset.split_train) for _ in range(self.__super_sample_factor)
            ])
            if self.memory is None:
                self.memory = self.scale_and_noise_collate(
                    samples=list(zip(*data_train)),
                    mean=self.dataset.feature_mean,
                    stdv=self.dataset.feature_stdv,
                    sizes=self.dataset.sizes,
                    noise_probability=0.0,
                    mask_probability=0.0,
                    replace_probability=0.0,
                    target_discrete=self.dataset.target_discrete,
                    target_numeric=self.dataset.target_numeric,
                    feature_discrete=self.dataset.feature_discrete,
                    exclude_targets=False
                )[0]
        if stage in (TrainerFn.FITTING, TrainerFn.VALIDATING):
            assert self.dataset.split_validation is not None, r'No validation samples specified!'
            data_validation = self._get_subset(indices=self.dataset.split_validation)
            self.__data_validation = TensorDataset(*data_validation)
        elif stage == TrainerFn.TESTING:
            assert self.dataset.split_test is not None, r'No test samples specified!'
            data_test = self._get_subset(indices=self.dataset.split_test)
            self.__data_test = TensorDataset(*data_test)

    def train_dataloader(self) -> DataLoader:
        """
        Prepare and get the data loader for the training subset.

        :return: data loader for the training subset
        """
        return DataLoader(
            dataset=self.__data_train,
            batch_size=len(self.__data_train) if self.__batch_size is None else self.__batch_size,
            pin_memory=self.trainer.gpus is not None,
            num_workers=self.__num_workers,
            persistent_workers=self.__num_workers > 0,
            collate_fn=partial(
                self.scale_and_noise_collate,
                mean=self.dataset.feature_mean,
                stdv=self.dataset.feature_stdv,
                sizes=self.dataset.sizes,
                noise_probability=self.__noise_probability,
                mask_probability=self.__mask_probability,
                replace_probability=self.__replace_probability,
                target_discrete=self.dataset.target_discrete,
                target_numeric=self.dataset.target_numeric,
                feature_discrete=self.dataset.feature_discrete,
                exclude_targets=True
            )
        )

    def val_dataloader(self) -> DataLoader:
        """
        Prepare and get the data loader for the validation subset.

        :return: data loader for the validation subset
        """
        return DataLoader(
            dataset=self.__data_validation,
            batch_size=len(self.__data_validation),
            pin_memory=self.trainer.gpus is not None,
            num_workers=self.__num_workers,
            persistent_workers=self.__num_workers > 0,
            collate_fn=partial(
                self.scale_and_noise_collate,
                mean=self.dataset.feature_mean,
                stdv=self.dataset.feature_stdv,
                sizes=self.dataset.sizes,
                noise_probability=0.0,
                mask_probability=0.0,
                replace_probability=0.0,
                target_discrete=self.dataset.target_discrete,
                target_numeric=self.dataset.target_numeric,
                feature_discrete=self.dataset.feature_discrete,
                exclude_targets=True
            )
        )

    def test_dataloader(self) -> DataLoader:
        """
        Prepare and get the data loader for the test subset.

        :return: data loader for the test subset
        """
        return DataLoader(
            dataset=self.__data_test,
            batch_size=len(self.__data_test),
            pin_memory=self.trainer.gpus is not None,
            num_workers=self.__num_workers,
            persistent_workers=self.__num_workers > 0,
            collate_fn=partial(
                self.scale_and_noise_collate,
                mean=self.dataset.feature_mean,
                stdv=self.dataset.feature_stdv,
                sizes=self.dataset.sizes,
                noise_probability=0.0,
                mask_probability=0.0,
                replace_probability=0.0,
                target_discrete=self.dataset.target_discrete,
                target_numeric=self.dataset.target_numeric,
                feature_discrete=self.dataset.feature_discrete,
                exclude_targets=True
            )
        )


In [15]:
def from_data_module(cls,
                        data_module: DataModule,
                        **kwargs: Dict[str, Any]) -> r'Hopular':
    """
    Initialize Hopular from a pre-instantiated data module.

    :param data_module: module encapsulating a dataset
    :param kwargs: additional keyword arguments used for initializing Hopular
    :return: new Hopular instance
    """
    data_module.setup(stage=r'memory')
    return cls(
        input_sizes=data_module.dataset.sizes,
        target_discrete=data_module.dataset.target_discrete.tolist(),
        target_numeric=data_module.dataset.target_numeric.tolist(),
        feature_discrete=data_module.dataset.feature_discrete.tolist(),
        memory=data_module.memory,
        memory_ids=data_module.dataset.split_train,
        feature_mean=data_module.dataset.feature_mean,
        feature_stdv=data_module.dataset.feature_stdv,
        **kwargs
    )

In [16]:
from auxiliary.data import find_dataset, list_datasets



In [17]:
# args.mode = r'optim'


In [18]:
dataset = find_dataset(name='GlassIdentificationDataset')

In [19]:
batch_size = 16
super_sample_factor = 0.5
noise_probability=0.5
mask_probability=0.5
replace_probability=0.5
num_workers=8

In [20]:
split_index, num_splits = 0, 1

In [21]:
data_module = DataModule(
        dataset=dataset(split_index=split_index),
        batch_size=batch_size,
        super_sample_factor=super_sample_factor,
        noise_probability=noise_probability,
        mask_probability=mask_probability,
        replace_probability=replace_probability,
        num_workers=num_workers
    )

In [23]:
feature_size=32
hidden_size=0
hidden_size_factor=1.0
num_heads=8
scaling_factor=1.0
input_dropout=0.1
lookup_dropout=0.1
output_dropout=0.01
memory_ratio=1.0
num_blocks=8

# Optimizer-specific arguments.
initial_feature_loss_weight=1.0
final_feature_loss_weight=0.0
learning_rate=1e-3
gamma=1.0
betas=(0.9, 0.999)
weight_decay=0.1
lookup_steps=1
lookup_ratio=0.005
warmup_ratio=0.0
num_steps_per_cycle=10000
cold_restart=True
synchronous_weights = False
asynchronous_weights=not synchronous_weights

In [24]:
hopular_model = from_data_module(

                # Hopular specific arguments.
                data_module=data_module,
                feature_size=feature_size,
                hidden_size=hidden_size,
                hidden_size_factor=hidden_size_factor,
                num_heads=num_heads,
                scaling_factor=scaling_factor,
                input_dropout=input_dropout,
                lookup_dropout=lookup_dropout,
                output_dropout=output_dropout,
                memory_ratio=memory_ratio,
                num_blocks=num_blocks,

                # Optimizer-specific arguments.
                initial_feature_loss_weight=initial_feature_loss_weight,
                final_feature_loss_weight=final_feature_loss_weight,
                learning_rate=learning_rate,
                gamma=gamma,
                betas=betas,
                weight_decay=weight_decay,
                lookup_steps=lookup_steps,
                lookup_ratio=lookup_ratio,
                warmup_ratio=warmup_ratio,
                num_steps_per_cycle=10000,
                cold_restart=cold_restart,
                asynchronous_weights=asynchronous_weights,
                synchronous_weights=synchronous_weights
            )
hopular_model.reset_parameters()

TypeError: from_data_module() missing 1 required positional argument: 'cls'