# How much gets filtered when removing rRNA, tRNA, and mtDNA/RNA? 
# Notebook 1: fetching the data
I'm making this notebook as a n intermission from preparing rolypoly external database. So far I've used a combination of NCBI +SILVA rRNAs to remove contaminating rRNA reads from RNA-seq data. Subsequently, the organisms whose rRNAs were most matched, are then also fetched (or their transcriptomes, if available) to remove any remaining reads that may have come from those organisms.  
THis is messy, requires NCBI taxdump, taxonkit, and ncbi-datasets. So I started removing this dependency by using a set of rRNAs for which I can generate a prebuilt table containing the FTP addresses of the hosts' genomes/transcriptomes.  
While doing so, I realised the step above could be split - quick rRNA mapping to get rough taxonomic breakdown of the sample, and then a more thorough removal of rRNA, tRNA, and mtDNA/RNA using a more comprehensive database. The question is then how much will these diffrent combinations filter, how much more time, and would masking the fasta for subsequences shared with RNA viruses change the results significantly?

To test these, below are how I got the data, how I named the sets. For simplicity, the [trrna_fetch.ipynb](trrna_fetch.ipynb) has the code for the actual filtering using these sequence typs, graphs and so on. 

*Note*: Parts of this script are from [build_data.py](../src/rolypoly/commands/misc/build_data.py) and [filter_reads.py](../src/rolypoly/commands/reads/filter_reads.py) scripts in the rolypoly repository.

Loading libraries and defining paths to sets already created/downloaded:

In [2]:
import datetime
import json
import logging
import shutil
import subprocess
import tarfile
from pathlib import Path as pt
import aria2p
import tempfile
import os
import time
import ftplib
from ftplib import FTP
import glob
from collections import defaultdict, deque
import heapq

import polars as pl
import requests
from tqdm.notebook import tqdm
from bbmapy import bbduk, bbmask, kcompress
from rich.console import Console
from rich_click import command, option

from rolypoly.utils.bio.alignments import (
    hmmdb_from_directory,
    mmseqs_profile_db_from_directory,
)
from rolypoly.utils.bio.sequences import (
    filter_fasta_by_headers,
    write_fasta_file,
    remove_duplicates
)

from rolypoly.utils.bio.trees import TaxonomyTree

from rolypoly.utils.bio.polars_fastx import from_fastx_eager

from rolypoly.utils.logging.citation_reminder import remind_citations
from rolypoly.utils.logging.loggit import get_version_info, setup_logging
from rolypoly.utils.various import fetch_and_extract, run_command_comp,extract

### DEBUG ARGS (for manually building, not entering via CLI):
threads = 6
log_file = "notebooks/Exprimental/trrna.log"
data_dir = "/clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data"

global rrna_dir
global contam_dir

logger = setup_logging(log_file)
print(f"Starting data preparation to : {data_dir}")

contam_dir = os.path.join(data_dir, "contam")
os.makedirs(contam_dir, exist_ok=True)

rrna_dir = os.path.join(contam_dir, "rrna")
os.makedirs(rrna_dir, exist_ok=True)

trna_dir = os.path.join(contam_dir, "trna")
os.makedirs(trna_dir, exist_ok=True)

masking_dir = os.path.join(contam_dir, "masking")
os.makedirs(masking_dir, exist_ok=True)

# taxonomy_dir = os.path.join(data_dir, "taxdump")
# os.makedirs(taxonomy_dir, exist_ok=True)

reference_seqs = os.path.join(data_dir, "reference_seqs")
os.makedirs(reference_seqs, exist_ok=True)

mmseqs_ref_dir = os.path.join(reference_seqs, "mmseqs")
os.makedirs(mmseqs_ref_dir, exist_ok=True)

rvmt_dir = os.path.join(reference_seqs, "RVMT")
os.makedirs(rvmt_dir, exist_ok=True)

ncbi_ribovirus_dir = os.path.join(reference_seqs, "ncbi_ribovirus")
os.makedirs(ncbi_ribovirus_dir, exist_ok=True)

# Masking sequences preparation
rvmt_fasta_path = os.path.join(
    data_dir, "reference_seqs", "RVMT", "RVMT_cleaned_contigs.fasta"
)
ncbi_ribovirus_fasta_path = os.path.join(
    data_dir,
    "reference_seqs",
    "ncbi_ribovirus",
    "refseq_ribovirus_genomes.fasta",
)

rna_viruses_entropy_masked_path = os.path.join(
    masking_dir, "combined_entropy_masked.fasta"
)

Starting data preparation to : /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data


## Fetching data (SILVA, NCBI rRNAs, tRNAs, mtDNAs):

In [4]:
silva_release = "138.2"

# Download SILVA rRNA sequences (SSU and LSU)
silva_ssu_path = os.path.join(
    rrna_dir, f"SILVA_{silva_release}_SSURef_NR99_tax_silva.fasta"
)
silva_lsu_path = os.path.join(
    rrna_dir, f"SILVA_{silva_release}_LSURef_NR99_tax_silva.fasta"
)
# skipping downloading (already done)
fetch_and_extract(
    f"https://www.arb-silva.de/fileadmin/silva_databases/release_{silva_release.replace('.', '_')}/Exports/SILVA_{silva_release}_SSURef_NR99_tax_silva.fasta.gz",
    fetched_to=os.path.join(rrna_dir, "tmp_ssu.fasta.gz"),
    extract_to=rrna_dir,
    rename_extracted=silva_ssu_path,
    logger=logger,
)
fetch_and_extract(
    f"https://www.arb-silva.de/fileadmin/silva_databases/release_{silva_release.replace('.', '_')}/Exports/SILVA_{silva_release}_LSURef_NR99_tax_silva.fasta.gz",
    fetched_to=os.path.join(rrna_dir, "tmp_lsu.fasta.gz"),
    extract_to=rrna_dir,
    rename_extracted=silva_lsu_path,
    logger=logger,
)

