In [29]:
import boto3
import pandas as pd
from ete3 import NCBITaxa
import subprocess
import itertools
import os
import s3fs
import numpy as np
from lca_functions import *

In [30]:
s3 = boto3.resource('s3')
client = boto3.client('s3')
bucket_name = "czbiohub-mosquito"
bucket = s3.Bucket(bucket_name)
contig_folders = [x["Prefix"] for x in client.list_objects(Bucket=bucket_name, Prefix="contigs/", Delimiter="/")["CommonPrefixes"]]
contig_quality_folders = [x["Prefix"] for x in client.list_objects(Bucket=bucket_name, Prefix="contig_quality/", Delimiter="/")["CommonPrefixes"] if "Mos" not in x["Prefix"]]

ncores = os.cpu_count()



In [6]:
%%bash
download_file() {
    tax_db=$1
    if [ ! -f "${tax_db}.zip" ]; then
        curl -O ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump_archive/$tax_db.zip
        unzip -d $tax_db $tax_db.zip
        cd $tax_db
        tar -czvf ../$tax_db.tar.gz *
        cd ..
        rm -rf $tax_db $tax_db.zip
    fi
}
export -f download_file
download_file "new_taxdump_2019-01-01" ## latest taxonomy release that still contains original virus taxonomy
download_file "new_taxdump_2019-06-01"


Archive:  new_taxdump_2019-01-01.zip
  inflating: new_taxdump_2019-01-01/citations.dmp  
  inflating: new_taxdump_2019-01-01/delnodes.dmp  
  inflating: new_taxdump_2019-01-01/division.dmp  
  inflating: new_taxdump_2019-01-01/fullnamelineage.dmp  
  inflating: new_taxdump_2019-01-01/gencode.dmp  
  inflating: new_taxdump_2019-01-01/host.dmp  
  inflating: new_taxdump_2019-01-01/merged.dmp  
  inflating: new_taxdump_2019-01-01/names.dmp  
  inflating: new_taxdump_2019-01-01/nodes.dmp  
  inflating: new_taxdump_2019-01-01/rankedlineage.dmp  
  inflating: new_taxdump_2019-01-01/taxidlineage.dmp  
  inflating: new_taxdump_2019-01-01/typematerial.dmp  
  inflating: new_taxdump_2019-01-01/typeoftype.dmp  
Archive:  new_taxdump_2019-06-01.zip
  inflating: new_taxdump_2019-06-01/citations.dmp  
  inflating: new_taxdump_2019-06-01/delnodes.dmp  
  inflating: new_taxdump_2019-06-01/division.dmp  
  inflating: new_taxdump_2019-06-01/fullnamelineage.dmp  
  inflating: new_taxdump_2019-06-01/genco

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0 96.0M    0  231k    0     0   157k      0  0:10:25  0:00:01  0:10:24  157k  4 96.0M    4 4181k    0     0  1674k      0  0:00:58  0:00:02  0:00:56 1673k  9 96.0M    9 9332k    0     0  2683k      0  0:00:36  0:00:03  0:00:33 2682k 16 96.0M   16 16.0M    0     0  3677k      0  0:00:26  0:00:04  0:00:22 3676k 25 96.0M   25 24.7M    0     0  4594k      0  0:00:21  0:00:05  0:00:16 5066k 33 96.0M   33 31.9M    0     0  5040k      0  0:00:19  0:00:06  0:00:13 6472k 39 96.0M   39 38.4M    0     0  5261k      0  0:00:18  0:00:07  0:00:11 7061k 48 96.0M   48 46.2M    0     0  5591k      0  0:00:17  0:00:08  0:00:09 7615k 57 96.0M   57 55.2M    0     0  5975k      0  0:00

In [31]:
ncbi = NCBITaxa()
#ncbi.update_taxonomy_database(taxdump_file=="new_taxdump_2019-06-01.tar.gz")

### functions

