# core

> These functions are designed to be used for working with genetic data.

In [None]:
#| default_exp dna

In [None]:
#| hide
from nbdev.showdoc import *

##  g2fc 
`# originally from nbs/01.04_g2fc_explore_genetic_data.ipynb`

Here I'm adapting an approach I took in `dlgwas` for accessing panzea data to the G2F genotype data release. The table is too big to easily load in an manipulate in pandas so as a work around to easily access specific genomes, I split the table into a separate file for the header and each genome and renamed each so that these files can be read piecemeal. See the Readme below for more details.

In [None]:
geno_path = '../data/zma/g2fc/genotypes/'

In [None]:
from EnvDL.core import print_txt # imports must be in a separate cell https://github.com/fastai/nbdev/blob/master/README.md#q-what-is-the-warning-found-a-cell-containing-mix-of-imports-and-computations-please-use-separate-cells

In [None]:
print_txt(path = '../data/zma/g2fc/genotypes/'+'split_and_rename.sh')

#!/usr/bin/bash
input_table='5_Genotype_Data_All_Years.txt'

#mkdir temp snps 

#split -l 1 "$input_table" ./temp/

files_path='./temp/'
out_path='./snps/'

files=$(ls "$files_path")

for file in $files
do
    taxa=$(awk '{print $1}' <<< cat "$files_path$file")
    taxa_sub=$(sed -r 's/[/]/__/g' <<< "$taxa") # replace the genotype's parent/parent with something that we can write.
    echo $files_path 
    echo $file
    echo $out_path
    echo $taxa_sub
    cp $files_path$file $out_path$taxa_sub
done

rm -r temp



In [None]:
import pandas as pd

In [None]:
# Other than listing the taxa this isn't expected to be of much use for our purposes.
geno_taxa=pd.read_table('../data/zma/g2fc/genotypes/'+'5_Genotype_Data_All_Years_TaxaSummary.txt')
geno_taxa.head()

Unnamed: 0,Taxa,Taxa Name,Number of Sites,Gametes Missing,Proportion Missing,Number Heterozygous,Proportion Heterozygous,Inbreeding Coefficient,Inbreeding Coefficient Scaled by Missing
0,0,2369/DK3IIH6,437214,11756,0.01344,137088,0.31782,Inbreeding Coefficient,ICSBM
1,1,2369/PHN82,437214,6990,0.00799,132208,0.30482,Inbreeding Coefficient,ICSBM
2,2,2369/PHZ51,437214,8974,0.01026,133888,0.30941,Inbreeding Coefficient,ICSBM
3,3,2FACC/DK3IIH6,437214,16866,0.01929,122678,0.28611,Inbreeding Coefficient,ICSBM
4,4,4N506/DK3IIH6,437214,14416,0.01649,135930,0.31611,Inbreeding Coefficient,ICSBM


In [None]:
#| export

def taxa_to_filename(taxa = '05-397/250007467', delim = '/'): return(taxa.replace(delim, '__'))

In [None]:
(taxa_to_filename(taxa = '05-397/250007467', delim = '/'),
 taxa_to_filename(taxa = '05-397:250007467', delim = ':'))

('05-397__250007467', '05-397__250007467')

In [None]:
#| export

def exists_geno(
    taxa, # should be the desired taxa or a regex fragment (stopping before the __). E.g. 'B73' or 'B\d+'
    **kwargs # optionally pass in a genome list (this allows for a different path or precomputing if we're finding a lot of genomes)
             # optionally pass in a different path to the snp table folder
    ):
    if 'genome_files_path' not in kwargs.keys():
        genome_files_path = '../data/zma/g2fc/genotypes/snps/'
    else:
        genome_files_path = kwargs['genome_files_path']
    
    if 'genome_files' not in kwargs.keys():
        import os
        genome_files = os.listdir(genome_files_path)
    else:
        genome_files = kwargs['genome_files']
        
    return(True in [True for e in genome_files if e == taxa])

In [None]:
(exists_geno(taxa = 'W10004_0171__PHZ51'),
 exists_geno(taxa = 'not_real'))

(True, False)

In [None]:
#| export
def find_geno(
    taxa, # should be the desired taxa or a regex fragment (stopping before the __). E.g. 'B73' or 'B\d+'
    **kwargs # optionally pass in a genome list (this allows for a different path or precomputing if we're finding a lot of genomes)
             # optionally pass in a different path to the snp table folder
    ):
    "Search for existing marker sets __"
    if 'genome_files_path' not in kwargs.keys():
        genome_files_path = '../data/zma/g2fc/genotypes/snps/'
    else:
        genome_files_path = kwargs['genome_files_path']
    
    if 'genome_files' not in kwargs.keys():
        import os
        genome_files = os.listdir(genome_files_path)
    else:
        genome_files = kwargs['genome_files']
    import re
    return( [e for e in genome_files if re.match(taxa+'__.+', e)] )

