#### Kraken2


In [None]:
import traceback

import pysam
import os
from hops import hdfs
import utils
import sys
from pyspark import SparkContext
import subprocess
import stat
import gzip
import shutil
import pandas as pd

sc = SparkContext.getOrCreate()

In [None]:
args_full=utils.load_arguments(sys.argv)

In [None]:
### Temporary input folder, input files if list are looked up in this dir
NONHUMAN_DIR = '/Projects/HPV_meta/Backup/TCGA/tcga/nonhuman_all/'

#### Load arguments

In [None]:


OUTPUT_DATASET=args_full[utils.OUTPUT_DATASET]
INPUT_ROOT_PATH=args_full[utils.INPUT_ROOT_PATH]
RUN_FOLDER=args_full[utils.RUN_FOLDER]
WORK_PATH=os.path.join(OUTPUT_DATASET, RUN_FOLDER)
args=args_full['Kraken']

# check of input and output root override
if args_full.get(utils.INPUT_OVERRIDE):
    inputRoot=args_full.get(utils.INPUT_OVERRIDE)
else :
    inputRoot=os.path.join(WORK_PATH,args['INPUT_ROOT'])
if args_full.get(utils.OUTPUT_OVERRIDE):
    outputRoot=args_full.get(utils.OUTPUT_OVERRIDE)
else:
    outputRoot=os.path.join(WORK_PATH,args['OUTPUT_ROOT'])


kraken_path=args['KRAKEN_PATH']
tool=os.path.basename(kraken_path)
kk_db_path=args['KRAKEN_DB_PATH']
is_save_all_outputs=args['SAVE_FULL_OUTPUT']
threads=args['THREADS']

#### Helper functions

In [None]:
# install kraken from hdfs source
def load_kraken(kraken_path):
    tool=os.path.basename(kraken_path)
    
    hdfs.copy_to_local(kraken_path)

    st = os.stat(tool+'/kraken2')
    os.chmod(tool+'/kraken2', st.st_mode | stat.S_IEXEC)
    
    st = os.stat(tool+'/classify')
    os.chmod(tool+'/classify', st.st_mode | stat.S_IEXEC)



def compress_file(file):
    compress_file=file+'.gz'
    with open(file, 'rb') as f_in:        
        with gzip.open(compress_file, 'wb',compresslevel=1) as f_out:
            shutil.copyfileobj(f_in, f_out)
            
    return compress_file

#### Map function


In [None]:
def apply_kraken_single(file_path,kk_db_path):
    """
    Runs kraken on single file via subprcess.
    First kraken is installed by copying kraken tool from hdfs.
    Outputs are copied back to hdfs.
    If an output file name is already present in output directory the processing
    of file is skipped to avoid processing of same file in case of resubmit of failed run.

    :param file_path:
    :param kk_db_path:
    :return:
    """
    
    file=os.path.split(file_path)[1]  
    sample=os.path.splitext(os.path.splitext(file)[0])[0]    
    report=sample+'_report.txt'
    
    if not hdfs.exists(os.path.join(outputRoot,'report',report)): # check if output already exists
        
        if not hdfs.exists(file_path):      
            print("Input file not found, skipping to next")
            return None
        
        hdfs.copy_to_local(file_path)
        # install kraken
        load_kraken(kraken_path) 
        kk_db=os.path.split(kk_db_path)[1]  
        hdfs.copy_to_local(kk_db_path, overwrite=True)

        output=sample+'_out.txt'
        unclassified=sample+'_unclassified.txt'
        if is_save_all_outputs: # save unclassified and output files
            params={'--db':kk_db,'--threads': threads, '--report': report,'--report-minimizer-data':'','--report-zero-counts':'','--unclassified-out': unclassified, file: '','--output': output, "--memory-mapping":"" }
        else :
            params={'--db':kk_db,'--threads': threads, '--report': report,'--report-minimizer-data':'','--report-zero-counts':'','--unclassified-out': '/dev/null', file: '','--output': '/dev/null',"--memory-mapping":"" }
        cmd=utils.build_command(tool+'/kraken2',params)
        print(cmd)
        try:
            status=subprocess.run(cmd.split(),stdout=subprocess.PIPE,check=True)

            if status.returncode==0 and os.path.exists(report):
                hdfs.copy_to_hdfs(report,os.path.join(outputRoot,'report'),overwrite=True)
                os.remove(report)

                if is_save_all_outputs:
                    # compress
                    c_output=compress_file(output)
                    c_unclassified=compress_file(unclassified)
                    # copy to hdfs
                    hdfs.copy_to_hdfs(c_unclassified,os.path.join(outputRoot,'unclassified'), overwrite=True)
                    hdfs.copy_to_hdfs(c_output,os.path.join(outputRoot,'output'), overwrite=True)
                    # remove local files
                    os.remove(output)
                    os.remove(unclassified)
                    os.remove(c_output)
                    os.remove(c_unclassified)


            return file
        except subprocess.CalledProcessError:
            traceback.print_exc()
            return False
        finally:
             os.remove(file)
    else :
        print('skipping existing file: ', file)
        return None

#### Get all input file paths

In [None]:
number_of_files=0

inputFiles=[]
inputFiles=[]
if hdfs.isfile(inputRoot):
    print(" --- Input: is file ----reading from input file list")
    df= pd.read_csv(inputRoot)
    number_of_files=df.count()
    inputFiles = df[df.columns[0]].tolist()
    
else:
    print(" --- Input: is folder ----reading from input folder")
    inputFiles=utils.load_file_names(inputRoot)
    number_of_files=len(inputFiles)


In [None]:
print("Number of input files {}".format(len(inputFiles)))

In [None]:

def renameFiles(x):
    x = NONHUMAN_DIR+x
    return x

#### Run in parallel

In [None]:
rdd_names=sc.parallelize(inputFiles,sc.getConf().get("spark.executor.instances"))


In [None]:

# run
# final=rdd_names.map(lambda x: renameFiles(x)).map(lambda x: apply_kraken_single(x,kk_db_path) ).collect()
final=rdd_names.map(lambda x: apply_kraken_single(x,kk_db_path) ).collect()