In [None]:
# default_exp datasets.OVPDataset

# OVP dataset

> API details.

In [None]:
#hide
from nbdev.showdoc import *
import pandas as pd
from corradin_ovp_utils.datasets.OVPDataset import OVPDataset
from corradin_ovp_utils.catalog import test_data_catalog, conf_test_data_catalog

In [None]:
#export
from typing import Any, Dict, List, Optional, Literal, Union
from enum import Enum
import numpy as np
from kedro.io import AbstractVersionedDataSet
from pydantic import BaseModel
from pydantic.dataclasses import dataclass
from dataclasses import InitVar, asdict

from kedro.io.core import (
    AbstractVersionedDataSet,
    DataSetError,
    Version,
    get_filepath_str,
    get_protocol_and_path,
)
import fsspec
from copy import deepcopy
from pathlib import Path, PosixPath
from types import SimpleNamespace

  and should_run_async(code)


In [None]:
#export
from corradin_ovp_utils.datasets import genetic_file, sample_file

In [None]:
#export

#modified from kedro.io.core
from kedro.utils import load_obj

_DEFAULT_PACKAGES = ["kedro.io.", "kedro.extras.datasets.", "corradin_ovp_utils.datasets.", ""]

class OVPDataSetError(Exception):
    pass

def parse_class(key,
    class_obj:str
): #-> Tuple[Type[AbstractDataSet], Dict[str, Any]]:
    """Parse and instantiate a dataset class using the configuration provided.
    Args:
        config: Data set config dictionary. It *must* contain the `type` key
            with fully qualified class name.
        load_version: Version string to be used for ``load`` operation if
                the data set is versioned. Has no effect on the data set
                if versioning was not enabled.
        save_version: Version string to be used for ``save`` operation if
            the data set is versioned. Has no effect on the data set
            if versioning was not enabled.
    Raises:
        DataSetError: If the function fails to parse the configuration provided.
    Returns:
        2-tuple: (Dataset class object, configuration dictionary)
    """
    
    if isinstance(class_obj, str):
        if len(class_obj.strip(".")) != len(class_obj):
            raise OVPDataSetError(
                f"{key} class path does not support relative "
                "paths or paths ending with a dot."
            )

        class_paths = (prefix + class_obj for prefix in _DEFAULT_PACKAGES)

        trials = (_load_obj(class_path) for class_path in class_paths)
        try:
            class_obj = next(obj for obj in trials if obj is not None)
        except StopIteration as exc:
            raise OVPDataSetError(f"Class `{class_obj}` not found.") from exc

#     if not issubclass(class_obj, AbstractDataSet):
#         raise DataSetError(
#             f"DataSet type `{class_obj.__module__}.{class_obj.__qualname__}` "
#             f"is invalid: all data set types must extend `AbstractDataSet`."
#         )

    return class_obj


def _load_obj(class_path: str) -> Optional[object]:
    mod_path, _, class_name = class_path.rpartition(".")
    try:
        available_classes = load_obj(f"{mod_path}.__all__")
    # ModuleNotFoundError: When `load_obj` can't find `mod_path` (e.g `kedro.io.pandas`)
    #                      this is because we try a combination of all prefixes.
    # AttributeError: When `load_obj` manages to load `mod_path` but it doesn't have an
    #                 `__all__` attribute -- either because it's a custom or a kedro.io dataset
    except (ModuleNotFoundError, AttributeError, ValueError):
        available_classes = None

    try:
        class_obj = load_obj(class_path)
    except (ModuleNotFoundError, ValueError):
        return None
    except AttributeError as exc:
        if available_classes and class_name in available_classes:
            raise DataSetError(
                f"{exc} Please see the documentation on how to "
                f"install relevant dependencies for {class_path}:\n"
                f"https://kedro.readthedocs.io/en/stable/"
                f"04_kedro_project_setup/01_dependencies.html"
            ) from exc
        return None

    return class_obj

In [None]:
assert parse_class("file_format", "genetic_file.GenFileFormat") == genetic_file.GenFileFormat

In [None]:
assert parse_class("file_format", "corradin_ovp_utils.datasets.genetic_file.GenFileFormat") == genetic_file.GenFileFormat

In [None]:
test_data_catalog.load("genetic_file").files

