In [18]:

import sqlite3
import multiprocessing as mp
from abc import ABC, abstractmethod
from typing import List, Tuple, Dict, Any, Optional
import logging
from concurrent.futures import ProcessPoolExecutor
import time
import os

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class DatabaseConnector(ABC):
    """
    Abstract base class for database connections.
    Allows for different database implementations.
    """
    @abstractmethod
    def connect(self):
        """Establish a connection to the database"""
        pass

    @abstractmethod
    def close(self):
        """Close the database connection"""
        pass

    @abstractmethod
    def get_total_specimens(self) -> int:
        """Return total number of specimens in the database"""
        pass

    @abstractmethod
    def get_specimens_batch(self, batch_size: int, offset: int) -> List[Tuple[int, str]]:
        """Get a batch of specimens (id, sequence)"""
        pass

    @abstractmethod
    def get_specimens_with_primer_match(self, primer: str, batch_size: int, offset: int) -> List[Tuple[int, str, Optional[int]]]:
        """Get specimens with their existing primer matches"""
        pass

    @abstractmethod
    def insert_primer_matches(self, matches: List[Tuple[int, str, int]]):
        """Bulk insert new primer matches"""
        pass

    @abstractmethod
    def create_primer_matches_table(self):
        """Create the primer_matches table if it doesn't exist"""
        pass


class SQLiteConnector(DatabaseConnector):
    """
    SQLite implementation of the DatabaseConnector
    """
    def __init__(self, db_path: str, **kwargs):
        self.db_path = db_path
        self.conn = None
        self.cursor = None

    def __enter__(self):
        """Called when entering a 'with' block"""
        self.conn = sqlite3.connect(self.db_path)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Called when exiting a 'with' block"""
        if hasattr(self, 'connection') and self.connection:
            self.conn.close()

    def connect(self):
        """Establish a connection to the SQLite database"""

        self.conn = sqlite3.connect(self.db_path)
        self.cursor = self.conn.cursor()
        return self

    def close(self):
        """Close the SQLite connection"""
        if self.conn:
            self.conn.close()
            self.conn = None
            self.cursor = None

    def get_total_specimens(self) -> int:
        """Return total number of specimens in the database"""
        self.cursor.execute("SELECT COUNT(*) FROM specimen")
        return self.cursor.fetchone()[0]

    def get_specimens_batch(self, batch_size: int, offset: int) -> List[Tuple[int, str]]:
        """Get a batch of specimens (id, sequence)"""
        self.cursor.execute("""
            SELECT specimenid, sequence
            FROM specimen
            ORDER BY specimenid
            LIMIT ? OFFSET ?
        """, (batch_size, offset))
        return self.cursor.fetchall()

    def get_specimens_with_primer_match(self, primer: str, batch_size: int, offset: int) -> List[Tuple[int, str, Optional[int]]]:
        """Get specimens with their existing primer matches"""
        self.cursor.execute("""
            SELECT s.specimenid, s.nuc_san, pm.primer_start_index
            FROM specimen s
            LEFT JOIN primer_matches pm ON s.specimenid = pm.specimenid AND pm.primer_sequence = ?
            ORDER BY s.specimenid
            LIMIT ? OFFSET ?
        """, (primer, batch_size, offset))
        return self.cursor.fetchall()

    def insert_primer_matches(self, matches: List[Tuple[int, str, int]]):
        """Bulk insert new primer matches"""
        if matches:
            self.cursor.executemany("""
                INSERT INTO primer_matches (specimenid, primer_sequence, primer_start_index)
                VALUES (?, ?, ?)
            """, matches)
            self.conn.commit()

    def create_primer_matches_table(self):
        """Create the primer_matches table if it doesn't exist"""
        self.cursor.execute("""
        DROP TABLE primer_matches
        """)
        self.cursor.execute("""
        CREATE TABLE IF NOT EXISTS primer_matches (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            specimenid INTEGER NOT NULL,
            primer_sequence TEXT NOT NULL,
            primer_start_index INTEGER NOT NULL,
            FOREIGN KEY (specimenid) REFERENCES specimen(specimenid),
            UNIQUE (specimenid, primer_sequence)
        )
        """)

        self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_primer_matches ON primer_matches (specimenid, primer_sequence)")
        self.conn.commit()


def find_primer_match(sequence: str, primer: str) -> int:
    """
    Find the position of primer in sequence.
    Returns position (0-based) or -1 if not found.

    This could be enhanced with more sophisticated matching algorithms
    like Smith-Waterman or PCR-specific algorithms.
    """
    return sequence.find(primer)


def process_batch(args: Tuple) -> List[Tuple[int, str, int]]:
    """
    Process a batch of specimens for a specific primer.
    This function runs in a separate process.

    Args:
        args: Tuple containing (db_path, primer, batch_data)

    Returns:
        List of tuples (specimenid, primer_sequence, match_position)
    """
    db_path, primer, batch_data = args

    results = []
    for specimenid, sequence, match_position in batch_data:
        if match_position is None:  # No match exists yet
            new_match_position = find_primer_match(sequence, primer)
            results.append((specimenid, primer, new_match_position))

    return results


