In [1]:
# Standard Python library
import os
import shutil
import csv
from collections import Counter

# Third-party libraries
from dotenv import load_dotenv

# BioPython - NCBI Data Processing and API
from Bio import SeqIO
from Bio.SeqFeature import CompoundLocation
from Bio import Entrez


In [2]:
# # Load email and Entrez API Key from .env file for better use of Entrez API for taxonomy fetching
load_dotenv()

# Set Entrez API credentials
Entrez.email = os.getenv("NCBI_EMAIL", None)
Entrez.api_key = os.getenv("NCBI_API_KEY", None)


In [None]:
# # Iterate through the nested structure given by bulk-downloading RefSeq/GenBank
# # .gbff files and rename and un-nest each file to the main directory of the dataset.
# # This assumes that downloaded directories are uncompressed and are placed in \data

for folderLvl1 in os.listdir(".\\data"):
    pathLvl1 = os.path.join(".\\data", folderLvl1)
    if os.path.isdir(pathLvl1):
        destination = pathLvl1
        # print(pathLvl1)
        for folderLvl2 in os.listdir(pathLvl1):
            pathLvl2 = os.path.join(pathLvl1, folderLvl2)
            if os.path.isdir(pathLvl2):
                # print(pathLvl2)
                for folderLvl3 in os.listdir(pathLvl2):
                    pathLvl3 = os.path.join(pathLvl2, folderLvl3)
                    if os.path.isdir(pathLvl3):
                        # print(pathLvl3)
                        for i, folderLvl4 in enumerate(os.listdir(pathLvl3)):
                            pathLvl4 = os.path.join(pathLvl3, folderLvl4)
                            if os.path.isdir(pathLvl4):
                                print(pathLvl4)
                                for item in os.listdir(pathLvl4):
                                    fullPath = os.path.join(pathLvl4, item)
                                    # print(i, fullPath)
                                    try:
                                        newPath = os.path.join(destination, "genome" + str(i - 3) + ".gbff")
                                        shutil.move(fullPath, newPath)
                                        print(f"File '{fullPath}' successfully renamed and moved to '{newPath}'.")
                                    except FileNotFoundError:
                                        print(f"Error: File '{fullPath}' not found.")
                                    except FileExistsError:
                                        print(f"Error: File '{newPath}' already exists.")
                                    except Exception as e:
                                        print(f"An error occurred: {e}")
                                shutil.rmtree(pathLvl4)
                        shutil.rmtree(pathLvl3)
                shutil.rmtree(pathLvl2)


.\data\Actinomyces\ncbi_dataset
.\data\Actinomyces\ncbi_dataset\data\GCF_001278845.1
File '.\data\Actinomyces\ncbi_dataset\data\GCF_001278845.1\genomic.gbff' successfully renamed and moved to '.\data\Actinomyces\genome0.gbff'.
.\data\Actinomyces\ncbi_dataset\data\GCF_001553565.1
File '.\data\Actinomyces\ncbi_dataset\data\GCF_001553565.1\genomic.gbff' successfully renamed and moved to '.\data\Actinomyces\genome1.gbff'.
.\data\Actinomyces\ncbi_dataset\data\GCF_001553935.1
File '.\data\Actinomyces\ncbi_dataset\data\GCF_001553935.1\genomic.gbff' successfully renamed and moved to '.\data\Actinomyces\genome2.gbff'.
.\data\Actinomyces\ncbi_dataset\data\GCF_002076915.1
File '.\data\Actinomyces\ncbi_dataset\data\GCF_002076915.1\genomic.gbff' successfully renamed and moved to '.\data\Actinomyces\genome3.gbff'.
.\data\Actinomyces\ncbi_dataset\data\GCF_002356195.1
File '.\data\Actinomyces\ncbi_dataset\data\GCF_002356195.1\genomic.gbff' successfully renamed and moved to '.\data\Actinomyces\genome4.