# Download SILVA taxonomy mappings (maps accessions to NCBI taxids)
silva_ssu_taxmap = pl.read_csv(
    "https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/ncbi/taxmap_embl-ebi_ena_ssu_ref_nr99_138.2.txt.gz",
    truncate_ragged_lines=True,
    separator="\t",
    infer_schema_length=123123,
)
silva_lsu_taxmap = pl.read_csv(
    "https://www.arb-silva.de/fileadmin/silva_databases/current/Exports/taxonomy/ncbi/taxmap_embl-ebi_ena_lsu_ref_nr99_138.2.txt.gz",
    truncate_ragged_lines=True,
    separator="\t",
    infer_schema_length=123123,
)
silva_taxmap = pl.concat([silva_lsu_taxmap, silva_ssu_taxmap])
silva_taxmap.write_parquet(os.path.join(rrna_dir, "silva_taxmap_embl-ebi_ena.parquet"))

# Parse SILVA headers and extract accessions
silva_fasta_df = pl.concat(
    [
        from_fastx_eager(silva_ssu_path).with_columns(
            pl.lit("SSU").alias("rRNA_type")
        ),
        from_fastx_eager(silva_lsu_path).with_columns(
            pl.lit("LSU").alias("rRNA_type")
        ),
    ]
)
silva_fasta_df.write_parquet(os.path.join(rrna_dir, "silva99_fasta.parquet"))
print(f"total SILVA99 sequences {silva_fasta_df.height}")
silva_fasta_df.head(4)


total SILVA99 sequences 605774


