In [4]:
!pip install lifelines
!pip install Pgenlib
!pip install numpy 
!pip install sci_palettes
!pip install SciencePlots

Collecting sci_palettes
  Using cached sci_palettes-1.0.1-py3-none-any.whl (7.8 kB)
Collecting seaborn (from sci_palettes)
  Using cached seaborn-0.13.2-py3-none-any.whl.metadata (5.4 kB)
Using cached seaborn-0.13.2-py3-none-any.whl (294 kB)
Installing collected packages: seaborn, sci_palettes
Successfully installed sci_palettes-1.0.1 seaborn-0.13.2
Collecting SciencePlots
  Using cached SciencePlots-2.1.1-py3-none-any.whl.metadata (11 kB)
Using cached SciencePlots-2.1.1-py3-none-any.whl (16 kB)
Installing collected packages: SciencePlots
Successfully installed SciencePlots-2.1.1


In [1]:
import pgenlib as pg
import numpy as np

In [3]:
test_pvar_path = "test/ANGPTL3.pvar"
test_pgen_path = "test/ANGPTL3.pgen"

pfile = pg.PgenReader(bytes(test_pgen_path, "utf-8"))
print("pgen file have rows (ind): ", pfile.get_raw_sample_ct())
print("pgen file have columns (snp) : ", pfile.get_variant_ct())
print("pgen file is hard call: ", pfile.hardcall_phase_present())

pgen file have rows (ind):  467491
pgen file have columns (snp) :  427
pgen file is hard call:  False


In [4]:
pvar = pg.PvarReader(bytes(test_pvar_path, "utf-8"))
print("pvar variants counts:", pvar.get_variant_ct())
print("pvar variant 1:", pvar.get_variant_id(0).decode())

pvar variants counts: 427
pvar variant 1: 1:62597543:T:C


`PgenReader`是最为主要的对象

```python
class PgenReader:
    """
    .pgen 或 .bed 文件的读取器。

    Parameters:
        filename (bytes): 文件名，如果文件不存在或无效，则会引发异常。
        raw_sample_ct (int, optional): .bed 文件所需的原始样本数，如果不提供将引发异常；对于 .pgen 文件，可选。如果为 .pgen 文件提供了该参数，则如果样本数不匹配 .pgen 中的值，则会引发异常。
        variant_ct (int, optional): 变异位点数，始终可选。如果不提供，则会检查 .bed 文件的大小或 .pgen 中显式存储的值，如果不匹配则引发异常。
        sample_subset (np.ndarray[np.uint32], optional): （从零开始的）索引数组，告诉读取器仅在加载或计算基因型时考虑指定的样本。如果不提供，则表示包括所有样本。如果提供了值，则会检查数组中的值是否在0到(raw_sample_ct - 1)的范围内，并且是否严格递增，如果不是则引发异常。
        allele_idx_offsets (np.ndarray[np.uintp], optional): 多等位基因型的偏移量数组。当提供时，应该是一个长度为 (variant_ct+1) 的 np.uintp 数组，其中 allele_idx_offsets[i+1] - allele_idx_offsets[i] 是 0-based 变异位点 i 的等位基因数目。
        pvar (object, optional): 如果提供了 pvar，则从中提取 variant_ct 和 allele_idx_offsets。（不能同时提供 pvar 和 variant_ct/allele_idx_offsets）

    Attributes:
        filename (bytes): 文件名。
        raw_sample_ct (int): .bed 文件所需的原始样本数。
        variant_ct (int): 变异位点数。
        sample_subset (np.ndarray[np.uint32]): 样本子集索引数组。
        allele_idx_offsets (np.ndarray[np.uintp]): 多等位基因型的偏移量数组。

    Raises:
        ValueError: 如果文件不存在、无效或参数不匹配要求时引发异常。
    """
    def __init__(self, filename: bytes, raw_sample_ct: int = None, variant_ct: int = None, 
                 sample_subset: np.ndarray[np.uint32] = None, allele_idx_offsets: np.ndarray[np.uintp] = None, 
                 pvar: object = None) -> None:
        pass  # 初始化函数，打开 .pgen 或 .bed 文件

```

