This notebook best viewed here: https://nbviewer.jupyter.org

This notebook was used to explore data of the pipeline where SNPs were called/filtered across varieties. We probably won't be using the combined data, so I'm keeping this records only

This notebooke encompasses
- sending files to start varscan_pipeline on server
- maf filtering
- LD pruning to get SNPs for structure estimation in GEA

In [1]:
from pythonimports import *

# copy over fastq and md5 files to compute canada server

In [2]:
DIR = '/data/fastq/mengmeng/CoAdapTree_DouglasFir/received_2019_Sep10'
fastqs = fs(DIR, pattern='.fastq')
len(fastqs)

352

In [4]:
cmdtext = op.join(DIR, 'cp_to_graham_cmds.txt')
with open(cmdtext, 'w') as o:
    cmds = []
    for fastq in fastqs:
        cmds.append(f'rsync -avz {fastq} graham:/scratch/lindb/DF_pooled/')
    o.write("%s" % '\n'.join(cmds))

In [8]:
lview, dview = get_client('default')

56 56


In [9]:
def exe(cmd):
    import os
    os.system(cmd)

In [10]:
len(cmds)

352

In [11]:
jobs = make_jobs(cmds, exe, lview)
watch_async(jobs)

61
352


KeyboardInterrupt: 

In [12]:
cmdtext

'/data/fastq/mengmeng/CoAdapTree_DouglasFir/received_2019_Sep10/cp_to_graham_cmds.txt'

In [13]:
for j in jobs:
    x = j.r

In [18]:
needed = []
for cmd in cmds:
    fq = op.basename(cmd.split()[2])
    for x in ['NS.1195.001.D707---D504.DF_p54_cap25_kit3_R1.fastq.gz',
              'NS.1195.001.D707---D504.DF_p54_cap25_kit3_R2.fastq.gz',
              'NS.1195.001.D707---D505.DF_p85_cap27_kit3_R1.fastq.gz']:
        if fq == x:
            needed.append(cmd)
len(needed)

3

In [20]:
jobs = make_jobs(needed, exe, lview)
watch_async(jobs)

3
3


# filter VarScan output for natural pops only

I ran the pipeline that included orchard pops, so I'm filtering based on natural pops

In [4]:
lview,dview = get_client()

56 56


In [28]:
# modified from filter_VariantsToTable.py to only pull out baseline-filtered snps based on variety

# I've copied this from 001_DF_pooled_data_explore where I separate on variety, but here I'm ...
# ... hacking it so that 'variety' is just all natural pops

# modifications are marked with ########## (other than imports)
def pklload(path):
    import pickle
    pkl = pickle.load(open(path, 'rb'))
    return pkl
dview['pklload'] = pklload

def get_varscan_names(df, tablefile):                                          ############ added tablefile arg
    """Convert generic sample/pool names from varscan to something meaningful."""
    print('renaming varscan columns ...')
    import os 
    pool = os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(tablefile))))          ############ added
    parentdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(tablefile))))      ############
    
    # get order of samps used to create varscan cmds (same order as datatable)
    samps = pklload(os.path.join(parentdir, f'{pool}/pkl_files/poolsamps.pkl'))[pool]                 ############ 
    # create a list of names that varscan gives by default
    generic = ['Sample%s' % (i+1) for i in range(len(samps))]
    # create a map between generic and true samp names
    dic = dict((gen, samp) for (gen, samp) in zip(generic, samps))
    # rename the columns in df
    cols = []
    for col in df:
        if '.' in col:
            gen, rest = col.split(".")
            samp = dic[gen]
            col = '.'.join([samp, rest])
        cols.append(col)
    df.columns = cols
    return df
dview['get_varscan_names'] = get_varscan_names

def load_data(tablefile, variety):
    """
    Load the VariantsToTable output.
    
    Positional arguments:
    tablefile - path to VariantsToTable output - used to find ploidy etc
    
    Returns:
    df - pandas.dataframe; VariantsToTable output
    tf - basename of tablefile
    """
    import os
    import pandas 
    
    tf = os.path.basename(tablefile)

    # load the data, create a column with CHROM-POS for locusID
    df = pandas.read_csv(tablefile, sep='\t')
    print(f'{tf} has {len(df.index)} rows (includes multiallelic)')
    df['locus'] = ["%s-%s" % (contig, pos) for (contig, pos) in zip(df['CHROM'].tolist(), df['POS'].tolist())]
    df = get_varscan_names(df, tablefile)
    
    # keep only columns for this variety
    cols = [col for col in df.columns if '.' not in col or col.split(".")[0] in varlist[variety]]  ### added
    df = df[[col for col in df.columns if col in cols]].copy()                                     ### added
    
    return df, tf
dview['load_data'] = load_data

def write_file(tablefile, df, tipe, variety):
    import pandas
    import os
    """Write filtered pandas.dataframe to file using args to create file name."""
#     newfile = tablefile.replace(".txt", f"_{tipe}.txt")                   ########## commented out
    dirdict = {'both': 'combined_varieties'}
    dname = os.path.basename(os.path.dirname(tablefile))
    write_dir = os.path.dirname(os.path.dirname(tablefile)) + f"/{dirdict[variety]}/{dname}_{variety}"                  ########## added
    print('write_dir = ', write_dir)
    bname = os.path.basename(tablefile).replace(".txt", f"_{tipe}_{variety}.txt")##### added   
    newfile = os.path.join(write_dir, bname)                              ########## added
    print(f'{tipe}_path = ', newfile)                                     ########## added
    
    df.to_csv(newfile, index=False, sep='\t')
    print('finished filtering VariantsToTable file: %s' % newfile)
dview['write_file'] = write_file

def adjust_freqs(smalldf):
    """
    For loci with REF=N, set freqs of pools with REF=N in GT to numpy.nan.
    Set alt freqs with respect to the second alt allele.
    
    Positional arguments:
    smalldf - pandas.dataframe; df with only REF=N
    
    Returns:
    ndf - smalldf with adjusted freqs in zeroth row
    """
    import pandas
    import numpy
    gtcols = [col for col in smalldf.columns if 'GT' in col]

    for col in gtcols:
        gt = smalldf.loc[1, col]
        if isinstance(gt, str):
            freqcol = col.split(".")[0] + '.FREQ'
            if not gt == 'N/N':
                freq = smalldf.loc[0, freqcol]
                if isinstance(freq, str):
                    if "%" in freq:
                        newfreq = "%s%%" % (100 - float(freq.split("%")[0]))
                        smalldf.loc[0, freqcol] = newfreq
            else:
                # if gt = N/N, adjust to undefined
                smalldf.loc[1, freqcol] = numpy.nan
        gt2 = smalldf.loc[0, col]
        if isinstance(gt2, str):
            if gt == 'N/N':
                # if gt = N/N, adjust to undefined
                smalldf.loc[0, freqcol] = numpy.nan
    return smalldf
dview['adjust_freqs'] = adjust_freqs

def get_refn_snps(df, tipe, ndfs=None):
    """
    Isolate polymorphisms with REF=N but two ALT single nuleodite alleles.
    
    Positional arguments:
    df - pandas.dataframe; current filtered VariantsToTable output
    
    Returns:
    dfs - list of loci (pandas.dataframes) with REF=N and two ALT alleles, counts with respect to second ALT
    ndfs - return from pandas.conat(dfs)
    """
    import pandas
    # as far as I can tell, crisp output from convert_pooled_vcf.py will not output REF = N
    ndf = df[df['REF'] == 'N'].copy()
    ndf = ndf[ndf['TYPE'] == tipe].copy()
    ncount = table(ndf['locus'])
    nloci = [locus for locus in ncount if ncount[locus] == 2]
    ndf = ndf[ndf['locus'].isin(nloci)].copy()
    dfs = []
    for locus in uni(ndf['locus']):
        smalldf = ndf[ndf['locus'] == locus].copy()
        if len(smalldf.index) == 2:
            smalldf.index = range(len(smalldf.index))
            smalldf = adjust_freqs(smalldf)
            smalldf.loc[0,'ALT'] = "%s+%s" % (smalldf.loc[0,'ALT'], smalldf.loc[1,"ALT"])
            dfs.append(pandas.DataFrame(smalldf.loc[0,:]).T)
    if len(dfs) > 0:
        ndfs = pandas.concat(dfs)
    return (dfs, ndfs)
dview['get_refn_snps'] = get_refn_snps

