Skip to content

Commit

Permalink
snpeff / vcf / hg19&hg38 post-upload process
Browse files Browse the repository at this point in the history
  • Loading branch information
sirloon committed Nov 29, 2016
1 parent d996a3b commit 09f9e24
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ src/www/static/docs
*checkpoint.ipynb
src/config_prod.py
src/run/*.pickle
src/run/done/*.pickle
1 change: 0 additions & 1 deletion src/dataload/sources/snpeff/snpeff_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ def annotate_by_snpeff(self, varobj_list):
else:
print(item)
print('beyond current capacity')
print("Running %s" % self.snpeff_cmd)
proc = subprocess.Popen(self.snpeff_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate(vcf_stdin.encode())
it = iter(snpeff_valid_id)
Expand Down
39 changes: 33 additions & 6 deletions src/dataload/uploader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import glob, os, math
import glob, os, math, asyncio
from functools import partial

import biothings.dataload.uploader as uploader
from biothings.dataload.storage import IgnoreDuplicatedStorage
from biothings.dataload.storage import UpsertStorage
from biothings.utils.mongo import doc_feeder
import biothings.utils.mongo as mongo

import dataload.sources.snpeff.snpeff_upload as snpeff_upload
import dataload.sources.snpeff.snpeff_parser as snpeff_parser

from utils.hgvs import get_pos_start_end

class SnpeffPostUpdateUploader(uploader.BaseSourceUploader):

def post_update_data(self):

def do_snpeff(self, batch_size=100000):
self.logger.info("Updating snpeff information from source '%s' (collection:%s)" % (self.fullname,self.collection_name))
# select Snpeff uploader to get collection name and src_dump _id
version = self.__class__.__metadata__["assembly"]
Expand All @@ -19,21 +24,43 @@ def post_update_data(self):
assert snpeff_doc, "No snpeff information found, has it been dumped & uploaded ?"
snpeff_dir = snpeff_doc["data_folder"]
cmd = "java -Xmx4g -jar %s/snpEff/snpEff.jar %s" % (snpeff_dir,version)
print(cmd)
# genome files are in "data_folder"/../data
genomes = glob.glob(os.path.join(snpeff_dir,"..","data","%s_genome.*" % version))
assert len(genomes) == 1, "Expected only one genome files for '%s', got: %s" % (version,genomes)
genome = genomes[0]
parser = snpeff_parser.VCFConstruct(cmd,genome)
storage = IgnoreDuplicatedStorage(None,snpeff_class.name,self.logger)
batch_size = 1000000
storage = UpsertStorage(None,snpeff_class.name,self.logger)
col = self.db[self.collection_name]
total = math.ceil(col.count()/batch_size)
cnt = 0
for doc_ids in doc_feeder(col, step=batch_size, inbatch=True, fields={'_id':1}):
ids = [d["_id"] for d in doc_ids]
data = parser.annotate_by_snpeff(ids)
data = annotate_vcf(data,version)
storage.process(data, batch_size)
cnt += 1
self.logger.debug("Processed batch %s/%s [%.1f]" % (cnt,total,(cnt/total*100)))

def post_update_data(self, steps, force, batch_size, job_manager):
# this one will run in current thread, snpeff java prg will
# multiprocess itself, no need to do more
self.do_snpeff(batch_size)


def annotate_vcf(docs, assembly):
for doc in docs:
st,end = None,None
if 'vcf' in doc:
try:
st, end = get_pos_start_end(
chr=None, # not even used in func
pos=doc['vcf']['position'],
ref=doc['vcf']['ref'],
alt=doc['vcf']['alt'])
if st and end:
doc[assembly] = {"start": st, "end": end}
except Exception as e:
pass

yield doc

0 comments on commit 09f9e24

Please sign in to comment.