In [12]:
def run_lca_analysis (input_file_name, output_dir, bucket_name, blast_type, default=False, ncores=8):
    # First list the folders (there is a limit of 1000 files output by AWS by default)
    list_of_folders = [client.list_objects(Bucket=bucket_name, Prefix=x["Prefix"]+input_file_name) \
                       for x in client.list_objects(Bucket=bucket_name, Prefix="contigs/", Delimiter="/")["CommonPrefixes"]]
    list_of_folders = ["s3://"+bucket_name+"/"+x["Prefix"] for x in list_of_folders if "Contents" in x.keys()]
    filenames = pd.DataFrame(list_of_folders, columns=["blast_"+blast_type])
    output_string = output_dir
    if not default:
        output_string += "/ident"+str(ident_cutoff)+"align"+str(align_cutoff)+"bitscore"+str(bitscore_cutoff)
    filenames = filenames.assign(filtered_blast=filenames["blast_"+blast_type].str.replace("contigs", output_string).str.replace(".m9", "_filtered.m9"))
    filenames = filenames.assign(excluded_contigs=filenames["filtered_blast"].apply(os.path.dirname).apply(lambda x: os.path.join(x, "exclude_contigs_"+blast_type+".txt")))
    filenames = filenames.assign(lca=filenames["filtered_blast"].str.replace("blast_"+blast_type, "lca_"+blast_type).str.replace("_filtered", ""))
    filenames = filenames.assign(reads=filenames["blast_"+blast_type].str.replace("blast_"+blast_type+".m9", "contig_stats.json").replace(""))
    #filenames.loc[~filenames["reads"].str.contains("ater"), "reads"] = filenames.loc[~filenames["reads"].str.contains("ater"), "reads"].str.replace("bowtie", "Mos/bowtie")
    commands = filenames.apply(lambda x: "python lca_analysis.py"+\
                               " --blast_type "+blast_type+\
                               " --fpath "+x.iloc[0]+\
                               " --filtered_blast_path "+x.iloc[1]+\
                               " --excluded_contigs_path "+x.iloc[2]+\
                               " --outpath "+x.iloc[3]+\
                               " --read_count_path "+x.iloc[4]+\
                               " --verbose True", axis=1)
    print (commands)
    commands_csv_filename = "lca_"+blast_type+"_commands"
    commands.to_csv(commands_csv_filename, index=False)
    if (len(filenames) < ncores):
        ncores = len(filenames)
    command_str = "parallel -a "+commands_csv_filename+" -j "+str(ncores)
    print (command_str)
#     process = subprocess.Popen(command_str.split(), stdout=subprocess.PIPE)
#     output, error = process.communicate()
#     return (output, error)


### nt hits
The nt hits of contigs from each sample are filtered with ident_cutoff=0.9 and align_len_cutoff=0.9

In [13]:
run_lca_analysis(input_file_name="blast_nt.m9", output_dir="contig_quality", \
                 bucket_name=bucket_name, blast_type="nt", default=True, ncores=ncores)

0      python lca_analysis.py --blast_type nt --fpath...
1      python lca_analysis.py --blast_type nt --fpath...
2      python lca_analysis.py --blast_type nt --fpath...
3      python lca_analysis.py --blast_type nt --fpath...
4      python lca_analysis.py --blast_type nt --fpath...
5      python lca_analysis.py --blast_type nt --fpath...
6      python lca_analysis.py --blast_type nt --fpath...
7      python lca_analysis.py --blast_type nt --fpath...
8      python lca_analysis.py --blast_type nt --fpath...
9      python lca_analysis.py --blast_type nt --fpath...
10     python lca_analysis.py --blast_type nt --fpath...
11     python lca_analysis.py --blast_type nt --fpath...
12     python lca_analysis.py --blast_type nt --fpath...
13     python lca_analysis.py --blast_type nt --fpath...
14     python lca_analysis.py --blast_type nt --fpath...
15     python lca_analysis.py --blast_type nt --fpath...
16     python lca_analysis.py --blast_type nt --fpath...
17     python lca_analysis.py -