header,sequence,rRNA_type
str,str,str
"""AY846379.1.1791 Eukaryota;Arch‚Ä¶","""AACCUGGUUGAUCCUGCCAGUAGUCAUAUG‚Ä¶","""SSU"""
"""AY846382.1.1778 Eukaryota;Arch‚Ä¶","""GUUGAUCCUGCCAGUAGUCAUAUGCUUGUC‚Ä¶","""SSU"""
"""AB000393.1.1510 Bacteria;Pseud‚Ä¶","""UGGCUCAGAUUGAACGCUGGCGGCAGGCCU‚Ä¶","""SSU"""
"""AY909590.1.2352 Eukaryota;Arch‚Ä¶","""GACUAAGCCAUGCAUGUCUAAGUAUAAACG‚Ä¶","""SSU"""


In [5]:
# Extract accession from header (format: >accession.version rest_of_header)
silva_fasta_df = silva_fasta_df.with_columns(
    primaryAccession=pl.col("header").str.extract(
        r"^([A-Za-z0-9_]+)(?:\.\d+)*", 1
    ),  # DQ150555.1.2478 -> DQ150555
    accession=pl.col("header").str.extract(
        r"^([A-Za-z0-9_]+(?:\.\d+)?)", 1
    ),  # AY846379 or DQ150555.1
    taxonomy_raw=pl.col("header").str.replace(r"^\S+\s+", ""),
)
    # silva_fasta_df = silva_fasta_df.with_columns(
    #     pl.col("sequence").str.len_chars().alias("seq_length")
    # )
    # silva_taxmap = silva_taxmap.with_columns(
    #     (pl.col("stop") - pl.col("start")).alias("seq_length")
    # )

silva_df = silva_fasta_df.join(
    silva_taxmap.select(
        ["primaryAccession", "ncbi_taxonid", "submitted_path"]
    ).unique(),  # seq_length
    on=["primaryAccession"],
    how="inner",
)
silva_df.write_parquet(os.path.join(rrna_dir, "silva_rrna_sequences.parquet"))
# silva_df.height
silva_df["ncbi_taxonid"].null_count()

# Load SILVA taxonomy mappings
print(
    f"Merged taxonomy for {silva_df.filter(pl.col('ncbi_taxonid').is_not_null()).height} SILVA sequences"
)

unique_taxids = (
    silva_df.filter(pl.col("ncbi_taxonid").is_not_null())
    .select("ncbi_taxonid")
    .unique()["ncbi_taxonid"]
    .to_list()
)
print(
    f"Total of {len(unique_taxids)} unique NCBI taxids found in SILVA sequences"
)


Merged taxonomy for 605774 SILVA sequences
Total of 105645 unique NCBI taxids found in SILVA sequences


In [3]:
# silva_df = pl.read_parquet(os.path.join(rrna_dir, "silva_rrna_sequences.parquet"))

In [None]:
# Generate FTP download URLs for host genomes/transcriptomes
fetch_and_extract(
    url="https://ftp.ncbi.nlm.nih.gov/genomes/genbank/assembly_summary_genbank.txt",
    fetched_to=os.path.join(rrna_dir, "assembly_summary_genbank.txt.gz"),
    extract=False,
)
print("Loading NCBI GenBank assembly summary")
# genbank_summary = pl.read_csv(os.path.join(rrna_dir, "assembly_summary_genbank.txt.gz",),
# infer_schema_length=100020, separator="\t", skip_rows=1,
# null_values=["na","NA","-"],ignore_errors=True,
# has_header=True)
# polars failed me, so using line by line iterator
from gzip import open as gz_open
with gz_open(
    os.path.join(rrna_dir, "assembly_summary_genbank.txt.gz"), "r"
) as f:
    header = None
    records = []
    i = 0
    for line in f:
        if i == 0:
            i += 1
            continue
        line = line.rstrip(b"\n")
        if i == 1:
            header = line.decode()[1:].strip().split("\t")
            i += 1
            continue
        fields = line.decode().strip().split("\t")
        record = dict(zip(header, fields))
        records.append(record)
genbank_summary = pl.from_records(records).rename({"taxid": "ncbi_taxonid"})
genbank_summary.collect_schema()
    # Schema([('assembly_accession', String),
#         ('bioproject', String),
#         ('biosample', String),
#         ('wgs_master', String),
#         ('refseq_category', String),
#         ('ncbi_taxonid', String),
#         ('species_taxid', String),
#         ('organism_name', String),
#         ('infraspecific_name', String),
#         ('isolate', String),
#         ('version_status', String),
#         ('assembly_level', String),
#         ('release_type', String),
#         ('genome_rep', String),
#         ('seq_rel_date', String),
#         ('asm_name', String),
#         ('asm_submitter', String),
#         ('gbrs_paired_asm', String),
#         ('paired_asm_comp', String),
#         ('ftp_path', String),
#         ('excluded_from_refseq', String),
#         ('relation_to_type_material', String),
#         ('asm_not_live_date', String),
#         ('assembly_type', String),
#         ('group', String),
#         ('genome_size', String),
#         ('genome_size_ungapped', String),
#         ('gc_percent', String),
#         ('replicon_count', String),
#         ('scaffold_count', String),
#         ('contig_count', String),
#         ('annotation_provider', String),
#         ('annotation_name', String),
#         ('annotation_date', String),
#         ('total_gene_count', String),
#         ('protein_coding_gene_count', String),
#         ('non_coding_gene_count', String),
#         ('pubmed_id', String)])

genbank_summary.write_csv(
    os.path.join(rrna_dir, "genbank_assembly_summary.tsv"), separator="\t"
)
genbank_summary = pl.read_csv(
    os.path.join(rrna_dir, "genbank_assembly_summary.tsv"),
    infer_schema_length=100020,
    separator="\t",
    null_values=["na", "NA", "-"],
    ignore_errors=True,
    has_header=True,
)

# In [91]: genbank_summary.collect_schema()
# Out[91]: 
# Schema([('assembly_accession', String),
#         ('bioproject', String),
#         ('biosample', String),
#         ('wgs_master', String),
#         ('refseq_category', String),
#         ('ncbi_taxonid', Int64),
#         ('species_taxid', Int64),
#         ('organism_name', String),
#         ('infraspecific_name', String),
#         ('isolate', String),
#         ('version_status', String),
#         ('assembly_level', String),
#         ('release_type', String),
#         ('genome_rep', String),
#         ('seq_rel_date', String),
#         ('asm_name', String),
#         ('asm_submitter', String),
#         ('gbrs_paired_asm', String),
#         ('paired_asm_comp', String),
#         ('ftp_path', String),
#         ('excluded_from_refseq', String),
#         ('relation_to_type_material', String),
#         ('asm_not_live_date', String),
#         ('assembly_type', String),
#         ('group', String),
#         ('genome_size', Int64),
#         ('genome_size_ungapped', Int64),
#         ('gc_percent', Float64),
#         ('replicon_count', Int64),
#         ('scaffold_count', Int64),
#         ('contig_count', Int64),
#         ('annotation_provider', String),
#         ('annotation_name', String),
#         ('annotation_date', String),
#         ('total_gene_count', Int64),
#         ('protein_coding_gene_count', Int64),
#         ('non_coding_gene_count', Int64),
#         ('pubmed_id', String)])

genbank_summary.write_parquet(
    os.path.join(rrna_dir, "genbank_assembly_summary.parquet")
)
genbank_summary.write_csv(
    os.path.join(rrna_dir, "genbank_assembly_summary.tsv"), separator="\t"
)
print(genbank_summary.head(4))

2025-12-01 14:15:15,584 - rolypoly.utils.logging.loggit - INFO - Downloading https://ftp.ncbi.nlm.nih.gov/genomes/genbank/assembly_summary_genbank.txt to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/rrna/assembly_summary_genbank.txt.gz
2025-12-01 14:15:45,465 - rolypoly.utils.logging.loggit - INFO - Successfully downloaded to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/rrna/assembly_summary_genbank.txt.gz


In [4]:
genbank_summary = pl.read_parquet(
    os.path.join(rrna_dir, "genbank_assembly_summary.parquet")
)
print(genbank_summary.head(4))

shape: (4, 38)
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ assembly_ ‚îÜ bioprojec ‚îÜ biosample ‚îÜ wgs_maste ‚îÜ ‚Ä¶ ‚îÜ total_gen ‚îÜ protein_c ‚îÜ non_codin ‚îÜ pubmed_i ‚îÇ
‚îÇ accession ‚îÜ t         ‚îÜ ---       ‚îÜ r         ‚îÜ   ‚îÜ e_count   ‚îÜ oding_gen ‚îÜ g_gene_co ‚îÜ d        ‚îÇ
‚îÇ ---       ‚îÜ ---       ‚îÜ str       ‚îÜ ---       ‚îÜ   ‚îÜ ---       ‚îÜ e_count   ‚îÜ unt       ‚îÜ ---      ‚îÇ
‚îÇ str       ‚îÜ str       ‚îÜ           ‚îÜ str       ‚îÜ   ‚îÜ i64       ‚îÜ ---       ‚îÜ ---       ‚îÜ str      ‚îÇ
‚îÇ           ‚îÜ           ‚îÜ           ‚îÜ           ‚îÜ   ‚îÜ           ‚îÜ i64       ‚îÜ i64       ‚îÜ          ‚îÇ
‚ïû‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ï

In [5]:
# next, for every unique ncbi_taxonid, we select the one that has the most protein_coding_gene_count, then refseq_category, then tie breaking with non_coding_gene_count, tie breaking by latest assembly (by seq_rel_date).
mini_genebank = genbank_summary.sort(
    by=[
        pl.col("refseq_category").reverse(),
        pl.col("protein_coding_gene_count").cast(pl.Int64).reverse(),
        pl.col("non_coding_gene_count").cast(pl.Int64).reverse(),
        pl.col("seq_rel_date").reverse(),
        pl.col("genome_size").reverse(),
    ]
).unique(subset=["ncbi_taxonid"], keep="first")
print(
    f"Filtered GenBank summary to {mini_genebank.height} unique taxid entries for SILVA sequences"
)

in_silva = mini_genebank.filter(pl.col("ncbi_taxonid").is_in(silva_taxmap["ncbi_taxonid"])).unique()
print(in_silva.height)
# print(temp_genbank.height)
# only 30503 out of ~240k?


Filtered GenBank summary to 242839 unique taxid entries for SILVA sequences
30503


Please use `implode` to return to previous behavior.

See https://github.com/pola-rs/polars/issues/22149 for more information.
  in_silva = mini_genebank.filter(pl.col("ncbi_taxonid").is_in(silva_taxmap["ncbi_taxonid"])).unique()


In [6]:
# next, for every unique entry, we create paths for the expected location of the transcripts/transcriptome fasta, the rna fasta, and the genome.
mini_genebank = mini_genebank.with_columns(
        (pl.col("ftp_path") + "/md5checksums.txt").alias("md5sums_path")
)
# fetching the md5sums files for a subset to test time
test_df = mini_genebank[:10].with_columns(
        pl.col("ftp_path").map_elements(
            lambda path: requests.get(path + "/md5checksums.txt").text,return_dtype=pl.Utf8
        ).alias("md5sums_content")
)
test_df["md5sums_content"].first()
# ~2.2s, too slow, so will do it with aria2 using FTP 


'9a59e7252082bf546b2f84a6212753ac  ./annotation_hashes.txt\nca4b9086529f58d55f3d10c3816aabd2  ./GCA_038318695.1_ASM3831869v1_assembly_report.txt\n397cdca0cf495a861533c2fda3008de9  ./GCA_038318695.1_ASM3831869v1_assembly_stats.txt\nfaad4a79de2e96ac4ba06ed789db6d16  ./GCA_038318695.1_ASM3831869v1_cds_from_genomic.fna.gz\n770f9d368a473e0f00ce627f0ef0db6d  ./GCA_038318695.1_ASM3831869v1_feature_count.txt\n0b259388786231db1f212e9cc1011f9a  ./GCA_038318695.1_ASM3831869v1_feature_table.txt.gz\n03be2534ed6cbf9de002af189cf2459f  ./GCA_038318695.1_ASM3831869v1_genomic.fna.gz\ne410d6ee6a2050ea00c31b78ad12ff1e  ./GCA_038318695.1_ASM3831869v1_genomic.gbff.gz\ne69e781c501af782ae27053603c2e7a2  ./GCA_038318695.1_ASM3831869v1_genomic.gff.gz\ne57ed066b6be493266857fb24fea6058  ./GCA_038318695.1_ASM3831869v1_genomic.gtf.gz\n069d32032455e11ffbf4cc3aa489e1f9  ./GCA_038318695.1_ASM3831869v1_protein.faa.gz\nae3785cd2f95bf874bb47924f542f9e4  ./GCA_038318695.1_ASM3831869v1_protein.gpff.gz\na2e7f7b60c56b39c7890

In [None]:

# Create a temporary directory for downloads
temp_dir = tempfile.mkdtemp()

# Start aria2c daemon (assumes aria2c is running with RPC enabled)
api = aria2p.API(
    aria2p.Client(
        host="http://localhost",
        port=6800,
        secret=""
    )
)
# print("Make sure aria2c is running with: aria2c --enable-rpc --rpc-listen-all=true --rpc-allow-origin-all")

def convert_to_ftp_url(http_ftp_path):
    """Convert NCBI HTTP FTP path to proper FTP URL"""
    if http_ftp_path.startswith("https://ftp.ncbi.nlm.nih.gov/"):
        # Convert https://ftp.ncbi.nlm.nih.gov/path to ftp://ftp.ncbi.nlm.nih.gov/path
        return http_ftp_path.replace("https://", "ftp://")
    elif http_ftp_path.startswith("http://ftp.ncbi.nlm.nih.gov/"):
        # Convert http://ftp.ncbi.nlm.nih.gov/path to ftp://ftp.ncbi.nlm.nih.gov/path
        return http_ftp_path.replace("http://", "ftp://")
    elif http_ftp_path.startswith("ftp://"):
        return http_ftp_path
    else:
        # If it doesn't start with a protocol, assume it's a path that needs ftp:// prefix
        if http_ftp_path.startswith("/"):
            return f"ftp://ftp.ncbi.nlm.nih.gov{http_ftp_path}"
        else:
            return f"ftp://ftp.ncbi.nlm.nih.gov/{http_ftp_path}"

# Create download URLs for md5checksum files with unique filenames using FTP protocol
md5_urls = []
url_to_filename = {}  # Map URLs to their unique filenames
filename_to_ftp_path = {}  # Map filenames back to FTP paths

for i, ftp_path in enumerate(mini_genebank["ftp_path"]):  # [:100] Test with first 100 entries - 3.3s, quick, so running on all
    if ftp_path and ftp_path != "na":
        # Convert to proper FTP URL and append md5checksums.txt
        ftp_url = convert_to_ftp_url(ftp_path)
        md5_url = ftp_url + "/md5checksums.txt"
        
        # Create unique filename using index to avoid conflicts
        unique_filename = f"md5checksums_{i:06d}.txt"
        
        md5_urls.append(md5_url)
        url_to_filename[md5_url] = unique_filename
        filename_to_ftp_path[unique_filename] = ftp_path

print(f"Preparing to download {len(md5_urls)} MD5 checksum files in parallel using FTP protocol")
print(f"Sample FTP URL: {md5_urls[0] if md5_urls else 'No URLs generated'}")

# FTP-optimized aria2 options
ftp_options = {
    "dir": temp_dir,
    "max-connection-per-server": "10",  # FTP servers often limit connections
    "split": "1",  # Don't split small text files
    "ftp-reuse-connection": "true",  # Reuse FTP connections for efficiency
    "ftp-pasv": "true",  # Use passive FTP mode (better for firewalls)
    "timeout": "60",  # 60 second timeout for FTP connections
    "retry-wait": "3",  # Wait 3 seconds between retries
    "max-tries": "3",  # Try up to 3 times per file
    "connect-timeout": "15",  # 30 second connection timeout
    "max-concurrent-downloads": "10",  # Limit total concurrent downloads
}

# Queue all downloads with progress bar
downloads = []
print("Queuing FTP downloads...")
for url in tqdm(md5_urls, desc="Queuing FTP downloads"):
    try:
        unique_filename = url_to_filename[url]
        download_options = ftp_options.copy()
        download_options["out"] = unique_filename  # Specify unique output filename
        download = api.add_uris([url], options=download_options)
        downloads.append((url, download, unique_filename))
    except Exception as e:
        print(f"Failed to queue download for {url}: {e}")

print(f"Queued {len(downloads)} FTP downloads. Waiting for aria2 to complete all downloads...")

Preparing to download 242833 MD5 checksum files in parallel using FTP protocol
Sample FTP URL: ftp://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/018/101/015/GCA_018101015.1_ASM1810101v1/md5checksums.txt
Queuing FTP downloads...


Queuing FTP downloads:   0%|          | 0/242833 [00:00<?, ?it/s]

Queued 242833 FTP downloads. Waiting for aria2 to complete all downloads...


In [7]:
# test_df = pl.scan_csv(temp_dir+"/*.txt",  separator=" ", include_file_paths="file_path", has_header=False,schema={"md5":pl.Utf8,"dropme":pl.Boolean,"file_path_on_ftp":pl.Utf8}
# ).collect()
# test_df = test_df.drop("dropme")
# test_df

NameError: name 'temp_dir' is not defined

In [102]:
# # Wait for all aria2 downloads to complete
# # Check aria2 global stats to see when all downloads are done
# for thing in tqdm(enumerate(downloads), desc="waiting for downloads to complete"):
#     stats = api.get_stats()
#     active = int(stats.num_active)
#     waiting = int(stats.num_waiting)
    
#     if active == 0 and waiting == 0:
#         print("All downloads completed!")
#         break
#     tqdm.update()
#     time.sleep(2)
        

In [178]:
# Now read all the downloaded files
ftp_path_md5_content = {}
completed_count = 0
failed_count = 0
failed_exmp =""
print("Processing downloaded files...")
for url, download, unique_filename in tqdm(downloads, desc="Reading downloaded files"):
    file_path = os.path.join(temp_dir, unique_filename)
    
    if os.path.exists(file_path):
        try:
            with open(file_path, 'r') as f:
                content = f.read()
            
            # Map FTP path to content using our mapping
            ftp_path = filename_to_ftp_path[unique_filename]
            ftp_path_md5_content[ftp_path] = content
            completed_count += 1
        except Exception as e:
            print(f"Error reading downloaded file {file_path}: {e}")
            failed_count += 1
            # print(url)
    else:
        # print(f"Downloaded file not found: {file_path}")
        failed_count += 1
        failed_exmp = url
        # print(url)

# # Clean up temporary files
# try:
#     import shutil
#     shutil.rmtree(temp_dir)
# except Exception as e:
#     print(f"Failed to clean up temp directory: {e}")

print(f"\n‚úÖ Successfully processed {completed_count} MD5 checksum files")
print(f"‚ùå Failed/missing files: {failed_count}")
print(f"‚ùå Failed/missing example url: {failed_exmp}")
print(f"üìä Total attempted: {len(md5_urls)}")
print(f"üìã Sample of first entry: {list(ftp_path_md5_content.values())[0][:200] if ftp_path_md5_content else 'No content'}")
print(f"üîó Mapping verification: {len(ftp_path_md5_content)} entries in final dictionary")

Processing downloaded files...


Reading downloaded files:   0%|          | 0/242833 [00:00<?, ?it/s]


‚úÖ Successfully processed 60579 MD5 checksum files
‚ùå Failed/missing files: 182254
‚ùå Failed/missing example url: ftp://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/038/506/625/GCA_038506625.1_ASM3850662v1/md5checksums.txt
üìä Total attempted: 242833
üìã Sample of first entry: ba7bb432c803cec66a415cf43ae54ef6  ./annotation_hashes.txt
d364bab756fa59130f83f23bface8fc1  ./GCA_018101015.1_ASM1810101v1_assembly_report.txt
f501bb76bb9b9d7bf21d5acf8c373195  ./GCA_018101015.1_ASM18
üîó Mapping verification: 60579 entries in final dictionary


In [None]:
mini_genebank = mini_genebank.with_columns(
        (pl.col("ftp_path") + "/md5checksums.txt").alias("md5sums_path")
)


In [None]:
minimini= mini_genebank[:10]
test_df = [pl.scan_csv(md5sums_path, sep="\t", has_header=False) for md5sums_path in minimini["md5sums_path"].to_list()]

In [34]:
len(ftp_path_md5_content)

99

In [None]:
# Merge SILVA sequences and apply entropy masking
print("Merging and masking SILVA sequences")
silva_merged = os.path.join(rrna_dir, "SILVA_merged.fasta")
silva_masked = os.path.join(rrna_dir, "SILVA_merged_masked.fasta")

# Concatenate SILVA files
run_command_comp(
    base_cmd="cat",
    positional_args=[silva_ssu_path, silva_lsu_path],
    positional_args_location="end",
    params={},
    output_file=silva_merged,
    logger=logger,
)

# Apply entropy masking
bbduk(
    in1=silva_merged,
    out=silva_masked,
    entropy=0.6,
    entropyk=4,
    entropywindow=24,
    maskentropy=True,
    ziplevel=9,
)

print(f"Created masked SILVA rRNA database: {silva_masked}")

# # clean up
# try:
#     os.remove(deduplicated_fasta)
#     os.remove(compressed_path)
# except Exception as e:
#     logger.warning(f"Could not remove intermediate files: {e}")

print(f"Masking sequences prepared in {masking_dir}")

# ##### Create rRNA DB #####
# cd $rolypoly/data/
# mkdir rRNA
# cd rRNA
# gzip SILVA_138.1_SSURef_NR99_tax_silva.fasta.gz
# gzip SILVA_138.1_LSURef_NR99_tax_silva.fasta.gz
# cat *fasta > merged.fas

# bbduk.sh -Xmx1g in=merged.fas out=merged_masked.fa zl=9 entropy=0.6 entropyk=4 entropywindow=24 maskentropy
# bbduk.sh -Xmx1g in=rmdup_rRNA_ncbi.fasta  out=rmdup_rRNA_ncbi_masked.fa zl=9 entropy=0.6 entropyk=4 entropywindow=24 maskentropy


## rRNAs, tRNA, mtDNA, and plastid-DNA from NCBI BLAST DBs

This is a little hacky, but works?

rRNAs

In [6]:
# first, getting the contents of https://ftp.ncbi.nlm.nih.gov/blast/db/ and filtering to files containing "16S", "18S", "28S", "ITS" 
# LSU, SSU

terms = ["16S", "18S", "28S", "ITS", "LSU", "SSU", "ribosomal"]
files_2_download = []
FTP_HOST = "ftp.ncbi.nlm.nih.gov"
FTP_DIR = "/blast/db/"

# List to store the file and directory names
file_list = []

# Connect to the FTP host
with FTP(FTP_HOST) as ftp:
    #  Log in anonymously (default behavior for .login())
    ftp.login()
    print(f"Connected to {FTP_HOST}")
    ftp.cwd(FTP_DIR)
    logger.debug(f"Changed directory to {FTP_DIR}")

    # nlst() returns only names, not detailed information
    file_list = ftp.nlst()
    
    print("\n--- Directory Contents ---")
    for item in file_list:
        if any(term in item for term in terms):
            files_2_download.append(item)
            logger.debug(item)
            
            # Download file directly using FTP
            local_path = os.path.join(rrna_dir, item)
            print(f"Downloading {item}")
            with open(local_path, 'wb') as local_file:
                ftp.retrbinary(f'RETR {item}', local_file.write)
            logger.debug(f"Successfully downloaded {item}")

print(f"Downloaded {len(files_2_download)} files matching terms: {terms}")

Connected to ftp.ncbi.nlm.nih.gov

--- Directory Contents ---
Downloading 18S_fungal_sequences.tar.gz
Downloading 18S_fungal_sequences.tar.gz.md5
Downloading 28S_fungal_sequences.tar.gz
Downloading 28S_fungal_sequences.tar.gz.md5
Downloading ITS_RefSeq_Fungi.tar.gz
Downloading ITS_RefSeq_Fungi.tar.gz.md5
Downloading ITS_eukaryote_sequences.tar.gz
Downloading ITS_eukaryote_sequences.tar.gz.md5
Downloading LSU_eukaryote_rRNA.tar.gz
Downloading LSU_eukaryote_rRNA.tar.gz.md5
Downloading LSU_prokaryote_rRNA.tar.gz
Downloading LSU_prokaryote_rRNA.tar.gz.md5
Downloading SSU_eukaryote_rRNA.tar.gz
Downloading SSU_eukaryote_rRNA.tar.gz.md5
Downloading 16S_ribosomal_RNA.tar.gz
Downloading 16S_ribosomal_RNA.tar.gz.md5
Downloading 16S_ribosomal_RNA-nucl-metadata.json
Downloading 18S_fungal_sequences-nucl-metadata.json
Downloading 28S_fungal_sequences-nucl-metadata.json
Downloading ITS_RefSeq_Fungi-nucl-metadata.json
Downloading ITS_eukaryote_sequences-nucl-metadata.json
Downloading LSU_eukaryote_rR

In [7]:
# for each of these, will run blastdbcmd to extract sequences into fasta 
for item in files_2_download:
    if not item.endswith('.tar.gz'):
        continue
    print(f"Processing {item}...")
    extract(archive_path=os.path.join(rrna_dir, item),
        extract_to=rrna_dir,
    )
    # blastdbcmd -entry all -db LSU_prokaryote_rRNA  -out reference.fasta -outfmt "%T;%t;%s" # taxid;header;sequence
    run_command_comp(
        base_cmd="blastdbcmd",
        positional_args=[
            "-entry",
            "all",
            "-db",
            os.path.join(rrna_dir, item.replace('.tar.gz','')),
            "-out",
            os.path.join(rrna_dir, item.replace('.tar.gz','').replace('.tar','') + '.tab'),
            "-outfmt",
            str(f'"%T@%t@%K@%s"') ,
        ],
        positional_args_location="end",
        params={},
        logger=logger,
        output_file=os.path.join(rrna_dir, item.replace('.tar.gz','').replace('.tar','') + '.tab')
    )


Processing 18S_fungal_sequences.tar.gz...


Processing 28S_fungal_sequences.tar.gz...


Processing ITS_RefSeq_Fungi.tar.gz...


Processing ITS_eukaryote_sequences.tar.gz...


Processing LSU_eukaryote_rRNA.tar.gz...


Processing LSU_prokaryote_rRNA.tar.gz...


Processing SSU_eukaryote_rRNA.tar.gz...


Processing 16S_ribosomal_RNA.tar.gz...


In [8]:
# get all files to remove (everything but the .tab files)
all_files_in_rrna = glob.glob(os.path.join(rrna_dir, "*"))
all_tab_files = glob.glob(os.path.join(rrna_dir, "*.tab"))
all_fasta_files = glob.glob(os.path.join(rrna_dir, "*.fasta"))

# Files to keep (tab files, fasta files, and parquet/tsv files)
files_to_keep = set(all_tab_files + all_fasta_files + 
                   glob.glob(os.path.join(rrna_dir, "*.parquet")) +
                   glob.glob(os.path.join(rrna_dir, "*.tsv")) +
                   glob.glob(os.path.join(rrna_dir, "*.gz")))

# Get all files to remove (exclude directories and files to keep)
files_2_remove = [f for f in all_files_in_rrna if os.path.isfile(f) and f not in files_to_keep]

print(f"Found {len(files_2_remove)} files to remove")
print(f"Keeping {len(files_to_keep)} files (.tab, .fasta, .parquet, .tsv, .gz)")

# Remove the files
for file_path in files_2_remove:
    try:
        os.remove(file_path)
        print(f"Removed: {os.path.basename(file_path)}")
    except Exception as e:
        print(f"Failed to remove {os.path.basename(file_path)}: {e}")

print("Cleanup complete")

Found 99 files to remove
Keeping 23 files (.tab, .fasta, .parquet, .tsv, .gz)
Removed: 16S_ribosomal_RNA-nucl-metadata.json
Removed: 16S_ribosomal_RNA.ndb
Removed: 16S_ribosomal_RNA.nhr
Removed: 16S_ribosomal_RNA.nin
Removed: 16S_ribosomal_RNA.nnd
Removed: 16S_ribosomal_RNA.nni
Removed: 16S_ribosomal_RNA.nog
Removed: 16S_ribosomal_RNA.nos
Removed: 16S_ribosomal_RNA.not
Removed: 16S_ribosomal_RNA.nsq
Removed: 16S_ribosomal_RNA.ntf
Removed: 16S_ribosomal_RNA.nto
Removed: 16S_ribosomal_RNA.tar.gz.md5
Removed: 18S_fungal_sequences-nucl-metadata.json
Removed: 18S_fungal_sequences.ndb
Removed: 18S_fungal_sequences.nhr
Removed: 18S_fungal_sequences.nin
Removed: 18S_fungal_sequences.nnd
Removed: 18S_fungal_sequences.nni
Removed: 18S_fungal_sequences.nog
Removed: 18S_fungal_sequences.nos
Removed: 18S_fungal_sequences.not
Removed: 18S_fungal_sequences.nsq
Removed: 18S_fungal_sequences.ntf
Removed: 18S_fungal_sequences.nto
Removed: 18S_fungal_sequences.tar.gz.md5
Removed: 28S_fungal_sequences-nuc

In [9]:
rrna_df = pl.scan_csv(
    os.path.join(rrna_dir, "*.tab"),
    separator="@",
    has_header=False,
    null_values = ["N/A"],
    new_columns=["taxid", "header","name","sequence"],
    include_file_paths="type"
).collect()
rrna_df = rrna_df.drop("name")
rrna_df = rrna_df.with_columns(
    rRNA_type=pl.col("type").str.extract(r"([^/]+)\.tab$", 1)
)
rrna_df = rrna_df.drop("type")
print(f"Read {rrna_df.height} rRNA sequences from")

Read 159996 rRNA sequences from


In [10]:
rrna_df["rRNA_type"].value_counts(sort = True)

rRNA_type,count
str,u32
"""ITS_eukaryote_sequences""",77582
"""16S_ribosomal_RNA""",27708
"""ITS_RefSeq_Fungi""",19541
"""28S_fungal_sequences""",11754
"""SSU_eukaryote_rRNA""",8784
"""LSU_eukaryote_rRNA""",6575
"""LSU_prokaryote_rRNA""",4047
"""18S_fungal_sequences""",4005


In [11]:
rrna_fasta_path = os.path.join(rrna_dir, "ncbi_rRNA_all_sequences.fasta")
write_fasta_file(
    seqs=rrna_df["sequence"].to_list(),
    headers=rrna_df["header"].to_list(),
    output_file=rrna_fasta_path,
)
print(f"Wrote combined rRNA fasta to {rrna_fasta_path}")

Wrote combined rRNA fasta to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/rrna/ncbi_rRNA_all_sequences.fasta


In [214]:
mini_genebank["group"].value_counts(sort=True)

group,count
str,u32
"""bacteria""",125828
"""viral""",88342
"""fungi""",7870
"""invertebrate""",7069
"""vertebrate_other""",4354
…,…
"""plant""",3385
"""vertebrate_mammalian""",1124
"""protozoa""",1123
"""metagenomes""",133


In [76]:
temp_df = rrna_df.join(
    mini_genebank.select(
        ["ncbi_taxonid", "organism_name"]
    ).unique(),  # seq_length
    left_on=["taxid"],
    right_on=["ncbi_taxonid"],
    how="left",
)
temp_df["organism_name"].value_counts(sort=True)


organism_name,count
str,u32
,117964
"""Giardia duodenalis""",127
"""Globisporangium heterothallicu‚Ä¶",93
"""Babesia bovis""",65
"""Globisporangium sylvaticum""",61
…,…
"""Andreaea rupestris""",1
"""Tetraphis pellucida""",1
"""Anncaliia algerae""",1
"""Dictyocoela muelleri""",1