In [None]:
find_geno(taxa = 'W10004_0171')

['W10004_0171__PHP02', 'W10004_0171__PHZ51', 'W10004_0171__PHK76']

In [None]:
#| export

def get_geno( 
    taxa,
    **kwargs 
    ):
    "Retrieve an existing marker set"
    if 'genome_files_path' not in kwargs.keys():
        genome_files_path = '../data/zma/g2fc/genotypes/snps/'
    else:
        genome_files_path = kwargs['genome_files_path']
        
    with open(genome_files_path+taxa, 'r') as f:
        data = f.read()    
    data = data.split('\t')
    return(data)


In [None]:
get_geno('W10004_0171__PHZ51')[0:10]

['W10004_0171/PHZ51', 'C', 'C', 'C', 'R', 'W', 'G', 'C', 'G', 'R']

In addition to returning a specific taxa, the table's headers can be retieved with "taxa".

In [None]:
get_geno(taxa = 'Taxa')[0:4]

['Taxa', '120931', '121343', '122279']

Converting between site and chromosome/position requires the `AGPv4_site` dataframe. A given record contains the taxa as well as the nucleotides, so with that entry excluded the chromosome / position can be paired up.

In [None]:
len(get_geno(taxa = 'Taxa'))

437215

In [None]:
#| export

def list_to_ACGT(
    in_seq, # This should be a list with strings corresponding to IUPAC codes e.g. ['A', 'C', 'Y']
    progress = False
):
    import numpy as np
    import tqdm 
    from tqdm import tqdm

    # Convert IUPAC codes into pr ACGT -------------------------------------------
    encode_dict = {
        #     https://www.bioinformatics.org/sms/iupac.html
        #     A     C     G     T
        'A': [1,    0,    0,    0   ],
        'C': [0,    1,    0,    0   ],
        'G': [0,    0,    1,    0   ],
        'T': [0,    0,    0,    1   ],
        'K': [0,    0,    0.5,  0.5 ],
        'M': [0.5,  0.5,  0,    0   ],
        'N': [0.25, 0.25, 0.25, 0.25],
        'R': [0.5,  0,    0.5,  0   ],
        'S': [0,    0.5,  0.5,  0   ],
        'W': [0.5,  0,    0,    0.5 ],
        'Y': [0,    0.5,  0,    0.5 ],
        #     Other values (assumed empty)
        #     A     C     G     T
         '': [0,    0,    0,    0   ],
        '-': [0,    0,    0,    0   ],
        '0': [0,    0,    0,    0   ],
    }


    # Cleanup -- 
    # Any newlines need to be removed
    in_seq = [e.replace('\n', '') for e in in_seq]

    # Check if there's anything that should be in the dictionary but is not.
    not_in_dict = [e for e in list(set(in_seq)) if e not in list(encode_dict.keys())]

    if not_in_dict != []:
        print("Waring: The following are not in the encoding dictionary and will be set as missing.\n"+str(not_in_dict))

    in_seq = [e if e not in not_in_dict else '' for e in in_seq] 

    # output matrix
    GMat = np.zeros(shape = [len(in_seq), 4])

    # convert all nucleotides to probabilities
    if progress == True:
        for nucleotide in tqdm(encode_dict.keys()):
            mask = [True if e == nucleotide else False for e in  in_seq]
            GMat[mask, :] = encode_dict[nucleotide]    
    else:
        for nucleotide in encode_dict.keys():
            mask = [True if e == nucleotide else False for e in  in_seq]
            GMat[mask, :] = encode_dict[nucleotide]

    return(GMat)


In [None]:
list_to_ACGT(in_seq = get_geno('W10004_0171__PHZ51')[1:] )[0:10]

array([[0. , 1. , 0. , 0. ],
       [0. , 1. , 0. , 0. ],
       [0. , 1. , 0. , 0. ],
       [0.5, 0. , 0.5, 0. ],
       [0.5, 0. , 0. , 0.5],
       [0. , 0. , 1. , 0. ],
       [0. , 1. , 0. , 0. ],
       [0. , 0. , 1. , 0. ],
       [0.5, 0. , 0.5, 0. ],
       [0.5, 0. , 0.5, 0. ]])

In [None]:
#| export

def calc_needed_hilbert_p(n_needed = 1048576,
                          max_p = 20):
    out = None
    for i in range(1, max_p):
        if 4**i > n_needed:
            out = i
            break
    return(out)

In [None]:
#| export

