# Jupyter interface of OrthoSLC (0.2.4)

<b>OrthoSLC</b> is a pipline that perfomrs Reciprocal Best Blast Hit (RBBH) Single Linkage Clustering to obtain Orthologous Genes which assist core genome construction. <br>

**This interface** is python jupyter based to facilitate customized analysis. However, for datasets with 500 or more genomes, we still recommend using the binary files, which is mainly written in C++, for optimal performance.<br>

It is: <br>
* <b>independent</b> of relational database management systems (e.g., MySQL)
* recommend to handle less genomes.

It is recommended for sub-species level single copy core genome construction since RBBH may not work well for missions like Human-Microbe core genome construction. 

<b>Caveat:</b><br>
The pipeline is currently only tested on Ubuntu 20.04 and 18.04 and Debian strech.

<b>Requirement:</b><br>
Python3 (suggest newest stable release or higher),<br>
NCBI Blast+ (suggest 2.12 or higher) <br>

<b>Note:</b><br>
For all steps, users do not need to make the ourput directory manually, program will do that for you.

Bug report: 
* <jingjie.chencharly@gmail.com>

In [None]:
import os,sys
import random
import copy
# import numpy as np
# import pandas as pd

from multiprocessing import Lock, Process, Manager
import subprocess
import shutil

# pip install biopython
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord

from collections import defaultdict

import time
from typing import List

# def mission spliter
def mission_spliter(lst: List, 
                    num_splits: int) -> List[List]:
    n = len(lst)
    random.shuffle(lst)
    
    split_size = n // num_splits
    remainder = n % num_splits
    start = 0
    result = []
    
    for i in range(num_splits):
        end = start + split_size
        if i < remainder:
            end += 1
        if lst[start:end] == []:
                break
        result.append(lst[start:end])
        start = end
        
    return result

## Step 1 Genome information preparation
The pipeline starts with annotated genomes in fasta format. FASTA file name require strain name and extension (e.g., `strain_A.ffn`, `strain_B.fna`, `strain_C.fasta` etc.).<br> 
Step 1 needs the **path to directory of annotated FASTA files** as input, to genereate a header less, tab separated table, in which the 
* first column is a short ID, 
* second column is the strain name, 
* third column as the absolute path. 

The short ID of each genome is generated to save computational resources and storage space. Since reciprocal BLAST generates a large volume of files (millions to billions of rows if large number of genomes participated), each row contains the names of the query and subject. If the user provides input FASTA file names like:

* `GCA_900627445.1_PFR31F05_genomic.fasta`
* `GCA_021980615.1_PDT001237823.1_genomic.fasta`

and such file names become part of gene identifier instead of the short ID used in this program, large size of additional storage will be consumed for intermediate files and even more pressure on computing memory for analysis of 1000~ genomes.