some 117k nulls, these are the ones that will need to patch to link to cloest related organism that does have genome/transcriptome data on genbank.  
That is done in [rrna_genome_mapping_taxonomy.ipynb](rrna_genome_mapping_taxonomy.ipynb)

## Now tRNAs

In [3]:
file_url = "https://ftp.ebi.ac.uk/pub/databases/Rfam/CURRENT/fasta_files/RF00005.fa.gz"
trna_seqs = os.path.join(trna_dir, "tRNA_sequences.fasta")
gz_filename = "RF00005.fa.gz"
fetch_and_extract(
                url=file_url,
                fetched_to=os.path.join(trna_dir, gz_filename),
                extract_to=trna_dir,
                expected_file=trna_seqs,
            )
print(f"Downloaded tRNA sequences to {trna_seqs}")


2025-12-04 11:45:32,833 - rolypoly.utils.logging.loggit - INFO - Downloading https://ftp.ebi.ac.uk/pub/databases/Rfam/CURRENT/fasta_files/RF00005.fa.gz to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/trna/RF00005.fa.gz
2025-12-04 11:46:10,628 - rolypoly.utils.logging.loggit - INFO - Successfully downloaded to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/trna/RF00005.fa.gz
2025-12-04 11:46:10,632 - rolypoly.utils.logging.loggit - INFO - Extracting /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/trna/RF00005.fa.gz to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/trna
2025-12-04 11:46:14,336 - rolypoly.utils.logging.loggit - INFO - Successfully extracted to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/trna/tRNA_sequences.fasta