def keep_snps(df, tf):
    """
    Count CHROM-POS (locus) and keep only those with one ALT.
    
    Positional arguments:
    df - pandas.dataframe; currently filtered VariantsToTable output
    tf - basename of path to VariantsToTable output
    Returns:
    df - pandas.dataframe; non-multiallelic-filtered VariantsToTable output
    """
    import pandas
    loccount = table(df['locus'])
    goodloci = [locus for locus in loccount if loccount[locus] == 1]
    print(f'{tf} has {len(goodloci)} good loci (non-multiallelic)')

    # filter df for multiallelic (multiple lines), REF != N
    df = df[df['locus'].isin(goodloci)].copy()
    df = df[df['REF'] != 'N'].copy()
    return df
dview['keep_snps'] = keep_snps

def filter_missing_data(df, tf, tipe):
    """
    Remove loci with < 25% missing data.
    Count numpy.nan in .FREQ col to assess % missing data.
    
    Positional arguments:
    df - pandas.dataframe; VariantsToTable output
    tf - str; basename of tablefile
    tipe - str; one of either "SNP" or "INDEL"
    
    Returns:
    df - pandas.dataframe; missing data-filtered VariantsToTable output
    """
    import tqdm
    import pandas
    import math
    freqcols = [col for col in df.columns if '.FREQ' in col]
    copy = get_copy(df, freqcols)
    keepers = []
    # else statement for running single pos.path.(megagamtos.path.yte) through:
    thresh = math.floor(0.25 * len(freqcols)) if len(freqcols) > 1 else 1
    for locus in tqdm.tqdm(copy.columns):
        # if there is less than 25% missing data:
        # the only time x != x is when x is nan (fastest way to count it)
        count = sum(1 for x in copy[locus] if x != x)
        if count < thresh:
            keepers.append(locus)
    df = df[df.index.isin(keepers)].copy()
    df.index = range(len(df.index))
    return df
dview['filter_missing_data'] = filter_missing_data

def get_copy(df, cols):
    """
    Transpose dataframe using specific columns (that will be index after transformation).
    Doing so helps speed things up.
    """
    import pandas
    return df[cols].T.copy()
dview['get_copy'] = get_copy

def get_variety_freq_cutoffs(variety, ploidy):
    """
    Use number of pops per variety to determine lowfreq, highfreq.
    Differs from pipeline.
    """
    lowfreq = 1/sum([popploidy for pop,popploidy in ploidy.items() if pop in varlist[variety]])
    ###############                                               ##### note diffs with get_freq_cutoffs(tablefile)
    highfreq = 1 - lowfreq
    return lowfreq, highfreq
dview['get_variety_freq_cutoffs'] = get_variety_freq_cutoffs

def filter_freq(df, tf, tipe, tablefile, variety):
    """
    Keep fixed loci.
    
    Positional arguments:
    df - pandas.dataframe; VariantsToTable output
    tablefile - path to VariantsToTable output - used to find ploidy etc
    tf - str; basename of tablefile
    tipe - str; one of either "SNP" or "INDEL"
    
    Returns:
    df - pandas.dataframe; freq-filtered VariantsToTable output
    """
    import tqdm
    import pandas
    import os
    import math
    # believe it or not, it's faster to do qual and freq filtering in two steps vs an 'and' statement
#     lowfreq, highfreq = get_freq_cutoffs(tablefile)                                         ############ removed
#     print(f'filtering for global frequency ({lowfreq}, {highfreq})...')                     ############ moved
    df.reset_index(drop=True, inplace=True)
    
    # prep for filtering
    freqcols = [col for col in df.columns if '.FREQ' in col]
    pool = os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(tablefile))))     ############ changed
    parentdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(tablefile)))) ############
    ploidy = pklload(os.path.join(parentdir, f'{pool}/pkl_files/ploidy.pkl'))[pool]           ############
    lowfreq, highfreq = get_variety_freq_cutoffs(variety, ploidy)                             ############ added
    print(f'filtering for global frequency ({lowfreq}, {highfreq})...')                       ## moved from above
    
    # carry on with poolseq datas
    filtloci = []
    afs = []
    copy = get_copy(df, freqcols)
    for locus in tqdm.tqdm(copy.columns):
        freqs = dict((samp.replace(".FREQ",""),freq) for (samp,freq)
                     in copy[locus].str.rstrip('%').astype('float').items()
                     if not math.isnan(freq))  # faster than .str.rstrip('%').astype('float').dropna()
        if len(freqs) > 0:  # avoid loci with all freqs masked (avoid ZeroDivisionError)
            # calc globfreq using the samps/ploidy that are present for this locus
            globfreq = sum([ploidy[samp]*(freq/100)
                            for (samp,freq) in freqs.items()]) / sum([ploidy[samp] for samp in freqs])
            if lowfreq <= globfreq <= highfreq:
                filtloci.append(locus)
                # since we're going in order of rows in df ...
                # ... we can use afs to replace AF col later since we reduce df to filtloci
                afs.append(globfreq)
                # which is about 40x faster than: df.loc[locus, 'AF'] = globfreq
    print(f'{tf} has {len(filtloci)} {tipe}s that have global MAF > {lowfreq*100}%')
    df = df[df.index.isin(filtloci)].copy()
    df.index = range(len(df.index))
    df['AF'] = afs
    return df
dview['filter_freq'] = filter_freq

def filter_qual(df, tf, tipe, tablefile, variety):
    """
    mask freqs that have GQ < 20.
    
    Positional arguments:
    df - pandas.dataframe; VariantsToTable output
    tf - str; basename of tablefile
    tipe - str; one of either "SNP" or "INDEL"
    
    Returns: pandas.dataframe; quality-filtered VariantsToTable output
    - FREQ and GT are masked (numpy.nan) if GQ < 20
    """
    import tqdm
    import pandas
    import numpy
    gqcols = [col for col in df.columns if '.GQ' in col]
    print(f'masking bad freqs for {len(gqcols)} pools...')
    for col in tqdm.tqdm(gqcols):
        freqcol = col.replace(".GQ", ".FREQ")
#         gtcol = col.replace(".GQ", ".GT")  # pretty sure this is depricated
        # badloci True if qual < 20
#         df.loc[df[col] < 20, [freqcol, gtcol]] = np.nan
        df.loc[df[col] < 20, freqcol] = numpy.nan

    print('filtering for missing data ...')
    df = filter_missing_data(df, tf, tipe)

    if len(df.index) > 0:
        print(f'{tf} has {len(df.index)} {tipe}s that have GQ >= 20 and < 25% missing data')
        df = filter_freq(df, tf, tipe, tablefile, variety)
        df.index = range(len(df.index))
    else:
        print(f'{tf} did not have any {tipe}s that have GQ >= 20 for >= 75% of pops' +
              '\nnot bothering to filter for freq')
#         df = drop_freq_cols(df)
    return df
dview['filter_qual'] = filter_qual


def main(tablefile, tipe='SNP', parentdir=None, ret=True, variety=None):   ########## changed default args
    import sys
    import pandas
    import numpy
    import math
    import tqdm
    import os
    from collections import Counter
    # load the data
    df, tf = load_data(tablefile, variety)
    
    # filter only SNPs
    df = df[df['TYPE'] == tipe].copy()

    # determine loci with REF=N but biallelic otherwise
    if tipe == 'SNP':
        dfs, ndfs = get_refn_snps(df, tipe)

        # determine which loci are multiallelic
        df = keep_snps(df, tf)
    
    if len(df.index) == 0:
        if ret is True:
            return df
        else:
            # save
            write_file(tablefile, df, tipe)

    # add in loci with REF=N but biallelic otherwise
    if tipe == 'SNP' and len(dfs) > 0:
        print(f'{tf} has {len(ndfs.index)} biallelic {tipe}s with REF=N')
        dfs.append(df)
        df = pandas.concat(dfs)

    # filter for quality and missing data
    df.index = range(len(df.index))
    if 'varscan' in tf and tipe == 'SNP':
        # if we allow to continue for INDEL, each line is treated as a locus (not true for INDEL)
        df = filter_qual(df, tf, tipe, tablefile, variety)


########################################################################################################
    # look for filtering options called at 00_start.py
    if parentdir is not None and tipe == 'SNP':
        # translate stitched (if called at 00_start)   ############## no need to translate for DF

        # remove repeats (if called at 00_start) - want to remove repeats before paralogs
        df = remove_repeats(df.copy(),
                            parentdir,
                            tablefile,
                            os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(tablefile)))), ## added
                            variety)  ###### added
#                             op.basename(pooldir))  # commented out

        # remove paralog SNPs (if called at 00_start)
        df = remove_paralogs(df.copy(), parentdir, tablefile,
                             os.path.basename(os.path.dirname(os.path.dirname(os.path.dirname(tablefile)))),## added
                             variety) ###### added