In [3]:
test_pvar_path = 'pgen/ANGPTL3.pvar'
test_pgen_path = 'pgen/ANGPTL3.pgen'

pfile = pg.PgenReader(bytes(test_pgen_path, 'utf-8'))
print("pgen file have rows (ind): ", pfile.get_raw_sample_ct())
print("pgen file have columns (snp) : ", pfile.get_variant_ct())
print("pgen file is hard call: ", pfile.hardcall_phase_present())

pgen file have rows (ind):  467491
pgen file have columns (snp) :  427
pgen file is hard call:  False


In [6]:
pfile = pg.PgenReader(bytes(test_pgen_path, 'utf-8'), pvar = pg.PvarReader(bytes(test_pvar_path, 'utf-8')))

pfile.get_variant_id(32)

AttributeError: 'pgenlib.PgenReader' object has no attribute 'get_variant_id'

array([0, 0, 0, ..., 0, 0, 0], dtype=int8)

- read function

read(int32_t variant_idx, np.ndarray[np.int{8,32,64}_t] geno_int_out,
       uint32_t allele_idx = 1)
- variant_idx:int32, 接受索引
- geno_int_out: np.array(dtype=np.int{8,32,64}) 用来作为buf，接受输出，最终的输出结果存放在这个变量里面
- allele_idx: int32, default 1 ，1 返回的计数是基于alt的，0则是基于REF

```python
def read(variant_idx: int, geno_int_out: np.ndarray[np.int8 | np.int32 | np.int64], allele_idx: int = 1) -> None:
    """
    读取指定索引的变异位点信息。

    Parameters:
        variant_idx (int): 变异位点的索引，使用 int32 类型表示。
        geno_int_out (np.ndarray): 用于存储基因型信息的 numpy 数组，dtype 为 np.int8、np.int32 或 np.int64。
        allele_idx (int, optional): 等位基因索引，默认为 1。当 allele_idx 为 1 时，返回的基因型信息是基于替代等位基因的计数；当 allele_idx 为 0 时，返回的基因型信息是基于参考等位基因的计数。

    Returns:
        None: 函数没有返回值，但是会将结果存储在 geno_int_out 中。
    """
```

In [77]:
# 读取一个人的全部SNP
buf = np.empty(pfile.get_raw_sample_ct(), np.int8)  # why use this ? - Author tried the interface without the required outbuffer, and it could take twice as long due to all the extra allocation/deallocation. 
pfile.read(0, buf) # 返回结果存放在buf里面
buf

array([0, 0, 0, ..., 0, 0, 0], dtype=int8)

```python
def read_dosages(variant_idx: int, floatarr_out: np.ndarray[np.float32 | np.float64], allele_idx: int = 1) -> None:
    """
    读取指定索引的变异位点剂量信息。

    Parameters:
        variant_idx (int): 变异位点的索引，使用 uint32_t 类型表示。
        floatarr_out (np.ndarray): 用于存储剂量信息的 numpy 数组，dtype 为 np.float32 或 np.float64。
        allele_idx (int, optional): 等位基因索引，默认为 1。当 allele_idx 为 1 时，返回的剂量信息是基于替代等位基因的剂量；当 allele_idx 为 0 时，返回的剂量信息是基于参考等位基因的剂量。

    Returns:
        None: 函数没有返回值，但是会将结果存储在 floatarr_out 中。
    """


```

In [79]:
buf = np.empty(pfile.get_raw_sample_ct(), np.float32) 
pfile.read_dosages(0, buf)
buf

array([0., 0., 0., ..., 0., 0., 0.], dtype=float32)

