## workflow
1. reads assembly with `flye`.
2. reads polish with `racon`.
3. reads polish with `medaka`.
4. contigs polish with `homopolish`.

In [1]:
import os
import shutil
import string
import random
import subprocess
from io import BytesIO
from pathlib import Path
from multiprocessing import Pool
from tempfile import TemporaryDirectory

import docker

In [2]:
import sys
sys.path.append('../src')

from utils import run_cmd

In [3]:
def filter_read(infile, outfile, reference):
    run_cmd(f"nanoq -q 7 -l 50 -f {infile} | NanoLyse -r {reference} > {outfile}", False, False)

def flye_assembly(infile, outdir, threads, is_meta=False):
    cmd = ['flye', '--nano-raw', infile, '-o', outdir, '-t', threads, '--plasmids']
    if is_meta:
        cmd.append('--meta')
    cmd = ' '.join(str(term) for term in cmd)      
    run_cmd(cmd)

def racon_polish(fastq, fasta, outfile, threads):
    prefix = ''.join(random.sample(string.digits + string.ascii_letters, 20))
    paf_file = os.path.join('/tmp/', prefix + '_minimap2.paf')
    run_cmd(f'minimap2 -x map-ont -t {threads} {fasta} {fastq} > {paf_file}')
    run_cmd(f'racon -m 8 -x -6 -g -8 -t {threads} {fastq} {paf_file} {fasta} > {outfile}')
    os.remove(paf_file)

def homopolish_polish(infile, outdir, model, mash_sketch, threads, meta=''):
    os.makedirs(outdir, exist_ok=True)
    prog = '/home/chen1i6c04/Tools/homopolish/homopolish.py'
    log_file = os.path.join(outdir, 'homopolish.log')
    if meta:
        cmd = f'conda run -n homopolish python {prog} polish '\
              f'-a {infile} -m {model} -s {mash_sketch} -o {outdir} -t {threads} --meta > {log_file} 2>&1'
    else:
        cmd = f'conda run -n homopolish python {prog} polish '\
              f'-a {infile} -m {model} -s {mash_sketch} -o {outdir} -t {threads} > {log_file} 2>&1'
    run_cmd(cmd)

def docker_execute(image, volumes, command):
    client = docker.from_env()
    client.containers.run(
        image=image,
        command=command,
        remove=True,
        stderr=True,
        tty=True,
        volumes=volumes,
        user=os.getuid(),
    )
    client.close()

def medaka_polish(basecalls, assembly, outdir, model='r941_min_high_g360', threads=8):
    basecalls_dir, basecalls_filename = os.path.split(basecalls)
    assembly_dir, assembly_filename = os.path.split(assembly)
    volumes = {
        basecalls_dir: {'bind': '/basecalls_dir', 'mode': 'rw'},
        assembly_dir: {'bind': '/assembly_dir', 'mode': 'rw'},
        outdir: {'bind': '/data', 'mode': 'rw'}}
    docker_command = f"medaka_consensus -i /basecalls_dir/{basecalls_filename} -d /assembly_dir/{assembly_filename} -o medaka -t {threads} -m {model}"
    docker_execute('medaka:v1.2.3', volumes, docker_command)

In [4]:
def workflow(basecalls, outdir, medaka_model='r941_min_high_g360', threads=8, is_meta=False, host_genome=""):
    os.makedirs(outdir, exist_ok=True)
    filtered_basecalls = os.path.join(outdir, 'filtefed_reads.fastq')
    
    # filtering sequences quality and length
    filter_read(basecalls, filtered_basecalls, host_genome)
    
    # denovo assembly
    flye_dir = os.path.join(outdir, 'flye')
    flye_output = os.path.join(flye_dir, 'assembly.fasta')
    flye_assembly(filtered_basecalls, flye_dir, threads, is_meta=is_meta)
    if os.path.exists(flye_output) is False:
        os.remove(filtered_basecalls)
        raise
    
    # first polish with racon
    racon_output = os.path.join(outdir, 'racon.fasta')
    racon_polish(filtered_basecalls, flye_output, racon_output, threads)
    
    
    # run medaka with docker container
    medaka_dir = os.path.join(outdir, 'medaka')
    medaka_polish(filtered_basecalls, racon_output, outdir)
    
    medaka_output = os.path.join(medaka_dir, 'consensus.fasta')
    homopolish_input = os.path.join(outdir, 'medaka.fasta')
    shutil.copyfile(medaka_output, homopolish_input)
    shutil.rmtree(medaka_dir)
    os.remove(racon_output + '.fai')
    os.remove(racon_output + '.mmi')
    os.remove(sub_basecalls)
    
    # correcte system error of nanopore with homopolish
    homopolish_dir = os.path.join(outdir, 'homopolish')
    homopolish_polish(
        homopolish_input,
        homopolish_dir,
        '/home/chen1i6c04/Tools/homopolish/R9.4.pkl',
        '/home/chen1i6c04/Tools/homopolish/bacteria.msh',
        threads
    )
    
    homopolish_output = os.path.join(homopolish_dir, 'medaka_homopolished.fasta')
    result_output = os.path.join(outdir, 'assembly.fasta')
    shutil.copyfile(homopolish_output, result_output)
    shutil.rmtree(homopolish_dir)

In [5]:
workflow(
    '/media/NGS/Nanopore_1/mNGS/20210507_clBC/time_filt/fastq/barcode06_2h.fastq',
    '/media/NGS/Nanopore_1/mNGS/20210507_clBC/time_filt/denovo', is_meta=True,
    threads=48, host_genome="/media/NGS/Sequence/GRCh38_genomic"
)

ContainerError: Command 'medaka_consensus -i /basecalls_dir/filtefed_reads.fastq -d /assembly_dir/racon.fasta -o medaka -t 8 -m r941_min_high_g360' in image 'medaka:v1.2.3' returned non-zero exit status 1: b''

In [None]:
dirpath = Path('/media/NGS/Nanopore_1/mNGS/20210507_clBC/QC')
outpath = Path('/media/NGS/Nanopore_1/mNGS/20210507_clBC/denovo')

In [None]:
with Pool(4) as p:
    try:
        for filepath in dirpath.iterdir():
            outdir = outpath/filepath.stem
            if not outdir.exists():
                p.apply_async(workflow, args=(filepath, outdir), kwds={'threads': 8, 'is_meta': True})
        p.close()
        p.join()
    except:
        p.terminate()

In [None]:
dirpath = Path('/media/NGS/Nanopore_1/mNGS/20210507_clBC/denovo')
outpath = Path('/media/NGS/Nanopore_1/mNGS/20210507_clBC/contigs')

In [None]:
for i in dirpath.iterdir():
    src_file = i/'assembly.fasta'
    dst_file = outpath/(i.name + '.fasta')
    if src_file.exists():
        shutil.copyfile(src_file, dst_file)