In [None]:
import sqlite3
import os
from pathlib import Path
from datetime import datetime

In [None]:
# my queue class
class Queue:
    def __init__(self, db_path: str):
        """
        Initialize the queue with the given SQLite database path.
        """
        self.db_path = db_path
        self._initialize_queue()

    def _initialize_queue(self):
        """
        Create the queue table if it does not exist.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS queue (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    period TEXT NOT NULL
                )
            """)
            conn.commit()

    def enqueue(self, period: str):
        """
        Add a new period to the queue only if it does not already exist.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM queue WHERE period = ?", (period,))
            exists = cursor.fetchone()[0]

            if not exists:  # Insert only if the period does not exist
                cursor.execute("INSERT INTO queue (period) VALUES (?)", (period,))
                conn.commit()

    def dequeue(self):
        """
        Remove and return the first period in the queue.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT id, period FROM queue ORDER BY id LIMIT 1")
            item = cursor.fetchone()
            if item:
                cursor.execute("DELETE FROM queue WHERE id = ?", (item[0],))
                conn.commit()
                return item[1]
        return None

    def peek(self):
        """
        Return the first period in the queue without removing it.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT period FROM queue ORDER BY id LIMIT 1")
            item = cursor.fetchone()
            return item[0] if item else None

    def count(self) -> int:
        """
        Return the number of items in the queue.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM queue")
            return cursor.fetchone()[0]

    def reset(self):
        """
        Clear all contents from the queue and reset the indexing.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("DROP TABLE IF EXISTS queue;")  # Drop table to reset indexing
            conn.commit()
            self._initialize_queue()  # Recreate table

    def view_queue(self):
        """
        View all elements in the queue without removing them.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT id, period FROM queue ORDER BY id")
            items = cursor.fetchall()
            if items:
                print("Queue contents:")
                for item in items:
                    print(f"ID: {item[0]}, Period: {item[1]}")
            else:
                print("The queue is empty.")

    def count_queue_items(self) -> int:
        """
        Count the number of items in the queue.
        """
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM queue")
            count = cursor.fetchone()[0]
        return count

# periods
def generate_periods(start_period: str, end_period: str) -> list:
    # Parse the start and end periods into datetime objects
    start_date = datetime.strptime(start_period, "%Y%m")
    end_date = datetime.strptime(end_period, "%Y%m")
    
    # Initialize the list of periods
    periods = []
    
    # Generate periods by incrementing the month
    while start_date <= end_date:
        periods.append(start_date.strftime("%Y%m"))
        # Increment by one month
        next_month = start_date.month % 12 + 1
        year_increment = start_date.month // 12
        start_date = start_date.replace(year=start_date.year + year_increment, month=next_month)
    
    return periods

**Run initialize**

In [None]:
# set PATH to DB
extracts_config_path = os.path.join('/home/jovyan/workspace/pipelines/dhis2_snis_extract/', "config", ".queue.db")
# local checks
# extracts_config_path = Path("C:\\Users\\blues\\Desktop\\Bluesquare\\Repositories\\openhexa-pipelines-ner-nmdr\\dhis2_nmdr_push\\workspace\\pipelines\\dhis2_snis_extract\\config", ".queue.db")
queue = Queue(extracts_config_path)

**View queue**

In [None]:
# Check items
queue.view_queue()

**Reset DB**

In [None]:
queue.reset()

# Check items
print(f"Total periods in DB: {queue.count_queue_items()}")

**Manually add a specific period**

In [None]:
periods = generate_periods('202406', '202412') # real loop
print(periods)

In [None]:
for p in periods:
    queue.enqueue(period=p) 

In [None]:
queue.view_queue()