########################################################################################################

        
    if ret is True:
        print('returning df')
        return df
    else:
        # save
        write_file(tablefile, df, tipe, variety)

dview['main'] = main
dview['uni'] = uni
from pythonimports import table # in case I use 'table' in an iteration
dview['table'] = table

In [29]:
def remove_paralogs(snps, parentdir, snpspath, pool, variety):
    """
    Remove sites from snptable that are thought to have multiple gene copies align to this position.
    
    # assumes
    # paralog file has 'CHROM' and 'locus' in the header (best if this is the only data, reads in quicker)
    #   where CHROM is the reference chromosome/scaffold
    #   where locus is hyphen-separated CHROM-POS
    
    # paralog file is created from calling SNPs on haplotype data as diploid
    #   no need to worry about translating stiched -> unstitched if SNPs called on same reference.
    """
    import os, pandas
    parpkl = os.path.join(parentdir, f'{pool}/pkl_files/paralog_snps.pkl')
    if os.path.exists(parpkl):
        # read in paralogfile
#         paralogdict = pklload(parpkl)                                            ############ commented out
#         if paralogdict[pool] is not None:                                        ############ commented out
        if True:                                                                   ########## added
            print('Removing paralogs sites ...')
#             paralogs = pd.read_csv(paralogdict[pool], sep='\t')                  ############ commented out
            refdir = '/data/database/DouglasFir_ref_genome'                        ############ added
            paralogfile = os.path.join(refdir, 'DF_mega-varscan_all_bedfiles_SNP_paralog_snps.txt')# added
            paralogs = pandas.read_table(paralogfile)                                  ############ added
            # remove and isolate paralogs from snps
            truths = snps['locus'].isin(paralogs['locus'])
            found_paralogs = snps[truths].copy()
            snps = snps[~truths].copy()
            snps.index = range(len(snps.index))

            # write paralogs to a file
#             parafile = snpspath.replace(".txt", "_PARALOGS.txt")                 ########## commented out
            dirdict = {'both': 'combined_varieties'}  ## added
            dname = os.path.basename(os.path.dirname(snpspath))  ## added
            write_dir = os.path.dirname(os.path.dirname(snpspath)) + f"/{dirdict[variety]}/{dname}_{variety}"  ## added  
#             write_dir = os.path.dirname(snpspath) + f"_{variety}"                  ########## prev added, now commented
            bname = os.path.basename(snpspath).replace(".txt", f"_PARALOGS_{variety}.txt")### added   
            parafile = os.path.join(write_dir, bname)                              ########## added
            print('paralog_path = ', parafile)                                     ########## added
            
            found_paralogs.to_csv(parafile, sep='\t', index=False)
            print(f'{os.path.basename(snpspath)} has {len(snps.index)} non-paralog SNPs')
    return snps
dview['remove_paralogs'] = remove_paralogs


def remove_repeats(snps, parentdir, snpspath, pool, variety):
    """
    Remove SNPs that are found to be in repeat-masked regions.
    
    # assumes
    # that the positions have been translated BEFORE removing repeats
        # took forever to create unstitched repeat regions, don't want to translate repeat file
        # this way I can just use unstitched chrom if reference is stitched
    # repeat file has a header ('CHROM', 'start', 'stop')
    # start and stop positions of repeat regions are 1-based
    """
    import pandas
    import tqdm
    import os
    reppkl = os.path.join(parentdir, f'{pool}/pkl_files/repeat_regions.pkl')
    if os.path.exists(reppkl):
        # read in repeat regions
#         repeatdict = pklload(reppkl)                                             ########## commented out
#         if repeatdict[pool] is not None:                                         ########## commented out
        if True:                                                                   ########## added
            print('Removing repeat regions ...')
            # if user selected translation be applied to this pool
#             repeats = pd.read_csv(repeatdict[pool], sep='\t')                    ########## commented out
            repeats = pandas.read_table('/data/database/DouglasFir_ref_genome/DF_ref_edit_repeats.txt')   #### added
            # figure out if data is from stitched or not
            if 'unstitched_chrom' in snps.columns:
                # then the snps have been translated: stitched -> unstitched
                chromcol = 'unstitched_chrom'
                poscol = 'unstitched_pos'
                print('\tsnps have been translated')
            else:
                # otherwise SNPs were called on unstitched reference
                chromcol = 'CHROM'
                poscol = 'POS'
                print('\tsnps have not been translated')
            # reduce repeats to the chroms that matter (helps speed up lookups)
            repeats = repeats[repeats['CHROM'].isin(snps[chromcol].tolist())].copy()

            # isolate SNPs in repeat regions
            repeat_snps = []
            for chrom in tqdm.tqdm(uni(snps[chromcol])):
                reps = repeats[repeats['CHROM'] == chrom].copy()
                mysnps = snps[snps[chromcol] == chrom].copy()
                if len(reps.index) > 0 and len(mysnps.index) > 0:
                    for row in mysnps.index:
                        pos = snps.loc[row, poscol]  # index is maintained from snps to mysnsps
                        df = reps[reps['stop'].astype(int) >= int(pos)].copy()
                        df = df[df['start'].astype(int) <= int(pos)].copy()
                        if len(df.index) > 0:
                            assert len(df.index) == 1
                            repeat_snps.append(row)

            # save repeats
            print(f'\tSaving {len(repeat_snps)} repeat regions')
#             repeat_path = snpspath.replace(".txt", "_REPEATS.txt")               ########## comm ented out
            dirdict = {'both': 'combined_varieties'}  ## added
            dname = os.path.basename(os.path.dirname(snpspath))  ## added
            write_dir = os.path.dirname(os.path.dirname(snpspath)) + f"/{dirdict[variety]}/{dname}_{variety}"  ## added  
#             write_dir = os.path.dirname(snpspath) + f"_{variety}"                  ########## prev added, now commented
            bname = os.path.basename(snpspath).replace(".txt", f"_REPEATS_{variety}.txt")### added   
            repeat_path = os.path.join(write_dir, bname)                           ########## added
            print('repeat_path = ', repeat_path)                                   ########## added
            
            myrepeats = snps[snps.index.isin(repeat_snps)].copy()
            myrepeats.to_csv(repeat_path, sep='\t', index=False)

            # remove SNPs in repeat regions
            snps = snps[~snps.index.isin(repeat_snps)].copy()
            snps.index = range(len(snps.index))

            print(f'{os.path.basename(snpspath)} has {len(snps.index)} SNPs outside of repeat regions')

    return snps
dview['remove_repeats'] = remove_repeats

In [11]:
# envdata has variety ID - THIS IS CHANGED FROM WHEN SORTING INDIVIDUAL VARIETIES
envdata = pd.read_table('/data/projects/pool_seq/environemental_data/df_ALL-naturalpops_std_env-19variables.txt')
envdata = envdata[envdata['our_id']==envdata['our_id']]  # removes irrelevant pop with our_id=nan

pool2var = {}
varlist = {}
for row in envdata.index:
    pool = envdata.loc[row, 'our_id']
#     variety = envdata.loc[row, 'Variety']
    variety = 'both'   # THIS IS THE PART I CHANGED TO GRAB BOTH VARIETIES
    pool2var[pool] = variety
    if variety not in varlist:
        varlist[variety] = []
    varlist[variety].append(pool)
for variety,pops in varlist.items():
    print(variety, len(pops))
dview['varlist'] = varlist
varlist

both 73


{'both': ['DF_p1',
  'DF_p2',
  'DF_p3',
  'DF_p4',
  'DF_p5',
  'DF_p6',
  'DF_p7',
  'DF_p8',
  'DF_p9',
  'DF_p10',
  'DF_p11',
  'DF_p12',
  'DF_p13',
  'DF_p14',
  'DF_p15',
  'DF_p16',
  'DF_p17',
  'DF_p18',
  'DF_p19',
  'DF_p20',
  'DF_p23',
  'DF_p24',
  'DF_p25',
  'DF_p26',
  'DF_p27',
  'DF_p28',
  'DF_p29',
  'DF_p30',
  'DF_p31',
  'DF_p32',
  'DF_p33',
  'DF_p34',
  'DF_p35',
  'DF_p36',
  'DF_p37',
  'DF_p38',
  'DF_p39',
  'DF_p40',
  'DF_p41',
  'DF_p42',
  'DF_p43',
  'DF_p44',
  'DF_p45',
  'DF_p46',
  'DF_p47',
  'DF_p48',
  'DF_p49',
  'DF_p50',
  'DF_p51',
  'DF_p52',
  'DF_p53',
  'DF_p54',
  'DF_p55',
  'DF_p56',
  'DF_p57',
  'DF_p58',
  'DF_p59',
  'DF_p60',
  'DF_p61',
  'DF_p62',
  'DF_p72',
  'DF_p73',
  'DF_p74',
  'DF_p75',
  'DF_p76',
  'DF_p77',
  'DF_p78',
  'DF_p79',
  'DF_p80',
  'DF_p81',
  'DF_p82',
  'DF_p83',
  'DF_p84']}

