In [None]:
import os
import subprocess
import sys
import traceback
import argparse
from datetime import datetime

from hops import hdfs
from pyspark import SparkContext
import utils

sc = SparkContext.getOrCreate()

In [None]:
parser = argparse.ArgumentParser(description="Argument Parser for BWA")
parser.add_argument("-i", "--input", help="Input root HDFS path")
parser.add_argument("-o", "--output", help="Output root HDFS path ")

options = parser.parse_args(sys.argv[1:])
if options.input:
    INPUT_ROOT=options.input
else :
    raise ValueError("Input path missing")
if options.output:
    OUTPUT_ROOT=options.output
else:
    raise ValueError("Output path missing")


In [None]:


LOG_DIR='Logs/Bwa'

REFERENCE_FASTA_PATH = "References/HPVproteinsincludingnonoficial_201119.faa"
OUTPUT_PREFIX='alg_'
SAI_FORMAT='.sai'
SAM_FORMAT='.sam'

In [None]:

def run_bwa(file,ref_path):
    # download diamond DB file
    ref=os.path.split(ref_path)[1]
    if not os.path.exists(ref):
        hdfs.copy_to_local(ref_path)

    # index
    cmd='bwa index '+ref
    status=subprocess.run(cmd.split(),stdout=subprocess.PIPE,stderr=subprocess.PIPE)

    # download input file
    hdfs.copy_to_local(file, overwrite=True)
    input_file=os.path.basename(file)
    filename=os.path.splitext(os.path.splitext(input_file)[0])[0] # split file name without extension

    outfile=filename+SAI_FORMAT

    parameters = { 'aln': ref, '':input_file}
    cmd=utils.build_command("bwa",parameters)
    print('INFO: Running bwa with command:', cmd)

    log_file=os.path.splitext(outfile)[0]+'.txt'
    try:
        with open(log_file, "w") as f:
            with open(outfile,"w") as out:
                execStatus=subprocess.run(cmd.split(),stdout=out,stderr=f,check=True)
                if execStatus.returncode==0:
                    cmd_sam='bwa samse'+' '+ref+' '+outfile+' '+input_file
                    outputSam=OUTPUT_PREFIX+filename+SAM_FORMAT
                    with open(outputSam,'w') as f_outSam:
                        status_cmd2=subprocess.run(cmd_sam.split(),stdout=f_outSam,stderr=f,check=True)


        hdfs.copy_to_hdfs(log_file, LOG_DIR, overwrite=True)

        if status_cmd2.returncode==0 and os.path.exists(outputSam):
            hdfs.copy_to_hdfs(outputSam,OUTPUT_ROOT,overwrite=True)
            os.remove(outputSam)
            os.remove(outfile)

        return outputSam
    except subprocess.SubprocessError:
        traceback.print_exc()
        return False
    finally:
        parameters.clear()
        os.remove(input_file)
        if os.path.exists(log_file):
            os.remove(log_file)

In [None]:
# load input data
number_of_files=0

inputFiles=[]
if hdfs.isfile(INPUT_ROOT):
    print(" --- Input: is file ----reading from input file list")
    df=spark.read.csv(INPUT_ROOT,header=True)
    number_of_files=df.count()
    df_files=df.select('sorted_file').repartition(int(sc.getConf().get("spark.executor.instances")))
    rdd_names=df_files.rdd
    # run
    start=datetime.now()
    mapped_rdd=rdd_names.map(lambda x:run_bwa(x[0],REFERENCE_FASTA_PATH))
    mapped_rdd.collect()
    print("time elapsed: " +str(datetime.now() - start))
else:
    print(" --- Input: is folder ----reading from input folder")
    inputFiles=utils.load_file_names(INPUT_ROOT)
    number_of_files=len(inputFiles)
    # run
    rdd=sc.parallelize(inputFiles,sc.getConf().get("spark.executor.instances"))
    start=datetime.now()
    final=rdd.map(lambda x: run_bwa(x,REFERENCE_FASTA_PATH) ).collect()
    print("time elapsed: " , datetime.now() - start)




In [None]:

report_file='reportBwa_'+os.path.basename(INPUT_ROOT)+'.txt'
with open(report_file,'w') as f:
    f.write(" Date:" +str(datetime.now() ) )
    f.write(" \n Number of input files: " +str(number_of_files))
    f.write(" \n Time elapsed: " +str(datetime.now() - start))

hdfs.copy_to_hdfs(report_file,'Experiments/benchmark/report',overwrite=True)