def np_2d_to_hilbert(
    in_seq, # This should be a 2d numpy array with dimensions of [sequence, channels] 
    **kwargs # for silent
):
    import numpy as np
    import tqdm
    from tqdm import tqdm
    
    import hilbertcurve
    from hilbertcurve.hilbertcurve import HilbertCurve
    
    import EnvDL
    from EnvDL.dna import calc_needed_hilbert_p
    
    n_snps = in_seq.shape[0]
    n_channels = in_seq.shape[-1]
    temp = in_seq

    p_needed = calc_needed_hilbert_p(n_needed=n_snps)
    
    # Data represented need not be continuous -- it need only have int positions
    # a sequence or a sequence with gaps can be encoded
    hilbert_curve = HilbertCurve(
        p = p_needed, # iterations i.e. hold 4^p positions
        n = 2    # dimensions
        )

    points = hilbert_curve.points_from_distances(range(n_snps))

    dim_0 = np.max(np.array(points)[:, 0])+1 # add 1 to account for 0 indexing
    dim_1 = np.max(np.array(points)[:, 1])+1
    temp_mat = np.zeros(shape = [dim_0, dim_1, n_channels])
    temp_mat[temp_mat == 0] = np.nan         #  empty values being used for visualization

    if "silent" in kwargs:
        for i in range(n_snps):
            temp_mat[points[i][0], points[i][1], :] = temp[i]
    else:
        for i in tqdm(range(n_snps)):
            temp_mat[points[i][0], points[i][1], :] = temp[i]

    return(temp_mat)

In [None]:
#| export
def np_3d_to_hilbert(
    in_seq, # This should be a 3d numpy array with dimensions of [samples, sequence, channels] 
    **kwargs
):
    "This is the 3d version of `np_2d_to_hilbert`. The goal is to process all of the samples of an array in one go."
    import numpy as np
    import tqdm
    from tqdm import tqdm
    
    import hilbertcurve
    from hilbertcurve.hilbertcurve import HilbertCurve

    import EnvDL
    from EnvDL.dna import calc_needed_hilbert_p
    
    n_snps = in_seq.shape[1]
    n_channels = in_seq.shape[-1]
    temp = in_seq

    p_needed = calc_needed_hilbert_p(n_needed=n_snps)
    
    # Data represented need not be continuous -- it need only have int positions
    # a sequence or a sequence with gaps can be encoded
    hilbert_curve = HilbertCurve(
        p = p_needed, # iterations i.e. hold 4^p positions
        n = 2    # dimensions
        )

    points = hilbert_curve.points_from_distances(range(n_snps))

    dim_0 = np.max(np.array(points)[:, 0])+1 # add 1 to account for 0 indexing
    dim_1 = np.max(np.array(points)[:, 1])+1
    temp_mat = np.zeros(shape = [in_seq.shape[0], dim_0, dim_1, n_channels])
    temp_mat[temp_mat == 0] = np.nan         #  empty values being used for visualization
    
    if "silent" in kwargs:
        for i in range(n_snps):
            temp_mat[:,                          # sample
                     points[i][0], points[i][1], # x, y
                     :] = temp[:, i]             # channels
    else:
        for i in tqdm(range(n_snps)):
            temp_mat[:,                          # sample
                     points[i][0], points[i][1], # x, y
                     :] = temp[:, i]             # channels

    return(temp_mat)

In [None]:
#| export

def torch_2d_to_hilbert(
    in_seq, # This should be a 2d numpy array or torch tensor with dimensions of [sequence, channels] 
    **kwargs # for verbose
):
    import numpy
    import torch
    import tqdm
    from tqdm import tqdm
    
    import hilbertcurve
    from hilbertcurve.hilbertcurve import HilbertCurve
    
    import EnvDL
    from EnvDL.dna import calc_needed_hilbert_p

    if isinstance(in_seq, numpy.ndarray):
        in_seq = torch.from_numpy(in_seq)
    
    n_snps = in_seq.shape[0]
    n_channels = in_seq.shape[-1]
    temp = in_seq

    p_needed = calc_needed_hilbert_p(n_needed=n_snps)
    
    # Data represented need not be continuous -- it need only have int positions
    # a sequence or a sequence with gaps can be encoded
    hilbert_curve = HilbertCurve(
        p = p_needed, # iterations i.e. hold 4^p positions
        n = 2    # dimensions
        )

    points = hilbert_curve.points_from_distances(range(n_snps))

    dim_0 = torch.Tensor(points)[:, 0].max()+1 # add 1 to account for 0 indexing
    dim_1 = torch.Tensor(points)[:, 1].max()+1
    temp_mat = torch.zeros((dim_0.int().item(), # convert to int, get item
                            dim_1.int().item(), 
                            n_channels))
    temp_mat[temp_mat == 0] = torch.nan         #  empty values being used for visualization

    verbose = False
    if 'verbose' in kwargs:
        if kwargs['verbose'] == True:
            verbose = True

    if verbose:
        for i in tqdm(range(n_snps)):
            temp_mat[points[i][0], points[i][1], :] = temp[i] 
    else:        
        for i in range(n_snps):
            temp_mat[points[i][0], points[i][1], :] = temp[i]
    return(temp_mat)