In [4]:
lca_nt_paths = ["s3://"+bucket_name+"/"+x["Prefix"]+"lca_nt.m9" \
 for x in client.list_objects(Bucket=bucket_name, Prefix="contig_quality/", Delimiter="/")["CommonPrefixes"] if "Mos" not in x["Prefix"]]
blast_nt_paths = [x.replace("lca_nt", "blast_nt_filtered") for x in lca_nt_paths]


In [5]:
for i in range(len(lca_nt_paths)):
    sample_name = os.path.basename(os.path.dirname(lca_nt_paths[i]))
    outfile = lca_nt_paths[i].replace("lca_nt", "blast_lca_nt_filtered")
    try:
        combine_blast_lca (lca_nt_paths[i], blast_nt_paths[i], outfile, sample_name, "nt")
    except:
        print ("error: "+sample_name)



error: CMS001_Water5_RNA_A_S12
error: CMS001_water1_S11
error: CMS001_water5_RNA_A_S12
error: CMS002_016a_Rb_S121_L004
error: CMS002_025d_Rb_S143_L004
error: CMS002_025f_Rb_S145_L004
error: CMS002_0Water8_Rb_S11_L004


### nr hits

Extract blast_nr hits from plast results and save to folder for each sample

In [None]:
%%bash
aws s3 ls s3://czbiohub-mosquito/plast/ | grep '.m8' | awk 'NF>1{print $NF}' | parallel -j 72 python create_blast_nr.py --fpath s3://czbiohub-mosquito/plast/{}
aws s3 ls s3://lucymli/skeeters/blast_nr/ | grep 'CMS00' | awk 'NF>1{print $NF}' | parallel aws s3 sync s3://lucymli/skeeters/blast_nr/{} blast_nr_output/{}
head -n 1 $(find blast_nr_output -type f -name '*.m8' | head -n 1) > header_line 
for x in `ls blast_nr_output`; do 
    mkdir -p blast_nr_output_full/$x
    head -n 1 $(find blast_nr_output -type f -name '*.m8' | head -n 1) > blast_nr_output_full/$x/blast_nr.m9
    ls -d $(find blast_nr_output/$x -type f) | xargs -0 -I file cat file > blast_nr_output_full/$x/blast_nr.m9
done
ls blast_nr_output_full | parallel aws s3 cp blast_nr_output_full/{}/blast_nr.m9 s3://czbiohub-mosquito/contigs/{}/blast_nr.m9

# add missing taxids
python initiate_gi2taxid_database.py
aws s3 ls s3://czbiohub-mosquito/contigs/ --recursive | grep "blast_nr.m9" | awk 'NF>1{print $NF}' | parallel python get_missing_prot_taxa.py s3://czbiohub-mosquito/{}






The nr hits of contigs from each sample are filtered with ident_cutoff=0.9 and align_len_cutoff=0.9

In [6]:
run_lca_analysis(input_file_name="blast_nr.m9", output_dir="contig_quality", \
                 bucket_name=bucket_name, blast_type="nr", default=True, ncores=ncores)

0      python lca_analysis.py --blast_type nr --fpath...
1      python lca_analysis.py --blast_type nr --fpath...
2      python lca_analysis.py --blast_type nr --fpath...
3      python lca_analysis.py --blast_type nr --fpath...
4      python lca_analysis.py --blast_type nr --fpath...
                             ...                        
150    python lca_analysis.py --blast_type nr --fpath...
151    python lca_analysis.py --blast_type nr --fpath...
152    python lca_analysis.py --blast_type nr --fpath...
153    python lca_analysis.py --blast_type nr --fpath...
154    python lca_analysis.py --blast_type nr --fpath...
Length: 155, dtype: object
parallel -a lca_nr_commands -j 128




In [4]:
lca_nr_paths = ["s3://"+bucket_name+"/"+x["Prefix"]+"lca_nr.m9" \
 for x in client.list_objects(Bucket=bucket_name, Prefix="contig_quality/", Delimiter="/")["CommonPrefixes"] if "Mos" not in x["Prefix"]]
blast_nr_paths = [x.replace("lca_nr", "blast_nr_filtered") for x in lca_nr_paths]