In [14]:
# create directories to save files
dirdict = {'both': 'combined_varieties'}
for variety in varlist.keys():
    print(variety, dirdict[variety])
    makedir(f'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/{dirdict[variety]}/01_unfiltered_{variety}')

both combined_varieties


In [19]:
# test out filtering
tablefile = '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/01_unfiltered/DF_pooled_varscan_bedfile_0391_table.txt'
df = main(tablefile, tipe='SNP', parentdir="/data/projects/pool_seq/DF_datasets/DF_pooled_GEA",
          ret=False, variety='both')

DF_pooled_varscan_bedfile_0391_table.txt has 11890 rows (includes multiallelic)
renaming varscan columns ...


  0%|          | 0/73 [00:00<?, ?it/s]

DF_pooled_varscan_bedfile_0391_table.txt has 10219 good loci (non-multiallelic)
masking bad freqs for 73 pools...


100%|██████████| 73/73 [00:00<00:00, 720.12it/s]


filtering for missing data ...


100%|██████████| 10216/10216 [00:00<00:00, 20272.40it/s]
  5%|▍         | 210/4380 [00:00<00:01, 2091.53it/s]

DF_pooled_varscan_bedfile_0391_table.txt has 4380 SNPs that have GQ >= 20 and < 25% missing data
filtering for global frequency (0.00017283097131005876, 0.99982716902869)...


100%|██████████| 4380/4380 [00:02<00:00, 2117.00it/s]


DF_pooled_varscan_bedfile_0391_table.txt has 4351 SNPs that have global MAF > 0.017283097131005877%
Removing repeat regions ...


  0%|          | 0/29 [00:00<?, ?it/s]

	snps have not been translated


100%|██████████| 29/29 [00:05<00:00,  5.60it/s]


	Saving 61 repeat regions
repeat_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0391_table_REPEATS_both.txt
DF_pooled_varscan_bedfile_0391_table.txt has 4290 SNPs outside of repeat regions
Removing paralogs sites ...
paralog_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0391_table_PARALOGS_both.txt
DF_pooled_varscan_bedfile_0391_table.txt has 4285 non-paralog SNPs
write_dir =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both
SNP_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0391_table_SNP_both.txt
finished filtering VariantsToTable file: /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/0

In [25]:
# read in dataframe and make sure the total pops make sense
df = pd.read_table('/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0391_table_SNP_both.txt')
len(varlist['both']), len([col for col in df.columns if 'FREQ' in col]) == len(varlist['both'])

(73, True)

#### now do in parallel

In [26]:
len(lview)

56

In [27]:
# get all of the varscan outputs
files = fs('/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/01_unfiltered',
           endswith='table.txt')
len(files)

932

In [30]:
# filter across both varieties
jobs = []
for f in files:
    jobs.append(lview.apply_async(main, f, **{'tipe':'SNP',
                                              'parentdir':"/data/projects/pool_seq/DF_datasets/DF_pooled_GEA",
                                              'ret':False,
                                              'variety':'both'}))
watch_async(jobs)

928
932


KeyboardInterrupt: 

In [32]:
for i,j in enumerate(jobs):
    if not j.ready():
        main(files[i], **{'tipe':'SNP',
                          'parentdir':"/data/projects/pool_seq/DF_datasets/DF_pooled_GEA",
                          'ret':False,
                          'variety':'both'})

DF_pooled_varscan_bedfile_0006_table.txt has 23483 rows (includes multiallelic)
renaming varscan columns ...
DF_pooled_varscan_bedfile_0006_table.txt has 20077 good loci (non-multiallelic)


100%|██████████| 73/73 [00:00<00:00, 649.89it/s]

masking bad freqs for 73 pools...
filtering for missing data ...



100%|██████████| 20077/20077 [00:01<00:00, 19584.86it/s]


DF_pooled_varscan_bedfile_0006_table.txt has 9801 SNPs that have GQ >= 20 and < 25% missing data
filtering for global frequency (0.00017283097131005876, 0.99982716902869)...


100%|██████████| 9801/9801 [00:04<00:00, 1994.66it/s]


DF_pooled_varscan_bedfile_0006_table.txt has 9722 SNPs that have global MAF > 0.017283097131005877%
Removing repeat regions ...


100%|██████████| 28/28 [00:00<00:00, 309.85it/s]

	snps have not been translated
	Saving 0 repeat regions
repeat_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0006_table_REPEATS_both.txt





DF_pooled_varscan_bedfile_0006_table.txt has 9722 SNPs outside of repeat regions
Removing paralogs sites ...
paralog_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0006_table_PARALOGS_both.txt
DF_pooled_varscan_bedfile_0006_table.txt has 9722 non-paralog SNPs
write_dir =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both
SNP_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0006_table_SNP_both.txt
finished filtering VariantsToTable file: /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0006_table_SNP_both.txt
DF_pooled_varscan_bedfile_0090_table.txt has 28318 rows (includes multiallelic)
renaming varscan columns ...
DF_pooled_varscan_bedfile_0090

100%|██████████| 73/73 [00:00<00:00, 617.92it/s]

masking bad freqs for 73 pools...
filtering for missing data ...



100%|██████████| 23769/23769 [00:01<00:00, 19283.85it/s]


DF_pooled_varscan_bedfile_0090_table.txt has 13035 SNPs that have GQ >= 20 and < 25% missing data
filtering for global frequency (0.00017283097131005876, 0.99982716902869)...


100%|██████████| 13035/13035 [00:06<00:00, 2013.59it/s]


DF_pooled_varscan_bedfile_0090_table.txt has 12924 SNPs that have global MAF > 0.017283097131005877%
Removing repeat regions ...


  0%|          | 0/32 [00:00<?, ?it/s]

	snps have not been translated


100%|██████████| 32/32 [00:09<00:00,  3.32it/s]


	Saving 294 repeat regions
repeat_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0090_table_REPEATS_both.txt
DF_pooled_varscan_bedfile_0090_table.txt has 12630 SNPs outside of repeat regions
Removing paralogs sites ...
paralog_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0090_table_PARALOGS_both.txt
DF_pooled_varscan_bedfile_0090_table.txt has 12630 non-paralog SNPs
write_dir =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both
SNP_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0090_table_SNP_both.txt
finished filtering VariantsToTable file: /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varietie

100%|██████████| 73/73 [00:00<00:00, 595.41it/s]

masking bad freqs for 73 pools...
filtering for missing data ...



100%|██████████| 18495/18495 [00:01<00:00, 17857.47it/s]


DF_pooled_varscan_bedfile_0337_table.txt has 9049 SNPs that have GQ >= 20 and < 25% missing data
filtering for global frequency (0.00017283097131005876, 0.99982716902869)...


100%|██████████| 9049/9049 [00:04<00:00, 2020.84it/s]


DF_pooled_varscan_bedfile_0337_table.txt has 8980 SNPs that have global MAF > 0.017283097131005877%
Removing repeat regions ...


  8%|▊         | 3/38 [00:00<00:01, 27.02it/s]

	snps have not been translated


100%|██████████| 38/38 [00:10<00:00,  3.62it/s]


	Saving 447 repeat regions
repeat_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0337_table_REPEATS_both.txt
DF_pooled_varscan_bedfile_0337_table.txt has 8533 SNPs outside of repeat regions
Removing paralogs sites ...
paralog_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0337_table_PARALOGS_both.txt
DF_pooled_varscan_bedfile_0337_table.txt has 8533 non-paralog SNPs
write_dir =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both
SNP_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0337_table_SNP_both.txt
finished filtering VariantsToTable file: /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/

100%|██████████| 73/73 [00:00<00:00, 558.98it/s]

masking bad freqs for 73 pools...
filtering for missing data ...



100%|██████████| 21742/21742 [00:01<00:00, 17632.77it/s]


DF_pooled_varscan_bedfile_0507_table.txt has 10456 SNPs that have GQ >= 20 and < 25% missing data
filtering for global frequency (0.00017283097131005876, 0.99982716902869)...


100%|██████████| 10456/10456 [00:05<00:00, 2010.06it/s]


DF_pooled_varscan_bedfile_0507_table.txt has 10390 SNPs that have global MAF > 0.017283097131005877%
Removing repeat regions ...


  0%|          | 0/37 [00:00<?, ?it/s]

	snps have not been translated


