## Nextgenmap
Runs `nextgenmap` in paired and single end setting. Samples are grouped by `R1` and `R2` in file names.
If the keyword `R` is not found its treated as a single ended file.


In [1]:
import logging
import os
import subprocess
import sys
import traceback
import stat

from hops import hdfs
from pyspark import SparkContext

import utils
sc = SparkContext.getOrCreate()

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
24,application_1674723992507_0129,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [4]:

#args_full=utils.load_arguments([0,"-shdfs:///Projects/HPV_meta/Jupyter/Bio_pipeline/settings/settings_benchmark.yml","-ihdfs:///Projects/HPV_meta/Ainhoa/input","-ohdfs:///Projects/HPV_meta/Ainhoa/output"])
args_full=utils.load_arguments(sys.argv)

#### Load arguments

In [5]:




OUTPUT_DATASET=args_full[utils.OUTPUT_DATASET]
INPUT_ROOT_PATH=args_full[utils.INPUT_ROOT_PATH]
RUN_FOLDER=args_full[utils.RUN_FOLDER]
WORK_PATH=os.path.join(OUTPUT_DATASET, RUN_FOLDER)
args=args_full[utils.KEY_NGM]
# check of input and output root override
if args_full.get(utils.INPUT_OVERRIDE):
    INPUT_ROOT=args_full.get(utils.INPUT_OVERRIDE)
else :
   INPUT_ROOT=os.path.join(WORK_PATH,args['INPUT_ROOT'])
if args_full.get(utils.OUTPUT_OVERRIDE):
    OUTPUT_ROOT=args_full.get(utils.OUTPUT_OVERRIDE)
else:
    OUTPUT_ROOT=os.path.join(WORK_PATH,args['OUTPUT_ROOT'])

VERY_FAST='--very-fast'
REFERENCE_PATH=args['REFERENCE_FILE']
MIN_I=str(args['MIN-IDENTITY'])
MIN_R=str(args['MIN-RESIDUES'])
THREADS=args['THREADS']
LOG_DIR=args['LOGS_ROOT']
is_very_fast=args['VERY_FAST']
IS_INSTALL_NGM=args['INSTALL_NGM']
SPACE=utils.SPACE
TOOL_PATH='Tools/ngm_built/NGM'
if IS_INSTALL_NGM:
    TOOL='./NGM/bin/ngm-0.5.5/ngm'
else :
    TOOL='ngm'


def chmod_exec(tool):
    st = os.stat(tool)
    os.chmod(tool, st.st_mode | stat.S_IEXEC)

def install_ngm(tool_path):
    print('INFO: Installing NGM from path ', tool_path)
    hdfs.copy_to_local(tool_path,overwrite=True)
    lib1='./NGM/bin/ngm-0.5.5/ngm-core'
    lib2='./NGM/bin/ngm-0.5.5/ngm'
    chmod_exec(lib1)
    chmod_exec(lib2)

#### Map functions 

In [6]:
### paired files
def apply_ngm_paired(x,REFERENCE_PATH):
    """
    Runs NGM in paired mode via subprocess for single R1 R2 pair.
    Output is saved in SAM format and copied back to hdfs.
    """

    filename_forward_path=x[0] #r1
    filename_reverse_path=x[1] #r2
    # split path to get file names
    filename_forward=os.path.basename(filename_forward_path)
    filename_reverse=os.path.basename(filename_reverse_path)

    output_file=filename_forward.split('.')[0].replace(utils.R1,'')
    if  utils.skip_file(filename_forward,output_file,OUTPUT_ROOT):
        return [-1]

    if IS_INSTALL_NGM and not os.path.exists(os.path.basename(TOOL_PATH)):
         install_ngm(TOOL_PATH)

    # get file name and copy to local
    ref=os.path.split(REFERENCE_PATH)[1]
    if not os.path.exists(ref):        
        hdfs.copy_to_local(REFERENCE_PATH)
    hdfs.copy_to_local(filename_forward_path, overwrite=True)
    hdfs.copy_to_local(filename_reverse_path, overwrite=True)
    

    parameters = { '-i':MIN_I, '-R': MIN_R, '-p': utils.EMPTY, '-r': ref, '-1': filename_forward, '-2': filename_reverse,
                  '--silent-clip': utils.EMPTY, '-o': output_file, '-t': THREADS, '--no-progress': utils.EMPTY }
    
    cmd = utils.build_command(TOOL,parameters)
    if is_very_fast :
        cmd=cmd+SPACE+VERY_FAST # final command to run
    
    logging.info('Running nextgenmap with command:', cmd)
    log_file=os.path.splitext(output_file) [0]+'.txt'
    f=open(log_file, "w")
    # run
    try:
        execStatus=subprocess.run(cmd.split(),stdout=f,stderr=f,check=True)
        hdfs.copy_to_hdfs(log_file, LOG_DIR, overwrite=True)
        if execStatus.returncode==0 and os.path.exists(output_file):
             # copy output to hdfs
            hdfs.copy_to_hdfs(output_file, OUTPUT_ROOT, overwrite=True)
            # remove from local
            os.remove(output_file)
            return [True,output_file]
    except subprocess.CalledProcessError:
        traceback.print_exc()
        return False
    except IOError:
        traceback.print_exc()
        utils.hdfs_delete_file(os.path.join(OUTPUT_ROOT,output_file))
        return False
    finally:
        f.close()
        os.remove(filename_forward)
        os.remove(filename_reverse)
        if os.path.exists(log_file):
            os.remove(log_file)
        parameters.clear()

    