class PrimerMatchProcessor:
    """
    Main class for processing primer matches using multiprocessing
    """
    def __init__(self, db_connector: DatabaseConnector, primers: List[str], batch_size: int = 1000,
                 max_workers: int = None):
        """
        Initialize the processor

        Args:
            db_connector: Database connector instance
            primers: List of primer sequences to search for
            batch_size: Size of batches to process
            max_workers: Maximum number of worker processes (defaults to CPU count)
        """
        self.db_connector = db_connector
        self.primers = primers
        self.batch_size = batch_size
        self.max_workers = max_workers or mp.cpu_count()

    def process(self):
        """
        Process all specimens for all primers
        """
        start_time = time.time()
        logger.info(f"Starting primer match processing with {self.max_workers} workers")

        # Ensure the table exists
        with self.db_connector.connect() as db:
            db.create_primer_matches_table()
            total_specimens = db.get_total_specimens()

        logger.info(f"Processing {total_specimens} specimens for {len(self.primers)} primers")

        # Process each primer
        for primer in self.primers:
            self._process_primer(primer, total_specimens)

        elapsed_time = time.time() - start_time
        logger.info(f"Completed processing in {elapsed_time:.2f} seconds")

    def _process_primer(self, primer: str, total_specimens: int):
        """
        Process all specimens for a single primer

        Args:
            primer: Primer sequence to search for
            total_specimens: Total number of specimens
        """
        logger.info(f"Processing primer: {primer}")
        db = self.db_connector.connect()
        try:
            total_specimens = db.get_total_specimens()

            for offset in range(0, total_specimens, self.batch_size):
                batch_start_time = time.time()

                # Get specimens with their existing matches for this primer
                with self.db_connector.connect() as db:
                    batch_data = db.get_specimens_with_primer_match(primer, self.batch_size, offset)

                # Split the batch into chunks for parallel processing
                chunk_size = len(batch_data) // self.max_workers
                if chunk_size == 0:
                    chunk_size = len(batch_data)

                chunks = [batch_data[i:i + chunk_size] for i in range(0, len(batch_data), chunk_size)]

                # Process chunks in parallel
                with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
                    db_path = self.db_connector.db_path if hasattr(self.db_connector, 'db_path') else None
                    results = list(executor.map(process_batch, [(db_path, primer, chunk) for chunk in chunks]))

                # Flatten results
                all_matches = [match for sublist in results for match in sublist]

                # Insert results
                if all_matches:
                    with self.db_connector.connect() as db:
                        db.insert_primer_matches(all_matches)

                batch_time = time.time() - batch_start_time
                progress = min(offset + self.batch_size, total_specimens)

                logger.info(f"Processed {progress}/{total_specimens} specimens for primer {primer} " +
                          f"({len(all_matches)} new matches, {batch_time:.2f}s)")
        finally:
            if hasattr(db, 'close'):
                db.close()


def main():
    """
    Example usage
    """
    # Configuration
    db_path = "/mnt/z/Uni/Master Thesis/eyeBOLD/eyeBOLD_mini.db"
    primers = ["ATTACCGCGGCTGCTGG", "CCTACGGGAGGCAGCAG"]  # Example 16S rRNA primers
    batch_size = 5000
    max_workers = min(mp.cpu_count(), 8)  # Limit to 8 processes max

    # Initialize the database connector
    db_connector = SQLiteConnector(db_path)

    # Create and run the processor
    processor = PrimerMatchProcessor(
        db_connector=db_connector,
        primers=primers,
        batch_size=batch_size,
        max_workers=max_workers
    )

    processor.process()

In [19]:
main()

2025-05-11 15:47:05,666 - __main__ - INFO - Starting primer match processing with 4 workers
2025-05-11 15:47:05,721 - __main__ - INFO - Processing 7000 specimens for 2 primers
2025-05-11 15:47:05,722 - __main__ - INFO - Processing primer: ATTACCGCGGCTGCTGG
2025-05-11 15:47:06,552 - __main__ - INFO - Processed 5000/7000 specimens for primer ATTACCGCGGCTGCTGG (5000 new matches, 0.82s)
2025-05-11 15:47:07,430 - __main__ - INFO - Processed 7000/7000 specimens for primer ATTACCGCGGCTGCTGG (2000 new matches, 0.88s)
2025-05-11 15:47:07,432 - __main__ - INFO - Processing primer: CCTACGGGAGGCAGCAG
2025-05-11 15:47:08,233 - __main__ - INFO - Processed 5000/7000 specimens for primer CCTACGGGAGGCAGCAG (5000 new matches, 0.79s)
2025-05-11 15:47:09,375 - __main__ - INFO - Processed 7000/7000 specimens for primer CCTACGGGAGGCAGCAG (2000 new matches, 1.14s)
2025-05-11 15:47:09,377 - __main__ - INFO - Completed processing in 3.71 seconds