In [8]:
for i in range(len(lca_nr_paths)):
    sample_name = os.path.basename(os.path.dirname(lca_nr_paths[i]))
    outfile = lca_nr_paths[i].replace("lca_nr", "blast_lca_nr_filtered")
    try:
        combine_blast_lca (lca_nr_paths[i], blast_nr_paths[i], outfile, sample_name, "nr")
    except:
        print ("error: "+sample_name)



In [70]:
blast_type="nt"
fpath="s3://czbiohub-mosquito/contigs/CMS002_038a_Rb_S172_L004/blast_nt.m9"
filtered_blast_path="s3://czbiohub-mosquito/contig_quality/CMS002_038a_Rb_S172_L004/blast_nt_filtered.m9" 
excluded_contigs_path="s3://czbiohub-mosquito/contig_quality/CMS002_038a_Rb_S172_L004/exclude_contigs_nt.txt"
outpath="s3://czbiohub-mosquito/contig_quality/CMS002_038a_Rb_S172_L004/lca_nt.m9" 
read_count_path="s3://czbiohub-mosquito/contigs/CMS002_038a_Rb_S172_L004/contig_stats.json"
verbose=True
                    
                    
                    
                    

In [71]:
import argparse, sys, time

In [72]:
if (read_count_path.endswith(".json")):
    read_counts = load_json(read_count_path, colnames=["query", "read_count"])
else:
    read_counts = pd.read_csv(read_count_path, sep="\t", header=0).rename(columns={"contig_name":"query"})

filtered_contigs_by_read_count = read_counts[read_counts["read_count"]>2]

In [73]:

col_names = ["query", "subject", "identity", "align_length", "mismatches", 
        "gaps", "qstart", "qend", "sstart", "send", "evalue", "bitscore", "taxid", 
        "sci_name", "common_name", "subject_title", "qcov", "hsp_count"]

db = "nucleotide"

In [74]:
blast_results = get_single_hsp(fpath, blast_type, col_names) 

/var/folders/82/tl56r6r132sbl5nnqn9jpytc0000gn/T/tmppnr6fi3f blast file downloaded to this tempfile


In [21]:
blast_results[blast_results["query"]=="NODE_7_length_3154_cov_2211.364316"]

Unnamed: 0,query,subject,identity,align_length,mismatches,gaps,qstart,qend,sstart,send,evalue,bitscore,taxid,sci_name,common_name,subject_title,qcov,hsp_count,blast_type
43,NODE_7_length_3154_cov_2211.364316,KX882764.1,81.25003,2320,252,6,15,1537,3077,1555,0.0,1856.0,1922926,Hubei mosquito virus 2,Hubei mosquito virus 2,Hubei mosquito virus 2 strain 3mos6212 segment...,0.735574,2.0,nt
44,NODE_7_length_3154_cov_2211.364316,KX882873.1,81.034207,2320,255,6,15,1537,3072,1550,0.0,1827.0,1922926,Hubei mosquito virus 2,Hubei mosquito virus 2,Hubei mosquito virus 2 strain spider133708 seg...,0.735574,2.0,nt
45,NODE_7_length_3154_cov_2211.364316,KX882832.1,79.148709,2326,292,14,13,1537,3082,1558,0.0,1584.0,1922926,Hubei mosquito virus 2,Hubei mosquito virus 2,Hubei mosquito virus 2 strain mosZJ35453 segme...,0.737476,2.0,nt
46,NODE_7_length_3154_cov_2211.364316,MH188027.1,100.0,30,0,0,2129,2158,4816,4787,0.006,56.5,2304509,Culex Daeseongdong-like virus,Culex Daeseongdong-like virus,Culex Daeseongdong-like virus strain CDaeVL/Ke...,0.009512,1.0,nt


In [75]:
start_time = time.time()
# obtain a single HSP for each query-subject pairing and read in blast results as a pandas data frame
blast_results = get_single_hsp(fpath, blast_type, col_names) 