```python
def read_range(variant_idx_start: int, variant_idx_end: int,
               geno_int_out: np.ndarray[np.int8 | np.int32 | np.int64, mode='c', ndim=2],
               allele_idx: int = 1, sample_maj: int = 0) -> None:
    """
    读取指定范围内的变异位点信息。

    Parameters:
        variant_idx_start (int): 变异位点的起始索引，使用 uint32_t 类型表示。
        variant_idx_end (int): 变异位点的结束索引，使用 uint32_t 类型表示。
        geno_int_out (np.ndarray): 用于存储基因型信息的二维 numpy 数组，dtype 为 np.int8、np.int32 或 np.int64。mode 为 "c"，ndim 为 2
        allele_idx (int, optional): 等位基因索引，默认为 1。当 allele_idx 为 1 时，返回的基因型信息是基于替代等位基因的计数；当 allele_idx 为 0 时，返回的基因型信息是基于参考等位基因的计数。
        sample_maj (int, optional): 不知道功能，没有文档解释

    Returns:
        None: 函数没有返回值，但是会将结果存储在 geno_int_out 中。
    """


```

In [96]:
start =0 
end = 10 
nums = end - start

buf = np.empty(pfile.get_raw_sample_ct(), np.int8).reshape(1, -1).repeat(nums, axis=0) # shape (num, sample)
print(buf.shape)
pfile.read_range(start, end, buf)
buf

(10, 467491)


array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]], dtype=int8)

```python
def read_list(variant_idxs: np.ndarray[np.uint32], 
              geno_int_out: np.ndarray[np.int8 | np.int32 | np.int64, mode='c', ndim=2], 
              allele_idx: int = 1, sample_maj: int = 0) -> None:
    """
    读取指定索引列表中的变异位点信息。

    Parameters:
        variant_idxs (np.ndarray): 变异位点索引的数组，dtype 为 np.uint32。
        geno_int_out (np.ndarray): 用于存储基因型信息的二维 numpy 数组，dtype 为 np.int8、np.int32 或 np.int64。mode 为 "c"，ndim 为 2。
        allele_idx (int, optional): 等位基因索引，默认为 1。当 allele_idx 为 1 时，返回的基因型信息是基于替代等位基因的计数；当 allele_idx 为 0 时，返回的基因型信息是基于参考等位基因的计数。
        sample_maj (int, optional): 样本主要等位基因的索引，默认为 0。

    Returns:
        None: 函数没有返回值，但是会将结果存储在 geno_int_out 中。
    """

```