100%|██████████| 37/37 [00:12<00:00,  2.89it/s]


	Saving 376 repeat regions
repeat_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0507_table_REPEATS_both.txt
DF_pooled_varscan_bedfile_0507_table.txt has 10014 SNPs outside of repeat regions
Removing paralogs sites ...
paralog_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0507_table_PARALOGS_both.txt
DF_pooled_varscan_bedfile_0507_table.txt has 10014 non-paralog SNPs
write_dir =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both
SNP_path =  /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both/DF_pooled_varscan_bedfile_0507_table_SNP_both.txt
finished filtering VariantsToTable file: /data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varietie

In [None]:
# make sure no errors
for j in jobs:
    x = j.r

In [33]:
# check to see how many files were produced per bedfile
dirs = ['/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/01_unfiltered_both']
for d in dirs:
    filtfiles = fs(d, endswith='.txt')
    bedtofiles = {}
    for f in filtfiles:
        bed = f.split("bedfile_")[1].split("_table")[0]
        assert float(bed) == int(bed)
        if bed not in bedtofiles:
            bedtofiles[bed] = []
        bedtofiles[bed].append(f)

    missing = []
    for i in range(int(max(bedtofiles.keys()))):
        bed = str(i).zfill(4)
        if not bed in bedtofiles.keys():
            missing.append(files[i])
        elif len(bedtofiles[bed]) != 3:
            missing.append(files[i])
    print(op.basename(d), len(missing))

01_unfiltered_both 0


#### combine dataframes

In [35]:
varlist.keys()

dict_keys(['both'])

In [36]:
# make new dirs
for variety in varlist:
    makedir(f'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/{dirdict[variety]}/02_baseline_filtered_{variety}')

In [38]:
lview,dview = get_client()

56 56


In [42]:
def read_df(f):
    import pandas
    return pandas.read_table(f)

def save_df(df, f):
    import pandas
    df.to_csv(f, sep='\t', index=False)
    
def concat_files(d, dstdir, variety):
    """Read in filtered dataframes, concat into single df, write to file."""
    import pandas
    import os
    outfiles = []
    lview,dview = get_client()
    for tipe in ['PARALOGS', 'REPEATS', 'SNP']:
        files = fs(d, pattern=tipe, endswith='.txt')
        jobs = make_jobs(files, read_df, lview)
        watch_async(jobs)
        outfile = os.path.join(dstdir, f'DF_pooled-varscan_all_bedfiles_{tipe}_{variety}.txt')
        outfiles.append(outfile)
        file = os.path.join(outfile)
        for i,j in enumerate(jobs):
            if i == 0:
                j.r.to_csv(file, sep='\t', index=False)
            else:
                j.r.to_csv(file, sep='\t', index=False, header=False, mode='a')
        del jobs
    return outfiles
dview['fs'] = fs
dview['make_jobs'] = make_jobs
dview['get_client'] = get_client
dview['watch_async'] = watch_async
dview['read_df'] = read_df

In [41]:
varlist.keys()

dict_keys(['both'])

In [43]:
# read in dfs in parallel, write to final file
variety = 'both'
d = f'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/{dirdict[variety]}/01_unfiltered_{variety}'
dstdir = f'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/{dirdict[variety]}/02_baseline_filtered_{variety}'
assert op.exists(dstdir)
assert op.exists(d)
concat_files(d, dstdir, variety)

932
932