if "qlen" not in blast_results:
    if ("~" in blast_results["query"].iloc[0]):
        blast_results = blast_results.assign(qlen=blast_results["query"].str.split("~").apply(lambda x: int(x[1].split("_")[3])))
    else:
        blast_results = blast_results.assign(qlen=blast_results["query"].str.split("_").apply(lambda x: int(x[3])))
print_to_stdout("Loaded blast file: "+fpath, start_time, verbose)


# data frame: whether or not each contig was included or excluded from blast analysis, and reason for exclusion
excluded_contigs = blast_results.groupby(["query"]).first().reset_index()[["query", "qlen"]].rename(columns={"qlen":"contig_length"})
if ("~" in excluded_contigs["query"].iloc[0]):
    excluded_contigs = excluded_contigs.assign(sample=excluded_contigs["query"].str.split("~").apply(lambda x: x[0]))
    excluded_contigs["query"] = excluded_contigs["query"].str.split("~").apply(lambda x: x[1])
    if "sample" in read_counts:
        selected_cols = ["sample", "query", "read_count"]
    else:
        selected_cols = ["query", "read_count"]
    excluded_contigs = pd.merge(excluded_contigs, read_counts[selected_cols], how="left").fillna(0)
    excluded_contigs["query"] = excluded_contigs[["sample", "query"]].apply(lambda x: x[0]+"~"+x[1], axis=1)
    queries = filtered_contigs_by_read_count.apply(lambda x: x["sample"]+"~"+x["query"], axis=1)
    excluded_contigs = excluded_contigs.assign(low_read_count=~excluded_contigs["query"].isin(queries))
else:
    excluded_contigs = excluded_contigs.assign(contig_length=excluded_contigs["query"].str.split("_").apply(lambda x: int(x[3])))
    excluded_contigs = pd.merge(excluded_contigs, read_counts, how="left", on="query").fillna(0)
    excluded_contigs = excluded_contigs.assign(low_read_count=~excluded_contigs["query"].isin(filtered_contigs_by_read_count["query"]))


    

# find missing taxids
blast_results["taxid"] = blast_results["taxid"].replace(to_replace=0, value=np.nan) # some synthetic constructs have taxid 0
if (blast_results["taxid"].isnull().any()):
    subjects_to_search = list(blast_results[blast_results["taxid"].isnull()]["subject"].unique())
    print_to_stdout(str(blast_results["taxid"].isnull().sum())+" blast hits corresponding to "+str(len(subjects_to_search))+" accession numbers have taxid 'NA'. Trying to find the taxid for these hits on NCBI.", start_time, verbose)
    subjects_taxids = [find_missing_taxid(x, db=db) for x in subjects_to_search]
    subjects_taxid_dict = dict(zip(subjects_to_search, subjects_taxids))
    blast_results.loc[blast_results["taxid"].isnull(), ["taxid"]] = blast_results[blast_results["taxid"].isnull()]["subject"].apply(lambda x: subjects_taxid_dict[x])
    blast_results = blast_results[~blast_results["taxid"].isnull()]

    
excluded_contigs = excluded_contigs.assign(taxid_na=~excluded_contigs["query"].isin(blast_results["query"]))
print_to_stdout(str(excluded_contigs["taxid_na"].sum())+" contigs were excluded because none of the subject taxids could be found.", start_time, verbose)        

if len(blast_results)==0:
    if (excluded_contigs_path.startswith("s3://")):
        df_to_s3(excluded_contigs, excluded_contigs_path)
    else:
        excluded_contigs.to_csv(excluded_contigs_path, sep="\t", index=False)
    exit()



/var/folders/82/tl56r6r132sbl5nnqn9jpytc0000gn/T/tmp0w1kzbfg blast file downloaded to this tempfile
Loaded blast file: s3://czbiohub-mosquito/contigs/CMS002_038a_Rb_S172_L004/blast_nt.m9| elapsed time: 18.27 seconds
0 contigs were excluded because none of the subject taxids could be found.| elapsed time: 18.29 seconds


