In [5]:
import os, shutil, random

import numpy as np

In [2]:
base_dir = os.path.abspath(os.path.join(os.getcwd(), "..", ".."))
seq_db = os.path.join(base_dir, "input", "sequence_db")

#Path to the benchmark databases
db_len_path = os.path.join(base_dir, "input", "sequence_db", "BM_db_e6")
db_nb_path = os.path.join(base_dir, "input", "sequence_db", "BM_db_100")

#Path to the modules
db_code = os.path.join(base_dir, "code")

# Format preparation

Prepare a database of 20 genomes for the benchmarking of runtime depending on the number of the genome.

In [7]:
def build_db(input_db : str, size_db : int = 20):
    """
    Build the benchmark database by copying given number of genomes to another directory
    """
    path_input = os.path.join(seq_db, input_db)
    list_genomes = np.random.choice(os.listdir(path_input), size=size_db, replace=False).tolist()

    destination_path = os.path.join(seq_db, f"BM_db_{size_db}")

    if os.path.exists(destination_path):
        shutil.rmtree(destination_path)   
    os.makedirs(destination_path, exist_ok=True)

    for item in list_genomes:
        source_path = os.path.join(path_input, item)
        dest_path = os.path.join(destination_path, item)

        if os.path.isdir(source_path):
            shutil.copytree(source_path, dest_path)

build_db("healthy_microbiota")

# Format preparation

Prepare a database for signature computation runtime calculation depending on the genome's size.

In [3]:
def create_tmp_db(input_db: str, name_tmp : str) -> str:
    """
    Create a temporary truncated database
    """
    realpath_input_db = os.path.join(seq_db, input_db)
    destination_path = os.path.join(seq_db, f"tmp_BM_db_{name_tmp}")

    if os.path.exists(destination_path):
        shutil.rmtree(destination_path)

    os.makedirs(destination_path, exist_ok=True)

    items_to_copy = os.listdir(realpath_input_db)

    for item in items_to_copy:
        source_path = os.path.join(realpath_input_db, item)
        dest_path = os.path.join(destination_path, item)

        if os.path.isdir(source_path):
            shutil.copytree(source_path, dest_path)

    return f"tmp_BM_db_{name_tmp}"

def generate_random_sequence(length: int) -> str:
    """ 
    Generate random DNA sequences
    """
    return ''.join(random.choice('ACGT') for _ in range(int(length)))

def rm_tmp(seq_db : str):
    """
    remove temporary files
    """
    for folder in os.listdir(seq_db):
        if folder.startswith('tmp_'):
            shutil.rmtree(os.path.join(seq_db, folder))

def process_fasta(file_path: str, desired_length: int):
    """ 
    Process the fasta files for them to have the right length
    """
    with open(file_path, 'r') as file:
        lines = file.readlines()

    assert len(lines) > 2, "Invalid fasta file format"

    header = lines[0].strip()
    sequence = lines[1].strip()

    if len(sequence) > desired_length:
        sequence = sequence[:desired_length]

    elif len(sequence) < desired_length:
        sequence += generate_random_sequence(desired_length - len(sequence))

    with open(file_path, 'w') as file:
        file.write(f"{header}\n{sequence}\n")

def create_tmp(input_db : str):
    """
    Create the temporary files
    """
    liste_size = [5e5, 1e6, 3e6, 5e6, 7e6, 1e7]

    tmp_list = []
    for size in liste_size:
        tmp_list.append(create_tmp_db(input_db, str(size)))

    dico_size_name = dict(zip(tmp_list, liste_size))
    for key, val in dico_size_name.items():
        path_tmp = os.path.join(seq_db, key)
        for folder in os.listdir(path_tmp):
            folder_path = os.path.join(path_tmp, folder)
            file_path = os.path.join(folder_path, os.listdir(folder_path)[0])
            process_fasta(file_path, val)

create_tmp("BM_db_e6")