## Merge BAM

This job to merge BAM files per sample using samtools. Unique files are found by combining:

* Lanes
* Paired/Unpaired
* All parts incase the input FASTQ was split into smaller parts

In [None]:
import os
import subprocess
import sys
import traceback

import pysam
from hops import hdfs
from pyspark import SparkContext
import stat

import utils


sc = SparkContext.getOrCreate()

#### Load arguments

In [None]:
args_full=utils.load_arguments(sys.argv)

OUTPUT_DATASET=args_full[utils.OUTPUT_DATASET]
INPUT_ROOT_PATH=args_full[utils.INPUT_ROOT_PATH]
RUN_FOLDER=args_full[utils.RUN_FOLDER]
WORK_PATH=os.path.join(OUTPUT_DATASET, RUN_FOLDER)
args=args_full[utils.KEY_MERGE]

# check of input and output root override
if args_full.get(utils.INPUT_OVERRIDE):
    INPUT_ROOT=args_full.get(utils.INPUT_OVERRIDE)
else :
    INPUT_ROOT=os.path.join(WORK_PATH,args['INPUT_ROOT'])
if args_full.get(utils.OUTPUT_OVERRIDE):
    OUTPUT_MERGE=args_full.get(utils.OUTPUT_OVERRIDE)
else:
    OUTPUT_MERGE=os.path.join(WORK_PATH,args['OUTPUT_MERGE'])

THREADS=str(args['THREADS'])
tool_path=args['SAMTOOLS_PATH']
TOOL='./samtools/bin/samtools'
SPACE=utils.SPACE

def chmod_exec(tool):
    st = os.stat(tool)
    os.chmod(tool, st.st_mode | stat.S_IEXEC)

def install_samtools(tool_path):
    hdfs.copy_to_local(tool_path)
    lib1='./samtools/bin/samtools'
    chmod_exec(lib1)




#### Function to map

In [None]:


def merge_files(file):
    """
    runs 'samtools merge' as subprocess to merge all files for a given sample name.
    """
    
    print("INFO: Run merge files", file)    
    first_name=file
    merged_file=first_name+'.bam'

    if not os.path.exists('samtools'):
        install_samtools(tool_path)

    if utils.skip_file(file,merged_file,OUTPUT_MERGE):
        return [-1]

    group_files=list(filter(lambda x: file in x ,inputFiles)) # all files for sample name
    if len(group_files)==1: # if only single file copy to hdfs output folder
        hdfs.cp(os.path.join(INPUT_ROOT,group_files[0]), os.path.join(OUTPUT_MERGE,group_files[0]),overwrite=True )
        return merged_file
    # copy all files to local
    [hdfs.copy_to_local(os.path.join(INPUT_ROOT,x), overwrite=True) for x in group_files]
    # get string with all file names
    args=' '.join(group_files)
    
    params={'merge':merged_file, args: '','-@': THREADS}
    cmd=utils.build_command(TOOL,params)
    try:
        status=subprocess.run(cmd.split(),stdout=subprocess.PIPE,check=True)
        if status.returncode==0 and os.path.exists(merged_file):
            hdfs.copy_to_hdfs(merged_file,OUTPUT_MERGE,overwrite=True)
            os.remove(merged_file)
    except subprocess.SubprocessError:
        traceback.print_exc()
        # delete corrupted input files
        print('INFO: Input files could be corrupted, deleting input files: ', group_files)
        [hdfs.delete(os.path.join(INPUT_ROOT,x)) for x in group_files]
    finally:
        for f in group_files:
            os.remove(f)

    return merged_file
  

        
    


#### Load input file names

In [None]:
# load input file hdfs paths
inputFiles=utils.load_file_names(INPUT_ROOT)
# take only file names
inputFiles=[os.path.basename(f) for f in inputFiles]
# get unique sample name
uniques=utils.find_unique_names(inputFiles)


#### Run in parallel 

In [None]:
## merge in parallel
mergedList=sc.parallelize(uniques,sc.getConf().get("spark.executor.instances")).map(merge_files).collect()

