In [4]:
import gzip
import json
import re
import numpy as np
import os

from concurrent.futures import ProcessPoolExecutor
from needletail import parse_fastx_file

In [5]:
class processHandler:
    def __init__(self, jobs=12):
        self.jobs=jobs
        self.create_output_folder()
        self.inputs = self.read_inputs()
        self.process()

    def process(self) -> None:
        with ProcessPoolExecutor(max_workers=self.jobs) as executor:
            executor.map(processHandler.process_one_static, self.inputs)

    def create_output_folder(self, path="processed") -> None:
        if not(os.path.isdir(path)):
            os.mkdir(path)
        return
            
    def read_inputs(self) -> list:
        inputs = []
        for filename in os.listdir('./output/'):
            json_path = os.path.join('./output/', filename)
            
            fasta_name = re.sub(r"\.filtered\.fubar\.json$", ".nt.filtered.fas", filename)
            fasta_path = os.path.join('./nt_tree_filtered/', fasta_name)
            
            output_name = re.sub(r"\.filtered\.fubar\.json$", r".processed.jsonl.gz", filename)
            output_path = os.path.join('./processed/', output_name)
            
            inputs.append((fasta_path, json_path, output_path))
        return inputs
        
    @staticmethod
    def codon_gap_mask(seq: str) -> tuple[np.ndarray, str]:
        """
        Efficiently compute a boolean mask for codons that are not '---',
        and return a filtered nucleotide sequence without those codons.
    
        Parameters:
            seq (str): Input nucleotide sequence (length must be a multiple of 3).
    
        Returns:
            tuple[np.ndarray, str]: Boolean mask array and filtered sequence string.
        """
        n = len(seq)
        if n % 3 != 0:
            raise ValueError("Sequence length must be a multiple of 3.")
    
        codon_count = n // 3
        mask = np.empty(codon_count, dtype=bool)
        filtered_seq = bytearray()  # faster than string join for many additions
    
        for i in range(codon_count):
            j = i * 3
            if seq[j : j + 3] != "---":
                mask[i] = True
                filtered_seq.extend(seq[j : j + 3].encode("ascii"))
            else:
                mask[i] = False
    
        return mask, filtered_seq.decode("ascii")

    @staticmethod
    def process_one_static(inputs: tuple[str, str, str]):
        INPUT_FASTA, INPUT_JSON, OUTPUT_JSONL = inputs
        with open(INPUT_JSON) as fi:
            FUBAR_DATA = np.array(json.load(fi)["MLE"]["content"]["0"])
    
        with gzip.open(OUTPUT_JSONL, "wt") as fo:
            for record in parse_fastx_file(INPUT_FASTA):
                mask, cleaned_seq = processHandler.codon_gap_mask(record.seq.upper())
                masked_fubar = FUBAR_DATA[mask]
                json.dump(
                    {
                        "name": record.name,
                        "sequence": cleaned_seq,
                        "alpha": masked_fubar[:, 0].tolist(),
                        "beta": masked_fubar[:, 1].tolist(),
                        "Prob[alpha>beta]": masked_fubar[:, 3].tolist(),
                        "Prob[alpha<beta]": masked_fubar[:, 4].tolist(),
                        "BayesFactor[alpha<beta]": masked_fubar[:, 5].tolist(),
                    },
                    fo,
                )
                fo.write("\n")

In [6]:
processHandler()

<__main__.processHandler at 0x7af3c099ae50>