Exception in callback BaseAsyncIOLoop._handle_events(83, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(83, 1)>
Traceback (most recent call last):
  File "/data/programs/brandon_anaconda3/envs/py37/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/data/programs/brandon_anaconda3/envs/py37/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 139, in _handle_events
    handler_func(fileobj, events)
  File "/data/programs/brandon_anaconda3/envs/py37/lib/python3.7/site-packages/zmq/eventloop/zmqstream.py", line 456, in _handle_events
    self._handle_recv()
  File "/data/programs/brandon_anaconda3/envs/py37/lib/python3.7/site-packages/zmq/eventloop/zmqstream.py", line 486, in _handle_recv
    self._run_callback(callback, msg)
  File "/data/programs/brandon_anaconda3/envs/py37/lib/python3.7/site-packages/zmq/eventloop/zmqstream.py", line 438, in _run_callback
    callback(*args, **kwargs)
  File "<decorator-gen-153>",

['/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/02_baseline_filtered_both/DF_pooled-varscan_all_bedfiles_PARALOGS_both.txt',
 '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/02_baseline_filtered_both/DF_pooled-varscan_all_bedfiles_REPEATS_both.txt',
 '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/02_baseline_filtered_both/DF_pooled-varscan_all_bedfiles_SNP_both.txt']

In [44]:
len('TATTATTTGGATCATTTATTTGTATTTACCCTTCAxATCATTCTCTTTCTCTCTGTAATCTCAGGATATGA')

71

# filter for MAF 

In [1]:
from pythonimports import *

In [2]:
lview,dview = get_client()

56 56


In [3]:
def get_skipto_df(f, skipto, nrows, cols=None, filter_maf=False, **kwargs):
    """Retrieve dataframe in parallel so that all rows are captured when iterating.
    
    f = filename to open
    skipto = row number to skip, read rows thereafter
    nrows = how many rows to read from f after skipto
    """
    import pandas
    
    if skipto == 0:
        df = pandas.read_table(f, nrows=nrows-1)
    else:
        df = pandas.read_table(f, skiprows=range(1, skipto), nrows=nrows)
    
    if cols is not None:
        if isinstance(cols, str):
            cols = [cols]
        df = df[cols].copy()
    
    if filter_maf is True:
        return maf_filter(df, **kwargs)
    
    return df
dview['get_skipto_df'] = get_skipto_df

def maf_filter(chunk, maf=0.05, **kwargs):
    """filter minor allele frequency >= maf, create maf column, return df."""
    import pandas
    import os

    # filter for MAF
    df = chunk[(chunk['AF'].astype(float) >= maf) & (chunk['AF'].astype(float) <= (1-maf))].copy()
    # create MAF column
    df['MAF'] = df['AF']
    df.loc[df['AF'].astype(float) > 0.5, 'MAF'] = 1 - chunk['AF'][chunk['AF'].astype(float) > 0.5]
    assert sum(df['MAF']<maf) == 0
    
    return df
dview['maf_filter'] = maf_filter

In [4]:
dirdict = {'both': 'combined_varieties'}

In [5]:
# get linenums for each variety and each type
linenums = {}
for variety in ['both']:
    d = f'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/{dirdict[variety]}/02_baseline_filtered_{variety}'
    assert op.exists(d)
    for tipe in ['SNP', 'PARALOGS', 'REPEATS']:
        f = op.join(d, f'DF_pooled-varscan_all_bedfiles_{tipe}_{variety}.txt')
        out = !wc -l $f
        linenums[f] = int(out[0].split()[0])-1
        print(variety, tipe, linenums[f])
linenums

both SNP 9846117
both PARALOGS 1809
both REPEATS 299010


{'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/02_baseline_filtered_both/DF_pooled-varscan_all_bedfiles_SNP_both.txt': 9846117,
 '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/02_baseline_filtered_both/DF_pooled-varscan_all_bedfiles_PARALOGS_both.txt': 1809,
 '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/02_baseline_filtered_both/DF_pooled-varscan_all_bedfiles_REPEATS_both.txt': 299010}

#### begin filtering 

In [9]:
# read in SNPs in parallel
dfs = {}
for f in linenums:
    if '_SNP_' not in f:
        nrows = 50000
        jobs = []
        count = 0
        for skipto in range(0, linenums[f], nrows):
            num = str(count).zfill(4)
            jobs.append(lview.apply_async(get_skipto_df, *(f, skipto, nrows), **{'filter_maf':True, 'maf':0.05}))
            count += 1
        watch_async(jobs)
        dfs[f] = pd.concat([j.r for j in jobs])

6
6


# Recalcuate RD

Looking at our testdata (1 poolseq pop vs indSeq of same individuals), AD/DP was consistent with the frequency prediction from GATK. We saw that adjusting FREQ to AD / (AD + RD) decreased concordance between the two datasets. So that we are consistent with respect to uncorrected and corrected, I'm adjusting RD = DP - AD so we don't have to make adjustments in the future

In [11]:
def recalc_rd(df):
    """Recalculate RD so RD = DP - AD."""
    rdcols = [col for col in df if '.RD' in col]
    for col in nb(rdcols):
        pop = col.split(".")[0]
        df[f'{pop}.RD'] = df[f'{pop}.DP'] - df[f'{pop}.AD']
    return df

def save_file(df, f):
    """Save file to background using one of the ipcluster engines so I can contiue working."""
    import pandas
    df.to_csv(f, sep='\t', index=False)
    return f

In [14]:
# make dirs
newdirs = {}
for variety in ['both']:
    newdirs[variety] = makedir(f'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/{dirdict[variety]}/03_maf-p05_RD-recalculated_{variety}')
    print(newdirs[variety])

/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/03_maf-p05_RD-recalculated_both


In [14]:
recalced = {}
for f,df in dfs.items():
    recalced[f] = recalc_rd(df)

100%|██████████| 73/73 [00:00<00:00, 4440.73it/s]
100%|██████████| 73/73 [00:00<00:00, 1416.71it/s]


In [17]:
# save interior SNP data
combfile = op.join(newdirs['both'], 'DF_pooled-varscan_all_bedfiles_SNP_both-varieties_maf_RD-recalculated.txt')
save_file(combined_recalc, combfile)

'/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/combined_varieties/03_maf-p05_RD-recalculated_both/DF_pooled-varscan_all_bedfiles_SNP_both-varieties_maf_RD-recalculated.txt'

In [17]:
for f,df in recalced.items():
    dst = f.replace("02_baseline_filtered_both" ,"03_maf-p05_RD-recalculated_both").replace(".txt", "_maf_RD-recalculated.txt")
    df.to_csv(dst, sep='\t', index=False)

# filter for MAF

In [None]:
lview,dview = get_client('default')

In [None]:
def maf_filter(f):
    """filter MAF >= 0.01, write to file."""
    import pandas
    import os
    chunks = pandas.read_csv(f, sep='\t', chunksize=10000)
    dfs = []
    for chunk in chunks:
#         if len(dfs) % 5 == 0:
#             update([op.basename(f), len(dfs)])
        df = chunk[(chunk['AF'].astype(float) >= 0.01) & (chunk['AF'].astype(float) <= 0.99)].copy()
        df['MAF'] = df['AF']
        df.loc[df['AF'].astype(float) > 0.5, 'MAF'] = 1 - chunk['AF'][chunk['AF'].astype(float) > 0.5]
        dfs.append(df)
    
    df = pandas.concat(dfs)
    print(df.shape)
    
    out = f.replace(".txt", "_maf.txt").replace('02_baseline_filtered', '03_maf-p01_RD-recalculated')
    makedir(os.path.dirname(out))
    
    df.to_csv(out, sep='\t', index=False)
    print('done!')
dview['maf_filter'] = maf_filter
dview['makedir'] = makedir

In [None]:
# create dirs, get the baseline filtered files
DIR = '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA'
snpdir = op.join(DIR, 'DF_pooled/snpsANDindels')
basedir = op.join(snpdir, '02_baseline_filtered')
basefiles = [f for f in fs(basedir) if '.md5' not in f and 'INDEL' not in f]
basefiles

In [None]:
jobs = make_jobs(basefiles, maf_filter, lview)
watch_async(jobs)

# Recalcuate RD

Looking at our testdata (1 poolseq pop vs indSeq of same individuals), AD/DP was consistent with the frequency prediction from GATK. We saw that adjusting FREQ to AD / (AD + RD) decreased concordance between the two datasets. So that we are consistent with respect to uncorrected and corrected, I'm adjusting RD = DP - AD so we don't have to make adjustments in the future

In [None]:
from pythonimports import *

In [None]:
lview,dview = get_client()

In [None]:
DIR = '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/03_maf-p01_RD-recalculated'
files = fs(DIR, endswith='maf.txt', exclude='README')
files

In [None]:
# read in paralog and repeats, recalc RD
def recalc_rd(df):
    """Recalculate RD so RD = DP - AD."""
    rdcols = [col for col in df if '.RD' in col]
    for col in nb(rdcols):
        pop = col.split(".")[0]
        df[f'{pop}.RD'] = df[f'{pop}.DP'] - df[f'{pop}.AD']
    return df
    
dfs = {}
for f in files[:-1]:
    print(f)
    df = pd.read_table(f)
    dfs[f] = recalc_rd(df)

In [None]:
def get_skipto_df(f, skipto, nrows, cols=None):
    """Retrieve dataframe so that all rows are captured when iterating.
    
    f = filename to open
    skipto = row number to skip, read rows thereafter
    nrows = how many rows to read from f
    """
    import pandas
    
    if skipto == 0:
        df = pandas.read_table(f, nrows=nrows-1)
    else:
        df = pandas.read_table(f, skiprows=range(1, skipto), nrows=nrows)
    
    if cols is not None:
        if isinstance(cols, str):
            cols = [cols]
        df = df[cols].copy()
    
    return df
dview['get_skipto_df'] = get_skipto_df

In [None]:
# get linenums, includes header
linenums = {}
for f in files:
    out = !wc -l $f
    linenums[f] = int(out[0].split()[0])
    print(op.basename(f), linenums[f])

In [None]:
# read in SNPs in parallel, 
nrows = 50000
jobs = []
count = 0
for skipto in range(0, linenums[files[-1]], nrows):
    num = str(count).zfill(4)
    jobs.append(lview.apply_async(get_skipto_df, *(files[-1], skipto, nrows)))
    count += 1
watch_async(jobs)

In [None]:
# recalc_rd for SNPs
dfs[files[-1]] = recalc_rd(pd.concat([j.r for j in jobs]))

In [None]:
for f,df in dfs.items():
    print(f)
    newf = f.replace("_maf.txt", "_maf_RD-recalculated.txt")
    print('\t', newf)
    df.to_csv(newf, sep='\t', index=False)

# Choose SNPs for GEA structure correction in baypass

First choose loci for each variety (coastal and interior) then choose loci across both.

for SNPs with MAF > 0.01 and for all pops: 40 < DP < 1000, randomly choose one snp per contig (for contigs > 1Kbp), then LD prune so no pairwise r2 > 99.9th percentile of r2

#### get the snps

In [None]:
from pythonimports import *

def get_mafdict(afs, roundto):
    # bins for p52 depth
    mafs = [round(1-float(af),roundto) if float(af) > 0.5 else round(float(af),roundto) for af in afs]
    t = table(mafs)
    mafsdict = OrderedDict()
    for k in sorted(t):
        newk = '%.10f' % k
        if not newk in mafsdict:
            mafsdict[newk] = t[k]
        else:
            mafsdict[newk] += t[k]
    retdict = OrderedDict()
    for newk,count in mafsdict.items():
        retdict[newk] = count / sum(mafsdict.values())
    print(len(retdict.keys()))

    return retdict

def subcategorybar(X, vals, width=.9):
    n = len(vals)
    _X = np.arange(len(X))
    for i in range(n):
        plt.bar(_X - width/2. + i/float(n)*width, vals[i], 
                width=width/float(n), align="edge")   
    plt.xticks(_X, X)

def make_mafdict_fig(pruned_snpdict, all_snpdict, title=None):
    fig = plt.figure(figsize=(25,5))
    X = ['%.3f' % float(key) for key in sorted(keys(pruned_snpdict))]
    Y = [pruned_snpdict[freq] for freq in sorted(keys(pruned_snpdict))]
    Z = [all_snpdict[freq] for freq in sorted(keys(all_snpdict))]
    subcategorybar(X, [Y,Z])
    plt.legend(['pruned snps', 'all snps'],fontsize=20)
    plt.ylabel('proportion of SNPs in category',size=20)
    plt.xlabel('minor allele frequency',size=20)
    plt.title(title, size=20)
    plt.show()

In [None]:
lview,dview = get_client()

In [None]:
snpdir = '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels'

In [None]:
def filter_depth(*args):
    """Filter min/max depth, reduce columns."""
    chunk = get_skipto_df(*args)
    dpcols = [col for col in chunk.columns if '.DP' in col]
    freqcols = [col for col in chunk.columns if '.FREQ' in col]
    cols = ['CHROM', 'locus', 'AF', 'MAF'] + dpcols + freqcols
    chunk = chunk[cols].copy()
    for col in dpcols:
        chunk = chunk[chunk[col] >= 20].copy()
        chunk = chunk[chunk[col] < 1000].copy()
    chunk.index = chunk['locus'].tolist()
    return chunk

In [None]:
mafdir = op.join(snpdir, '03_maf-p01_RD-recalculated')
f = op.join(mafdir, 'DF_pooled-varscan_all_bedfiles_SNP_maf_RD-recalculated.txt')
op.exists(f)

In [None]:
out = !wc -l $f
out

In [None]:
linenums = int(out[0].split()[0]) - 1
linenums

In [None]:
linenums = 6728421

In [None]:
# filter for depth in parallel as I read it in
nrows = 50000
jobs = []
for skipto in range(0, linenums, nrows):
    jobs.append(lview.apply_async(filter_depth, *(f, skipto, nrows)))
watch_async(jobs)

In [None]:
snps = pd.concat([j.r for j in jobs])
print(snps.shape)
snps.head()

In [None]:
luni(snps['CHROM'])

#### get a dict to distinguish interior vs coastal

In [None]:
# envdata has variety ID
envdata = pd.read_table('/data/projects/pool_seq/environemental_data/df_std_env-19variables.txt')
envdata = envdata[envdata['our_id']==envdata['our_id']]  # removes irrelevant pop with our_id=nan

pool2var = {}
varlist = {}
for row in envdata.index:
    pool = envdata.loc[row, 'our_id']
    variety = envdata.loc[row, 'Variety']
    pool2var[pool] = variety
    if variety not in varlist:
        varlist[variety] = []
    varlist[variety].append(pool)

In [None]:
for var,lst in varlist.items():
    print(var, len(lst))

In [None]:
len(pool2var)

#### take a look at missing data among varities (we filtered for missing data across both varieties)

In [None]:
cols = [col for col in snps.columns if '.' not in col]  # columns with info relevant to both varieties
varsnps = {}
for variety in uni(keys(varlist)):
    varcols = [col for pop in varlist[variety] for col in snps.columns if col == f'{pop}.FREQ']
    print(variety, len(varcols))
    varsnps[variety] = snps[cols + varcols].copy()
    print(variety, varsnps[variety].shape)

In [None]:
varsnps['FDI'].head()

In [None]:
varsnps['FDC'].head()

In [None]:
lview,dview = get_client()

In [None]:
dview['varsnps'] = varsnps

In [None]:
varsnps.keys()

In [None]:
def calc_missing_data(loci, variety=None):
    df = varsnps[variety]
    missingdata = []
    for locus in loci:
        sums = sum([1 for x in df.loc[locus, [col for col in df.columns if '.FREQ' in col]] if x==x])
        missingdata.append(sums)
    return missingdata

In [None]:
jobs = send_chunks(calc_missing_data, snps.index.tolist(), nrow(snps)/10, lview, {'variety': 'FDI'})
jobs.extend(send_chunks(calc_missing_data, snps.index.tolist(), nrow(snps)/10, lview, {'variety': 'FDC'}))
watch_async(jobs)

In [None]:
missingdata = {}
for i,j in enumerate(jobs):
    if i < 10:
        var = 'FDI'
    else:
        var = 'FDC'
    if var not in missingdata:
        missingdata[var] = []
    for sums in j.r:
        missingdata[var].append(1-(sums/len(varlist[var])))
for var,lst in missingdata.items():
    print(var, len(lst))

In [None]:
for var,lst in missingdata.items():
    plt.hist(lst)
    plt.title(f'{var} missingdata')
    plt.show()

#### reduce those snps on contigs > 1Kbp

In [None]:
# get contig lengths
lengths = pd.read_table('/data/database/DouglasFir_ref_genome/DF_ref_edit.fasta.length', header=None)
lengths.head()

In [None]:
lens = dict((contig,length) for (contig,length) in zip(lengths[0],lengths[1]) if length>1000)
len(lens.keys())

In [None]:
freqcols = [col for col in snps.columns if '.FREQ' in col]
reduced = snps[snps['CHROM'].isin(list(lens.keys()))].copy()
reduced = reduced.loc[:, freqcols + ['CHROM', 'AF', 'MAF']]
reduced.shape, snps.shape

In [None]:
min(lens.values())

In [None]:
make_mafdict_fig(get_mafdict(reduced['AF'], roundto=2),
                 get_mafdict(snps['AF'], roundto=2),
                 title='Doug fir\nafter filtering for contig length >1Kbp')

#### reduce to no missing data  (can't, MAF spectrum doesn't match well with MAF spectrum of all data. Filter for % missing below)

In [None]:
# lview,dview = get_client()

In [None]:
# def reduce_col(col):
#     """Return list of loci with no missing data for pop.FREQ col."""
#     return reduced[~reduced[col].isnull()].index

# dview['reduced'] = reduced

In [None]:
# # jobs = []
# # for col in nb(freqcols):
# #     jobs.append(reduce_col(col))

In [None]:
# # send jobs to reduce_col
# freqcols = [col for col in reduced.columns if '.FREQ' in col]
# jobs = []
# for col in freqcols:
#     jobs.append(lview.apply_async(reduce_col, col))
# watch_async(jobs)

In [None]:
# # combine returns to get loci with no missing data across pop.FREQ cols
# nomissingloci = snps.index.tolist()
# for j in jobs:
#     nomissingloci = list(set(nomissingloci).intersection(j.r))
# len(nomissingloci), luni(nomissingloci)

In [None]:
# nrow(reduced), nrow(snps)

In [None]:
# # # combine returns to get loci with no missing data across pop.FREQ cols
# # nomissingloci = snps.index.tolist()
# # for j in jobs:
# #     nomissingloci = list(set(nomissingloci).intersection(j))
# # len(nomissingloci), luni(nomissingloci)

In [None]:
# # reduce snp table to no missing loci
# reduced = reduced[reduced.index.isin(nomissingloci)]
# reduced.head()

In [None]:
# len(nomissingloci), nrow(reduced)

In [None]:
# nrow(snps)

In [None]:
# plt.hist(reduced['MAF'], bins=100)
# plt.title('SNPs with no missing data\nAnd 20 < DP < 1000\nall contigs')
# plt.show()

In [None]:
# plt.hist(snps['MAF'],bins=100)
# plt.title('all SNPs')
# plt.show()

#### try to reduce to X% missing data ... Result: any filtering resulted in a visibly different MAF spectrum

In [None]:
reduced.shape

In [None]:
lview,dview = get_client()

In [None]:
def filter_perc(loci, perc=0.10):
    """Return only loci with missing data <= perc arg.
    freqcols = all cols with .FREQ in col name
    """
    import pandas
    from tqdm.notebook import tqdm as nb

    keep = []
    for locus in nb(loci):
        sums = sum([1 for freq in reduced.loc[locus,freqcols] if freq!=freq])
        if sums/len(freqcols) <= perc:
            keep.append(locus)
    return keep
dview['reduced'] = reduced
dview['freqcols'] = freqcols

In [None]:
reduced.shape

In [None]:
nrow(reduced)/len(lview)

In [None]:
luni(reduced['CHROM'])

<center> SETTING PERC TO 0.25 MEANS NO LOCI WERE FILTERED BY MAF
    
    if I tried filtering < 25% missing data, the MAF spectrum looked weird, leaving the code inplace.

In [None]:
jobs = send_chunks(filter_perc, reduced.index.tolist(), nrow(reduced)/len(lview), lview, {'perc':0.25})
watch_async(jobs)

In [None]:
perc_above = []
for j in jobs:
    perc_above.extend(j.r)
len(perc_above)

In [None]:
print(reduced.shape)
reduced = reduced[reduced.index.isin(perc_above)].copy()
reduced.shape

In [None]:
# this is what I used (pipeline default)
make_mafdict_fig(get_mafdict(reduced['AF'], roundto=2),
                 get_mafdict(snps['AF'], roundto=2),
                 title='Doug fir\nafter filtering for <= 25 % missing data (pipeline default)')

In [None]:
# this is when trying to filter for less missing data than default (eg perc=0.1)
make_mafdict_fig(get_mafdict(reduced['AF'], roundto=2),
                 get_mafdict(snps['AF'], roundto=2),
                 title='Doug fir\nafter filtering for perc missing data')

#### choose one snp per contig

In [None]:
lview,dview = get_client()

In [None]:
reduced.shape

In [None]:
def choose_random_loci(chrom_list):
    """For each chrom in chrom_list, randomly choose 1 snp."""
    from random import shuffle
    from tqdm.notebook import tqdm as nb
    
    loci = []
    for chrom in nb(chrom_list):
        chrom_snps = list(reduced[reduced['CHROM']==chrom].index)
        shuffle(chrom_snps)
        loci.append(chrom_snps[0])

    return loci
dview['reduced'] = pd.DataFrame(reduced['CHROM'])  # reload reduced

In [None]:
uni_chroms = uni(reduced['CHROM'])
len(uni_chroms)

In [None]:
len(uni_chroms)/len(lview)

In [None]:
# send to choose_random_loci
# jobs = send_chunks(choose_random_loci, uni_chroms, len(uni_chroms)/len(lview), lview)
watch_async(jobs, phase='len(uni_chrom) = %s' % len(uni_chroms))

In [None]:
randomloci = []
for j in jobs:
    randomloci.extend(j.r)
len(randomloci) == len(uni_chroms)

In [None]:
len(randomloci)

In [None]:
reduced = reduced[reduced.index.isin(randomloci)].copy()

In [None]:
reduced.shape

In [None]:
make_mafdict_fig(get_mafdict(reduced['AF'], roundto=2),
                 get_mafdict(snps['AF'], roundto=2),
                 title='Doug fir\nafter choose one SNP per contig for contigs > 1Kb')

#### get an idea of r2 values so we can determine an empirical high-end cutoff

In [None]:
# # skip choosing 1 per contig to see what happens
# randomloci = list(nomissingloci)

In [None]:
lview,dview = get_client()

In [None]:
def getfreqs(myloci):
    """Get the population ALT frequency as a float (reported as a str in table).
    
    Returns
    -------
    list of tuples
        - first element = locus name
        - second element = pandas.Series
    """
    rets = []
    for locus in myloci:
        rets.append((locus, reduced.loc[locus, freqcols].str.replace("%", "").astype(float) / 100))
    return rets
dview['getfreqs'] = getfreqs
dview['freqcols'] = freqcols
dview['reduced'] = reduced[freqcols]

def getr2(myloci):
    from scipy.stats import pearsonr
    from numpy import logical_or
    from tqdm.notebook import tqdm as nb

    freqs = dict((locus, freqs) for (locus,freqs) in getfreqs(myloci))

    r2vals = []
    i = 0
    for locusi in nb(myloci):
        for j,locusj in enumerate(myloci):
            if i < j:
                nas = logical_or(freqs[locusi].isnull(), freqs[locusj].isnull())
                r2 = pearsonr(freqs[locusi][~nas], freqs[locusj][~nas])[0]**2
                r2vals.append(r2)
        i += 1
    return r2vals

In [None]:
jobs = send_chunks(getr2, randomloci, 250, lview)
watch_async(jobs)

In [None]:
r2vals = []
for j in jobs:
    r2vals.extend(j.r)
plt.hist(r2vals, bins=100)
plt.show()

In [None]:
# get 99.9th percentile
r2thresh = sorted(r2vals)[math.ceil(len(r2vals)*.999)]
r2thresh

In [None]:
# what is 99.99th percentile?
sorted(r2vals)[math.ceil(len(r2vals)*.9999)]

In [None]:
# what is 99th percentile?
sorted(r2vals)[math.ceil(len(r2vals)*.99)]

In [None]:
# percentiles are too high, let's see what 0.2 looks like
r2thresh = 0.2

In [None]:
for i,x in enumerate(sorted(r2vals)):
    if x > r2thresh:
        print(i)
        break

In [None]:
i/len(r2vals)  # percentile of r2thresh in r2vals

In [None]:
sorted(r2vals)[math.ceil(len(r2vals)*(i/len(r2vals)))]

#### LD prune random loci¶

In [None]:
reduced.shape

In [None]:
len(randomloci)

In [None]:
snps.shape, reduced.shape

In [None]:
freqcols = [col for col in snps if '.FREQ' in col]

In [None]:
freqs = dict((locus, freqs) for (locus,freqs) in getfreqs(randomloci))
dview['freqs'] = freqs
len(freqs)

In [None]:
dview['reduced'] = None

In [None]:
len(lview)

In [None]:
def prune_em(compareto, locusi=None, r2thresh=0.2):
    from scipy.stats import pearsonr
    from numpy import logical_or
    
    drop = []
    for locusj in compareto:
        nas = logical_or(freqs[locusi].isnull(), freqs[locusj].isnull())
        r2 = pearsonr(freqs[locusi][~nas], freqs[locusj][~nas])[0]**2
#         print('r2 = ', r2)
        if r2 > r2thresh:
            drop.append(locusj)

    return locusi,drop

In [None]:
r2thresh

In [None]:
len(lview)

In [None]:
r2thresh

In [None]:
# send locus along with all needed loci for pairwise comparisons to engines
jobs = []
i = 0
for locusi in tnb(randomloci):
    tosend = randomloci[i+1:]
    jobs.append(lview.apply_async(prune_em, tosend, **{'r2thresh':r2thresh, 'locusi':locusi}))
    i += 1
watch_async(jobs)

In [None]:
# found = {-1: None}  # comment this out after running this the first time in case I want to interrupt and restart
# keep = list(randomloci)  # comment this out after running this the first time in case I want to interrupt and restart
maxx = -1
while maxx < (len(randomloci) - 1):
    i = 0
    for j in tnb(jobs):
        if j.ready() and i-1 in found.keys() and i not in found.keys():
            found[i] = True
            try:
                locusi, drop = j.r
            except: # CancelledError
                continue
            if locusi in keep:
                for locusj in drop:
                    if locusj in keep:
                        job_idx = randomloci.index(locusj)
    #                     print('\tcanceling ', job_idx, '. ', locusj, randomloci[job_idx])
                        jobs[job_idx].cancel()
                        keep.remove(locusj)
        i += 1
    maxx = max(found.keys())
    if maxx+1 < len(randomloci):
        update([(jobs[maxx+1].ready(), maxx, len(found.keys()), len(keep))])
        time.sleep(10)
len(keep)

In [None]:
from pythonimports import *

In [None]:
# pkldump(keep, '/data/home/lindb/temp_dougfir_gea_pruned.pkl')
keep = pklload('/data/home/lindb/temp_dougfir_gea_pruned.pkl')
len(keep)

In [None]:
keeping = snps[snps['locus'].isin(keep)].copy()
keeping.shape

In [None]:
make_mafdict_fig(get_mafdict(keeping['AF'], roundto=2),
                 get_mafdict(snps['AF'], roundto=2),
                 title='Doug fir\nafter pruing for r2 > 0.2')

In [None]:
# this is when I had used a lower threshold of missing data.
make_mafdict_fig(pruned_mafdict, snp_mafdict, 'Doug fir')

In [None]:
len(keep)

In [None]:
luni(keeping['CHROM'])

In [None]:
plt.hist([lens[chrom] for chrom in keeping['CHROM']], bins = 100)
plt.show()

In [None]:
plt.hist([lens[chrom] for chrom in snps['CHROM'] if chrom in lens], bins=100)
plt.show()

In [None]:
len(lens), luni(snps['CHROM'])

In [None]:
# save
baydir = makedir(op.join(mafdir, 'baypass'))
pkldump(keep, op.join(baydir, 'lessthan10perc-missing-data_20-dp-1000_random-snps_1-per-contig-gt1Kbp_r2-lessthan-p30.pkl'))

In [None]:
plt.hist(keeping['MAF'], bins=100)
plt.title('Structure SNPs')
plt.xlabel('MAF')
plt.ylabel('Count')
plt.show()

In [None]:
plt.hist(snps['MAF'], bins=100)
plt.title('All SNPs')
plt.xlabel('MAF')
plt.ylabel('Count')
plt.show()

#### get full snps df to reduce to loci in keep

In [None]:
f

In [None]:
linenums

In [None]:
# read in all SNPs in parallel
nrows = 50000
jobs = []
for skipto in range(0, linenums, nrows):
    jobs.append(lview.apply_async(get_skipto_df, *(f, skipto, nrows)))
watch_async(jobs)

In [None]:
snps = pd.concat([j.r for j in jobs])
snps.shape

In [None]:
snps.index = snps['locus'].tolist()

In [None]:
filtered = snps.loc[keep, :].copy()
filtered.shape

In [None]:
filtered.head()

In [None]:
baydir = '/data/projects/pool_seq/DF_datasets/DF_pooled_GEA/DF_pooled/snpsANDindels/03_maf-p01_RD-recalculated/baypass'
pkldump(keep, op.join(baydir,
                      'baseline-missing-data_20-dp-1000_random-snps_1-per-contig-gt1Kbp_r2-lessthan-p20.pkl'))

In [None]:
# # moved this and all files like it to baydir/bad_maf_spectrum
# filtered.to_csv(op.join(baydir, 'lessthan10perc-missing-data_20-dp-1000_random-snps_1-per-contig-gt1Kbp_r2-lessthan-p30_table.txt'),
#                 sep='\t', index=False)

In [None]:
filtered.to_csv(op.join(baydir, 'baseline-missing-data_20-dp-1000_random-snps_1-per-contig-gt1Kbp_r2-lessthan-p20_table.txt'),
                sep='\t', index=False)

In [None]:
del snps

In [None]:
dview['df'] = None