Downloaded tRNA sequences to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/trna/tRNA_sequences.fasta


In [None]:
# remove duplicates
deduplicated_fasta = os.path.join(trna_dir, "tRNA_sequences_deduplicated.fasta")
remove_duplicates(
    input_file=trna_seqs,
    output_file=deduplicated_fasta,
    return_stats=True,
    by = "seq"
)

2025-12-04 11:46:25,181 - rolypoly.utils.logging.loggit - INFO - Processed 5335982 records: 3028115 unique, 2307867 duplicates removed


{'total_records': 5335982,
 'unique_records': 3028115,
 'duplicates_removed': 2307867}

In [16]:
from rolypoly.utils.bio.polars_fastx import compute_aggregate_stats,fasta_stats
info_table = fasta_stats(deduplicated_fasta)
# print(info_table)

some_stats = compute_aggregate_stats(info_table,fields=["length", "gc_content", "n_content"])
some_stats

{'min_length': 33,
 'max_length': 271,
 'mean_length': 72.89565752951918,
 'median_length': 72.0,
 'std_length': 8.179689610946584,
 'total_length': 220736434,
 'min_gc': 0.08823529411764706,
 'max_gc': 0.813953488372093,
 'mean_gc': 0.5516366297753061,
 'median_gc': 0.5535714285714286,
 'std_gc': 0.057055637448841706,
 'total_sequences': 3028115}