## 包裹代码
```python
from typing import Union, List, overload
# from 

class PgenReaderFull():
    def __init__(self, pfile_path:str, pgen_path:str =None, pvar_path:str=None, sample_path:str=None ):

        self.pgen_path:str = pgen_path if pgen_path is not None else pfile_path + '.pgen'
        self.pvar_path:str = pvar_path if pvar_path is not None else pfile_path + '.pvar'
        self.sample_path:str = sample_path if sample_path is not None else pfile_path + '.psam'

        self.pvar:pg.PvarReader = pg.PvarReader(bytes(self.pvar_path, 'utf-8'))
        self._init_var() # this may be too large to load
        
        self.sample:pd.DataFrame = pd.read_csv(self.sample_path, sep='\t')
        self._init_sample()

        self.pgen:pg.PgenReader = pg.PgenReader(bytes(self.pgen_path, 'utf-8'), pvar=self.pvar, 
                                                sample_subset=np.arange(len(self.sample_list), dtype=np.uint32)
                                                )


        # basic information
        self.sample_ct:int = self.get_raw_sample_ct() # number of samples as same as get_raw_sample_ct, but only call once
        self.variant_ct:int = self.get_variant_ct() # number of variants as same as get_variant_ct, but only call once


    @overload
    def get_variant_ids(self, variant_idx:List[int])->List[str]:...
    @overload
    def get_variant_ids(self, variant_idx:int)->str:...

    def get_variant_ids(self, variant_idx:Union[int, List[int]])->Union[str, List[str]]:
        """
        from PvarReader.get_variant_id 
        Modified version for interface        
        """
        if isinstance(variant_idx, int):
            return self.pvar.get_variant_id(variant_idx).decode('utf-8')
        else:
            return [self.get_variant_ids(i) for i in variant_idx]
        
    @overload
    def get_variant_idx(self, variant_id:str)->int:...
    @overload
    def get_variant_idx(self, variant_id:List[str])->List[int]:...
    def get_variant_idx(self, variant_id:Union[str, List[str]])->Union[int, List[int]]:
        """
        Modified version for interface
        """
        if isinstance(variant_id, str):
            return self.var_list.index(variant_id)
        else:
            return [self.get_variant_idx(i) for i in variant_id]
    def get_sample_ids(self, sample_idx:Union[int, List[int]])->Union[str, List[str]]:
        """
        TF version for get sample ids
        """
        if isinstance(sample_idx, int):
            return self.sample_list[sample_idx]
        else:
            return [self.get_sample_ids(i) for i in sample_idx]


    def _init_sample(self):
        """
        load from psam and to list[[FID, IID]]
        sample_list = [[FID, IID], ...]
        """
        self.sample_list = pd.read_csv(self.sample_path, sep='\t').iloc[:, :2].values.tolist()
    def _init_var(self):
        """
        load from pvar and to list[variant_id]
        """
        self.var_list = pd.read_csv(self.pvar_path, sep='\t').iloc[:, 2].tolist()

    def get_raw_sample_ct(self) -> int:
        """
        from PgenReader.get_raw_sample_ct
        """
        return self.pgen.get_raw_sample_ct()

    def get_variant_ct(self) -> int:
        """
        from PgenReader.get_variant_ct
        """
        return self.pgen.get_variant_ct()
    
    def hardcall_phase_present(self) -> bool:
        """
        from PgenReader.hardcall_phase_present
        """
        return self.pgen.hardcall_phase_present()
    def read(self, variant_idx, sample_idx=None, allele_idx=1)->np.ndarray:
        """
        from PgenReader.read with modified 

        Parameters
            - variant_idx:int32, index of the variant to read
            - allele_idx (int, optional): 等位基因索引，默认为 1。当 allele_idx 为 1 时，返回的基因型信息是基于ALT等位基因的计数；当 allele_idx 为 0 时，返回的基因型信息是基于REF等位基因的计数。

        """
        
        sample_idx = np.array(sample_idx) if sample_idx is not None else np.array(range(self.sample_ct))

        # use buf to store the result; the author of pgenlib think this is faster 
        buf = np.empty(self.sample_ct, dtype=np.int8)
        self.pgen.read(variant_idx = variant_idx, 
                       geno_int_out = buf,
                       allele_idx = allele_idx,
                       
                       )
        return buf[sample_idx]
    def read_range(self, variant_idx_start, variant_idx_end, allele_idx=1, sample_idx=None)->np.ndarray:
        """
        from PgenReader.read_range
        Parameters
            - variant_idx_start: int32, index of the first variant to read
            - variant_idx_end: int32, index of the last variant to read
            - allele_idx: int32, index of the allele to read, defualt 1 for ALT count, while 0 for REF count
            - sample_idx: int32, index of the sample to read
        """
        sample_idx = np.array(sample_idx) if sample_idx is not None else np.array(range(self.sample_ct))

        buf = np.empty((self.sample_ct, variant_idx_end-variant_idx_start), dtype=np.int8)
        self.pgen.read_range(variant_idx_start = variant_idx_start, 
                             variant_idx_end = variant_idx_end,
                             geno_int_out = buf,
                             allele_idx = allele_idx,
                             )
        
        return buf[:, sample_idx]
    def read_list(self, variant_idxs:list,  allele_idx=1, sample_idx=None) -> np.ndarray:
        """
        from PgenReader.read_list
        """
        sample_idx = np.array(sample_idx) if sample_idx is not None else np.array(range(self.sample_ct))

        buf = np.empty((len(variant_idxs), self.sample_ct), dtype=np.int32) # shape (variant_nums, sample_nums)

        variant_idxs_array = np.array(variant_idxs, dtype=np.uint32)
        self.pgen.read_list(variant_idxs = variant_idxs_array, 
                            geno_int_out = buf,
                            allele_idx = allele_idx,
                            )
        return buf[:, sample_idx]

    @overload
    def extract(self, variant_idx:List[int], variant_ids=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    @overload
    def extract(self, variant_idx:int, variant_ids=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    @overload
    def extract(self, variant_ids:List[str], variant_idx=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    @overload
    def extract(self, variant_ids:str, variant_idx=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    
    def extract(self, variant_idx=None, variant_ids=None, allele_idx=1, sample_idx=None)->np.ndarray:
        """
        TF version for extract data from pgen file 
        Usage:
        pgfull = PgenReaderFull(pfile_path) # suffix of pgen,pvar and psam like plink2 usage do 

        # extract the first 3 variants, the second allele, and the first 3 samples
        snp_idxs = [1,2,3]
        geno, variants, samples = pgfull.extract(variant_idx=[1,2,3])

        # or extract by variant_ids
        geno, variants, samples = pgfull.extract(variant_ids=['rs1','rs2','rs3'])

        # or only part of the samples
        geno, variants, samples = pgfull.extract(variant_idx=[1,2,3], sample_idx=[1,2,3])

        """
        if variant_idx is None and variant_ids is None:
            raise ValueError('variant_idx and variant_ids cannot be None at the same time')
        if variant_idx and variant_ids:
            raise ValueError('variant_idx and variant_ids cannot be assigned at the same time')
        if variant_idx:
            if isinstance(variant_idx, int):
                variant_idx = [variant_idx]

            extracted_geno = self.read_list(variant_idx, allele_idx, sample_idx)
            samples = self.get_sample_ids(sample_idx) if sample_idx else self.sample_list
            variants = self.get_variant_ids(variant_idx)

        if variant_ids:
            if isinstance(variant_ids, str):
                variant_ids = [variant_ids]

            variant_idx = self.get_variant_idx(variant_ids)

            extracted_geno = self.read_list(variant_idx, allele_idx, sample_idx)
            samples = self.get_sample_ids(sample_idx) if sample_idx else self.sample_list
            variants = variant_ids

        # return pd.DataFrame(extracted_geno, index=variants, columns=samples)
        return extracted_geno, variants, samples





    def read_snp_list(self, snp_list, allele_idx=1, sample_idx=None)->np.ndarray:
        pass 


    def read_dosage(self):
        """
        soon modify from PgenReader.read_dosage
        """
        raise NotImplementedError
    def read_alleles(self):
        """
        soon modify from PgenReader.read_alleles
        """
        raise NotImplementedError
    
    def read_alleles_and_phasepresent(self):
        """
        soon modify from PgenReader.read_alleles_and_phasepresent
        """
        raise NotImplementedError
    

    
    def __enter__(self):
        return self
    def __exit__(self):
        self.close()

    def close(self):
        """
        from PgenReader.close
        """
        self.pgen.close()
        self.pvar.close()
```

