# Task Queue

> Simple async task queue with background worker and concurrency control

In [None]:
#| default_exp queue

In [None]:
#| hide
from nbdev.showdoc import *

## The Problem

When handling HTTP requests that trigger long-running tasks:
1. You want to return immediately (non-blocking)
2. Process work in background
3. Limit concurrent execution

This is exactly what we need for scraping store data - we don't want to block the HTTP response while waiting for Playwright to finish.

In [None]:
#| export
import asyncio

## Basic Queue Operations

Python's `asyncio.Queue` is the foundation.

In [None]:
# Create a queue
q = asyncio.Queue()

# Put items (non-blocking when no maxsize)
await q.put("task1")
await q.put("task2")

assert q.qsize() == 2

In [None]:
# Get items (blocks until available)
item = await q.get()
assert item == "task1"

## Semaphore for Concurrency Limiting

A semaphore limits how many operations run concurrently.

In [None]:
# Allow max 2 concurrent tasks
sem = asyncio.Semaphore(2)

count = 0

async def task():
    global count
    async with sem:  # Blocks if 2 tasks already running
        count += 1
        await asyncio.sleep(0.1)

# Start 5 tasks - only 2 run at once
await asyncio.gather(*[task() for _ in range(5)])
assert count == 5

## The Pattern: Background Worker

This is the core pattern from our main.py:

In [None]:
#| export
class TaskQueue:
    def __init__(self, max_workers, handler):
        self.max_workers = max_workers
        self.handler = handler
        self.Q = asyncio.Queue()
        self.SEM = asyncio.Semaphore(max_workers)
    
    async def start(self):
        """Start background worker that processes tasks from queue"""
        async def worker():
            while True:
                task_data = await self.Q.get()
                asyncio.create_task(self._guarded_execute(task_data))
        asyncio.create_task(worker())
    
    async def _guarded_execute(self, task_data):
        """Execute handler with semaphore guard to limit concurrency"""
        async with self.SEM:
            await self.handler(*task_data)
    
    async def enqueue(self, *args):
        """Add task to queue"""
        await self.Q.put(args)

### Test it

In [None]:
results = []

async def my_handler(x, y):
    await asyncio.sleep(0.01)
    results.append(x + y)

# Create queue with handler
tq = TaskQueue(max_workers=2, handler=my_handler)

# Start worker
await tq.start()

# Enqueue tasks (non-blocking)
await tq.enqueue(1, 2)
await tq.enqueue(3, 4)
await tq.enqueue(5, 6)

# Wait for processing
await asyncio.sleep(0.1)

assert results == [3, 7, 11]

## Usage

Here's how you use it in your FastAPI app:

In [None]:
# In your app:
# task_queue = TaskQueue(max_workers=3, handler=process_user_data)
#
# @app.on_event("startup")
# async def startup():
#     await task_queue.start()
#
# @app.post('/v0/user/data')
# async def user_data(...):
#     task = tasks.insert(...)
#     await task_queue.enqueue(task.id, email, password, store)
#     return {'id': task.id, 'status': 'queued'}

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()