In [None]:
from torch.utils.data import Dataset, DataLoader


def create_dataloader(dataset: Dataset, dataset_opt: dict) -> DataLoader:
    """Create dataloader.

    Args:
        dataset (torch.utils.data.Dataset): Dataset.
        dataset_opt (dict): Dataset options. It contains the following keys:
            phase (str): 'train' or 'val'.
            num_worker_per_gpu (int): Number of workers for each GPU.
            batch_size_per_gpu (int): Training batch size for each GPU.
        num_gpu (int): Number of GPUs. Used only in the train phase.
            Default: 1.
        dist (bool): Whether in distributed training. Used only in the train
            phase. Default: False.
        sampler (torch.utils.data.sampler): Data sampler. Default: None.
        seed (int | None): Seed. Default: None
    """
    phase = dataset_opt["phase"]
    if phase == "train":
        dataloader_args = dict(
            dataset=dataset,
            batch_size=dataset_opt["batch_size"],
            shuffle=dataset_opt["shuffle"],
            num_workers=dataset_opt["num_worker"],
            drop_last=True,
        )

    elif phase in ["val", "test"]:  # validation
        dataloader_args = dict(
            dataset=dataset, batch_size=1, shuffle=False, num_workers=0
        )
    else:
        raise ValueError(
            f"Wrong dataset phase: {phase}. "
            "Supported ones are 'train', 'val' and 'test'."
        )

    dataloader_args["pin_memory"] = dataset_opt.get("pin_memory", False)

    #  prefetch_mode=None: Normal dataloader
    # prefetch_mode='cuda': dataloader for CUDAPrefetcher
    return DataLoader(**dataloader_args)

In [None]:
import os
from pathlib import Path
from torchvision.io import read_image, ImageReadMode
from torch import Tensor
from typing import Any

from src.lib.class_load import LoadFiles

handler_files = LoadFiles()


class CustomDataset(Dataset):
    def __init__(
        self, dataset_path: Path | str, channel: int = 3, train: bool = True
    ) -> None:
        if train:
            self.noisy_path = Path(dataset_path).joinpath("Noisy/train") 
            self.gt_path = Path(dataset_path).joinpath("GTruth/train")
        else:
            self.noisy_path = Path(dataset_path).joinpath("Noisy/test") 
            self.gt_path = Path(dataset_path).joinpath("GTruth/test")
        self._noisy_images = handler_files.search_load_files_extencion(
            path_search=self.noisy_path.as_posix(), ext=["png"]
        )["png"][1]
        self._gt_imges = handler_files.search_load_files_extencion(
            path_search=self.gt_path.as_posix(), ext=["png"]
        )["png"][1]
        self.channel = channel

    def __getitem__(self, index) -> dict[str, Any]:
        if self.channel == 3:
            img_gt: Tensor = read_image(
                self._gt_imges[index], mode=ImageReadMode.RGB
            )  # ImageReadMode.UNCHANGED
            img_lq: Tensor = read_image(
                self._noisy_images[index], mode=ImageReadMode.RGB
            )
        elif self.channel == 1:
            img_gt: Tensor = read_image(
                self._gt_imges[index], mode=ImageReadMode.GRAY
            )
            img_lq: Tensor = read_image(
                self._noisy_images[index], mode=ImageReadMode.GRAY
            )
        return {
            "lq": img_lq,
            "gt": img_gt,
            "lq_path": self._noisy_images[index],
            "gt_path": self.gt_path[index],
        }

    def __len__(self):
        return len(self._noisy_images)


# ImageReadMode.UNCHANGED

In [None]:
import math

def load_dataloader(opt):
    if opt["dataloader"]["phase"] == "train":
        sar_dataset = CustomDataset(
            dataset_path=opt["preprocessing"]["output"], channel=3, train=True
        )
        dataloader = create_dataloader(
            dataset=sar_dataset, dataset_opt=opt["dataloader"]
        )
    elif opt["dataloader"]["phase"] == "val":
        sar_dataset = CustomDataset(
            dataset_path=opt["preprocessing"]["output"], channel=3, train=False
        )
        dataloader = create_dataloader(
            dataset=sar_dataset,
            dataset_opt=opt["dataloader"],
        )
    num_iter_per_epoch = math.ceil(
        len(sar_dataset)  / opt["dataloader"]["batch_size_per_gpu"])
    total_iters = int(opt["dataloader"]["iters"])
    total_epochs = math.ceil(total_iters / (num_iter_per_epoch))

    return dataloader, total_epochs, total_iters

In [None]:
import yaml

file_config = Path(
    "/home/arthemis/Documents/pytorch_env/pytorch_env/transformers_ruben/src/data/config/config.yml"
)
configuration = yaml.safe_load(file_config.open("r"))

dataloader_, total_epochs, total_iters = load_dataloader(configuration)

In [None]:
total

In [None]:
next(iter(dataloader_))

In [None]:


if configuration["path"]["resume_state"]:  # resume training
    check_resume(opt, resume_state["iter"])
    model = create_model(opt)
    model.resume_training(resume_state)  # handle optimizers and schedulers
   
    start_epoch = resume_state["epoch"]
    current_iter = resume_state["iter"]
else:
    model = create_model(opt)
    start_epoch = 0
    current_iter = 0

In [None]:
import importlib
from os import path as osp
from basicsr.utils import scandir
file_ = '/home/arthemis/Documents/pytorch_env/pytorch_env/transformers_ruben/basicsr/models/archs/__init__.py'

arch_folder = osp.dirname(osp.abspath(file_))
arch_filenames = [
    osp.splitext(osp.basename(v))[0] for v in scandir(arch_folder)
    if v.endswith('_arch.py')
]
# import all the arch modules
_arch_modules = [
    importlib.import_module(f'basicsr.models.archs.{file_name}')
    for file_name in arch_filenames
]

In [None]:
arch_filenames

In [None]:
_arch_modules

In [None]:
def parse_options():
    import yaml
    from pathlib import Path

    file_config = Path(
        "/home/arthemis/Documents/pytorch_env/pytorch_env/transformers_ruben/src/data/config/config.yml"
    )
    return yaml.safe_load(file_config.open("r"))
from basicsr.models.archs import define_network
from copy import deepcopy


In [None]:
opt = parse_options()
net = define_network(deepcopy(opt['network_g']))

In [49]:
net.get_parameter()

TypeError: Module.get_parameter() missing 1 required positional argument: 'target'

In [50]:
total_params = sum(p.numel() for p in net.parameters())

In [51]:
total_params

16855780

In [55]:
net

Restormer(
  (patch_embed): OverlapPatchEmbed(
    (proj): Conv2d(1, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  )
  (encoder_level1): Sequential(
    (0): TransformerBlock(
      (norm1): LayerNorm(
        (body): BiasFree_LayerNorm()
      )
      (attn): Attention(
        (qkv): Conv2d(48, 144, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (qkv_dwconv): Conv2d(144, 144, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=144, bias=False)
        (project_out): Conv2d(48, 48, kernel_size=(1, 1), stride=(1, 1), bias=False)
      )
      (norm2): LayerNorm(
        (body): BiasFree_LayerNorm()
      )
      (ffn): FeedForward(
        (project_in): Conv2d(48, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (dwconv): Conv2d(96, 96, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=96, bias=False)
        (project_out): Conv2d(48, 48, kernel_size=(1, 1), stride=(1, 1), bias=False)
      )
    )
    (1): TransformerBlock(
      