In [3]:
def getTaxonomyRanks(taxonID):
    '''
        Uses taxonID (int) as a query id for NCBI's Entrez API to query the Taxonomy database.
        Processes the returned lineage structure into a dictionary containing the full taxonomy
        of that organism's ID
        Returns a dictionary containing each taxonomic level, or an empty dictionary if ID doesn't
        match any organism
    '''
    handle = Entrez.efetch(db="taxonomy", id=taxonID, retmode="xml")
    records = Entrez.read(handle)
    if records:
        lineage = records[0]["LineageEx"]
        ranks = {entry["Rank"]: entry["ScientificName"] for entry in lineage}
        ranks["species"] = records[0]["ScientificName"]
        return {
            "Domain": ranks.get("domain", None),
            "Kingdom": ranks.get("kingdom", None),
            "Phylum": ranks.get("phylum", None),
            "Class": ranks.get("class", None),
            "Order": ranks.get("order", None),
            "Family": ranks.get("family", None),
            "Genus": ranks.get("genus", None),
            "Species": ranks.get("species", None)
        }
    return {}


In [None]:
# # Process GBFF files for processing into CDS file

# Examples:
# Gene types:
# ['source', 'gene', 'CDS', 'regulatory', 'protein_bind', 'rRNA', 'tRNA', 'ncRNA', 'mobile_element', 'tmRNA', 'misc_RNA']

# Qualifiers:
# {
#     'organism': ['Salmonella enterica subsp. enterica serovar Typhimurium str. LT2'], 
#     'mol_type': ['genomic DNA'], 
#     'strain': ['LT2'], 
#     'serovar': ['Typhimurium'], 
#     'sub_species': ['enterica'], 
#     'culture_collection': ['ATCC:700720', 'SGSC:1412'], 
#     'type_material': ['type strain of Salmonella enterica'], 
#     'db_xref': ['taxon:99287'], 
#     'focus': [''],
#     'plasmid': ['pSLT']
# }

dataDirectory = ".\\data"
data = []

for genusFolder in os.listdir(dataDirectory):
    subPath = os.path.join(dataDirectory, genusFolder)
    print(genusFolder)
    for i, filename in enumerate(os.listdir(subPath)):
        if filename.endswith(".gbff"):
            fullPath = os.path.join(subPath, filename)
            print(f"processing File {i}: {fullPath}")
            # Read the file and parse it as a GenBank record
            with open(fullPath, "r") as input:
                for record in SeqIO.parse(input, "genbank"):
                    accessionNum = record.id
                    genome = record.seq
                    genomeLength = len(genome)
                    description = record.description
                    genomeType = "plasmid" if "plasmid" in description.lower() else "chromo"
                    organism = None
                    subspecies = None
                    moleculeType = None
                    strain = None
                    serovar = None
                    taxonID = None
                    host = None

                    # print(f"Accession: {record.id}")
                    # print(f"Genome Length: {genomeLength} bp")
                    # print(f"Type of genome: {genomeType}")
                    # print(f"Description: {description}")
                                            
                    sourceRegions = None
                    genes = []
                    for gene in record.features:
                        geneType = gene.type
                        geneName = None
                        locusTag = None
                        geneStart = gene.location.start
                        geneEnd = gene.location.end
                        strand = "+" if gene.location.strand == 1 else "-"
                        genomeSequence = genome[geneStart:geneEnd]
                        GCContent = (genomeSequence.count("G") + genomeSequence.count("C")) / len(genomeSequence) if len(genomeSequence) > 0 else 0

                        proteinProduct = None
                        aminoAcidSeq = None
                        regClass = None
                        boundMoiety = None
                        RNAProduct = None
                        ncRNAClass = None
                        elementType = None

                        # Store all `source` regions
                        if geneType == "source":
                            organism = gene.qualifiers.get("organism", [None])[0]
                            subspecies = gene.qualifiers.get("sub_species", [None])[0]
                            moleculeType = gene.qualifiers.get("mol_type", [None])[0]
                            strain = gene.qualifiers.get("strain", [None])[0]
                            serovar = gene.qualifiers.get("serovar", [None])[0]
                            host = gene.qualifiers.get("host", [None])[0]
                            taxonID = gene.qualifiers.get("db_xref", [None])[0]
                            continue
                        
                        if geneType == "mobile_element":
                            elementType = gene.qualifiers.get("mobile_element_type", [None])[0]
                        elif geneType == "protein_bind":
                            boundMoiety = gene.qualifiers.get("bound_moiety", [None])[0]
                        else:
                            geneName = gene.qualifiers.get("gene", [None])[0]
                            locusTag = gene.qualifiers.get("locus_tag", [None])[0]
                            # Extract protein-coding information
                            if geneType == "CDS":
                                proteinProduct = gene.qualifiers.get("product", [None])[0]
                                aminoAcidSeq = gene.qualifiers.get("translation", [None])[0]
                            elif geneType == "regulatory":
                                regClass = gene.qualifiers.get("regulatory_class", [None])[0]
                            elif geneType == "ncRNA":
                                ncRNAClass = gene.qualifiers.get("ncRNA_class", [None])[0]
                            elif geneType != "gene":
                                RNAProduct = gene.qualifiers.get("product", [None])[0]

                        previousGeneEnd = None
                        for prevGene in reversed(genes):  # Search backward
                            if prevGene["Strand"] == strand:  # Only consider same-strand genes from same source
                                previousGeneEnd = prevGene["End"]
                                break
                        intergenicDistance = geneStart - previousGeneEnd if previousGeneEnd is not None else None
                        # print(f"Gene: {geneName}, Location: {geneStart}-{geneEnd}, Strand: {strand}")

                        # Store extracted features
                        genes.append({
                            "GeneType": geneType,
                            "GeneName": geneName,
                            "LocusTag": locusTag,
                            "Start": geneStart,
                            "End": geneEnd,
                            "Strand": strand,
                            "GeneLength": len(genomeSequence),
                            "GCContent": GCContent,
                            "ElementType": elementType,
                            "BoundMoiety": boundMoiety,
                            "ProteinProduct": proteinProduct,
                            "AminoAcidSeq": aminoAcidSeq,
                            "RegulatoryClass": regClass,
                            "ncRNAClass": ncRNAClass,
                            "RNAProduct": RNAProduct,
                            "IntergenicDistance": intergenicDistance,
                        })
                    if taxonID:
                        taxonomy = getTaxonomyRanks(taxonID)
                        
                        domain = taxonomy.get("Domain", None)
                        kingdom = taxonomy.get("Kingdom", None)
                        phylum = taxonomy.get("Phylum", None)
                        className = taxonomy.get("Class", None)
                        order = taxonomy.get("Order", None)
                        family = taxonomy.get("Family", None)
                        genus = taxonomy.get("Genus", None)
                        species = taxonomy.get("Species", None)

                    data.append({
                        "Accession": accessionNum,
                        "Organism": organism,
                        "Subspecies": subspecies,
                        "MoleculeType": moleculeType,
                        "Strain": strain,
                        "Serovar": serovar,
                        "Domain": domain,
                        "Kingdom": kingdom,
                        "Phylum": phylum,
                        "Class": className,
                        "Order": order,
                        "Family": family,
                        "Genus": genus,
                        "Species": species,
                        "GenomeLength": genomeLength,
                        "Sequence": genome,
                        "Description": description,
                        "GenomeType": genomeType,
                        "Host": host,
                        "Genes": genes
                    })
    print(f"Total number of sequences: {len(data)}")
print(f"Final total number of sequences: {len(data)}")


Actinomyces
processing File 0: .\data\Actinomyces\genome0.gbff
processing File 1: .\data\Actinomyces\genome1.gbff
processing File 2: .\data\Actinomyces\genome10.gbff
processing File 3: .\data\Actinomyces\genome11.gbff
processing File 4: .\data\Actinomyces\genome12.gbff
processing File 5: .\data\Actinomyces\genome13.gbff
processing File 6: .\data\Actinomyces\genome14.gbff
processing File 7: .\data\Actinomyces\genome15.gbff
processing File 8: .\data\Actinomyces\genome16.gbff
processing File 9: .\data\Actinomyces\genome17.gbff
processing File 10: .\data\Actinomyces\genome18.gbff
processing File 11: .\data\Actinomyces\genome19.gbff
processing File 12: .\data\Actinomyces\genome2.gbff
processing File 13: .\data\Actinomyces\genome20.gbff
processing File 14: .\data\Actinomyces\genome21.gbff
processing File 15: .\data\Actinomyces\genome22.gbff
processing File 16: .\data\Actinomyces\genome23.gbff
processing File 17: .\data\Actinomyces\genome24.gbff
processing File 18: .\data\Actinomyces\genome25

In [6]:
# # Convert data dictionary into two separate CSV files: One for genome data, one for gene data;
# # Accession Number will be foreign key linking the two datasets together


# Genomes:
# "Accession": accessionNum,
# "Organism": organism,
# "Subspecies": subspecies,
# "MoleculeType": moleculeType,
# "Strain": strain,
# "Serovar": serovar,
# "Domain": domain,
# "Kingdom": kingdom,
# "Phylum": phylum,
# "Class": className,
# "Order": order,
# "Family": family,
# "Genus": genus,
# "Species": species,
# "GenomeLength": genomeLength,
# "Sequence": genome,
# "Description": description,
# "GenomeType": genomeType,
# "Host": host,
# "Genes": genes

# Set up CSV columns for genome dataset
genomeHeader = [
    "Accession", "Organism", "Strain", "Serovar", "MoleculeType",
    "GenomeType", "Host", "Domain", "Kingdom", "Phylum", "Class", "Order", 
    "Family", "Genus", "Species", "GenomeLength", "Description",
    "NumberOfGenes", "NumberOfCDSGenes", "NumberOfRepeatRegions", 
    "NumberOfRegulatoryGenes", "NumberOfrRNAGenes", "NumberOftRNAGenes", 
    "NumberOfncRNAGenes", "NumberOfMobileElementGenes", "NumberOftmRNAGenes", 
    "NumberOfMiscRNAGenes", "NumberOfGenericGeneLabels", "NumberOfOtherGeneTypes", 
    "NumberOfMiscBindingGenes", "NumberOfMiscRNAGenes"
]

# Genes:
# "GeneType": geneType,
# "GeneName": geneName,
# "LocusTag": locusTag,
# "Start": geneStart,
# "End": geneEnd,
# "Strand": strand,
# "GeneLength": len(labelsForGenePresence),
# "GCContent": GCContent,
# "ElementType": elementType,
# "BoundMoiety": boundMoiety,
# "ProteinProduct": proteinProduct,
# "AminoAcidSeq": aminoAcidSeq,
# "RegulatoryClass": regClass,
# "ncRNAClass": ncRNAClass,
# "RNAProduct": RNAProduct,
# "IntergenicDistance": intergenicDistance,

# Set up CSV columns for gene dataset
geneHeader = [
    "Accession", "GeneType", "GeneName", "LocusTag", "StartBasePair", 
    "EndBasePair", "GeneLength", "Strand", "GCContent", "IntergenicDistance", 
    "ElementType", "ProteinProduct", "AminoAcidSeq", "RegulatoryClass", 
    "ncRNAClass", "RNAProduct"
]

genomeCSVPath = "processedData/genomeData.csv"
geneCSVPath = "processedData/geneData.csv"
genomeFileExists = os.path.exists(genomeCSVPath)

with open(genomeCSVPath, "a", newline="", encoding="utf-8") as genomeCSV:
    genomeWriter = csv.DictWriter(genomeCSV, fieldnames=genomeHeader)
    if not genomeFileExists:
        genomeWriter.writeheader()
    
    for genome in data:
        genomeRow = {
            "Accession": genome.get("Accession"),
            "Organism": genome.get("Organism"),
            "Strain": genome.get("Strain"),
            "Serovar": genome.get("Serovar"),
            "MoleculeType": genome.get("MoleculeType"),
            "GenomeType": genome.get("GenomeType"),
            "Host": genome.get("Host"),
            "Domain": genome.get("Domain"),
            "Kingdom": genome.get("Kingdom"),
            "Phylum": genome.get("Phylum"),
            "Class": genome.get("Class"),
            "Order": genome.get("Order"),
            "Family": genome.get("Family"),
            "Genus": genome.get("Genus"),
            "Species": genome.get("Species"),
            "GenomeLength": genome.get("GenomeLength"),
            "Description": genome.get("Description"),
            "NumberOfGenes": len(genome["Genes"]) - 1, # -1 for source gene label

            # Get gene counts iteratively via Counter
            "NumberOfCDSGenes": 0,
            "NumberOfRepeatRegions": 0,
            "NumberOfRegulatoryGenes": 0,
            "NumberOfrRNAGenes": 0,
            "NumberOftRNAGenes": 0,
            "NumberOfncRNAGenes": 0,
            "NumberOfMobileElementGenes": 0,
            "NumberOftmRNAGenes": 0,
            "NumberOfMiscRNAGenes": 0,
            "NumberOfGenericGeneLabels": 0,
            "NumberOfOtherGeneTypes": 0,
            "NumberOfMiscBindingGenes": 0,
            "NumberOfMiscRNAGenes": 0
        }

        geneFileExists = os.path.exists(geneCSVPath)
        with open(geneCSVPath, "a", newline="", encoding="utf-8") as geneCSV:
            geneWriter = csv.DictWriter(geneCSV, fieldnames=geneHeader)
            if not geneFileExists:
                geneWriter.writeheader()

            geneCounter = Counter() # Counts each gene type
                
            for gene in genome["Genes"]:
                geneCounter[gene["GeneType"]] += 1
                geneRow = {
                    "Accession": genome.get("Accession"),
                    "GeneType": gene.get("GeneType"),
                    "GeneName": gene.get("GeneName"),
                    "LocusTag": gene.get("LocusTag"),
                    "StartBasePair": gene.get("Start"),
                    "EndBasePair": gene.get("End"),
                    "GeneLength": gene.get("GeneLength"),
                    "Strand": gene.get("Strand"),
                    "GCContent": gene.get("GCContent"),
                    "IntergenicDistance": gene.get("IntergenicDistance"),
                    "ElementType": gene.get("ElementType"),
                    "ProteinProduct": gene.get("ProteinProduct"),
                    "AminoAcidSeq": gene.get("AminoAcidSeq"),
                    "RegulatoryClass": gene.get("RegulatoryClass"),
                    "ncRNAClass": gene.get("ncRNAClass"),
                    "RNAProduct": gene.get("RNAProduct"),
                }
                geneWriter.writerow(geneRow)
                
            # Dynamically set each gene count for genomeRow
            geneTypeToColumn = {
                "CDS": "NumberOfCDSGenes",
                "repeat_region": "NumberOfRepeatRegions",
                "regulatory": "NumberOfRegulatoryGenes",
                "rRNA": "NumberOfrRNAGenes",
                "tRNA": "NumberOftRNAGenes",
                "ncRNA": "NumberOfncRNAGenes",
                "mobile_element": "NumberOfMobileElementGenes",
                "tmRNA": "NumberOftmRNAGenes",
                "misc_RNA": "NumberOfMiscRNAGenes",
                "gene": "NumberOfGenericGeneLabels",
                "misc_feature": "NumberOfOtherGeneTypes",
                "misc_binding": "NumberOfMiscBindingGenes"
            }
            for geneType, columnName in geneTypeToColumn.items():
                genomeRow[columnName] = int(geneCounter.get(geneType, 0))
        genomeWriter.writerow(genomeRow)