In [9]:
from typing import Union, List, overload
# from 

class PgenReaderFull():
    def __init__(self, pfile_path:str, pgen_path:str =None, pvar_path:str=None, sample_path:str=None ):

        self.pgen_path:str = pgen_path if pgen_path is not None else pfile_path + '.pgen'
        self.pvar_path:str = pvar_path if pvar_path is not None else pfile_path + '.pvar'
        self.sample_path:str = sample_path if sample_path is not None else pfile_path + '.psam'

        self.pvar:pg.PvarReader = pg.PvarReader(bytes(self.pvar_path, 'utf-8'))
        self._init_var() # this may be too large to load
        
        self.sample:pd.DataFrame = pd.read_csv(self.sample_path, sep='\t')
        self._init_sample()

        self.pgen:pg.PgenReader = pg.PgenReader(bytes(self.pgen_path, 'utf-8'), pvar=self.pvar, 
                                                sample_subset=np.arange(len(self.sample_list), dtype=np.uint32)
                                                )


        # basic information
        self.sample_ct:int = self.get_raw_sample_ct() # number of samples as same as get_raw_sample_ct, but only call once
        self.variant_ct:int = self.get_variant_ct() # number of variants as same as get_variant_ct, but only call once


    @overload
    def get_variant_ids(self, variant_idx:List[int])->List[str]:...
    @overload
    def get_variant_ids(self, variant_idx:int)->str:...

    def get_variant_ids(self, variant_idx:Union[int, List[int]])->Union[str, List[str]]:
        """
        from PvarReader.get_variant_id 
        Modified version for interface        
        """
        if isinstance(variant_idx, int):
            return self.pvar.get_variant_id(variant_idx).decode('utf-8')
        else:
            return [self.get_variant_ids(i) for i in variant_idx]
        
    @overload
    def get_variant_idx(self, variant_id:str)->int:...
    @overload
    def get_variant_idx(self, variant_id:List[str])->List[int]:...
    def get_variant_idx(self, variant_id:Union[str, List[str]])->Union[int, List[int]]:
        """
        Modified version for interface
        """
        if isinstance(variant_id, str):
            return self.var_list.index(variant_id)
        else:
            return [self.get_variant_idx(i) for i in variant_id]
    def get_sample_ids(self, sample_idx:Union[int, List[int]])->Union[str, List[str]]:
        """
        TF version for get sample ids
        """
        if isinstance(sample_idx, int):
            return self.sample_list[sample_idx]
        else:
            return [self.get_sample_ids(i) for i in sample_idx]


    def _init_sample(self):
        """
        load from psam and to list[[FID, IID]]
        sample_list = [[FID, IID], ...]
        """
        self.sample_list = pd.read_csv(self.sample_path, sep='\t').iloc[:, :2].values.tolist()
    def _init_var(self):
        """
        load from pvar and to list[variant_id]
        """
        self.var_list = pd.read_csv(self.pvar_path, sep='\t').iloc[:, 2].tolist()

    def get_raw_sample_ct(self) -> int:
        """
        from PgenReader.get_raw_sample_ct
        """
        return self.pgen.get_raw_sample_ct()

    def get_variant_ct(self) -> int:
        """
        from PgenReader.get_variant_ct
        """
        return self.pgen.get_variant_ct()
    
    def hardcall_phase_present(self) -> bool:
        """
        from PgenReader.hardcall_phase_present
        """
        return self.pgen.hardcall_phase_present()
    def read(self, variant_idx, sample_idx=None, allele_idx=1)->np.ndarray:
        """
        from PgenReader.read with modified 

        Parameters
            - variant_idx:int32, index of the variant to read
            - allele_idx (int, optional): 等位基因索引，默认为 1。当 allele_idx 为 1 时，返回的基因型信息是基于ALT等位基因的计数；当 allele_idx 为 0 时，返回的基因型信息是基于REF等位基因的计数。

        """
        
        sample_idx = np.array(sample_idx) if sample_idx is not None else np.array(range(self.sample_ct))

        # use buf to store the result; the author of pgenlib think this is faster 
        buf = np.empty(self.sample_ct, dtype=np.int8)
        self.pgen.read(variant_idx = variant_idx, 
                       geno_int_out = buf,
                       allele_idx = allele_idx,
                       
                       )
        return buf[sample_idx]
    def read_range(self, variant_idx_start, variant_idx_end, allele_idx=1, sample_idx=None)->np.ndarray:
        """
        from PgenReader.read_range
        Parameters
            - variant_idx_start: int32, index of the first variant to read
            - variant_idx_end: int32, index of the last variant to read
            - allele_idx: int32, index of the allele to read, defualt 1 for ALT count, while 0 for REF count
            - sample_idx: int32, index of the sample to read
        """
        sample_idx = np.array(sample_idx) if sample_idx is not None else np.array(range(self.sample_ct))

        buf = np.empty((self.sample_ct, variant_idx_end-variant_idx_start), dtype=np.int8)
        self.pgen.read_range(variant_idx_start = variant_idx_start, 
                             variant_idx_end = variant_idx_end,
                             geno_int_out = buf,
                             allele_idx = allele_idx,
                             )
        
        return buf[:, sample_idx]
    def read_list(self, variant_idxs:list,  allele_idx=1, sample_idx=None) -> np.ndarray:
        """
        from PgenReader.read_list
        """
        sample_idx = np.array(sample_idx) if sample_idx is not None else np.array(range(self.sample_ct))

        buf = np.empty((len(variant_idxs), self.sample_ct), dtype=np.int32) # shape (variant_nums, sample_nums)

        variant_idxs_array = np.array(variant_idxs, dtype=np.uint32)
        self.pgen.read_list(variant_idxs = variant_idxs_array, 
                            geno_int_out = buf,
                            allele_idx = allele_idx,
                            )
        return buf[:, sample_idx]

    @overload
    def extract(self, variant_idx:List[int], variant_ids=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    @overload
    def extract(self, variant_idx:int, variant_ids=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    @overload
    def extract(self, variant_ids:List[str], variant_idx=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    @overload
    def extract(self, variant_ids:str, variant_idx=None, allele_idx=1, sample_idx=None)-> np.ndarray:...
    
    def extract(self, variant_idx=None, variant_ids=None, allele_idx=1, sample_idx=None)->np.ndarray:
        """
        TF version for extract data from pgen file 
        Usage:
        pgfull = PgenReaderFull(pfile_path) # suffix of pgen,pvar and psam like plink2 usage do 

        # extract the first 3 variants, the second allele, and the first 3 samples
        snp_idxs = [1,2,3]
        geno, variants, samples = pgfull.extract(variant_idx=[1,2,3])

        # or extract by variant_ids
        geno, variants, samples = pgfull.extract(variant_ids=['rs1','rs2','rs3'])

        # or only part of the samples
        geno, variants, samples = pgfull.extract(variant_idx=[1,2,3], sample_idx=[1,2,3])

        """
        if variant_idx is None and variant_ids is None:
            raise ValueError('variant_idx and variant_ids cannot be None at the same time')
        if variant_idx and variant_ids:
            raise ValueError('variant_idx and variant_ids cannot be assigned at the same time')
        if variant_idx:
            if isinstance(variant_idx, int):
                variant_idx = [variant_idx]

            extracted_geno = self.read_list(variant_idx, allele_idx, sample_idx)
            samples = self.get_sample_ids(sample_idx) if sample_idx else self.sample_list
            variants = self.get_variant_ids(variant_idx)

        if variant_ids:
            if isinstance(variant_ids, str):
                variant_ids = [variant_ids]

            variant_idx = self.get_variant_idx(variant_ids)

            extracted_geno = self.read_list(variant_idx, allele_idx, sample_idx)
            samples = self.get_sample_ids(sample_idx) if sample_idx else self.sample_list
            variants = variant_ids

        # return pd.DataFrame(extracted_geno, index=variants, columns=samples)
        return extracted_geno, variants, samples





    def read_snp_list(self, snp_list, allele_idx=1, sample_idx=None)->np.ndarray:
        pass 


    def read_dosage(self):
        """
        soon modify from PgenReader.read_dosage
        """
        raise NotImplementedError
    def read_alleles(self):
        """
        soon modify from PgenReader.read_alleles
        """
        raise NotImplementedError
    
    def read_alleles_and_phasepresent(self):
        """
        soon modify from PgenReader.read_alleles_and_phasepresent
        """
        raise NotImplementedError
    

    
    def __enter__(self):
        return self
    def __exit__(self):
        self.close()

    def close(self):
        """
        from PgenReader.close
        """
        self.pgen.close()
        self.pvar.close()