namespace(case=GenFileFormat(filepath=Path('data/test_data/gen_file/test_CASE_MS_chr22.gen'), prob_n_cols=3, initial_cols=['dashes', 'rsid', 'position', 'ref', 'alt'], rsid_col='rsid', ref_col='ref', alt_col='alt', ref_alt_delim=None, pandas_args={'sep': ' ', 'header': None}, sample_ids=None),
          control=GenFileFormat(filepath=Path('data/test_data/gen_file/test_CONTROL_MS_chr22.gen'), prob_n_cols=3, initial_cols=['dashes', 'rsid', 'position', 'ref', 'alt'], rsid_col='rsid', ref_col='ref', alt_col='alt', ref_alt_delim=None, pandas_args={'sep': ' ', 'header': None}, sample_ids=None))

In [None]:
test_data_catalog.load("genetic_file_single").files

namespace(single_file=GenFileFormat(filepath=Path('data/test_data/gen_file/test_CASE_MS_chr22.gen'), prob_n_cols=3, initial_cols=['dashes', 'rsid', 'position', 'ref', 'alt'], rsid_col='rsid', ref_col='ref', alt_col='alt', ref_alt_delim=None, pandas_args={'sep': ' ', 'header': None}, sample_ids=None))

In [None]:
#export

class SingleFilePathSchema(BaseModel):
    folder: str
    full_file_name: str
    file_name: Optional[str] = None
    extension: Optional[str] = None
    split_by_chromosome: Optional[bool] = None
    
    def __init__(self, **data: Any):
        super().__init__(**data)
        if self.file_name is None or self.extension is None:
            self.file_name, *_, self.extension = self.full_file_name.split(".")
            
    def get_full_file_path(self, chrom:Optional[int]=None):
        if self.split_by_chromosome and chrom is None:
            raise ValueError("Need chrom number")
        else:
            formatted_file_name = self.full_file_name.format(chrom_num=chrom)
            return (Path(self.folder)/formatted_file_name)
    
    @property
    def full_file_path(self):
        if self.split_by_chromosome:
            return {chrom_num: self.get_full_file_path(chrom = chrom_num) for chrom_num in range(1,23)}
        else:
            return self.get_full_file_path()
    
    @property
    def protocol_and_path(self):
        
        if self.split_by_chromosome:
            return [None]
        else:
            return get_protocol_and_path(self.full_file_path.as_posix())
        
    @property
    def protocol(self):
        return self.protocol_and_path[0]
    
    #validate full file name when split by chrom here
    #throw error when files doesn't have an extension

class MultipleFilePathSchema():
    def __getattr__(self, attr, **kwargs):
        return {
            k: getattr(v, attr) for k, v in self.to_dict().items()
        }
    
    def to_dict(self):
        return asdict(self)
    
    def apply_func(self, func, **kwargs):
        print(func)
        print(self.to_dict().items())
        return {
            k: func(v, **kwargs) for k, v in self.to_dict().items()
        }
    
@dataclass
class CaseControlFilePathSchema(MultipleFilePathSchema):
    case: SingleFilePathSchema
    control: SingleFilePathSchema
    common_folder : InitVar(Optional[str]) = None

    def __post_init__(self, common_folder):
        if common_folder is not None:
            self.case = SingleFilePathSchema(folder=common_folder, **self.case)
            self.control = SingleFilePathSchema(folder=common_folder, **self.control)
    
    @property
    def protocol(self):
        if self.case.protocol != self.control.protocol:
            raise ValueError(f"Currently only the same file system for case and control file is supported.\n Case is located in {self.case.protocol} system. Control is located in {self.control.protocol} ")
        return self.case.protocol
    
    
#     def __post_init_post_parse__(self, common_folder):
#         self.protocol, _ = self.case.protocol_and_path

            

---

### Testing single file path

In [None]:
conf_test_data_catalog["genetic_file_single"]["file_path"]

{'folder': 'data/test_data/gen_file',
 'full_file_name': 'test_CASE_MS_chr22.gen'}

In [None]:
test_genetic_file_single_file_path = SingleFilePathSchema(**conf_test_data_catalog["genetic_file_single"]["file_path"])
assert test_genetic_file_single_file_path.full_file_path == Path("data/test_data/gen_file/test_CASE_MS_chr22.gen")
assert test_genetic_file_single_file_path.protocol == "file"

---

### Testing case control file path

In [None]:
conf_test_data_catalog["genetic_file"]["file_path"]

{'case': {'folder': 'data/test_data/gen_file',
  'full_file_name': 'test_CASE_MS_chr22.gen'},
 'control': {'folder': 'data/test_data/gen_file',
  'full_file_name': 'test_CONTROL_MS_chr22.gen'}}

In [None]:
test_genetic_file_cc_file_path = CaseControlFilePathSchema(**conf_test_data_catalog["genetic_file"]["file_path"])
test_genetic_file_cc_file_path.full_file_path

{'case': Path('data/test_data/gen_file/test_CASE_MS_chr22.gen'),
 'control': Path('data/test_data/gen_file/test_CONTROL_MS_chr22.gen')}