I don't trust this (min len 30, min gc 8% ), so will do some light filteration.

In [17]:
info_table = info_table.filter(
    pl.col("length").is_between(50,250),
    pl.col("gc_content") >= 0.01,
)
some_stats_filtered = compute_aggregate_stats(info_table,fields=["length", "gc_content", "n_content"])
some_stats_filtered
# info_table.write_parquet(os.path.join(trna_dir, "tRNA_sequences_deduplicated_stats.parquet"))

{'min_length': 50,
 'max_length': 248,
 'mean_length': 72.91280093980956,
 'median_length': 72.0,
 'std_length': 8.152576143845652,
 'total_length': 220644708,
 'min_gc': 0.08823529411764706,
 'max_gc': 0.813953488372093,
 'mean_gc': 0.5516374569020241,
 'median_gc': 0.5535714285714286,
 'std_gc': 0.05704648222966359,
 'total_sequences': 3026145}

looks a little better, will just accept that very low or very high GC content tRNAs exist.

In [18]:
write_fasta_file(
    seqs=info_table["sequence"].to_list(),
    headers=info_table["header"].to_list(),
    output_file=os.path.join(trna_dir, "tRNA_sequences_deduplicated_filtered.fasta"),
)
print(f"Wrote filtered tRNA sequences to {os.path.join(trna_dir, 'tRNA_sequences_deduplicated_filtered.fasta')}")


