In [1]:
import sqlite3, json, time, threading, subprocess, queue
from datetime import datetime, timedelta, timezone
from pprint import pprint

DB_PATH = "jobs.db"
CONFIG = {"max_retries": 3, "backoff_base": 2}

In [2]:
def init_db():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("""
    CREATE TABLE IF NOT EXISTS jobs (
        id TEXT PRIMARY KEY,
        command TEXT,
        state TEXT,
        attempts INTEGER,
        max_retries INTEGER,
        created_at TEXT,
        updated_at TEXT,
        next_run_at TEXT,
        last_error TEXT
    )
    """)
    conn.commit()
    conn.close()

def now():
    return datetime.now(timezone.utc).isoformat()

init_db()
print("‚úÖ Database initialized.")

‚úÖ Database initialized.


In [3]:
def enqueue(job_id, command, max_retries=CONFIG["max_retries"]):
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    created = now()
    cur.execute("""
    INSERT OR REPLACE INTO jobs (id, command, state, attempts, max_retries, created_at, updated_at)
    VALUES (?, ?, 'pending', 0, ?, ?, ?)
    """, (job_id, command, max_retries, created, created))
    conn.commit()
    conn.close()
    print(f"‚úÖ Enqueued job: {job_id}")

# Example
enqueue("job1", "echo Hello from Jupyter!")
enqueue("job2", "bash -c 'exit 1'")


‚úÖ Enqueued job: job1
‚úÖ Enqueued job: job2


In [4]:
def fetch_pending_job():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("""
    SELECT id, command, attempts, max_retries FROM jobs
    WHERE state='pending' AND (next_run_at IS NULL OR next_run_at <= ?)
    ORDER BY created_at ASC LIMIT 1
    """, (now(),))
    row = cur.fetchone()
    if not row:
        conn.close()
        return None
    job_id, command, attempts, max_retries = row
    cur.execute("UPDATE jobs SET state='processing', attempts=?, updated_at=? WHERE id=?",
                (attempts + 1, now(), job_id))
    conn.commit()
    conn.close()
    return {"id": job_id, "command": command, "attempts": attempts + 1, "max_retries": max_retries}

def mark_completed(job_id):
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("UPDATE jobs SET state='completed', updated_at=? WHERE id=?", (now(), job_id))
    conn.commit()
    conn.close()

def mark_failed(job_id, attempts, max_retries, error):
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    if attempts >= max_retries:
        cur.execute("UPDATE jobs SET state='dead', last_error=?, updated_at=? WHERE id=?", (error, now(), job_id))
    else:
        delay = CONFIG["backoff_base"] ** attempts
        next_run = datetime.now(timezone.utc) + timedelta(seconds=delay)
        cur.execute("UPDATE jobs SET state='pending', next_run_at=?, last_error=?, updated_at=? WHERE id=?",
                    (next_run.isoformat(), error, now(), job_id))
    conn.commit()
    conn.close()


In [5]:
def worker_loop(worker_id, stop_event):
    print(f"[Worker-{worker_id}] Started.")
    while not stop_event.is_set():
        job = fetch_pending_job()
        if not job:
            time.sleep(1)
            continue
        print(f"[Worker-{worker_id}] Running {job['id']} (Attempt {job['attempts']}/{job['max_retries']})")
        try:
            result = subprocess.run(job["command"], shell=True)
            if result.returncode == 0:
                mark_completed(job["id"])
                print(f"[Worker-{worker_id}] ‚úÖ Completed {job['id']}")
            else:
                mark_failed(job["id"], job["attempts"], job["max_retries"], f"Exit {result.returncode}")
                print(f"[Worker-{worker_id}] ‚ùå Failed {job['id']}")
        except Exception as e:
            mark_failed(job["id"], job["attempts"], job["max_retries"], str(e))
    print(f"[Worker-{worker_id}] Stopped.")


In [6]:
def start_workers(count=2, run_time=10):
    stop_event = threading.Event()
    workers = [threading.Thread(target=worker_loop, args=(i+1, stop_event)) for i in range(count)]
    for w in workers: w.start()
    print(f"üöÄ Started {count} worker(s). Running for {run_time} sec...")
    time.sleep(run_time)
    stop_event.set()
    for w in workers: w.join()
    print("üõë All workers stopped.")

# Run 2 workers for 10 seconds
start_workers(2, run_time=10)


[Worker-1] Started.[Worker-2] Started.

üöÄ Started 2 worker(s). Running for 10 sec...
[Worker-2] Running job1 (Attempt 1/3)
[Worker-1] Running job1 (Attempt 1/3)
[Worker-1] ‚úÖ Completed job1
[Worker-1] Running job2 (Attempt 1/3)
[Worker-2] ‚úÖ Completed job1
[Worker-1] ‚ùå Failed job2
[Worker-1] Running job2 (Attempt 2/3)
[Worker-1] ‚ùå Failed job2
[Worker-1] Running job2 (Attempt 3/3)
[Worker-1] ‚ùå Failed job2
[Worker-2] Stopped.
[Worker-1] Stopped.
üõë All workers stopped.


In [7]:
def show_summary():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("SELECT state, COUNT(*) FROM jobs GROUP BY state")
    print("üìä Job Summary:")
    for state, count in cur.fetchall():
        print(f"  {state:<10}: {count}")
    conn.close()

show_summary()


üìä Job Summary:
  completed : 1
  dead      : 1


In [8]:
def list_dlq():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("SELECT * FROM jobs WHERE state='dead'")
    rows = cur.fetchall()
    conn.close()
    return rows

def retry_dlq(job_id):
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("UPDATE jobs SET state='pending', attempts=0, next_run_at=NULL, last_error=NULL WHERE id=? AND state='dead'", (job_id,))
    conn.commit()
    conn.close()
    print(f"üîÅ Retried DLQ job: {job_id}")

# Example usage
print("üíÄ DLQ Jobs:")
pprint(list_dlq())

# Retry one
if list_dlq():
    retry_dlq(list_dlq()[0][0])


üíÄ DLQ Jobs:
[('job2',
  "bash -c 'exit 1'",
  'dead',
  3,
  3,
  '2025-11-07T04:10:22.747150+00:00',
  '2025-11-07T04:10:35.750590+00:00',
  '2025-11-07T04:10:35.690190+00:00',
  'Exit 1')]
üîÅ Retried DLQ job: job2


In [9]:
import pandas as pd

def display_jobs():
    conn = sqlite3.connect(DB_PATH)
    df = pd.read_sql_query("SELECT * FROM jobs", conn)
    conn.close()
    return df

display_jobs()


Unnamed: 0,id,command,state,attempts,max_retries,created_at,updated_at,next_run_at,last_error
0,job1,echo Hello from Jupyter!,completed,1,3,2025-11-07T04:10:22.736153+00:00,2025-11-07T04:10:29.552119+00:00,,
1,job2,bash -c 'exit 1',pending,0,3,2025-11-07T04:10:22.747150+00:00,2025-11-07T04:10:35.750590+00:00,,