In [76]:
# exclude contigs with hits to mosquito
all_hits_queries = list(blast_results["query"].unique())
print_to_stdout("remove contigs if they are likely hexapoda ", start_time, verbose)
subset_blast_hits = blast_results[~blast_results["taxid"].duplicated()]
hexapoda_hits = ncbi.get_descendant_taxa(ncbi.get_name_translator(["Hexapoda"])["Hexapoda"][0])
hexapoda_queries = subset_blast_hits[subset_blast_hits["taxid"].isin(hexapoda_hits)]["query"].unique().tolist()
before = blast_results[blast_results["query"].isin(hexapoda_queries)]
after = before.groupby(["query"], as_index=False).apply(filter_by_taxid, db=db, taxid=ncbi_older_db(["Hexapoda"], "get_name_translator")["Hexapoda"][0])
hexa_contigs = before[~before["query"].isin(after["query"])]["query"].unique()
blast_results = blast_results[~blast_results["query"].isin(hexa_contigs)]
excluded_contigs = excluded_contigs.assign(hexapoda=excluded_contigs["query"].isin(hexa_contigs))
print_to_stdout(str(len(hexa_contigs))+" contigs were likely hexapoda.", start_time, verbose)

remove contigs if they are likely hexapoda | elapsed time: 43.65 seconds


In [78]:
hexa_contigs

['NODE_57_length_1623_cov_3.124838',
 'NODE_66_length_1522_cov_1.734256',
 'NODE_81_length_1424_cov_11.271715',
 'NODE_100_length_1330_cov_4.233041',
 'NODE_110_length_1279_cov_1.802829',
 'NODE_117_length_1264_cov_1.492839',
 'NODE_178_length_1128_cov_3.803996',
 'NODE_294_length_955_cov_2.523918',
 'NODE_372_length_858_cov_2.294494',
 'NODE_687_length_677_cov_1.213333',
 'NODE_777_length_635_cov_3.609319',
 'NODE_792_length_630_cov_0.989150',
 'NODE_867_length_609_cov_1.592105',
 'NODE_1095_length_554_cov_0.909853',
 'NODE_1118_length_546_cov_299.526652',
 'NODE_1168_length_537_cov_1.715217',
 'NODE_1218_length_531_cov_0.632159',
 'NODE_1271_length_520_cov_2.288939',
 'NODE_1319_length_512_cov_2.680460',
 'NODE_1331_length_509_cov_2.708333',
 'NODE_1626_length_470_cov_1.249364',
 'NODE_2056_length_427_cov_1.505714',
 'NODE_2250_length_411_cov_3.212575',
 'NODE_2407_length_399_cov_1.975155',
 'NODE_2963_length_366_cov_1.515571',
 'NODE_4040_length_319_cov_100.458678',
 'NODE_4100_leng

In [79]:
blast_results = blast_results[~blast_results["query"].isin(excluded_contigs["query"][excluded_contigs["low_read_count"]])]






In [101]:
blast_results = blast_results.reset_index().drop(columns="index")
filtered_blast_results = blast_results.groupby(["query"], as_index=False).apply(
    select_taxids_for_lca, db=db,
    return_taxid_only=False
)

In [102]:
lca_results = filtered_blast_results.groupby(["query"]).apply(get_lca)
additional_hexa_contigs = lca_results["query"][lca_results["taxid"].apply(lambda x: ncbi.get_name_translator(["Hexapoda"])["Hexapoda"][0] in ncbi_older_db(x, "get_lineage"))]
excluded_contigs.loc[excluded_contigs["query"].isin(additional_hexa_contigs), "hexapoda"] = True
filtered_blast_results = filtered_blast_results[~filtered_blast_results["query"].isin(additional_hexa_contigs)]
lca_results = lca_results[~lca_results["query"].isin(additional_hexa_contigs)]


In [111]:
excluded_contigs[excluded_contigs["query"]=="NODE_7_length_3154_cov_2211.364316"]

Unnamed: 0,query,contig_length,read_count,low_read_count,taxid_na,hexapoda
338,NODE_7_length_3154_cov_2211.364316,3154,96948.0,False,False,False