In [None]:
test_genetic_file_cc_file_path.to_dict()

{'case': SingleFilePathSchema(folder='data/test_data/gen_file', full_file_name='test_CASE_MS_chr22.gen', file_name='test_CASE_MS_chr22', extension='gen', split_by_chromosome=None),
 'control': SingleFilePathSchema(folder='data/test_data/gen_file', full_file_name='test_CONTROL_MS_chr22.gen', file_name='test_CONTROL_MS_chr22', extension='gen', split_by_chromosome=None)}

Common folder

In [None]:
conf_test_data_catalog["genetic_file_common_folder"]

{'type': 'nbdev_tutorial.datasets.OVPDataset.OVPDataset',
 'file_format': 'genetic_file.GenFileFormat',
 'load_args': {'prob_n_cols': 3,
  'initial_cols': ['dashes', 'rsid', 'position', 'ref', 'alt'],
  'rsid_col': 'rsid',
  'ref_col': 'ref',
  'alt_col': 'alt',
  'pandas_args': {'sep': ' ', 'header': None}},
 'file_type': 'OVPDataset.CaseControlFilePathSchema',
 'file_path': {'common_folder': 'data/test_data/gen_file',
  'case': {'full_file_name': 'test_CASE_MS_chr22.gen'},
  'control': {'full_file_name': 'test_CONTROL_MS_chr22.gen'}}}

In [None]:
test_genetic_file_cc_file_path_common_folder = CaseControlFilePathSchema(**conf_test_data_catalog["genetic_file_common_folder"]["file_path"])
test_genetic_file_cc_file_path_common_folder.full_file_path

{'case': Path('data/test_data/gen_file/test_CASE_MS_chr22.gen'),
 'control': Path('data/test_data/gen_file/test_CONTROL_MS_chr22.gen')}

In [None]:
test_genetic_file_cc_file_path_common_folder

CaseControlFilePathSchema(case=SingleFilePathSchema(folder='data/test_data/gen_file', full_file_name='test_CASE_MS_chr22.gen', file_name='test_CASE_MS_chr22', extension='gen', split_by_chromosome=None), control=SingleFilePathSchema(folder='data/test_data/gen_file', full_file_name='test_CONTROL_MS_chr22.gen', file_name='test_CONTROL_MS_chr22', extension='gen', split_by_chromosome=None))

### Testing case control file path split by chromosome

In [None]:
conf_test_data_catalog["genetic_file_split_by_chrom"]["file_path"]

{'common_folder': 'data/test_data/gen_file',
 'case': {'split_by_chromosome': True,
  'full_file_name': 'test_CASE_MS_chr{chrom_num}.gen'},
 'control': {'split_by_chromosome': True,
  'full_file_name': 'test_CONTROL_MS_chr{chrom_num}.gen'}}

In [None]:
test_genetic_file_split_by_chrom = CaseControlFilePathSchema(**conf_test_data_catalog["genetic_file_split_by_chrom"]["file_path"])
test_genetic_file_split_by_chrom

CaseControlFilePathSchema(case=SingleFilePathSchema(folder='data/test_data/gen_file', full_file_name='test_CASE_MS_chr{chrom_num}.gen', file_name='test_CASE_MS_chr{chrom_num}', extension='gen', split_by_chromosome=True), control=SingleFilePathSchema(folder='data/test_data/gen_file', full_file_name='test_CONTROL_MS_chr{chrom_num}.gen', file_name='test_CONTROL_MS_chr{chrom_num}', extension='gen', split_by_chromosome=True))

In [None]:
test_genetic_file_split_by_chrom.full_file_path