In [None]:
#| export
def torch_3d_to_hilbert(
    in_seq, # This should be a 3d numpy array with dimensions of [samples, sequence, channels] 
    **kwargs
):
    "This is the 3d version of `torch_2d_to_hilbert`. The goal is to process all of the samples of an array in one go."
    # import numpy as np
    import numpy
    import torch
    import tqdm
    from tqdm import tqdm
    
    import hilbertcurve
    from hilbertcurve.hilbertcurve import HilbertCurve

    import EnvDL
    from EnvDL.dna import calc_needed_hilbert_p
    
    n_snps = in_seq.shape[1]
    n_channels = in_seq.shape[-1]
    temp = in_seq

    p_needed = calc_needed_hilbert_p(n_needed=n_snps)
    
    # Data represented need not be continuous -- it need only have int positions
    # a sequence or a sequence with gaps can be encoded
    hilbert_curve = HilbertCurve(
        p = p_needed, # iterations i.e. hold 4^p positions
        n = 2    # dimensions
        )

    points = hilbert_curve.points_from_distances(range(n_snps))

    # dim_0 = np.max(np.array(points)[:, 0])+1 # add 1 to account for 0 indexing
    # dim_1 = np.max(np.array(points)[:, 1])+1
    # temp_mat = np.zeros(shape = [in_seq.shape[0], dim_0, dim_1, n_channels])
    # temp_mat[temp_mat == 0] = np.nan         #  empty values being used for visualization
    
    dim_0 = torch.Tensor(points)[:, 0].max()+1 # add 1 to account for 0 indexing
    dim_1 = torch.Tensor(points)[:, 1].max()+1
    temp_mat = torch.zeros((in_seq.shape[0], 
                            dim_0.int().item(), # convert to int, get item
                            dim_1.int().item(), 
                            n_channels))
    temp_mat[temp_mat == 0] = torch.nan         #  empty values being used for visualization

    verbose = False
    if 'verbose' in kwargs:
        if kwargs['verbose'] == True:
            verbose = True

    if verbose:
        for i in tqdm(range(n_snps)):
            temp_mat[:,                 # sample
            points[i][0], points[i][1], # x, y
            :] = temp[:, i]             # channels
    else:        
        for i in range(n_snps):
            temp_mat[:,                 # sample
            points[i][0], points[i][1], # x, y
            :] = temp[:, i]             # channels

    return(temp_mat)

In [None]:
import numpy as np
import plotly.express as px

In [None]:
demo = np_2d_to_hilbert(
    in_seq = np.asarray([np.linspace(1, 100, num= 50),
                         np.linspace(100, 1, num= 50)]).T
)

px.imshow(demo[:,:,0])

100%|██████████| 50/50 [00:00<00:00, 123216.92it/s]


In [None]:
px.imshow(demo[:,:,1])

In [None]:
import torch
demo = torch_2d_to_hilbert(
    in_seq = torch.concat([torch.linspace(1, 100, steps= 50)[:, None],
                           torch.linspace(100, 1, steps= 50)[:, None]], axis = 1)
)

px.imshow(demo[:,:,1])

## Apply to subset of real marker data

Explictly convert a taxa

In [None]:
taxa_to_filename(taxa = 'W10004_0171/PHZ51')

'W10004_0171__PHZ51'

Or search for a taxa

In [None]:
find_geno(taxa = 'W10004_0171')

['W10004_0171__PHP02', 'W10004_0171__PHZ51', 'W10004_0171__PHK76']

Retrieve the sequence data

In [None]:
res = get_geno(taxa_to_filename(taxa = 'W10004_0171/PHZ51')) 
res = res[1:] # drop taxa
res[0:10]

['C', 'C', 'C', 'R', 'W', 'G', 'C', 'G', 'R', 'R']

Convert from characters to encoded nucleotide probabilities

In [None]:
res = list_to_ACGT(in_seq = res)
res = res[0:1000]
res

KeyboardInterrupt: 

Convert the sequence to a hilbert curve

In [None]:
# This will happen under the hood
# calc_needed_hilbert_p(n_needed=res.shape[0])
res_hilb = np_2d_to_hilbert(
    in_seq = res
)

100%|██████████| 1000/1000 [00:00<00:00, 811120.48it/s]


In [None]:
px.imshow( res[0:20, 0:1] )

In [None]:
px.imshow( res_hilb[:, :, 0] )

In [None]:
px.imshow( res_hilb[:, :, 1] )

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()