### single files
def apply_ngm_single(x,REFERENCE_PATH):
    """
    Runs NGM in single mode for single file.
    Output is in SAM format and copied back to hdfs.
    """
    
    filename_forward_path=x
    # split path to get file names
    filename_forward=os.path.basename(filename_forward_path)
    output_file=filename_forward.split('.')[0].replace(utils.R1,'')+'.sam'
    # skip if output exists
    if  utils.skip_file(filename_forward,output_file,OUTPUT_ROOT):
         return [-1]
    # install ngm
    if IS_INSTALL_NGM and not os.path.exists(os.path.basename(TOOL_PATH)):
        install_ngm(TOOL_PATH)
    # get file name
    ref=os.path.split(REFERENCE_PATH)[1]
    if not os.path.exists(ref):
        hdfs.copy_to_local(REFERENCE_PATH,overwrite=False)

    hdfs.copy_to_local(filename_forward_path, overwrite=True)

    parameters = { '-i':MIN_I, '-R': MIN_R, '-q': filename_forward, '-r': ref, 
                  '--silent-clip': utils.EMPTY, '-o': output_file, '-t': THREADS, '--no-progress': utils.EMPTY}
    
    cmd = utils.build_command(TOOL,parameters)
    
    if is_very_fast :
        cmd=cmd+SPACE+VERY_FAST # final command to run
    print("Running command:", cmd)
    #logging.info('Running nextgenmap with command:', cmd)
    log_file=os.path.splitext(output_file) [0]+'.txt'
    f=open(log_file, "w")
    # run command
    try:
        execStatus=subprocess.run(cmd.split(),stdout=f,stderr=f,check=True)
        
        if execStatus.returncode==0 and os.path.exists(output_file):
            # copy to hdfs
            hdfs.copy_to_hdfs(output_file, OUTPUT_ROOT, overwrite=True)
            # remove from local
            os.remove(output_file)
            return [True,output_file]

    except subprocess.CalledProcessError:
        traceback.print_exc()
        return False
    except IOError:
        traceback.print_exc()
        utils.hdfs_delete_file(os.path.join(OUTPUT_ROOT,output_file))
        return False
    finally:
        f.close()
        parameters.clear()
        os.remove(filename_forward)
        if os.path.exists(log_file):
            hdfs.copy_to_hdfs(log_file, LOG_DIR, overwrite=True)
            os.remove(log_file)



            

#### List all input files hdfs path

In [7]:


all_files=utils.load_file_names(INPUT_ROOT)

#### Get list of all single end files and run NGM in single mode in parallel

In [13]:
### single
single_files=[f for f in all_files if utils.R_IDENTIFIER not in f]


print('number of single input  files processing ', len(single_files))
dataRdd=sc.parallelize(single_files,sc.getConf().get("spark.executor.instances"))

# run
ngmSingleFiles=dataRdd.map(lambda x: apply_ngm_single(x,REFERENCE_PATH)).collect()

number of single input  files processing  0

#### Pair R1 and R2 as a tuple in a list and run NGM in paired end in parallel

In [14]:

# group r1 and r2
pairedList = utils.group_R1R2(all_files)
print('number of pairs input files processing ', len(pairedList))
dataPairedRdd=sc.parallelize(pairedList,sc.getConf().get("spark.executor.instances"))

# run
ngmFiles=dataPairedRdd.map(lambda x: apply_ngm_paired(x,REFERENCE_PATH) ).collect()



number of pairs input files processing  1