{'case': {1: Path('data/test_data/gen_file/test_CASE_MS_chr1.gen'),
  2: Path('data/test_data/gen_file/test_CASE_MS_chr2.gen'),
  3: Path('data/test_data/gen_file/test_CASE_MS_chr3.gen'),
  4: Path('data/test_data/gen_file/test_CASE_MS_chr4.gen'),
  5: Path('data/test_data/gen_file/test_CASE_MS_chr5.gen'),
  6: Path('data/test_data/gen_file/test_CASE_MS_chr6.gen'),
  7: Path('data/test_data/gen_file/test_CASE_MS_chr7.gen'),
  8: Path('data/test_data/gen_file/test_CASE_MS_chr8.gen'),
  9: Path('data/test_data/gen_file/test_CASE_MS_chr9.gen'),
  10: Path('data/test_data/gen_file/test_CASE_MS_chr10.gen'),
  11: Path('data/test_data/gen_file/test_CASE_MS_chr11.gen'),
  12: Path('data/test_data/gen_file/test_CASE_MS_chr12.gen'),
  13: Path('data/test_data/gen_file/test_CASE_MS_chr13.gen'),
  14: Path('data/test_data/gen_file/test_CASE_MS_chr14.gen'),
  15: Path('data/test_data/gen_file/test_CASE_MS_chr15.gen'),
  16: Path('data/test_data/gen_file/test_CASE_MS_chr16.gen'),
  17: Path('data/t

---

In [None]:
#export    
# class FILE_FORMAT_ENUM(Enum):
#     GenFile = genetic_datasets.GenFileFormat
#     SampleFile = sample_file.SampleFileFormat
    
# class FILE_TYPE_ENUM(Enum):
#     CC = CaseControlFilePathSchema
#     S = SingleFilePathSchema
    

In [None]:
#export

class OVPDataset(AbstractVersionedDataSet):
    def __init__(self,
                 file_type,
                 file_format,
                 file_path,
                 common_folder=None,
                load_args: Dict[str, Any] = None,
                version: Version = None,
                credentials: Dict[str, Any] = None,
                fs_args: Dict[str, Any] = None,
                ):
        
        self.file_type = file_type
        self._file_path_class = parse_class("file_type", file_type)
        self._file_path = self._file_path_class(**file_path) if common_folder is None else self._file_path_class(**file_path, common_folder = common_folder) #custom file path
        
        
        self._version = version
        
        self._file_format_class = parse_class("file_format", file_format)
        #self._file_format = self._file_format_class(**load_args)
        
        if self._file_path_class != SingleFilePathSchema:
            self.files = SimpleNamespace(**{single_file : self._file_format_class(filepath = single_file_path, **load_args)\
                                        for single_file, single_file_path in self.full_file_path.items()})#self._file_path.apply_func(self._file_format_class, **load_args)
        else:
            self.files = SimpleNamespace(**{"single_file" : self._file_format_class(filepath = self.full_file_path, **load_args)})
            
        _fs_args = deepcopy(fs_args) or {}
        _fs_open_args_load = _fs_args.pop("open_args_load", {})
        _fs_open_args_save = _fs_args.pop("open_args_save", {})
        _credentials = deepcopy(credentials) or {}
        
        #protocol, path = get_protocol_and_path(filepath, version)
        if self._file_path.protocol == "file":
            _fs_args.setdefault("auto_mkdir", True)
        
        _fs_open_args_save.setdefault("mode", "w")
        self._fs_open_args_load = _fs_open_args_load
        self._fs_open_args_save = _fs_open_args_save
        
        self._protocol = self._file_path.protocol
        self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)

#         self._protocol = protocol
#         self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)
    
    
    @property
    def full_file_path(self):
        return self._file_path.full_file_path
    
    def _load(self):
        return self
    
    def _save(self):
        pass
    
    def _describe(self):
        pass

  and should_run_async(code)


In [None]:
test_conf_test_data_catalog = conf_test_data_catalog["genetic_file"].copy()
test_conf_test_data_catalog.pop("type")

'nbdev_tutorial.datasets.OVPDataset.OVPDataset'

In [None]:
test_genetic_dataset = OVPDataset(**test_conf_test_data_catalog)
test_genetic_dataset

<__main__.OVPDataset at 0x7fb80817cfd0>

In [None]:
test_case_file = test_genetic_dataset.files.case
test_control_file = test_genetic_dataset.files.control

In [None]:
test_case_file.get_genotypes_df()

Unnamed: 0_level_0,ref,alt,homo_ref,het,homo_alt
rsid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
rs77948203,G,A,GG,AG,AA
rs1014626,C,T,CC,CT,TT
rs9610458,C,T,CC,CT,TT
rs5762201,A,G,AA,AG,GG
rs1004237,C,T,CC,CT,TT
rs134490,C,T,CC,CT,TT
rs4821519,G,C,GG,CG,CC
rs1003500,C,T,CC,CT,TT
rs5756405,A,G,AA,AG,GG


In [None]:
test_control_file.get_genotypes_df()

  and should_run_async(code)


Unnamed: 0_level_0,ref,alt,homo_ref,het,homo_alt
rsid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
rs77948203,G,A,GG,AG,AA
rs1014626,C,T,CC,CT,TT
rs9610458,C,T,CC,CT,TT
rs5762201,A,G,AA,AG,GG
rs1004237,C,T,CC,CT,TT
rs134490,C,T,CC,CT,TT
rs4821519,G,C,GG,CG,CC
rs1003500,C,T,CC,CT,TT
rs5756405,A,G,AA,AG,GG


In [None]:
case_all_geno_file = test_case_file.apply_func_to_all_samples(genetic_file.triplicate_converter, 
                                              genotype_df = test_case_file.get_genotypes_df()).T
case_all_geno_file

  and should_run_async(code)


0it [00:00, ?it/s]

rsid,rs77948203,rs1014626,rs9610458,rs5762201,rs1004237,rs134490,rs4821519,rs1003500,rs5756405
sample_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
sample1,GG,TT,TT,GG,CC,,GG,CC,AG
sample10,GG,TT,TT,GG,CC,TT,GG,CC,AG
sample100,GG,TT,TT,GG,CC,TT,GG,CC,AG
sample1000,AG,TT,CT,GG,CC,CT,GG,CC,AA
sample1001,GG,TT,TT,GG,CC,CT,CG,CC,AG
...,...,...,...,...,...,...,...,...,...
sample995,GG,TT,CT,GG,CC,CT,CC,CC,AG
sample996,GG,TT,CT,GG,CC,TT,GG,CC,AG
sample997,GG,TT,CT,AG,CC,TT,GG,CC,AG
sample998,GG,TT,CT,GG,CC,TT,GG,CC,AA


In [None]:
control_all_geno_file = test_control_file.apply_func_to_all_samples(genetic_file.triplicate_converter, 
                                              genotype_df = test_control_file.get_genotypes_df()).T
control_all_geno_file

  and should_run_async(code)


0it [00:00, ?it/s]

rsid,rs77948203,rs1014626,rs9610458,rs5762201,rs1004237,rs134490,rs4821519,rs1003500,rs5756405
sample_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
sample1,GG,TT,TT,AG,CC,CT,GG,CC,AG
sample10,GG,TT,CT,GG,CC,TT,GG,CC,AA
sample100,GG,TT,CT,GG,CC,,GG,CC,AG
sample1000,AG,TT,TT,GG,CC,CT,GG,CC,AA
sample1001,GG,TT,CC,GG,CC,CT,GG,CC,AG
...,...,...,...,...,...,...,...,...,...
sample995,GG,TT,TT,GG,CC,TT,GG,CC,AG
sample996,GG,TT,CT,GG,CC,TT,GG,CC,AA
sample997,GG,TT,CC,GG,CC,,GG,CC,AG
sample998,GG,TT,CT,GG,CC,TT,GG,CC,AG


In [None]:
assert not case_all_geno_file.equals(control_all_geno_file)

  and should_run_async(code)


In [None]:
test_data_catalog.save("case_geno_each_sample", case_all_geno_file)
test_data_catalog.load("case_geno_each_sample")

Unnamed: 0_level_0,rs77948203,rs1014626,rs9610458,rs5762201,rs1004237,rs134490,rs4821519,rs1003500,rs5756405
sample_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
sample1,GG,TT,TT,GG,CC,,GG,CC,AG
sample10,GG,TT,TT,GG,CC,TT,GG,CC,AG
sample100,GG,TT,TT,GG,CC,TT,GG,CC,AG
sample1000,AG,TT,CT,GG,CC,CT,GG,CC,AA
sample1001,GG,TT,TT,GG,CC,CT,CG,CC,AG
...,...,...,...,...,...,...,...,...,...
sample995,GG,TT,CT,GG,CC,CT,CC,CC,AG
sample996,GG,TT,CT,GG,CC,TT,GG,CC,AG
sample997,GG,TT,CT,AG,CC,TT,GG,CC,AG
sample998,GG,TT,CT,GG,CC,TT,GG,CC,AA


In [None]:
test_data_catalog.save("control_geno_each_sample", control_all_geno_file)
test_data_catalog.load("control_geno_each_sample")

  and should_run_async(code)


Unnamed: 0_level_0,rs77948203,rs1014626,rs9610458,rs5762201,rs1004237,rs134490,rs4821519,rs1003500,rs5756405
sample_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
sample1,GG,TT,TT,AG,CC,CT,GG,CC,AG
sample10,GG,TT,CT,GG,CC,TT,GG,CC,AA
sample100,GG,TT,CT,GG,CC,,GG,CC,AG
sample1000,AG,TT,TT,GG,CC,CT,GG,CC,AA
sample1001,GG,TT,CC,GG,CC,CT,GG,CC,AG
...,...,...,...,...,...,...,...,...,...
sample995,GG,TT,TT,GG,CC,TT,GG,CC,AG
sample996,GG,TT,CT,GG,CC,TT,GG,CC,AA
sample997,GG,TT,CC,GG,CC,,GG,CC,AG
sample998,GG,TT,CT,GG,CC,TT,GG,CC,AG
