In [None]:
#!/usr/bin/env python3

import os
import pandas as pd
import numpy as np
import re
import time
from ete3 import NCBITaxa
from Bio import Entrez
import logging

# ----------------------------
# Configuration and Setup
# ----------------------------

# Configure logging
logging.basicConfig(
    filename='blast_lca_processing.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Set your email and API key for NCBI Entrez
Entrez.email = "Ifeanyi.omah@ed.ac.uk"
api_key = "d155c4478aa27128073f178361b921d2e407"

# Initialize NCBITaxa
ncbi = NCBITaxa()
update_tax_database = False  # Set to True to update the taxonomy database
if update_tax_database:
    logging.info("Updating NCBI taxonomy database...")
    try:
        ncbi.update_taxonomy_database()
        logging.info("Taxonomy database updated.")
    except Exception as e:
        logging.error(f"Failed to update taxonomy database: {e}")

# Load older taxonomy databases if available
old_ncbi_taxa = [
    NCBITaxa(dbfile=x)
    for x in os.listdir()
    if x.endswith(".sqlite") and x.startswith("taxdump")
]
if old_ncbi_taxa:
    logging.info(f"Loaded {len(old_ncbi_taxa)} older taxonomy databases.")
else:
    logging.info("No older taxonomy databases found.")

# ----------------------------
# Taxonomy Helper Functions
# ----------------------------

def parse_blast_chunk(chunk, col_names):
    """
    Assign column names to the chunk and add 'blast_type'.
    """
    if isinstance(col_names, list):
        if len(col_names) != len(chunk.columns):
            logging.error("Number of provided column names does not match number of columns in the chunk.")
            print("Number of provided column names does not match number of columns in the chunk.")
            return pd.DataFrame()
        chunk.columns = col_names
    elif col_names is None:
        pass  # Do not change column names
    else:
        logging.error("Invalid value for col_names. Must be a list of names or None.")
        print("Invalid value for col_names. Must be a list of names or None.")
        return pd.DataFrame()
    
    # Assign blast_type
    chunk = chunk.assign(blast_type="nt")
    
    return chunk

def get_taxid(acc, db):
    """
    For a given accession number, find the corresponding TaxID from NCBI's Taxonomy Database.
    """
    try:
        handle = Entrez.esummary(id=str(acc), db=db, api_key=api_key)
        records = Entrez.read(handle)
        handle.close()
        taxid = int(records[0]["TaxId"])
        time.sleep(0.1)  # To respect NCBI's usage policies
        return taxid
    except Exception as e:
        logging.error(f"Error fetching TaxID for accession {acc} from db {db}: {e}")
        return None

def get_gb(acc, db):
    """
    For a given accession number, return the GenBank record.
    """
    try:
        if "|" in acc:
            acc = acc.split('|')[1]
        handle = Entrez.efetch(id=str(acc), db=db, rettype="gb", retmode="text", api_key=api_key)
        gb_record = list(handle)
        handle.close()
        time.sleep(0.1)  # To respect NCBI's usage policies
        return gb_record
    except Exception as e:
        logging.error(f"Error fetching GenBank record for accession {acc} from db {db}: {e}")
        return []

def find_missing_taxid(acc, db):
    """
    If an accession number is not associated with a TaxID, try to find that information elsewhere,
    such as in a related NCBI record.
    """
    try:
        gb_records = get_gb(acc, db)
        match_1 = [get_taxid(re.search(r"gi:(\d+)", x).group(1), db) for x in gb_records if "replace" in x]
        if len(match_1) == 1:
            return int(match_1[0])
        else:
            return None
    except Exception as e:
        logging.error(f"Error finding missing TaxID for accession {acc}: {e}")
        return None

def ncbi_older_db(taxid, method, current_taxdb=ncbi, older_taxdb=old_ncbi_taxa):
    """
    Utility function to handle taxonomy queries using current and older taxonomy databases.
    """
    try:
        return eval(f"current_taxdb.{method}({taxid})")
    except Exception as e:
        logging.error(f"Error performing '{method}' on TaxID {taxid} in current taxonomy: {e}")
        for db in older_taxdb:
            try:
                return eval(f"db.{method}({taxid})")
            except Exception as ex:
                logging.error(f"Error performing '{method}' on TaxID {taxid} in older taxonomy: {ex}")
                continue
    return None

def get_lca(taxids):
    """
    Find the lowest common ancestor (LCA) for a given set of taxonomic IDs.
    Returns the LCA TaxID if resolvable, otherwise returns 'Unknown'.
    """
    unique_taxids = set(taxids)
    if len(unique_taxids) == 0:
        return "Unknown"
    if len(unique_taxids) == 1:
        return list(unique_taxids)[0]
    try:
        lca = ncbi.get_common_ancestor(list(unique_taxids))
        return lca
    except Exception as e:
        logging.warning(f"Failed to get LCA for TaxIDs {taxids}: {e}")
        # Attempt a fallback to “root” or mark as Unknown
        try:
            # If certain sequences are synthetic or unresolvable, fallback:
            lca = "Unknown"
        except Exception as ex:
            logging.error(f"Error handling unresolved LCA for TaxIDs {taxids}: {ex}")
            lca = "Unknown"
    return lca

def filter_by_taxid(df, db, taxid):
    """
    Filter contigs or blast hits for contigs based on matches to a taxonomic id.
    This is just an example function of how you might filter by a given taxid.
    """
    if len(df) == 0:
        return df
    # In production, you'd apply your desired filtering logic.
    # For now, we simply return the same df.
    return df

def select_taxids_for_lca(df, db="nucleotide", return_taxid_only=True, digits=4):
    """
    Select taxonomic IDs to perform LCA analysis on based on BLAST results.
    Uses bitscore to filter top hits instead of 'qcov'.
    """
    if len(df.index) > 1:
        # Use bitscore to determine the best hits
        best_bitscore = df["bitscore"].max()
        threshold = best_bitscore * 0.8  # 80% of the best bitscore as threshold
        df = df[df["bitscore"] >= threshold]

    if "taxid" in df.columns:
        df["taxid"] = pd.to_numeric(df["taxid"], errors='coerce').astype('Int64')
        if return_taxid_only:
            return list(set(df["taxid"].dropna()))
        else:
            return df
    else:
        logging.error("Column 'taxid' not found in DataFrame.")
        return list()

# ----------------------------
# LCA Calculation Function
# ----------------------------

def calculate_lca(data):
    """
    Perform LCA calculation for each query in the BLAST results.
    """
    results = []
    grouped = data.groupby("qseqid")

    for qseqid, group in grouped:
        # Ensure numeric types for calculations
        for col in ["alignment_length", "identity", "mismatches", "bitscore", "taxid"]:
            if col in group.columns:
                group[col] = pd.to_numeric(group[col], errors="coerce")
            else:
                logging.warning(f"Column '{col}' not found in group for query {qseqid}. Skipping.")
                continue

        # Drop rows with NaN in critical columns
        group = group.dropna(subset=["alignment_length", "taxid", "identity", "mismatches", "bitscore"])

        # Calculate aligned bases
        group["aligned_bases"] = group["alignment_length"] * group["identity"] / 100

        # Drop rows where aligned_bases is NaN
        group = group.dropna(subset=["aligned_bases"])

        # Skip empty groups
        if group.empty:
            logging.info(f"No valid data for query: {qseqid}")
            continue

        # Find the best hit based on bitscore
        best_hit_idx = group["bitscore"].idxmax()
        best_hit = group.loc[best_hit_idx]
        best_bitscore = best_hit["bitscore"]

        # Filter valid hits based on bitscore threshold
        valid_hits = group[group["bitscore"] >= best_bitscore * 0.8].reset_index(drop=True)

        # Extract TaxIDs
        taxids = select_taxids_for_lca(valid_hits)

        # Compute LCA
        if not taxids:
            lca_taxid = "Unknown"
        else:
            lca_taxid = get_lca(taxids)

        # Extract contig length from qseqid if it exists in the format: "_length_12345_"
        try:
            contig_length = int(re.search(r'_length_(\d+)_', qseqid).group(1))
        except (AttributeError, ValueError):
            contig_length = None
            logging.warning(f"Unable to extract contig_length from qseqid: {qseqid}")

        # 'superkingdom' is not retrieved directly in this pipeline; label as 'Unknown'.
        superkingdom = "Unknown"

        # Capture best-hit details
        if not group["bitscore"].empty:
            best_hit = group.loc[group["bitscore"].idxmax()]
            best_hit_sciname = best_hit.get("sci_name", "Unknown")
            bitscore = best_hit["bitscore"]
            aligned_bases = best_hit["aligned_bases"]
            tax_id = best_hit["taxid"]
            subject_title = best_hit.get("subject_title", "Unknown")
            identity = best_hit.get("identity", "Unknown")
        else:
            best_hit_sciname = "Unknown"
            bitscore = 0
            aligned_bases = 0
            tax_id = None
            subject_title = "Unknown"
            identity = "Unknown"

        results.append({
            "qseqid": qseqid,
            "lca_taxid": lca_taxid,
            "best_hit_sciname": best_hit_sciname,
            "subject_title": subject_title,
            "identity": identity,
            "aligned_bases": aligned_bases,
            "contig_length": contig_length,
            "superkingdom": superkingdom,
            "bitscore": bitscore, 
            "tax_id": tax_id
        })

    return pd.DataFrame(results)

# ----------------------------
# File Processing Function
# ----------------------------

def process_files(directory, files, chunksize=1_000_000):
    """
    Process specific BLAST result files in chunks and compute LCA.
    """
    for file in files:
        input_file = os.path.join(directory, file)
        output_file = os.path.join(directory, f"LCA_{file}")

        logging.info(f"Processing file: {file}")
        print(f"\nProcessing file: {file}")

        # Define column names
        column_headings = [
            "qseqid", "sseqid", "identity", "alignment_length", "mismatches", "gap_opens", 
            "qstart", "qend", "sstart", "send", "evalue", "bitscore", 
            "taxid", "sci_name", "common_name", "subject_title"
        ]

        # Initialize an empty list to collect LCA results
        lca_results = []

        try:
            # Read the BLAST file in chunks
            for chunk_number, chunk in enumerate(
                pd.read_csv(
                    input_file,
                    sep="\t",
                    header=None,
                    comment="#",
                    chunksize=chunksize,
                    engine='c',
                    on_bad_lines='warn',
                    low_memory=False
                ),
                start=1
            ):
                print(f"  Processing chunk {chunk_number}...")
                logging.info(f"Processing chunk {chunk_number} of {file}")

                # Assign column names and blast_type
                chunk = parse_blast_chunk(chunk, column_headings)

                if chunk.empty:
                    logging.warning(f"Chunk {chunk_number} of {file} is empty after parsing.")
                    continue

                # Drop rows with NaN in essential columns
                essential_columns = [
                    "qseqid", "sseqid", "identity", "alignment_length", "mismatches",
                    "gap_opens", "qstart", "qend", "sstart", "send", "evalue",
                    "bitscore", "taxid", "subject_title"
                ]
                chunk = chunk.dropna(subset=essential_columns)

                if chunk.empty:
                    logging.warning(f"Chunk {chunk_number} of {file} has no valid data after dropping NaNs.")
                    continue

                # Perform LCA analysis on the chunk
                chunk_lca = calculate_lca(chunk)

                if not chunk_lca.empty:
                    lca_results.append(chunk_lca)
                else:
                    logging.info(f"No LCA results for chunk {chunk_number} of {file}.")

        except Exception as e:
            logging.error(f"Failed to process {file}: {e}")
            print(f"Failed to process {file}: {e}")
            continue

        # Concatenate all LCA results
        if lca_results:
            final_lca = pd.concat(lca_results, ignore_index=True)
            try:
                final_lca.to_csv(output_file, sep="\t", index=False)
                logging.info(f"LCA results saved to: {output_file}")
                print(f"LCA results saved to: {output_file}")
            except Exception as e:
                logging.error(f"Error saving results to {output_file}: {e}")
                print(f"Error saving results to {output_file}: {e}")
        else:
            logging.warning(f"No valid LCA results for {file}. Saving empty file.")
            print(f"No valid LCA results for {file}. Saving empty file.")
            pd.DataFrame().to_csv(output_file, sep="\t", index=False)

# ----------------------------
# Main Execution
# ----------------------------

if __name__ == "__main__":
    # Change this to the directory containing your *.m9 files
    directory = "/Volumes/aine_store/SENZOR_project/SPlited_SENSOR_porject/IDseq/Blast_NT"

    # Dynamically gather all .m9 files from the directory
    files_to_process = [
        f for f in os.listdir(directory)
        if f.endswith(".m9") and os.path.isfile(os.path.join(directory, f))
    ]

    if not files_to_process:
        logging.error(f"No .m9 files found in {directory}")
        print(f"No .m9 files found in {directory}. Exiting...")
    else:
        print("Found the following .m9 files to process:")
        for f in files_to_process:
            print(" -", f)

        # Verify that all files exist (they should if we found them, but just in case)
        missing_files = [
            f for f in files_to_process
            if not os.path.isfile(os.path.join(directory, f))
        ]
        if missing_files:
            logging.error("The following files were not found in the directory:")
            print("The following files were not found in the directory:")
            for f in missing_files:
                logging.error(f" - {f}")
                print(f" - {f}")
            print("Please ensure all files are present before running the script.")
        else:
            # Process each .m9 file in that directory
            process_files(directory, files_to_process)

Found the following .m9 files to process:
 - AIAMAAGR001_LIV_S46_662105_blast_nt.m9
 - AIAMACAT001_S69_657769_blast_nt.m9
 - AIAMADOG001_S40_657732_blast_nt.m9
 - AIAMAGOAT003_S50_657720_blast_nt.m9
 - AIAMAPIG001_S63_657713_blast_nt.m9
 - AIAMAPIG002_S64_657714_blast_nt.m9
 - AIAMAR001_LIV_S21_662106_blast_nt.m9
 - AIAMAR002_LIV_S22_662107_blast_nt.m9
 - AIAMAR003_LIV_S23_662108_blast_nt.m9
 - AIAMAR004_LIV_S24_1_662161_blast_nt.m9
 - AIAMAR005_LIV_S25_1_662162_blast_nt.m9
 - AIAMAR006_LIV_S26_2_662165_blast_nt.m9
 - AIAMAR007_LIV_S27_2_662166_blast_nt.m9
 - AIAMAR008_LIV_S28_1_662167_blast_nt.m9
 - AIAMAR009_LIV_S29_1_662168_blast_nt.m9
 - AIAMAR010_LIV_S30_663038_blast_nt.m9
 - AIAMAR011_LIV_S31_663039_blast_nt.m9
 - AIAMAR012_LIV_S32_663040_blast_nt.m9
 - AIAMAR013_LIV_S33_663041_blast_nt.m9
 - AIAMAR014_LIV_S34_663042_blast_nt.m9
 - AIAMAR015_LIV_S35_663046_blast_nt.m9
 - AIAMAR016_LIV_S36_663047_blast_nt.m9
 - AIAMAR017_LIV_S37_663048_blast_nt.m9
 - AIAMAR018_LIV_S38_663049_blast

In [None]:
import os
import pandas as pd
import numpy as np
import re
import time
from ete3 import NCBITaxa
from Bio import Entrez
import logging

# ----------------------------
# Configuration and Setup
# ----------------------------

# Configure logging
logging.basicConfig(
    filename='blast_lca_processing.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Set your email and API key for NCBI Entrez
Entrez.email = "Ifeanyi.omah@ed.ac.uk"  # Replace with your actual email
api_key = "d155c4478aa27128073f178361b921d2e407"  # Replace with your actual API key

# Initialize NCBITaxa
ncbi = NCBITaxa()
update_tax_database = False  # Set to True to update the taxonomy database
if update_tax_database:
    logging.info("Updating NCBI taxonomy database...")
    try:
        ncbi.update_taxonomy_database()
        logging.info("Taxonomy database updated.")
    except Exception as e:
        logging.error(f"Failed to update taxonomy database: {e}")

# Load older taxonomy databases if available
old_ncbi_taxa = []
for filename in os.listdir():
    if filename.endswith(".sqlite") and filename.startswith("taxdump"):
        try:
            db = NCBITaxa(dbfile=filename)
            old_ncbi_taxa.append(db)
            logging.info(f"Loaded older taxonomy database: {filename}")
        except Exception as e:
            logging.error(f"Failed to load taxonomy database {filename}: {e}")

if not old_ncbi_taxa:
    logging.info("No older taxonomy databases found.")

# ----------------------------
# Taxonomy Helper Functions
# ----------------------------

def parse_blast_chunk(chunk, col_names, blast_type):
    """
    Assign column names to the chunk and add 'blast_type'.

    Parameters:
    - chunk (pd.DataFrame): A DataFrame chunk from the BLAST results.
    - col_names (List[str]): List of column names to assign.
    - blast_type (str): Type of BLAST ('nr').

    Returns:
    - pd.DataFrame: Parsed DataFrame with assigned column names and 'blast_type'.
    """
    if isinstance(col_names, list):
        if len(col_names) != len(chunk.columns):
            logging.error(f"Column mismatch: Expected {len(col_names)} columns, got {len(chunk.columns)}.")
            print(f"Column mismatch: Expected {len(col_names)} columns, got {len(chunk.columns)}.")
            return pd.DataFrame()
        chunk.columns = col_names
    elif col_names is None:
        pass  # Do not change column names
    else:
        logging.error("Invalid value for col_names. Must be a list of names or None.")
        print("Invalid value for col_names. Must be a list of names or None.")
        return pd.DataFrame()
    
    # Assign blast_type
    chunk = chunk.assign(blast_type=blast_type)
    
    return chunk

def get_taxid(acc, db):
    """
    For a given accession number, find the corresponding TaxID from NCBI's Taxonomy Database.

    Parameters:
    - acc (str): Accession number.
    - db (str): Database name ('protein' for nr).

    Returns:
    - int or None: TaxID if found, else None.
    """
    try:
        handle = Entrez.esummary(id=str(acc), db=db, api_key=api_key)
        records = Entrez.read(handle)
        handle.close()
        taxid = int(records[0]["TaxId"])
        time.sleep(0.1)  # To respect NCBI's usage policies
        return taxid
    except Exception as e:
        logging.error(f"Error fetching TaxID for accession {acc} from db {db}: {e}")
        return None

def get_gb(acc, db):
    """
    For a given accession number, return the GenBank record.

    Parameters:
    - acc (str): Accession number.
    - db (str): Database name ('protein' for nr).

    Returns:
    - List[str]: GenBank record lines if successful, else empty list.
    """
    try:
        if "|" in acc:
            acc = acc.split('|')[1]
        handle = Entrez.efetch(id=str(acc), db=db, rettype="gb", retmode="text", api_key=api_key)
        gb_record = list(handle)
        handle.close()
        time.sleep(0.1)  # To respect NCBI's usage policies
        return gb_record
    except Exception as e:
        logging.error(f"Error fetching GenBank record for accession {acc} from db {db}: {e}")
        return []

def find_missing_taxid(acc, db):
    """
    If an accession number is not associated with a TaxID, try to find that information elsewhere,
    such as in a related NCBI record.

    Parameters:
    - acc (str): Accession number.
    - db (str): Database name ('protein' for nr).

    Returns:
    - int or None: TaxID if found, else None.
    """
    try:
        gb_records = get_gb(acc, db)
        taxids = []
        for line in gb_records:
            if "db_xref" in line:
                match = re.search(r"taxon:(\d+)", line)
                if match:
                    taxids.append(int(match.group(1)))
        if len(taxids) == 1:
            return taxids[0]
        else:
            return None
    except Exception as e:
        logging.error(f"Error finding missing TaxID for accession {acc}: {e}")
        return None

def get_lca(taxids):
    """
    Find the lowest common ancestor (LCA) for a given set of taxonomic IDs.
    Returns the LCA TaxID if resolvable, otherwise returns 'Unknown'.
    """
    unique_taxids = set(taxids)
    if len(unique_taxids) == 0:
        return "Unknown"
    if len(unique_taxids) == 1:
        return list(unique_taxids)[0]
    try:
        lca = ncbi.get_common_ancestor(list(unique_taxids))
        return lca
    except Exception as e:
        logging.warning(f"Failed to get LCA for TaxIDs {taxids}: {e}")
        # Handle cases like synthetic constructs or unresolved taxonomy
        try:
            other_seq_taxid = ncbi_older_db(["other sequences"], "get_name_translator")["other sequences"][0]
            other_seq_descendants = ncbi_older_db(other_seq_taxid, "get_descendant_taxa")
            if any([i in other_seq_descendants for i in taxids]):
                lca = ncbi_older_db(["root"], "get_name_translator")["root"][0]
            else:
                lca = "Unknown"
        except Exception as ex:
            logging.error(f"Error handling unresolved LCA for TaxIDs {taxids}: {ex}")
            lca = "Unknown"
    return lca

def select_taxids_for_lca(df, db_type="protein"):
    """
    Select taxonomic IDs to perform LCA analysis on based on BLAST results.
    Uses alignment criteria to filter relevant hits.

    Parameters:
    - df (pd.DataFrame): DataFrame containing BLAST hits for a single contig.
    - db_type (str): Type of BLAST database ('protein').

    Returns:
    - List[int]: List of selected TaxIDs for LCA computation.
    """
    if df.empty:
        return []
    
    # Calculate aligned_bases using .loc to avoid SettingWithCopyWarning
    df.loc[:, "aligned_bases"] = df["alignment_length"] * df["identity"] / 100
    
    # Identify the best hit based on bitscore
    if df["bitscore"].empty:
        return []
    
    best_hit = df.loc[df["bitscore"].idxmax()]
    best_aligned_bases = best_hit["aligned_bases"]
    mismatches_best_hit = best_hit["mismatches"]
    
    # Compute threshold
    threshold = best_aligned_bases - mismatches_best_hit
    
    # Select hits meeting the threshold
    selected_hits = df[df["aligned_bases"] >= threshold]
    
    # Extract unique TaxIDs
    selected_taxids = selected_hits["taxid"].dropna().unique().tolist()
    
    return selected_taxids

# ----------------------------
# LCA Calculation Function
# ----------------------------

def calculate_lca(data, db_type="protein"):
    """
    Perform LCA calculation for each query in the BLAST results.

    Parameters:
    - data (pd.DataFrame): A DataFrame containing BLAST hits.
    - db_type (str): Type of BLAST database ('protein').

    Returns:
    - pd.DataFrame: A DataFrame with LCA results for each contig, including subject descriptions and identity.
    """
    results = []
    grouped = data.groupby("qseqid")

    for qseqid, group in grouped:
        # Ensure numeric types for calculations
        numeric_cols = ["alignment_length", "identity", "mismatches", "bitscore", "taxid"]
        for col in numeric_cols:
            if col in group.columns:
                group[col] = pd.to_numeric(group[col], errors="coerce")
            else:
                logging.warning(f"Column '{col}' not found in group for query {qseqid}. Skipping.")
                continue

        # Drop rows with NaN in critical columns
        group = group.dropna(subset=numeric_cols)

        # Select TaxIDs based on alignment criteria
        taxids = select_taxids_for_lca(group, db_type=db_type)

        if not taxids:
            lca_taxid = "Unknown"
        else:
            lca_taxid = get_lca(taxids)

        # Extract additional details
        contig_length = None
        match = re.search(r'_length_(\d+)_', qseqid)
        if match:
            contig_length = int(match.group(1))
        else:
            logging.warning(f"Unable to extract contig_length from qseqid: {qseqid}")

        # 'superkingdom' is not present, set to 'Unknown'
        superkingdom = "Unknown"

        # Best hit details
        if not group["bitscore"].empty:
            best_hit = group.loc[group["bitscore"].idxmax()]
            best_hit_sciname = best_hit.get("sci_name", "Unknown")
            bitscore = best_hit["bitscore"]
            aligned_bases = best_hit["aligned_bases"]
            tax_id = best_hit["taxid"]
            subject_title = best_hit.get("subject_title", "Unknown")  # Extract subject_title
            identity = best_hit.get("identity", "Unknown")  # Extract identity
        else:
            best_hit_sciname = "Unknown"
            bitscore = 0
            aligned_bases = 0
            tax_id = None
            subject_title = "Unknown"
            identity = "Unknown"

        results.append({
            "qseqid": qseqid,
            "lca_taxid": lca_taxid,
            "best_hit_sciname": best_hit_sciname,
            "subject_title": subject_title,  # Included in previous modification
            "identity": identity,  # New field
            "aligned_bases": aligned_bases,
            "contig_length": contig_length,
            "superkingdom": superkingdom,
            "bitscore": bitscore, 
            "tax_id": tax_id
        })

    return pd.DataFrame(results)

# ----------------------------
# File Processing Function
# ----------------------------

def process_files(directory, files, blast_type="nr", chunksize=1000000):
    """
    Process specific BLAST result files in chunks and compute LCA.

    Parameters:
    - directory (str): Path to the directory containing BLAST result files.
    - files (List[str]): List of filenames to process.
    - blast_type (str): Type of BLAST ('nr' for protein).
    - chunksize (int): Number of rows per chunk.

    Returns:
    - None
    """
    # Define database based on blast_type
    if blast_type == "nt":
        db_name = "nuccore"
    elif blast_type == "nr":
        db_name = "protein"
    else:
        logging.error(f"Unsupported blast_type: {blast_type}. Skipping processing.")
        print(f"Unsupported blast_type: {blast_type}. Skipping processing.")
        return

    # Define column names based on blast_type
    if blast_type == "nr":
        column_headings = [
            "qseqid", "sseqid", "identity", "alignment_length", "mismatches", "gap_opens", 
            "qstart", "qend", "sstart", "send", "evalue", "bitscore", 
            "taxid", "sci_name", "subject_title"
        ]
    else:
        # For 'nt' BLAST results, which we're not handling here
        column_headings = []

    for file in files:
        input_file = os.path.join(directory, file)
        output_file = os.path.join(directory, f"LCA_{file}")

        logging.info(f"Processing file: {file}")
        print(f"\nProcessing file: {file}")

        # Initialize an empty list to collect LCA results
        lca_results = []

        try:
            # Read the BLAST file in chunks
            for chunk_number, chunk in enumerate(pd.read_csv(
                input_file,
                sep="\t",
                header=None,
                comment="#",
                chunksize=chunksize,
                engine='c',
                on_bad_lines='warn',
                low_memory=False
            ), start=1):
                print(f"Processing chunk {chunk_number}...")
                logging.info(f"Processing chunk {chunk_number} of {file}")

                # Assign column names and blast_type
                chunk = parse_blast_chunk(chunk, column_headings, blast_type=blast_type)

                if chunk.empty:
                    logging.warning(f"Chunk {chunk_number} of {file} is empty after parsing.")
                    continue

                # Drop rows with NaN in essential columns
                essential_columns = [
                    "qseqid", "sseqid", "identity", "alignment_length", "mismatches",
                    "gap_opens", "qstart", "qend", "sstart", "send", "evalue",
                    "bitscore", "taxid", "sci_name", "subject_title"
                ]
                chunk = chunk.dropna(subset=essential_columns)

                if chunk.empty:
                    logging.warning(f"Chunk {chunk_number} of {file} has no valid data after dropping NaNs.")
                    continue

                # Perform LCA analysis on the chunk
                chunk_lca = calculate_lca(chunk, db_type=("protein" if blast_type == "nr" else "unknown"))

                if not chunk_lca.empty:
                    lca_results.append(chunk_lca)
                else:
                    logging.info(f"No LCA results for chunk {chunk_number} of {file}.")

        except Exception as e:
            logging.error(f"Failed to process {file}: {e}")
            print(f"Failed to process {file}: {e}")
            continue

        # Concatenate all LCA results
        if lca_results:
            final_lca = pd.concat(lca_results, ignore_index=True)
            try:
                final_lca.to_csv(output_file, sep="\t", index=False)
                logging.info(f"LCA results saved to: {output_file}")
                print(f"LCA results saved to: {output_file}")
            except Exception as e:
                logging.error(f"Error saving results to {output_file}: {e}")
                print(f"Error saving results to {output_file}: {e}")
        else:
            logging.warning(f"No valid LCA results for {file}. Saving empty file.")
            print(f"No valid LCA results for {file}. Saving empty file.")
            pd.DataFrame().to_csv(output_file, sep="\t", index=False)

# ----------------------------
# Main Execution
# ----------------------------

if __name__ == "__main__":
    # Define directories and files for nr BLAST results
    blast_data = [
        {
            "directory": "/Volumes/aine_store/SENZOR_project/SPlited_SENSOR_porject/IDseq/Test_blast_SENZOR/Blast_nr/",
            "files": [
                "AIAMADOG001_S40_657732_modified_diamond_blast_nr.m9",
                "AIAMACAT001_S69_657769_modified_diamond_blast_nr.m9",
                "AIAMAGOAT003_S50_657720_modified_diamond_blast_nr.m9"
            ],
            "blast_type": "nr"
        }
    ]

    for dataset in blast_data:
        directory = dataset["directory"]
        files_to_process = dataset["files"]
        blast_type = dataset["blast_type"]

        # Verify that all specified files exist
        missing_files = [f for f in files_to_process if not os.path.isfile(os.path.join(directory, f))]
        if missing_files:
            logging.error(f"The following {blast_type} BLAST files were not found in the directory:")
            print(f"The following {blast_type} BLAST files were not found in the directory:")
            for f in missing_files:
                logging.error(f" - {f}")
                print(f" - {f}")
            print(f"Please ensure all {blast_type} BLAST files are present before running the script.\n")
            continue
        else:
            # Process the specified files
            process_files(directory, files_to_process, blast_type=blast_type)

In [None]:
# import os
import pandas as pd
import numpy as np
import re
import time
from ete3 import NCBITaxa
from Bio import Entrez
import logging

# ----------------------------
# Configuration and Setup
# ----------------------------

# Configure logging
logging.basicConfig(
    filename='blast_lca_processing.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Set your email and API key for NCBI Entrez
Entrez.email = "Ifeanyi.omah@ed.ac.uk"  # Replace with your actual email
api_key = "d155c4478aa27128073f178361b921d2e407"  # Replace with your actual API key

# Initialize NCBITaxa
ncbi = NCBITaxa()
update_tax_database = False  # Set to True to update the taxonomy database
if update_tax_database:
    logging.info("Updating NCBI taxonomy database...")
    try:
        ncbi.update_taxonomy_database()
        logging.info("Taxonomy database updated.")
    except Exception as e:
        logging.error(f"Failed to update taxonomy database: {e}")

# Load older taxonomy databases if available
old_ncbi_taxa = []
for filename in os.listdir():
    if filename.endswith(".sqlite") and filename.startswith("taxdump"):
        try:
            db = NCBITaxa(dbfile=filename)
            old_ncbi_taxa.append(db)
            logging.info(f"Loaded older taxonomy database: {filename}")
        except Exception as e:
            logging.error(f"Failed to load taxonomy database {filename}: {e}")

if not old_ncbi_taxa:
    logging.info("No older taxonomy databases found.")

# ----------------------------
# Taxonomy Helper Functions
# ----------------------------

def parse_blast_chunk(chunk, col_names, blast_type):
    """
    Assign column names to the chunk and add 'blast_type'.

    Parameters:
    - chunk (pd.DataFrame): A DataFrame chunk from the BLAST results.
    - col_names (List[str]): List of column names to assign.
    - blast_type (str): Type of BLAST ('nr').

    Returns:
    - pd.DataFrame: Parsed DataFrame with assigned column names and 'blast_type'.
    """
    if isinstance(col_names, list):
        if len(col_names) != len(chunk.columns):
            logging.error(f"Column mismatch: Expected {len(col_names)} columns, got {len(chunk.columns)}.")
            print(f"Column mismatch: Expected {len(col_names)} columns, got {len(chunk.columns)}.")
            return pd.DataFrame()
        chunk.columns = col_names
    elif col_names is None:
        pass  # Do not change column names
    else:
        logging.error("Invalid value for col_names. Must be a list of names or None.")
        print("Invalid value for col_names. Must be a list of names or None.")
        return pd.DataFrame()
    
    # Assign blast_type
    chunk = chunk.assign(blast_type=blast_type)
    
    return chunk

def get_taxid(acc, db):
    """
    For a given accession number, find the corresponding TaxID from NCBI's Taxonomy Database.

    Parameters:
    - acc (str): Accession number.
    - db (str): Database name ('protein' for nr).

    Returns:
    - int or None: TaxID if found, else None.
    """
    try:
        handle = Entrez.esummary(id=str(acc), db=db, api_key=api_key)
        records = Entrez.read(handle)
        handle.close()
        taxid = int(records[0]["TaxId"])
        time.sleep(0.1)  # To respect NCBI's usage policies
        return taxid
    except Exception as e:
        logging.error(f"Error fetching TaxID for accession {acc} from db {db}: {e}")
        return None

def get_gb(acc, db):
    """
    For a given accession number, return the GenBank record.

    Parameters:
    - acc (str): Accession number.
    - db (str): Database name ('protein' for nr).

    Returns:
    - List[str]: GenBank record lines if successful, else empty list.
    """
    try:
        if "|" in acc:
            acc = acc.split('|')[1]
        handle = Entrez.efetch(id=str(acc), db=db, rettype="gb", retmode="text", api_key=api_key)
        gb_record = list(handle)
        handle.close()
        time.sleep(0.1)  # To respect NCBI's usage policies
        return gb_record
    except Exception as e:
        logging.error(f"Error fetching GenBank record for accession {acc} from db {db}: {e}")
        return []

def find_missing_taxid(acc, db):
    """
    If an accession number is not associated with a TaxID, try to find that information elsewhere,
    such as in a related NCBI record.

    Parameters:
    - acc (str): Accession number.
    - db (str): Database name ('protein' for nr).

    Returns:
    - int or None: TaxID if found, else None.
    """
    try:
        gb_records = get_gb(acc, db)
        taxids = []
        for line in gb_records:
            if "db_xref" in line:
                match = re.search(r"taxon:(\d+)", line)
                if match:
                    taxids.append(int(match.group(1)))
        if len(taxids) == 1:
            return taxids[0]
        else:
            return None
    except Exception as e:
        logging.error(f"Error finding missing TaxID for accession {acc}: {e}")
        return None

def get_lca(taxids):
    """
    Find the lowest common ancestor (LCA) for a given set of taxonomic IDs.
    Returns the LCA TaxID if resolvable, otherwise returns 'Unknown'.

    Parameters:
    - taxids (List[int]): List of TaxIDs.

    Returns:
    - int or str: LCA TaxID or 'Unknown'.
    """
    unique_taxids = set(taxids)
    if not unique_taxids:
        return "Unknown"
    if len(unique_taxids) == 1:
        return unique_taxids.pop()
    try:
        lca = ncbi.get_common_ancestor(list(unique_taxids))
        return lca
    except Exception as e:
        logging.warning(f"Failed to get LCA for TaxIDs {taxids}: {e}")
        # Attempt to use older taxonomy databases
        for db in old_ncbi_taxa:
            try:
                lca = db.get_common_ancestor(list(unique_taxids))
                return lca
            except Exception as ex:
                logging.error(f"Error getting LCA from older taxonomy database: {ex}")
                continue
        return "Unknown"

def select_taxids_for_lca(df, db_type="protein"):
    """
    Select taxonomic IDs to perform LCA analysis on based on BLAST results.
    Uses alignment criteria to filter relevant hits.

    Parameters:
    - df (pd.DataFrame): DataFrame containing BLAST hits for a single contig.
    - db_type (str): Type of BLAST database ('protein').

    Returns:
    - List[int]: List of selected TaxIDs for LCA computation.
    """
    if df.empty:
        return []
    
    # Calculate aligned_bases using .loc to avoid SettingWithCopyWarning
    df.loc[:, "aligned_bases"] = df["alignment_length"] * df["identity"] / 100
    
    # Identify the best hit based on bitscore
    if df["bitscore"].empty:
        return []
    
    best_hit = df.loc[df["bitscore"].idxmax()]
    best_aligned_bases = best_hit["aligned_bases"]
    mismatches_best_hit = best_hit["mismatches"]
    
    # Compute threshold
    threshold = best_aligned_bases - mismatches_best_hit
    
    # Select hits meeting the threshold
    selected_hits = df[df["aligned_bases"] >= threshold]
    
    # Extract unique TaxIDs
    selected_taxids = selected_hits["taxid"].dropna().unique().tolist()
    
    return selected_taxids

# ----------------------------
# LCA Calculation Function
# ----------------------------

def calculate_lca(data, db_type="protein"):
    """
    Perform LCA calculation for each query in the BLAST results.

    Parameters:
    - data (pd.DataFrame): A DataFrame containing BLAST hits.
    - db_type (str): Type of BLAST database ('protein').

    Returns:
    - pd.DataFrame: A DataFrame with LCA results for each contig.
    """
    results = []
    grouped = data.groupby("qseqid")

    for qseqid, group in grouped:
        # Ensure numeric types for calculations
        numeric_cols = ["alignment_length", "identity", "mismatches", "bitscore", "taxid"]
        for col in numeric_cols:
            if col in group.columns:
                group[col] = pd.to_numeric(group[col], errors="coerce")
            else:
                logging.warning(f"Column '{col}' not found in group for query {qseqid}. Skipping.")
                continue

        # Drop rows with NaN in critical columns
        group = group.dropna(subset=numeric_cols)

        # Select TaxIDs based on alignment criteria
        taxids = select_taxids_for_lca(group, db_type=db_type)

        if not taxids:
            lca_taxid = "Unknown"
        else:
            lca_taxid = get_lca(taxids)

        # Extract additional details
        contig_length = None
        match = re.search(r'_length_(\d+)_', qseqid)
        if match:
            contig_length = int(match.group(1))
        else:
            logging.warning(f"Unable to extract contig_length from qseqid: {qseqid}")

        # 'superkingdom' is not present, set to 'Unknown'
        superkingdom = "Unknown"

        # Best hit details
        if not group["bitscore"].empty:
            best_hit = group.loc[group["bitscore"].idxmax()]
            best_hit_sciname = best_hit.get("sci_name", "Unknown")
            bitscore = best_hit["bitscore"]
            aligned_bases = best_hit["aligned_bases"]
            tax_id = best_hit["taxid"]
        else:
            best_hit_sciname = "Unknown"
            bitscore = 0
            aligned_bases = 0
            tax_id = None

        results.append({
            "qseqid": qseqid,
            "lca_taxid": lca_taxid,
            "best_hit_sciname": best_hit_sciname,
            "aligned_bases": aligned_bases,
            "contig_length": contig_length,
            "superkingdom": superkingdom,
            "bitscore": bitscore, 
            "tax_id": tax_id
        })

    return pd.DataFrame(results)

# ----------------------------
# File Processing Function
# ----------------------------

def process_files(directory, files, blast_type="nr", chunksize=1000000):
    """
    Process specific BLAST result files in chunks and compute LCA.

    Parameters:
    - directory (str): Path to the directory containing BLAST result files.
    - files (List[str]): List of filenames to process.
    - blast_type (str): Type of BLAST ('nr' for protein).
    - chunksize (int): Number of rows per chunk.

    Returns:
    - None
    """
    # Define database based on blast_type
    if blast_type == "nt":
        db_name = "nuccore"
    elif blast_type == "nr":
        db_name = "protein"
    else:
        logging.error(f"Unsupported blast_type: {blast_type}. Skipping processing.")
        print(f"Unsupported blast_type: {blast_type}. Skipping processing.")
        return

    # Define column names based on blast_type
    if blast_type == "nr":
        column_headings = [
            "qseqid", "sseqid", "identity", "alignment_length", "mismatches", "gap_opens", 
            "qstart", "qend", "sstart", "send", "evalue", "bitscore", 
            "taxid", "sci_name", "subject_title"
        ]
    else:
        # For 'nt' BLAST results, which we're not handling here
        column_headings = []

    for file in files:
        input_file = os.path.join(directory, file)
        output_file = os.path.join(directory, f"LCA_{file}")

        logging.info(f"Processing file: {file}")
        print(f"\nProcessing file: {file}")

        # Initialize an empty list to collect LCA results
        lca_results = []

        try:
            # Read the BLAST file in chunks
            for chunk_number, chunk in enumerate(pd.read_csv(
                input_file,
                sep="\t",
                header=None,
                comment="#",
                chunksize=chunksize,
                engine='c',
                on_bad_lines='warn',
                low_memory=False
            ), start=1):
                print(f"Processing chunk {chunk_number}...")
                logging.info(f"Processing chunk {chunk_number} of {file}")

                # Assign column names and blast_type
                chunk = parse_blast_chunk(chunk, column_headings, blast_type=blast_type)

                if chunk.empty:
                    logging.warning(f"Chunk {chunk_number} of {file} is empty after parsing.")
                    continue

                # Drop rows with NaN in essential columns
                essential_columns = [
                    "qseqid", "sseqid", "identity", "alignment_length", "mismatches",
                    "gap_opens", "qstart", "qend", "sstart", "send", "evalue",
                    "bitscore", "taxid", "sci_name", "subject_title"
                ]
                chunk = chunk.dropna(subset=essential_columns)

                if chunk.empty:
                    logging.warning(f"Chunk {chunk_number} of {file} has no valid data after dropping NaNs.")
                    continue

                # Perform LCA analysis on the chunk
                chunk_lca = calculate_lca(chunk, db_type=("protein" if blast_type == "nr" else "unknown"))

                if not chunk_lca.empty:
                    lca_results.append(chunk_lca)
                else:
                    logging.info(f"No LCA results for chunk {chunk_number} of {file}.")

        except Exception as e:
            logging.error(f"Failed to process {file}: {e}")
            print(f"Failed to process {file}: {e}")
            continue

        # Concatenate all LCA results
        if lca_results:
            final_lca = pd.concat(lca_results, ignore_index=True)
            try:
                final_lca.to_csv(output_file, sep="\t", index=False)
                logging.info(f"LCA results saved to: {output_file}")
                print(f"LCA results saved to: {output_file}")
            except Exception as e:
                logging.error(f"Error saving results to {output_file}: {e}")
                print(f"Error saving results to {output_file}: {e}")
        else:
            logging.warning(f"No valid LCA results for {file}. Saving empty file.")
            print(f"No valid LCA results for {file}. Saving empty file.")
            pd.DataFrame().to_csv(output_file, sep="\t", index=False)

# ----------------------------
# Main Execution
# ----------------------------

if __name__ == "__main__":
    # Define directories and files for nr BLAST results
    blast_data = [
        {
            "directory": "/Volumes/aine_store/SENZOR_project/SPlited_SENSOR_porject/IDseq/Test_blast_SENZOR/Blast_nr/",
            "files": [
                "AIAMADOG001_S40_657732_modified_diamond_blast_nr.m9",
                "AIAMACAT001_S69_657769_modified_diamond_blast_nr.m9",
                "AIAMAGOAT003_S50_657720_modified_diamond_blast_nr.m9"
            ],
            "blast_type": "nr"
        }
    ]

    for dataset in blast_data:
        directory = dataset["directory"]
        files_to_process = dataset["files"]
        blast_type = dataset["blast_type"]

        # Verify that all specified files exist
        missing_files = [f for f in files_to_process if not os.path.isfile(os.path.join(directory, f))]
        if missing_files:
            logging.error(f"The following {blast_type} BLAST files were not found in the directory:")
            print(f"The following {blast_type} BLAST files were not found in the directory:")
            for f in missing_files:
                logging.error(f" - {f}")
                print(f" - {f}")
            print(f"Please ensure all {blast_type} BLAST files are present before running the script.\n")
            continue
        else:
            # Process the specified files
            process_files(directory, files_to_process, blast_type=blast_type)


In [10]:
import os
import pandas as pd
import numpy as np
import re
import time
import logging
from ete3 import NCBITaxa
from Bio import Entrez

# ----------------------------
# Configuration and Setup
# ----------------------------
logging.basicConfig(
    filename='blast_lca_processing.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

Entrez.email = "Ifeanyi.omah@ed.ac.uk"
Entrez.api_key = "d155c4478aa27128073f178361b921d2e407"
ncbi = NCBITaxa()

# Column definitions
NT_COLUMNS = [
    "qseqid", "sseqid", "pident", "length", "mismatch", "gapopen",
    "qstart", "qend", "sstart", "send", "evalue", "bitscore"
]
NR_COLUMNS = [
    "qseqid", "sseqid", "identity", "alignment_length", "mismatches", "gap_opens",
    "qstart", "qend", "sstart", "send", "evalue", "bitscore",
    "taxid", "sci_name", "subject_title"
]

# Helper functions
def parse_blast_chunk(chunk: pd.DataFrame, columns: list, blast_type: str) -> pd.DataFrame:
    if len(columns) != len(chunk.columns):
        logging.error(f"Column mismatch for {blast_type}: expected {len(columns)}, got {len(chunk.columns)}")
        return pd.DataFrame()
    chunk.columns = columns
    chunk['blast_type'] = blast_type
    return chunk


def get_taxid(acc: str, db: str) -> int:
    try:
        handle = Entrez.esummary(id=acc, db=db)
        records = Entrez.read(handle)
        handle.close()
        time.sleep(0.1)
        return int(records[0]['TaxId'])
    except Exception as e:
        logging.warning(f"Failed retrieving taxid for {acc}: {e}")
        return None


def get_lca(taxids: list) -> int or str:
    unique = set([t for t in taxids if t is not None])
    if not unique:
        return 'Unknown'
    if len(unique) == 1:
        return unique.pop()
    try:
        return ncbi.get_common_ancestor(list(unique))
    except Exception as e:
        logging.warning(f"LCA failure for {unique}: {e}")
        return 'Unknown'


def select_taxids_for_lca(df: pd.DataFrame) -> list:
    df = df.copy()
    if 'alignment_length' in df and 'identity' in df:
        df['aligned_bases'] = df['alignment_length'] * df['identity'] / 100
    else:
        df['aligned_bases'] = df['length'] * df['pident'] / 100
    best = df.loc[df['bitscore'].idxmax()]
    mismatches = best.get('mismatches', best.get('mismatch', 0))
    threshold = best['aligned_bases'] - mismatches
    hits = df[df['aligned_bases'] >= threshold]
    return hits.get('taxid', pd.Series(dtype=float)).dropna().unique().tolist()


def calculate_lca(df: pd.DataFrame) -> pd.DataFrame:
    results = []
    for qseqid, group in df.groupby('qseqid'):
        for col in ['alignment_length', 'identity', 'mismatches', 'bitscore', 'length', 'pident']:
            if col in group:
                group[col] = pd.to_numeric(group[col], errors='coerce')
        group = group.dropna(subset=['bitscore'])
        taxids = select_taxids_for_lca(group)
        lca = get_lca(taxids)
        best = group.loc[group['bitscore'].idxmax()]
        results.append({
            'qseqid': qseqid,
            'lca_taxid': lca,
            'best_hit_sciname': best.get('sci_name', best.get('sseqid')),
            'bitscore': best['bitscore'],
            'aligned_bases': best.get('aligned_bases'),
            'blast_type': best['blast_type']
        })
    return pd.DataFrame(results)


def process_files(directory: str, blast_type: str, columns: list, db_name: str):
    files = [f for f in os.listdir(directory) if f.endswith('.m9')]
    total = len(files)
    print(f"Found {total} {blast_type.upper()} files in {directory}")

    for idx, file in enumerate(files, start=1):
        input_path = os.path.join(directory, file)
        output_path = os.path.join(directory, f"LCA_{file}")
        print(f"[{idx}/{total}] Processing {blast_type.upper()}: {file}")
        logging.info(f"Start {blast_type} processing: {file}")

        try:
            reader = pd.read_csv(
                input_path,
                sep='\t',
                header=None,
                comment='#',
                chunksize=500000,
                engine='c',
                on_bad_lines='warn'
            )
        except pd.errors.EmptyDataError:
            print(f"  - Skipped empty file: {file}")
            logging.warning(f"Empty file skipped: {file}")
            continue

        parsed_chunks = []
        for cidx, chunk in enumerate(reader, start=1):
            print(f"    Chunk {cidx}...")
            parsed = parse_blast_chunk(chunk, columns, blast_type)
            if parsed.empty:
                continue
            if blast_type == 'nr' and 'taxid' in parsed.columns:
                parsed['taxid'] = parsed.apply(
                    lambda r: r['taxid'] if pd.notna(r['taxid']) else get_taxid(r['sseqid'], db_name),
                    axis=1
                )
            parsed_chunks.append(parsed)

        if not parsed_chunks:
            print(f"  - No valid data in {file}, skipping output." )
            logging.warning(f"No valid data in {file}")
            continue

        df_all = pd.concat(parsed_chunks, ignore_index=True)
        print(f"  - Calculating LCA on concatenated data ({len(df_all)} rows)")
        lca_df = calculate_lca(df_all)
        lca_df.to_csv(output_path, sep='\t', index=False)
        print(f"  - Saved LCA to {output_path}\n")
        logging.info(f"Saved LCA to {output_path}")


if __name__ == '__main__':
    base_path = '/Volumes/aine_store/Blast_nr'
    process_files(
        os.path.join(base_path, 'virus_NT'),
        blast_type='nt',
        columns=NT_COLUMNS,
        db_name='nuccore'
    )
    process_files(
        os.path.join(base_path, 'Virus_Blast_nr'),
        blast_type='nr',
        columns=NR_COLUMNS,
        db_name='protein'
    )

Found 78 NT files in /Volumes/aine_store/Blast_nr/virus_NT
[1/78] Processing NT: 1_01_23_0574_S41_virus_contigs_blast_nt.m9
    Chunk 1...
  - No valid data in 1_01_23_0574_S41_virus_contigs_blast_nt.m9, skipping output.
[2/78] Processing NT: 1_01_23_0590_S42_virus_contigs_blast_nt.m9
    Chunk 1...
  - No valid data in 1_01_23_0590_S42_virus_contigs_blast_nt.m9, skipping output.
[3/78] Processing NT: 1_01_24_0050_S45_virus_contigs_blast_nt.m9
    Chunk 1...
    Chunk 2...
    Chunk 3...
    Chunk 4...
  - No valid data in 1_01_24_0050_S45_virus_contigs_blast_nt.m9, skipping output.
[4/78] Processing NT: 1_01_24_0188_S38_virus_contigs_blast_nt.m9
    Chunk 1...
  - No valid data in 1_01_24_0188_S38_virus_contigs_blast_nt.m9, skipping output.
[5/78] Processing NT: 1_01_24_0191_S43_virus_contigs_blast_nt.m9
    Chunk 1...
    Chunk 2...
    Chunk 3...
    Chunk 4...
    Chunk 5...
  - No valid data in 1_01_24_0191_S43_virus_contigs_blast_nt.m9, skipping output.
[6/78] Processing NT: 1_01

ParserError: Error tokenizing data. C error: Calling read(nbytes) on source failed. Try engine='python'.

Found 147 NT files in /Volumes/aine_store/Blast_nr/virus_NT
[1/147] NT: 1_01_23_0574_S41_virus_contigs_blast_nt.m9
  - 486302 rows, computing LCA
    [1/247] LCA for k141_10155
    [2/247] LCA for k141_10216
    [3/247] LCA for k141_10476
    [4/247] LCA for k141_10554
    [5/247] LCA for k141_10564
    [6/247] LCA for k141_10614
    [7/247] LCA for k141_1071
    [8/247] LCA for k141_11053
    [9/247] LCA for k141_11085
    [10/247] LCA for k141_11271
    [11/247] LCA for k141_11551
    [12/247] LCA for k141_11559
    [13/247] LCA for k141_1156
    [14/247] LCA for k141_11775
    [15/247] LCA for k141_1184
    [16/247] LCA for k141_11895
    [17/247] LCA for k141_1210
    [18/247] LCA for k141_12150
    [19/247] LCA for k141_12262
    [20/247] LCA for k141_12281
    [21/247] LCA for k141_12439
    [22/247] LCA for k141_12628
    [23/247] LCA for k141_12711
    [24/247] LCA for k141_12990
    [25/247] LCA for k141_13012
    [26/247] LCA for k141_13168
    [27/247] LCA for k141_13198
   

KeyboardInterrupt: 