### Set path
* Set variable `raw_annotated_dir_path` to the path of directory of annoated FASTA files
* Set variable `preparation_output_path` to the path of output tab separated files.<br>
Then run the codes below [Step 1 execution](###step-1-execution)

In [None]:
# set your own
# directory path of input fastas
raw_annotated_dir_path = 'test_inputs/'
# path of output tsv
preparation_output_path = 'test_op/Step1_op.txt'

In [None]:
# os.mkdir("test_op")

### Step 1 execution

In [None]:
start_c = 0

with open(preparation_output_path, 'w') as pre_op:
    for file_naam in os.listdir(raw_annotated_dir_path):
        abs_path = os.path.join(raw_annotated_dir_path,
                                file_naam
                               )
        abs_path = os.path.abspath(abs_path)
        
        strain_naam = os.path.basename(abs_path)
        strain_naam = os.path.splitext(strain_naam)[0]
        
        start_cs = str(start_c)
        if len(start_cs) == 4:
            start_cs = '0' + start_cs
        
        pre_op.write(start_cs
                     + '\t'
                     + strain_naam
                     + '\t'
                     + abs_path
                     + '\n'
                    )
        start_c += 1
        
pre_op.close()

## Step2 FASTA dereplication

Step 2 is to remove potential sequence duplication (e.g., copies of tRNA, some cds) within each genome. This dereplication is equivalent to 100% clustering, to obtain single copy.<br>
Step 2 **requires the tab separated table output by Step 1 as input**, and specifying a directory for dereplicated files.

### define functions

In [None]:
def redundancy_rm_a_file(path_n_id_tup, # abs path of an input fasta
                         rred_folder, # path to 100% clustered result
                        ):
    
    in_fasta = SeqIO.parse(path_n_id_tup[0],
                           'fasta'
                          )
    
    strain_naam = os.path.basename(path_n_id_tup[0])
    strain_naam = os.path.splitext(strain_naam)[0]
    
    short_id = path_n_id_tup[1]
    
    added_seq = set()
    to_save = []
    gene_id_c = 0
    
    for SEQ in in_fasta:
        if str(SEQ.seq) in added_seq:
            continue
        else:
            to_save.append(SeqRecord(seq = SEQ.seq,
                                     id = short_id + '-' + str(gene_id_c),
                                     description = SEQ.description
                                    )
                          )
            gene_id_c += 1
            added_seq = added_seq.union(set([str(SEQ.seq)
                                            ] # set a list
                                           ) # add to set
                                       )
    
    SeqIO.write(to_save, 
                os.path.join(os.path.abspath(rred_folder), 
                             strain_naam + '.fasta'
                            ),
                'fasta')

def redundancy_rm_files(m_lst,
                        rred_folder_
                       ):
    
    for ab_fp in m_lst:
        redundancy_rm_a_file(ab_fp, # to provide abs path
                             rred_folder_
                            )

### Set path
* Set variable `preparation_output_path` to the path of output tab separated files.
* Set variable `dereped_dir_path` to the path to directory of dereplicated files.
* Set variable `process_number` as thread number.<br>
Then run the codes below [Step 2 execution](###step-2-execution)

In [None]:
# set your own
# path of output tsv from Step1
preparation_output_path = 'test_op/Step1_op.txt'
# directory path of output
dereped_dir_path = 'test_op/S2_op_dereped'
# how many jobs to parallel
process_number = 16

### Step 2 execution

In [None]:
print(time.asctime())

if __name__ == "__main__":
    
    # mkdir or not ----
    if os.path.exists(dereped_dir_path):
        pass
    else:
        os.mkdir(dereped_dir_path)
        
    # read short id ----
    strain_short_pth_n_id = []
    
    with open(preparation_output_path, 'r') as pre_op:
        rls = pre_op.readlines()
        
        for l in rls:
            l_ = l.split('\t')
            strain_short_pth_n_id.append((l_[2][0: -1], # key remove \n
                                          l_[0]
                                         )
                                        )
        
    pre_op.close()
    
    mission_lst = mission_spliter(strain_short_pth_n_id, 
                                  process_number
                                 )
    # mp ----
    jobs = []
    
    for sub_mission_lst in mission_lst:
    
        p = Process(target = redundancy_rm_files,
                    args = (sub_mission_lst, 
                            dereped_dir_path
                           )
                   )
        p.start()
        jobs.append(p)


    for z in jobs:
        z.join()
        
print(time.asctime())

### <font color="red">Note before next step</font>
After dereplication, users should give a careful check of <b>size</b> of dereplicated fasta files. It is worth noting that if a fasta file with a very low amount of sequences, getting processed together with all the rest, the final "core" clusters will be heavily affected and may bias your analysis.
    
Since the core genome construction is similar with intersection construction. <font color="red"><b>It is recommend to remove some very small dereplicated fasta files BEFORE NEXT STEP</b>, e.g., remove all dereplicated <i>E.coli </i> genomes with file size lower than 2.5MB as most should be larger than 4.0MB.</font>

In [None]:
!grep -c ">" test_op/S2_op_dereped/* | sort -t: -k2rn

## Step 3 Pre-clustering of using all dereplicated FASTAs and non-redundant genome generation

Step 3 is the new feature since version `0.1` onward comparing with `0.1Beta`.It performs 100% clustering on all dereplicated FASTAs made in Step 2.<br>

The program of Step 3 will take the **dereplicated fasta files made in step 3 as input**, and produce:<br>
* dereplicated concatenated FASTA prepared for reciprocal blast
* length of each non-redundant sequence
* pre-clustered gene id
* each genome that is redundancy-removed
* gene id information: a tab delimited table, first column as short id generated by software, second colums as description line of original fasta 

In previous versions, program would BLAST the concatenated FASTA against each dereplicated genome sequentially, as direct all-vs-all BLAST using concatenated FASTA would be too memory intensive to run (1~ G FASTA direct all-vs-all BLAST cost roughly 120 GB memory using `-mt_mode 1` with 36 threads). <br>

As tested, BLAST the concatenated FASTA against each genome (`-mt_mode 1` and 36 threads) could also be very time consuming:
* 500 <i>Listeria monocytogenes</i> costs > 7 hours.<br>
* 1150 <i>E. coli</i> costs > 2.7 days.<br>

However, when running the program for phylogneticlly close genmoes, there would be a high duplication level in the concatenated FASTA.<br>

As tested, the size of concatenated FASTA could be significantly reduced after dereplication:
* 500 <i>Listeria monocytogenes</i> before dereplication -> ~1.33GB, after -> ~187MB, 
* 1150 <i>E. coli</i> before dereplication -> ~5.2GB, after -> ~986MB

Besides dereplication on concatenated FASTA, this version would also use this dereplicated concatenaion to re-generate each genome without overall redundancy (non-redundant genome). Which would significantly reduce task labor.<br>

As tested, the BLAST time usage after dereplication could be significantly reduced:
* 500 <i>Listeria monocytogenes</i> before dereplication -> ~7 hours, after -> ~22 mins
* 1150 <i>E. coli</i> before dereplication -> ~2.7 days, after -> less than 3.5 hours

### define functions

In [None]:
def save_nr_each(in_genome_lst, fasta_dict, op):
    
    for nr_genome in in_genome_lst:
        saver = []
        
        for id_ in nr_genome: 
            saver.append(fasta_dict[id_])
        
        SeqIO.write(saver, 
                    os.path.join(op,
                                 nr_genome[0].partition("-")[0] + '.fasta'
                                ), 
                    'fasta'
                   )

### Set path
* Set variable `dir_dereped_fasta_path` as path to directory dereplicated FASTAs file made in Step 2.
* Set variable `cated_dereped_fasta_path` as path to dereplicated concatenated FASTA file.
* Set variable `nr_genomes_pth` as path to directory of each non-redundant genome file.
* Set variable `len_infor_path` as path to file of sequence length information.
* Set variable `pre_cluster_path` as path to file pre-clustered gene id.
* Set variable `id_info_path` as path information gene id.
* Set variable `process_number` as thread number.<br>

In [None]:
# path to concatenated fasta made in Step 3
dir_dereped_fasta_path = 'test_op/S2_op_dereped/'
# path to the dereplicated concatenated fasta
cated_dereped_fasta_path = 'test_op/S3_op_dereped_cated.fasta'
# path to the dereplicated of non-redundant genomes
nr_genomes_pth = 'test_op/S3_op_nr_genomes/'
# path to save sequence lenght of each sequence
len_infor_path = 'test_op/S3_op_seq_len.txt'
# path to save pre-cluster
pre_cluster_path = 'test_op/S3_op_pre_cluster.txt'
# path to save gene id info
id_info_path = 'test_op/S3_op_id_info.txt'
# how many jobs to parallel
process_number = 16

In [None]:
print(time.asctime())

length_info = open(len_infor_path, 'w')

pre_cluster = defaultdict(list)
each_d = defaultdict(list)

dereped_cated_dict = {}

dereped_cated = []

id_info = open(id_info_path, 'w')
# derep of all ----
for f_naam in os.listdir(dir_dereped_fasta_path):
    a_fasta = SeqIO.parse(os.path.join(dir_dereped_fasta_path, 
                                       f_naam),
                          'fasta')
    for rec in a_fasta:
        
        id_info.write(rec.id + '\t' + rec.description.partition(" ")[2] + '\n')
        SEQ = str(rec.seq)

        if SEQ in pre_cluster.keys():# if already presence
            pre_cluster[SEQ].append(rec.id) # must after if

        else:
            length_info.write(rec.id 
                              + '\t' 
                              +  str(len(SEQ)) 
                              + '\n')
            pre_cluster[SEQ].append(rec.id)

            dereped_cated.append(rec)
            dereped_cated_dict[rec.id] = rec
            
            each_d[rec.id.partition("-")[0]].append(rec.id)
        
length_info.close()
id_info.close()

# save pre cluster ----
pre_cluster_file = open(pre_cluster_path, 'w')

for _100_clus in pre_cluster.values():
    LEN = len(_100_clus)
    LEN_1 = LEN - 1
    for ele in range(LEN):
        if ele == LEN_1:
            pre_cluster_file.write(_100_clus[ele] + '\n')
        else:
            pre_cluster_file.write(_100_clus[ele] + '\t')

pre_cluster_file.close()

# save dereped_cated ----
SeqIO.write(dereped_cated, 
            cated_dereped_fasta_path,
            'fasta')

# nr genomes ----
if os.path.exists(nr_genomes_pth):
    pass
else:
    os.mkdir(nr_genomes_pth)
    
if __name__ == "__main__":
                                       
        
    mission_lst = list(each_d.values())
    
    mission_lst = mission_spliter(mission_lst, 
                                  process_number
                                 )
    
    jobs = []
    
    for sub_mission_lst in mission_lst:
        
        p = Process(target = save_nr_each,
                    args = (sub_mission_lst,
                            dereped_cated_dict,
                            nr_genomes_pth
                           )
                   )
            
        p.start()
        jobs.append(p)


    for z in jobs:
        z.join()
print(time.asctime())

## Step 4 Reciprocal Blast

Step 4 will carry out the Reciprocal Blast using NCBI Blast. You can get it from [NCBI official](https://ftp.ncbi.nlm.nih.gov/blast/executables/blast+/LATEST/)
 
The pipeline will assist you to:<br>
1. Create databases for each of all dereplicated genomes using `makeblastdb`.
2. Using `blastn` or `blastp` to align the `dereplicated concatenated FASTA` against each of the database just made and get tabular output.

### makeblastdb
To create database to BLAST, users should provide **path to directory where all non-redundant FASTA made in Step 3 is**, and a **path to output directory where BLAST database is to store**.<br>

#### define functions

In [None]:
def make_a_db(in_abs_pth, 
              db_dir_abs_pth, 
              dbtype,
              makeblastdb_bin):
#     if db folder exist
    strain_naam = os.path.basename(in_abs_pth)
    strain_naam = os.path.splitext(strain_naam)[0]
    
    save_dest = os.path.join(db_dir_abs_pth, strain_naam)
    
    if os.path.exists(save_dest):
        pass
    else:
        os.mkdir(save_dest)
        
    save_naam = os.path.join(save_dest, strain_naam)
        
    subprocess.run([makeblastdb_bin, 
                    "-dbtype", dbtype, 
                    "-in", in_abs_pth,
                    "-out", save_naam,
                    "-parse_seqids"
                   ],
                   stdout = subprocess.DEVNULL
                  )
    
def make_dbs(in_abs_pth_lst,
             db_dir_abs_pth_,
             dbtype_,
             makeblastdb_bin_):
    for s in in_abs_pth_lst:
        make_a_db(s, 
                  db_dir_abs_pth_, 
                  dbtype_,
                  makeblastdb_bin_)

#### Set path
* Set variable `nr_genomes_pth` as path to directory of each non-redundant genome file.
* Set variable `blastdb_dir_path` as path to directory to store generated databases.
* Set variable `process_number` thread number.
* Set variable `dbt` as the tpye of database to make.
* Set variable `mbdb` to the path to makeblastdb command.

Then run the codes below [makeblastdb](###makeblastdb)

In case you have installed your blast but not exported to `$PATH`, you can simply input `whereis blastn` or `whereis makeblastdb` to get the full path to your blast binary file.<br>

In [None]:
! whereis makeblastdb

In [None]:
# set your own
# directory path of output
nr_genomes_pth = 'test_op/S3_op_nr_genomes/'
# directory path of blastdb
blastdb_dir_path = 'test_op/S4_op_dbs'
# how many jobs to parallel
process_number = 16
# set dbtype: nucl or prot
dbt = 'nucl'
# set path to makblastdb bin file
mbdb = 'makeblastdb'

#### makeblastdb

In [None]:
print(time.asctime())

if __name__ == "__main__":
    
    # mkdir or not
    if os.path.exists(blastdb_dir_path):
        pass
    else:
        os.mkdir(blastdb_dir_path)
    
    mission_lst = os.listdir(nr_genomes_pth)
    
    mission_lst = [os.path.join(nr_genomes_pth, x) for x in mission_lst]
    
    mission_lst = mission_spliter(mission_lst, 
                                  process_number
                                 )
    
    # mp
    jobs = []
    
    for sub_mission_lst in mission_lst:
    
        p = Process(target = make_dbs,
                    args = (sub_mission_lst, 
                            blastdb_dir_path, 
                            dbt,
                            mbdb
                           )
                   )
        p.start()
        jobs.append(p)


    for z in jobs:
        z.join()
        
print(time.asctime())

### Reciprocal Blast

To perform reciprocal BLAST, users should provide **path to dereplicated concatenated FASTA producd by step 3**, **path to directory where databases made by `makeblastdb`**, and a **path to output directory where BLAST tabular output** is to store.<br>

The reason to BLAST against each database sequentially rather than directly using an all-vs-all approach is to reduce computational overhead. This can be very useful if the task involves many genomes. For example, if you have 1000 dereplicated genomes to analyze, the total size of concatenated FASTA may reach 5-10 GB. A multi-threaded BLAST job using the `-mt_mode 1` by all-vs-all style could be too memory-intensive to run for such a large dataset.

In addition, sequentially running BLAST will produce one tabular output per database. This will be a better adaptation for the job parallelization of finding reciprocal best hits in later steps, which will apply the hash binning method.

Since version `0.1` onward, program allow users to choose recoprocal BLAST executed under memory efficient mode or not. Under memory efficient mode, the real-time memory usage will be much lower but **much more** time consuming.

Since version `0.2.1` onward, users are allowed to set customized blast output format for other analysis requirement. If a follow up OthoSLC analysis is needed, leave the parameter as unspecified as default - `"6 qseqid sseqid score"` so that follow up command could parse the blast output.

#### define function

In [None]:
def RBB(cmd_lst_,
        db_pth, # alread abs
        blast_res_dir, # already abs
       ):
    strain_naam = db_pth.split('/')[-1]
    
    db_name = os.path.join(db_pth, strain_naam)
    save_dest = os.path.join(blast_res_dir, strain_naam + '.tab')
    
    cmd_lst_l = copy.deepcopy(cmd_lst_)

    cmd_lst_l.extend([
        "-db", db_name,
        "-out", save_dest
    ])
    
    subprocess.run(cmd_lst_l)
    
def RBB_high(cmd_lst_,
             db_pth_lst, # alread abs
             blast_res_dir_, # already abs
             pro_id_, 
             s_t_d,
             st_lock_,
             available_threads):
    for db in db_pth_lst:
        st_lock_.acquire()
        if sum(s_t_d.values()) >= available_threads:
            useable_th = s_t_d[pro_id_]
#             print(str(pro_id_) + ' uses ' + str(useable_th))
        else:
            s_t_d[pro_id_] = s_t_d[pro_id_] + (available_threads - sum(s_t_d.values()
                                                                       )
                                                )
            useable_th = s_t_d[pro_id_]
#             print(str(pro_id_) + ' uses ' + str(useable_th))
        st_lock_.release()
    
        cmd_lst_l = copy.deepcopy(cmd_lst_)
    
        cmd_lst_l.extend(["-num_threads", str(useable_th)])
        RBB(
            cmd_lst_l,
            db, # alread abs
            blast_res_dir_, # already abs
        )

    st_lock_.acquire()
    s_t_d[pro_id_] = 0
#     print(str(pro_id_) + ' finished')
    st_lock_.release()

#### Set path
* Set variable `cated_dereped_fasta_path` as path to dereplicated concatenated FASTA file.
* Set variable `blastdb_dir_path` as path to directory to store generated databases.
* Set variable `process_number` thread number.
* Set variable `E_value` as e value parameter during BLAST.
* Set variable `blast_bin_` to the path to BLAST command.
* Set variable `blast_op_dir` to the path of directory to save BLAST output.
* Set variable `mem_eff` as Ture or False to run under memory efficient mode or not.
* Set variable `outfmt` to specify output format if other format needed.

Then run the codes below [BLAST](###BLAST)

In [None]:
! whereis blastn

In [None]:
# set your own
# path to concatenated fasta
cated_dereped_fasta_path = 'test_op/S3_op_dereped_cated.fasta'
# directory path of blastdb
blastdb_dir_path = 'test_op/S4_op_dbs/'
# how many jobs to parallel
process_number = 16
# set e_value
E_value = 1e-5
# set path to blast bin file
blast_bin_ = 'blastn'
# set blast output directory
blast_op_dir = 'test_op/S4_op_blast_op'
# memory efficient mode 
mem_eff_mode = False
# blast output format
outfmt = str("6 qseqid sseqid score")
# blastp task, select from <'blastp' 'blastp-fast' 'blastp-short'>, unspecified means `'blastp'` as default
blastp_task = ''
# blastp task <blastn' 'blastn-short' 'dc-megablast' 'megablast' 'rmblastn' > , unspecified means `'megablast'` as default
blastn_task = ''

#### BLAST

In [None]:
print(time.asctime())
# mkdir or not
if os.path.exists(blast_op_dir):
    pass
else:
    os.mkdir(blast_op_dir)
    
    
mission_lst = os.listdir(blastdb_dir_path)
mission_lst = [os.path.join(blastdb_dir_path, x) for x in mission_lst]

cmd_lst = [blast_bin_, 
            "-query", cated_dereped_fasta_path,
            # "-db", db_name,
            # "-out", save_dest,
            "-evalue", str(E_value),
            "-max_hsps", "1",
            # "-dust", "no",
            "-outfmt", outfmt,
            "-mt_mode", "1"
            # "-num_threads", str(t_num)
            ]

if 'blastn' in blast_bin_:
    cmd_lst.extend(["-dust", "no"])
    if blastn_task == '':
        pass
    else:
        cmd_lst.extend(["-task", blastn_task])
elif 'blastp' in blast_bin_ and blastp_task == "":
    if blastp_task == '':
        pass
    else:
        cmd_lst.extend(["-task", blastp_task])

if mem_eff_mode:
    blast_low = RBB

    cmd_lst.extend(["-num_threads", str(process_number)])

    for db in mission_lst:
        
        blast_low(cmd_lst,
                  db, 
                  blast_op_dir
                  )
        
else:
    mission_lst = mission_spliter(mission_lst, 
                                  process_number
                                 )
    
    blast_high = RBB_high

    if __name__ == "__main__":
        with Manager() as manager:
            smart_threads_d = manager.dict()
            
            st_lock = Lock()
            
            for x in range(len(mission_lst)):
                smart_threads_d[x] = 1
        
            # mp
            jobs = []
    
            pro_id = 0
            for db_lst in mission_lst:

                p = Process(target = blast_high,
                            args = (cmd_lst,
                                    db_lst,
                                    blast_op_dir,
                                    pro_id,
                                    smart_threads_d, 
                                    st_lock,
                                    process_number,
                                   )
                           )
                p.start()
                jobs.append(p)
                
                pro_id += 1


            for z in jobs:
                z.join()

## Step 5 query binning

This is the new feature in since version `0.1`. This step is to apply hash binning to bin all presence of a query into same file to facilitate next step filtering.

<font color="red"><b>Set bin level:</b></font><br>
According to the amount of genomes to analysze, user should provide binning level, which is to set how many bins should be used. Level $L$ should be interger within range $0 < L \le 9999$, and will generate $L$ bins. 

Suggestion is that do not set the bin level too high, especially when less than 200 genomes participated. If such amount of genomes participated analysis, bin level from 10 to 100 should work as most efficient way. 

As tested, an analysis of 30 genomes, has 30 BLAST output after step 4. 
* A bin level of 10, takes 7 seconds to finish, 
* a bin level of 100, takes 10 seconds to finish,
* a bin level of 1000, takes 24 seconds to finish,

<font color="red"><b>When to set a high bin level:</b></font><br>
Simply speaking, when you have really larger amount of genomes and not enough memory (e.g., more than 1000 genomes and less than 100 GB memory) <br>

For example, if the output of BLAST for 1000 genomes reach 150 GB in size, and if the bin level is set to 10, there will be 10 bins to evenly distribute the data. On average, each bin will contain 1.5 GB of data, which may be too memory-intensive to process in step 5 (where requires approximately 1.5 GB of memory per bin). However, if the number of bins is increased to 1000, the size of each bin will be reduced to between 100-200 MB, it will then facilitate step 6 parallelization.

<font color="red"><b>No lock mode:</b></font><br>
we provide **no lock mode** in all steps that apply hash binning to speed up the process. We allow users to turn off mutex lock which is to safely write into files when multi-threading. In ours tests, program can generate files without data corruption when multi-threading with no lock (data corruption were rarely observed, the possiblity of data corruption may vary between computation platform).

### define functions

In [None]:
def q_binning(in_f_lst, bin_level, bin_op_dir, no_lock_mode, LOCKS):
    for in_f in in_f_lst:
        f = open(in_f, 'r')

        save_dict = defaultdict(list)

        while True:
            l_ = f.readline()
            if l_ == '':
                break

            query = l_.split('\t')[0]

            bin_ = hash(query) % bin_level

            save_dict[bin_].append(l_)
        f.close()
        
        # saving
        for b in save_dict.keys():
            if not no_lock_mode:
                LOCKS[int(b)].acquire()

            f = open(os.path.join(bin_op_dir, str(b) + '.txt'), 'a')

            f.writelines(save_dict[b])

            f.close()
            if not no_lock_mode:
                LOCKS[int(b)].release()

### Set path
* Set variable `blast_op_dir` to the path of directory to save BLAST output.
* Set variable `process_number` thread number.
* Set variable `b_level` as how many bins to generate.
* Set len_infor_path `query_bin_op_dir_` to the path of output directory.

Then run the codes below [Step 5 execution](###Step-6-exeution)

In [None]:
# set your own
# set blast output directory
blast_op_dir = 'test_op/S4_op_blast_op/'
# how many jobs to parallel
process_number = 16
# set bin level
b_level = 64
# set blast output directory
query_bin_op_dir_ = 'test_op/S5_op'
# set whether turn off lock
no_lock_mode = True

### Step 5 execution

In [None]:
if b_level < 0 or b_level > 9999 or not isinstance(b_level, int):
    print('b_level must be integer within range 0 < L <= 9999')
    sys.exit()
    
# mkdir or not

if os.path.exists(query_bin_op_dir_):
    pass
else:
    os.mkdir(query_bin_op_dir_)
    
print(time.asctime())

if __name__ == "__main__":
    
    mission_lst = os.listdir(blast_op_dir)
    
    mission_lst = [os.path.join(blast_op_dir, x) for x in mission_lst]
    
    mission_lst = mission_spliter(mission_lst, 
                                  process_number
                                 )
    
    # mp
    locks = [Lock() for i in range(b_level)]
    
    jobs = []
    
    for sub_mission_lst in mission_lst:
    
        p = Process(target = q_binning,
                    args = (sub_mission_lst, 
                            b_level, 
                            query_bin_op_dir_,
                            no_lock_mode,
                            locks
                           )
                   )
        p.start()
        jobs.append(p)


    for z in jobs:
        z.join()
        
print(time.asctime())

## Step 6 Filtering and binning

This step is to filter the blast output and to apply hash binning, in order to provide best preparation for reciprocal best find.<br>

Step 6 requires **path to directory of query binning output (Step 5)**, **sequence length information, pre-cluster information output by Step 3** as input.

The pipeline will carry out following treatment to BLAST output:
1. Paralog removal: <br>
If query and subject is from same strain, the hit will be skipped, as to remove paralog.
2. Length ratio filtering:<br>
Within a hit, query length $Q$ and subject length $S$, the ratio $v$ of this 2 length

$$v = \frac{Q}{S}$$

should be within a range, according to [L. Salichos et al](https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0018755), $r (0 < r \le 1)$ is recommended to be higher than 0.3 which means the shorter sequence should not be shorter than 30% of the longer sequence:

$$r \le v \le \frac{1}{r}$$<br>

If above condition not met, the hit will be removed from analysis.

3. Non-best-hit removal: <br>
* Identical sequences are always regarded as best hit.
* If a query has more than 1 subject hits, only the query-subject pair with highest score will then be kept.
* if pairs are of same score, the pair whose query and subject are of more similar length will be kept.
4. Sorting and binning:<br>
For every kept hit, its query and subject will be sorted using Python or C++ built in sort algorithm. This is because in a sequential blast output file, only "<b>single direction best hit</b>" can be obtained, its "<b>reciprocal best hit</b>" only exist in other files, which poses difficulty doing "<b>repriprocal finding</b>". <br>
However, if a query $a$ and its best suject hit $b$, passed filter above, and form $(a, b)$, and in the mean time we sort its rericprocal hit $(b, a)$ from another file into $(a, b)$, then both $(a, b)$ will generate same hash value. This hashed value with last several digits will allow us to bin them into same new file. Therefore, after this binning, "<b>reciprocal finding</b>" will be turned into "<b>duplication finding</b>" within one same file.<br>

<font color="red"><b>Set bin level:</b></font><br>
According to the amount of genomes to analysze, user should provide binning level, which is to set how many bins should be used. Level $L$ should be interger within range $0 < L \le 9999$, and will generate $L$ bins. 

Suggestion is that do not set the bin level too high, especially when less than 200 genomes participated. If such amount of genomes participated analysis, bin level from 10 to 100 should work as most efficient way. 

<font color="red"><b>When to set a high bin level:</b></font><br>
Simply speaking, when you have really larger amount of genomes and not enough memory (e.g., more than 1000 genomes and less than 100 GB memory) <br>

for example, if the output of BLAST for 1000 genomes reach 200 GB in size, and if the bin level is set to 100, there will be 100 bins to evenly distribute the data. On average, each bin will contain 1.7 GB of data, which may be too memory-intensive to process in step 7, where reciprocal find is performed (which requires approximately 1.7 GB of memory per bin). However, if the number of bins is increased to 1000, the size of each bin will be reduced to between 100-200 MB, which then facilitate step 7 parallelization.

<font color="red"><b>Note:</b></font><br>
This is the one of the most computation and I/O intensive step, use the C++ based binary file to process for better efficiency.

In [None]:
def fileter_n_bin(blast_op_pt_lst, # to avoid redundant copy of  seqlen info, abs pth
                  r, 
                  bin_level, 
                  bin_op_dir, # already abs
                  seq_len_info,
                  bin_spe, # pre_cluster
                  no_lock_mode,
                  LOCKS):
    for fs in blast_op_pt_lst:
        
        f = open(fs, 'r')
            
        q_s_d = defaultdict(dict)
        
        sspe_m = defaultdict(lambda: defaultdict(list
                                                )
                            )
        
        while True:
            l_ = f.readline()
            if l_ == '':
                break
                
#             l_ = l_.rstrip('\n') '\n' do not affect number  
            l_ = l_.split('\t')
            
            query = l_[0]
            subject = l_[1]
            
            if query.partition("-")[0] == subject.partition("-")[0]: # if self hit
                continue
            else:
                score = int(l_[2])
                
                q_s_d[query][subject] = score
#                 {q1: {s1: score,
#                       s2: score
#                      },
#                  q2
#                 }
                
                # add all spe
                for a_spe_of_clus in bin_spe[subject]:
                    sspe_m[query][a_spe_of_clus].append(subject)
#                     {
#                         q1: {
#                             a1_spe_in_cluster_of_sub: [subject1 id,
#                                                     subject2 id
#                                                    ],
#                             a2_spe_in_cluster_of_sub: [subject3 id,
#                                                     subject4 id
#                                                    ],
#                         },
#                         q2
#                     }
        f.close()
        # how to save
        saver = defaultdict(list)
        
        for qs in q_s_d.keys():
            q_len = seq_len_info[qs]
            
            for sub in q_s_d[qs].keys():
                s_spe = sub.partition("-")[0]
                
                if s_spe in bin_spe[qs]:# if qs pre cluster has that spe
                    continue
                    
                else:
                    best_hit = True
                    
                    same_score = []
                    
                    if len(sspe_m[qs][s_spe]) > 1: #if multiple hit in a spe
                        for id_ in sspe_m[qs][s_spe]:
                            if sub == id_: # skip self
                                continue
                                
                            if q_s_d[qs][sub] < q_s_d[qs][id_]: # if there is a better hit
                                best_hit = False
                                break
                            elif q_s_d[qs][sub] == q_s_d[qs][id_]: # same score 
                                same_score.append(id_)
                    
                    if len(same_score) > 0 and best_hit: # need more similar length # print(qs + "<>" + sub)
                        
                        qs_len_diff = abs(q_len - seq_len_info[sub])
                        
                        for id_ in same_score:
                            q_id_len_diff = abs(q_len - seq_len_info[id_])
                            
                            if q_id_len_diff < qs_len_diff:
#                                 print(qs + "< >" + sub + "< >" + id_)
                                best_hit = False
                                break
                                
                    if best_hit: 
                        
                        len_ratio = q_len/seq_len_info[sub]
                        
                        c1 = len_ratio <= r
                        c2 = len_ratio >= (1/r)
                        
                        if c1 or c2: # len ratio filter
#                             print(qs + "< >" + sub)
                            continue
                        else:
                            sorting = sorted((qs, sub)) 
                            
                            bin_ = hash(tuple(sorting
                                             )
                                       ) % bin_level
                            
                            saver[str(bin_)].append(sorting[0] + '\t' + sorting[1] + '\n')
                            
        for b in saver.keys():
            if not no_lock_mode:
                LOCKS[int(b)].acquire()

            f = open(os.path.join(bin_op_dir, b + '.txt'), 'a')

            f.writelines(saver[b])

            f.close()
            
            if not no_lock_mode:
                LOCKS[int(b)].release()

### Set path
* Set variable `process_number` thread number.
* Set variable `b_level` as how many bins to generate.
* Set variable `r_` as the limit of length difference ratio.
* Set variable `blast_op_dir` to the path of directory to save BLAST output.
* Set variable `len_infor_path` as path to file of sequence length information.
* Set variable `pre_cluster_path` as path to file pre-clustered gene id.

Then run the codes below [Step 6 execution](###Step-6-execution)

In [None]:
# set your own
# set blast output directory
qbin_op_dir = 'test_op/S5_op/'
# how many jobs to parallel
process_number = 16
# set bin level
b_level = 48
# set length ration limit r value 
r_ = 0.3
# set blast output directory
bin_op_dir_ = 'test_op/S6_op'
# path save sequence lenght of each sequence
len_infor_path = 'test_op/S3_op_seq_len.txt'
# path to save pre-cluster
pre_cluster_path = 'test_op/S3_op_pre_cluster.txt'
# set whether turn off lock
no_lock_mode = True

### Step 6 execution

In [None]:
if b_level < 0 or b_level > 9999 or not isinstance(b_level, int):
    print('b_level must be integer within range 0 < L <= 9999')
    sys.exit()

# mkdir or not

if os.path.exists(bin_op_dir_):
    pass
else:
    os.mkdir(bin_op_dir_)

# seq len
seq_len_d = {}
seq_len = open(len_infor_path, 'r')
seq_len_rls = seq_len.readlines()
seq_len.close()

for ros in seq_len_rls:
    l_ = ros.rstrip('\n').split('\t')
    seq_len_d[l_[0]] = float(l_[1])


# bin spe
bin_spe = {}

f = open(pre_cluster_path, 'r')

while True:
    l_ = f.readline()
    if l_ == '':
        break
    
    
    l_ = l_.rstrip('\n').split('\t')
    
    bin_ = l_[0]
    
    bin_spe[bin_] = [x.partition("-")[0] for x in l_]
    

f.close()

print(time.asctime())

if __name__ == "__main__":
    
    mission_lst = os.listdir(qbin_op_dir)
    
    mission_lst = [os.path.join(qbin_op_dir, x) for x in mission_lst]
    
    mission_lst = mission_spliter(mission_lst, 
                                  process_number
                                 )
    
    # mp
    locks = [Lock() for i in range(b_level)]
    
    jobs = []
    
    for sub_mission_lst in mission_lst:
    
        p = Process(target = fileter_n_bin,
                    args = (sub_mission_lst, 
                            r_, 
                            b_level,
                            bin_op_dir_,
                            seq_len_d,
                            bin_spe,
                            no_lock_mode,
                            locks
                           )
                   )
        p.start()
        jobs.append(p)


    for z in jobs:
        z.join()
        
print(time.asctime())

## Step 7 Reciprocal Best find

This Step is to find reciprocal best hits. In `Step 6`, query-subject pairs had been binned into different files according to their hash value, therefore, pair $(a, b)$ and its reciprocal pair $(b, a)$ (which was sorted into $(a, b)$), will be in the same bin. Thus, a pair found twice in a bin will be reported as a reciprocal best blast pair.

In addition, Step 7 also does hash binning after a reciprocal best hit is comfirmed. Query-subject pairs will be binned by the hash value of query ID, which then put pairs with common elements into same bin to assist faster clustering in next step.

Step 7 requires **path to directory of bins output by Step 6**, and path to output directory.


<font color="red"><b>Set bin level:</b></font><br>
According to the amount of genomes to analysze, user should provide binning level, which is to set how many bins should be used. Level $0 < L \le 9999$, and will generate $L$ bins. 

Suggestion is that do not set the bin level too high, especially when less than 200 genomes participated. If such amount of genomes participated analysis, bin level from 10 to 100 should work as most efficient way. 

As tested, 30 genomes, if 10 bins generated by Step 7: 
* A bin level of 10, takes 1 seconds to finish, 
* a bin level of 100, takes 2 seconds to finish,
* a bin level of 1000, takes 7 seconds to finish,

<font color="red"><b>When to set a high bin level:</b></font><br>
Simply speaking, when you have really larger amount of genomes and not enough memory (e.g., more than 1000 genomes and less than 100 GB memory) <br>

Less bins could make step 7 faster, but step 8 more memory intensive.

<font color="red"><b>Note:</b></font><br>
This is the one of the most computation and I/O intensive step, use the C++ based binary file to process for better efficiency.

### define functions

In [None]:
def dup_find_mem(in_file_lst, RBB_op_dir, bin_level, no_lock_mode, LOCKS):
    for in_file in in_file_lst:
        naam = os.path.basename(in_file)

        f = open(in_file, 'r')

        rec = set()
        d_bin = defaultdict(list)

        while True:
            l_ = f.readline()
            if l_ == '':
                break

            if l_ in rec:
                hash_res = str(hash(l_.split('\t')[0]
                                   ) % bin_level
                              )
                d_bin[hash_res].append(l_)
                rec.remove(l_)
            else:
                rec |= set([l_])

        f.close()

        for b in d_bin.keys():
            if not no_lock_mode:
                LOCKS[int(b)].acquire()

            f2 = open(os.path.join(RBB_op_dir, b + '.txt'), 'a')

            f2.writelines(d_bin[b])

            f2.close()
            if not no_lock_mode:
                LOCKS[int(b)].release()

#### Set path
* Set variable `bin_op_dir_` as path to directory to bins generated by Step 6.
* Set variable `RBB_op_dir_pth` as path to output directory.
* Set variable `process_number` thread number.
* Set variable `b_level` as how many bins to generate

Then run the codes below [Step 7 execution](###Step-7-execution)

In [None]:
# set blast output directory
bin_op_dir_ = 'test_op/S6_op/'
# set reciprocal best output directory
RBB_op_dir_pth = 'test_op/S7_op'
# how many jobs to parallel
process_number = 16
# set bin level
b_level = 36
# set whether turn off lock
no_lock_mode = True

### Step 7 execution 

In [None]:
if b_level < 0 or b_level > 9999 or not isinstance(b_level, int):
    print('b_level must be integer within range 0 < L <= 9999')
    sys.exit()

# mkdir or not
if os.path.exists(RBB_op_dir_pth):
    pass
else:
    os.mkdir(RBB_op_dir_pth)

print(time.asctime())

if __name__ == "__main__":
    
    mission_lst = os.listdir(bin_op_dir_)
    
    mission_lst = [os.path.join(bin_op_dir_, x) for x in mission_lst]
    
    mission_lst = mission_spliter(mission_lst, 
                                  process_number
                                 )
    
    # mp
    locks = [Lock() for i in range(b_level)
            ]
             
    jobs = []
    
    for sub_mission_lst in mission_lst:
        
        p = Process(target = dup_find_mem,
                    args = (sub_mission_lst,
                            RBB_op_dir_pth,
                            b_level,
                            no_lock_mode,
                            locks
                           )
                   )
            
        p.start()
        jobs.append(p)


    for z in jobs:
        z.join()
        
print(time.asctime())

## Step 8 Single Linkage Clustering

This step will carry out single linkage clustering on output from step 7. Users may perform "<b>multi-step-to-final</b>" or "<b>one-step-to-final</b>" clustering by adjusting the `compression_size` parameter. In the output files, each row is a cluster (stopped by "\n") and each gene ID is separated by "\t".

In case that large amount genomes participated analysis, it could be memory intensive to reach final cluster in a single step. The pipeline provide ability to extenuate such pressure by reaching final cluster with multiple steps. For example, if `compression_size` = 5 is provided, program will perform clustering using 5 files at a time and shrink the output file number by a factor of 5.

<font color="red"><b>Note Before Start</b></font><br>
User <font color="red"><b>must</b></font> specify the path to `pre-cluster file` produced in `Step 3`, when running the <font color="red"><b>LAST step of multi step to final</b></font>, or when running direct <font color="red"><b>one step to final</b></font>.

### define functions

In [None]:
def connect_bin(bin_ind_lst, ind):
    while bin_ind_lst[ind] != ind:
        ind = bin_ind_lst[ind]
    return ind

def ptr_like_merger(in_p_lst, op_dir):
    
    ptr_coll = dict()
    bin_inds_connector = []
    
    to_save_lst = []
    
    const_ind_every = 0
    
    for in_p in in_p_lst[1]:
        f = open(in_p, 'r')
    
        while True:
            l_ = f.readline()

            if l_ == '':
                break

            l_ = set(l_.rstrip('\n').split('\t'))

            ind_to_go = const_ind_every
            to_save_lst.append(l_)
            bin_inds_connector.append(ind_to_go)

            for ele in l_:
                if ele not in ptr_coll.keys():
                    ptr_coll[ele] = ind_to_go

                else:
                    its_lowest_bin_ind = connect_bin(bin_inds_connector, 
                                                     ptr_coll[ele]
                                                    )

                    if its_lowest_bin_ind == ind_to_go:
                        continue

                    if its_lowest_bin_ind > ind_to_go:
                        its_lowest_bin_ind, ind_to_go = ind_to_go, its_lowest_bin_ind

                    to_save_lst[its_lowest_bin_ind] |= to_save_lst[ind_to_go]
                    to_save_lst[ind_to_go] = None

                    bin_inds_connector[ind_to_go] = its_lowest_bin_ind
                    ind_to_go = its_lowest_bin_ind


            const_ind_every += 1

        f.close()
        
    f2 = open(os.path.join(op_dir, str(in_p_lst[0]) + '.txt'), 'w')
    for i in to_save_lst:
        if i:
            i = tuple(i)
            Len = len(i)
            for j in list(range(Len)):
                if j == Len - 1: 
                    f2.write(i[j] + '\n')
                else:
                    f2.write(i[j] + '\t')
        else:
            pass
    f2.close()
                
def files_clustering_s(in_p_lst_lst, op_dir_):
    
    for inp_lst in in_p_lst_lst:
        ptr_like_merger(inp_lst, op_dir_)
        
def SLC(ip, op, proc_num, c_s, pre_clus_pth = ''):
    if os.path.exists(op):
        pass
    else:
        os.mkdir(op)

    print(time.asctime())

    if __name__ == "__main__":

        mission_lst = os.listdir(ip)

        if c_s > len(mission_lst):
            print('Error: Compression size can only be smaller than ' + str(len(mission_lst)))
        else:

            mission_lst = [os.path.join(ip, x) for x in mission_lst]
            
            if pre_clus_pth != '':
                mission_lst.append(pre_clus_pth)
            
            per = len(mission_lst)//c_s

            mission_lst = mission_spliter(mission_lst, 
                                          per
                                         )

            x = 0
            mission_lst_ = []
            for i in mission_lst:
                mission_lst_.append([x, i])
                x += 1

            if len(mission_lst_) < proc_num:
                proc_num = len(mission_lst_)


            mission_lst_ = mission_spliter(mission_lst_, 
                                           proc_num
                                          )
            # mp
            jobs = []
            

            for sub_mission_lst in mission_lst_:

                p = Process(target = files_clustering_s,
                            args = (sub_mission_lst,
                                    op
                                   )
                           )
                p.start()
                jobs.append(p)


            for z in jobs:
                z.join()

        print(time.asctime())

### Example to reach final cluster by multi step, starting with 1000 input files.

#### Set path
* Set variable `RBB_op_dir_pth` as path to output directory.
* Set variable `SLC_inter_op_dir_pth` as path to output directory.
* Set variable `process_number` thread number.
* Set variable `compression_size` as how many files to cluster into one file.
* Set variable `pre_cluster_path` as path to file pre-clustered gene id.

In [None]:
# set reciprocal best output directory
RBB_op_dir_pth = 'test_op/S7_op/'
# set path to directory of clustering output
SLC_inter_op_dir_pth = 'test_op/SLC_1/'
# how many jobs to parallel
process_number = 16
# Compression size
compression_size = 1

Then run the codes below.

In [None]:
SLC(RBB_op_dir_pth, SLC_inter_op_dir_pth, process_number, compression_size)

#### Set path
* Set variable `ip_dir_pth` as path to output directory of last round clustering.
* Set variable `SLC_inter_op_dir_pth` as path to output directory.
* Set variable `process_number` thread number.
* Set variable `compression_size` as how many files to cluster into one file.

In [None]:
# set path to last step clustering output directory
ip_dir_pth = 'test_op/SLC_1/'
# set path to directory of clustering output
SLC_inter_op_dir_pth = 'test_op/SLC_2/'
# how many jobs to parallel
process_number = 16
# Compression size
compression_size = 2

Then run the codes below.

In [None]:
SLC(ip_dir_pth, SLC_inter_op_dir_pth, process_number, compression_size)

#### Set path
* Set variable `ip_dir_pth` as path to output directory of last round clustering.
* Set variable `SLC_inter_op_dir_pth` as path to output directory.
* Set variable `process_number` thread number.
* Set variable `compression_size` as how many files to cluster into one file.
* Set variable `pre_cluster_path` path to pre-clustered id file made in **step 3**.

In [None]:
# set path to last step clustering output directory
ip_dir_pth = 'test_op/SLC_2/'
# set path to directory of each clustering output
SLC_inter_op_dir_pth = 'test_op/SLC_final/'
# how many jobs to parallel
process_number = 16
# Compression size
compression_size = len(os.listdir(ip_dir_pth))
# path to pre-cluster
pre_cluster_path = 'test_op/S3_op_pre_cluster.txt'

Then run the codes below.

In [None]:
SLC(ip_dir_pth, SLC_inter_op_dir_pth, process_number, compression_size, pre_cluster_path)

## Step 9 Write clusters into FASTA
In Step 9, program will help user to generate FASTA file for each cluster. By providing the final one cluster file generated by Step 8 as input, program produces 3 types of clusters into 3 directories separately.<br>

Noteably, those genomes 
* depreplicated in **Step 2**,  
* not removed because of too low genome size
* participated processes up to this step, 

are used to separate 3 types of clusters.<br>

1. In drectory `accessory_cluster`(a cluster not shared by all genomes), FASTA files of clusters, which **do not have genes from all genomes** participated analysis, will be output in this drectory. For example, there are 100 genomes in analysis, a cluster with less than 100 genes will have its FASTA output here. Also, if a cluster has >= 100 genes, but all these genes are from less than 100 genomes, its FASTA will be in this directory.
2. In drectory `strict_core`, each cluster has **exactly 1 gene from every genome** to analyze. Such clusters will have their FASTA files here.
3. In drectory `surplus_core`, each cluster has **at least 1 gene from every genome** to analyze, and **some genomes has more than 1 genes** in this cluster. Such clusters will have their FASTA files here.

This step also requires the concatenated FASTA made in Step 3 as input.

In [None]:
def write_a_cluster(a_cluster_of_genes, 
                    total_amount, 
                    which_to_write, 
                    op_path,
                    fasta_dict,
                    id_cluster_d,
                    id_info_d
                    ):
    spe_count = len(set([x.partition("-")[0] for x in a_cluster_of_genes]))
    
    if spe_count < total_amount:
        if which_to_write["accessory_cluster"]:
            file_naam = os.path.join(op_path, 'accessory_cluster/' 
                                     + a_cluster_of_genes[0] 
                                     + '.fasta'
                                    )
        else:
            return
    else:
        if len(a_cluster_of_genes) == total_amount:
            if which_to_write["strict_core"]:
                file_naam = os.path.join(op_path, 'strict_core/' 
                                         + a_cluster_of_genes[0] 
                                         + '.fasta'
                                        )
            else:
                return
        else:
            if which_to_write["surplus_core"]:
                file_naam = os.path.join(op_path, 'surplus_core/' 
                                         + a_cluster_of_genes[0] 
                                         + '.fasta'
                                        )
            else:
                return
    
    to_save = []
    for  recs in a_cluster_of_genes:
        to_save.append(SeqRecord(id = recs, 
                                 description = id_info_d[recs],
                                 seq = fasta_dict[id_cluster_d[recs]].seq
                                )
                      )
        
    SeqIO.write(to_save, 
                file_naam, 
                'fasta')

def write_cluster_s(lst_of_a_clusters, 
                    total_amount_, 
                    which_to_write_, 
                    op_path_, 
                    fasta_dict_,
                    id_cluster_d_,
                    id_info_d_):
    
    for a_cl in lst_of_a_clusters:
        write_a_cluster(a_cl, 
                        total_amount_,
                        which_to_write_,
                        op_path_,
                        fasta_dict_,
                        id_cluster_d_,
                        id_info_d_)

#### Set path
* Set variable `final_cluster_path` as path to the final cluster file made by Step 8.
* Set variable `output_path` as path to output directory.
* Set variable `process_number` thread number.
* Set variable `cluster_type` select from `accessory/strict/surplus`, separate by comma (`,`).


* Set variable `dereped_fasta_path` as path to globally dereplicated FASTAs made in Step 3.
* Set variable `id_info_path` path to save gene id info made in Step3.
* Set variable `pre_cluster_path` path to pre-clustered gene id info made in Step3.

In [None]:
# set path to the final cluster file made by Step 7
final_cluster_path = 'test_op/SLC_final/0.txt'
# set path to output directory
output_path = 'test_op/S9_write_fasta'
# how many jobs to parallel
process_number = 16

# select at least 1 from `accessory/strict/surplus`, separate by comma (`,`), 
# specifying types of clusters to write
cluster_type = 'accessory,strict,surplus'

# set path to concatenated dereped FASTA file made in Step 3.
dereped_fasta_path = 'test_op/S3_op_dereped_cated.fasta'
# set path to save gene id info made in Step3
id_info_path = 'test_op/S3_op_id_info.txt'
# set path to save pre-cluster
pre_cluster_path = 'test_op/S3_op_pre_cluster.txt'

* Set variable `total_amount` of the how many genomes participated analysis.

In [None]:
# directory path of output
print(len(os.listdir(dir_dereped_fasta_path)
         )
     )

In [None]:
# how many genomes participated analysis.
total_amount = len(os.listdir(dir_dereped_fasta_path)
                      )

#### run the codes below get final FASTA files

In [None]:
print(time.asctime())
# mkdir or not
if os.path.exists(output_path):
    pass
else:
    os.mkdir(output_path)
    

# readin ----
cated_fasta = SeqIO.to_dict(SeqIO.parse(dereped_fasta_path, 
                                        'fasta')
                           )

# id: cluster_id dict of 100% same seqs ----
f = open(pre_cluster_path, 'r')
id_cluster = f.readlines()
f.close()

id_cluster_dict = {}
for cls in id_cluster:
    l_ = cls.rstrip('\n').split('\t')
    
    for a_id in l_:
        id_cluster_dict[a_id] = l_[0]

# id: sqe_info ----
f = open(id_info_path, 'r')
id_info = f.readlines()
f.close()

id_info_dict = {}
for pair in id_info:
    l_ = pair.rstrip('\n').split('\t')
    id_info_dict[l_[0]] = l_[1]
        
# which to write
types_to_write = {
    'accessory_cluster': False,
    'strict_core': False,
    'surplus_core': False
} 
for x in cluster_type.split(','):
    if x == 'accessory':
        types_to_write['accessory_cluster'] = True
        if os.path.exists(os.path.join(output_path, 'accessory_cluster')):
            pass
        else:
            os.mkdir(os.path.join(output_path, 'accessory_cluster'))
    
    elif x == 'strict':
        types_to_write['strict_core'] = True
        if os.path.exists(os.path.join(output_path, 'strict_core')):
            pass
        else:
            os.mkdir(os.path.join(output_path, 'strict_core'))
            
    elif x == 'surplus':
        types_to_write['surplus_core'] = True
        if os.path.exists(os.path.join(output_path, 'surplus_core')):
            pass
        else:
            os.mkdir(os.path.join(output_path, 'surplus_core'))
        
    
# read cluster
f = open(final_cluster_path, 'r')
CLUSTERS = f.readlines()
f.close()

if __name__ == "__main__":
    
    mission_lst = [x.rstrip('\n').split('\t') for x in CLUSTERS]
    
    mission_lst = mission_spliter(mission_lst, 
                                  process_number
                                 )
    
    # mp
             
    jobs = []
    
    for sub_mission_lst in mission_lst:
        
        p = Process(target = write_cluster_s,
                    args = (sub_mission_lst,
                            total_amount,
                            types_to_write,
                            output_path,
                            cated_fasta,
                            id_cluster_dict,
                            id_info_dict
                           )
                   )
            
        p.start()
        jobs.append(p)


    for z in jobs:
        z.join()
        
print(time.asctime())