Wrote filtered tRNA sequences to /clusterfs/jgi/scratch/science/metagen/neri/code/rolypoly/data/contam/trna/tRNA_sequences_deduplicated_filtered.fasta


## plastid and mitochondria genomes

In [None]:
from rolypoly.commands.misc.build_data import prepare_plastid_data, prepare_mito_data
prepare_plastid_data(data_dir, logger)
prepare_mito_data(data_dir, logger)

## gene2accession (NCBI)

In [None]:
# fetch_and_extract( url="http://ftp.ncbi.nlm.nih.gov/gene/DATA/gene2accession.gz",
#     fetched_to=os.path.join(rrna_dir, "gene2accession.gz"),
#     extract=False,
# )

silva_df = silva_df.with_columns(
    ncbi_taxonid=pl.col("ncbi_taxonid").cast(pl.String)
)

silva_df1 = silva_df.join(
    genbank_summary.select(["ncbi_taxonid", "ftp_path"]),
    on=["ncbi_taxonid"],
    how="left",
)
silva_df1

silva_df = silva_df.with_columns(
    genome_ftp_url=pl.when(pl.col("ncbi_taxonid").is_not_null())
    .then(
        pl.format(
            "https://ftp.ncbi.nlm.nih.gov/genomes/all/refseq/taxid_{}/",
            pl.col("ncbi_taxonid"),
        )
    )
    .otherwise(None),
    datasets_api_url=pl.when(pl.col("ncbi_taxonid").is_not_null())
    .then(
        pl.format(
            "https://api.ncbi.nlm.nih.gov/datasets/v2alpha/genome/taxon/{}/download?include_annotation_type=GENOME_FASTA,RNA_FASTA",
            pl.col("ncbi_taxonid"),
        )
    )
    .otherwise(None),
)

