Skip to content

feat(job): refactor job logging so it isn't a bottleneck #1256

@mihow

Description

@mihow

Observation

During a moderately-sized async ML job (~742 images), the Celery worker running process_nats_pipeline_result begins producing a recurring pair of errors once multiple NATS results are in flight:

ERROR Failed to save logs for job #N: SoftTimeLimitExceeded()
ERROR Failed to save logs for job #N: sending query failed: another command is already in progress
ERROR Error processing pipeline result for job N: sending query failed: another command is already in progress. NATS will re-deliver the task message.

The same 5-minute window also shows Django containers logging terminating connection due to idle-in-transaction timeout. Errors are spread across all ForkPoolWorker-* processes (not clustered on one) and are only ever attached to the single actively-processing job row. A live pg_stat_activity snapshot during the error window showed no lock contention and plenty of connection headroom (~90 of 500) — i.e. the DB is not saturated.

This appears (tentatively) to be a logical within-connection collision, not a capacity problem.

Current implementation

ami/jobs/models.py:335–361JobLogHandler.emit:

def emit(self, record: logging.LogRecord):
    logger.log(record.levelno, self.format(record))
    try:
        self.job.refresh_from_db(fields=["logs"])          # SELECT
        # ...in-memory insert at position 0, dedupe check, truncate to 1000...
        self.job.save(update_fields=["logs"], update_progress=False)  # UPDATE
    except Exception as e:
        logger.error(f"Failed to save logs for job #{self.job.pk}: {e}")

The refresh_from_db exists for a real reason (see PR #1162): each concurrent worker holds its own stale in-memory JobLogs pydantic object, and without the refresh the last save() wins and earlier entries are silently dropped. So the current design is trading concurrency-safety of log entries for concurrency-risk at the connection level.

Likely failure path (hypothesis)

_update_job_progress (same file, ~L179–209) wraps a transaction.atomic() + Job.objects.select_for_update().get(pk=job_id) and then calls job.logger.info(...) inside that block. JobLogHandler.emit then issues:

  1. SELECT (refresh_from_db) on the same connection that already holds a SELECT … FOR UPDATE cursor on the same row
  2. An UPDATE on the same row for logs only

Under ~180 result tasks per 5 minutes × concurrency 8, several workers end up waiting on the FOR UPDATE lock. They stall past Postgres's idle_in_transaction_session_timeout (Postgres terminates the connection server-side), or they stall past Celery's 300s soft_time_limit (soft timeout interrupts the task mid-transaction). The resulting exception is caught by the bare except Exception in emit, which then calls logger.error(...) — which re-enters emit — which issues another query on the same now-broken connection. That produces the "another command is already in progress" cascade.

What still needs to be verified: whether CONN_MAX_AGE > 0 + CONN_HEALTH_CHECKS = False on the celeryworker is causing Django to hand dead server-killed connections to subsequent tasks; and the exact value of idle_in_transaction_session_timeout on the Postgres instances used for async jobs. Both would affect how often this fires in practice.

Consequence

  • Log entries are dropped during bursts (the inner save is the exception site)
  • Connection churn / soft-timeouts make individual process_nats_pipeline_result tasks much slower than they otherwise would be, which compounds queue backlog
  • The bare-except cascade can convert a single DB blip into dozens of error lines per task

Proposals

Ordered by scope.

A. Separate JobLog table (recommended)

class JobLog(models.Model):
    job = models.ForeignKey(Job, on_delete=models.CASCADE, related_name="log_entries")
    level = models.CharField(max_length=10)
    message = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)

    class Meta:
        indexes = [models.Index(fields=["job", "-created_at"])]

emit becomes a single INSERT:

def emit(self, record):
    logger.log(record.levelno, self.format(record))
    try:
        JobLog.objects.create(
            job_id=self.job.pk,
            level=record.levelname,
            message=self.format(record),
        )
    except Exception as e:
        logger.error(f"Failed to save log for job #{self.job.pk}: {e}")

Why this is the right shape:

  • Pure INSERT — no read-before-write, no nested cursor, no row-level FOR UPDATE contention
  • Natural concurrency: multiple workers inserting into the same child table is a solved pattern (already the case for Detection, Classification, etc.)
  • Removes the arbitrary 1000-entry truncation; paginable / time-range queryable
  • Removes the if msg not in stdout dedupe scan (which is O(n) per write, ~n=1000)
  • Log levels become first-class (filter by ERROR for the "stderr" view; filter by INFO+ for the full view)

Costs:

  • One migration + a compatibility shim
  • Frontend read path changes, if it consumes Job.logs.stdout / .stderr directly

Proposed rollout to avoid a hard cutover:

  1. Add JobLog model + migration
  2. Dual-write: keep the existing JSON path; also JobLog.objects.create(...) in emit
  3. Switch Job.logs.stdout / .stderr to @property that reads the last N rows from JobLog (keeps existing consumers working with no API contract change)
  4. Verify frontend, then stop writing to the JSON field
  5. Migrate existing data + drop the JSON column in a separate release

B. Atomic jsonb_insert (no schema change)

Replace the refresh+mutate+save with a single SQL statement:

UPDATE jobs_job
SET logs = jsonb_set(
    COALESCE(logs, '{}'::jsonb),
    '{stdout}',
    jsonb_insert(COALESCE(logs->'stdout', '[]'::jsonb), '{0}', to_jsonb(%s::text), false)
)
WHERE id = %s;

This eliminates the read-before-write and therefore the nested cursor bug. But:

  • Truncation to 1000 entries becomes awkward (subquery on jsonb_array_length, two statements or a CTE)
  • Dedup (if msg not in stdout) disappears
  • Critically, the UPDATE still targets the same jobs_job row that _update_job_progress holds a SELECT FOR UPDATE on — so we trade a connection-state bug for a straight row-lock wait. Doesn't fix the contention, only the specific cursor error.

C. Redis-buffered + periodic flush

emit does LPUSH to a per-job Redis list; a celerybeat task flushes into the DB every N seconds.

  • Fully decouples log writes from the request/task path
  • Batched writes = much higher throughput
  • Adds eventual-consistency (logs lag real-time by up to N seconds)
  • More moving parts; loss-risk if the key is evicted before flush

Could also be layered on top of (A) as an optimisation later.

Recommendation

Go with A, rolled out dual-write as above. The nested-cursor bug is the visible symptom, but the deeper issue is that log writes share the row that the job-progress path locks. Moving them to their own table removes the coupling entirely and is consistent with how every other high-volume write in the model lives today.

B is tempting if we want something in this week — but because the remaining row-lock contention on jobs_job is a real source of stall, it only partially addresses the symptom set we saw on the ~742-image job.

C is a good follow-up once the write path is a plain insert; at that point it becomes an incremental throughput optimisation rather than a bug fix.

Adjacent quick wins worth considering in the same PR

  • Set CONN_HEALTH_CHECKS = True (Django ≥4.1) on the worker so a server-killed connection is detected before the next query reuses it
  • Move job.logger.* calls in _update_job_progress outside the transaction.atomic() block regardless of which proposal lands — logging shouldn't be inside a held row lock
  • Tighten the bare except Exception in emit to catch a narrower exception class so a broken connection can propagate instead of being swallowed, then re-enter emit again on the same broken connection

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions