## Load gff into memory and into a sqlite database

In [None]:
import pprint
from BCBio.GFF import GFFExaminer, parse
import os
from Bio.SeqRecord import SeqRecord
import logging
from gffutils import Feature, FeatureDB, create_db


In [None]:
logger = logging.getLogger(__name__)

In [None]:
# https://daler.github.io/gffutils/database-import.html
# https://useast.ensembl.org/info/website/upload/gff.html
def load_db(file_path: str, dirpath: str, read_map: bool = False, close: bool = True):
  """
    Load the `.gff` file into a `gffutils.FeatureDB` object.

    If `read_map` is `True`, read the map of the `.gff` file.

    If `close` is `True`, close the file handle.
  """
  db_path = os.path.join(dirpath, "genome.db")
  in_handle = open(file_path, "r")
  db = create_db(file_path, dbfn=db_path, force=True, keep_order=True, merge_strategy='merge', sort_attribute_values=True)

  if read_map:
    examiner = GFFExaminer()
    pprint.pprint(examiner.parent_child_map(in_handle))

  if close:
    in_handle.close()
    return db
  else:
    return db, in_handle


def load_gff(list_unpacked: list[str], read_all: bool = False, parse_gff: bool = False):
  """
    Load the `.gff` files into a list of `SeqRecord` objects if `parse_gff` is `True`.

    If `read_all` is `True`, read the map of the `.gff` file.

    Load the `.gff` files into a list of `gffutils.FeatureDB` objects.
  """

  results: list[SeqRecord] = []
  db_list: list[FeatureDB] = []
  for file_path in list_unpacked:
    logger.debug(file_path)

    if not file_path.endswith(".gff"):
      raise Exception("File is not a .gff file")

    parent_dir = os.path.dirname(file_path)

    loaded = load_db(file_path, parent_dir, read_all, False)
    if not isinstance(loaded, tuple):
      raise Exception("db is not of type gffutils.FeatureDB")

    db, in_handle = loaded
    logger.debug("FeatureDB")
    db_list.append(db)

    if parse_gff:
      records = list(parse(in_handle))
      results.extend(records)

    in_handle.close()

  return results, db_list


## Find 16S rRNA genes

In [None]:
def get_16S_gene(annotation_db: FeatureDB | tuple) -> Feature | None:
  if isinstance(annotation_db, tuple):
    raise Exception("db is not of type gffutils.FeatureDB")

  features = list(annotation_db.features_of_type("rRNA"))
  for feature in features:
    if feature.attributes["product"][0] == "16S ribosomal RNA":
      return feature

In [None]:
def get_16S_list():
  records, dbs = load_gff(unpacked_gff, False, False) # type: ignore
  results: list[Feature] = []
  for db in dbs:
    db.analyze()
    found_16S = get_16S_gene(db)
    if db.conn is not None:
      db.conn.close()
    if found_16S is not None:
      results.append(found_16S)
  return results

list_16S = get_16S_list()