# Save metadata table
metadata_output = os.path.join(rrna_dir, "rrna_metadata.tsv")
silva_df.write_csv(metadata_output, separator="\t")
print(
    f"Saved rRNA metadata table with {len(silva_df)} entries to {metadata_output}"
)

gene2accession = pl.read_csv(
    os.path.join(rrna_dir, "gene2accession.gz"),
    separator="\t",
    # skip_rows=1,
    # infer_schema_length=100020,
    null_values=["na", "NA", "-"],
    ignore_errors=True,
    has_header=True,
    # n_rows=100
)
gene2accession.write_parquet(os.path.join(rrna_dir, "gene2accession.parquet"))
# gene2accession = pl.read_parquet(os.path.join(rrna_dir, "gene2accession.parquet"))
# gene2accession.collect_schema()
# Schema([('#tax_id', Int64),
#     ('GeneID', Int64),
#     ('status', String),
#     ('RNA_nucleotide_accession.version', String),
#     ('RNA_nucleotide_gi', String),
#     ('protein_accession.version', String),
#     ('protein_gi', Int64),
#     ('genomic_nucleotide_accession.version', String),
#     ('genomic_nucleotide_gi', Int64),
#     ('start_position_on_the_genomic_accession', Int64),
#     ('end_position_on_the_genomic_accession', Int64),
#     ('orientation', String),
#     ('assembly', String),
#     ('mature_peptide_accession.version', String),
#     ('mature_peptide_gi', String),
#     ('Symbol', String)])
gene2accession = gene2accession.rename({"#tax_id": "ncbi_taxonid"})
test_df = gene2accession.filter(pl.col("ncbi_taxonid").is_in(unique_taxids))
test_df.height # 148449745
test_df2 = gene2accession.select(["ncbi_taxonid","assembly"]).unique()
test_df2.height # 52548
