Skip to content

Tags: goadesign/pulse

Tags

v1.4.0

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
pool: fix job payload cleanup during shutdown (#52)

* pool: fix job payload cleanup during shutdown

Fixes a bug where job payloads weren't being properly cleaned up during
node shutdown, causing subsequent DispatchJob calls to fail with
ErrJobExists even though the jobs were no longer running.

Changes:
- Add cleanup of job payloads when removing workers
- Run periodic cleanup of orphaned job payloads
- Fix potential race condition in tickers if created after the node is shutdown

* Upgrade to Go 1.24

v1.3.0

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
pool: Enhance worker lifecycle management and job reliability (#51)

* Filter out stale events early

Ensure two nodes can't process stale workers concurrently

* Centralize worker stream cache

Instead of each worker having their own cache,
have the parent node hold the cache to avoid
duplicate caching.

* pool: fix cleanup of stale workers

The original code used future timestamps in workerKeepAliveMap to prevent
concurrent cleanup operations. This made stale workers appear active and
could permanently prevent cleanup if a node crashed during the process.

Fixed by:
- Added dedicated cleanupMap to track workers being cleaned up
- Implemented proper concurrency handling using SetIfNotExists/TestAndSet
- Added retry logic with exponential backoff for requeuing jobs
- Ensured cleanup map is properly closed during node shutdown
- Updated worker.go to handle new processRequeuedJobs retry parameter

The fix ensures stale workers and their jobs are reliably cleaned up even
in case of node failures or concurrent cleanup attempts.

* Properly ack stale event to avoid re-publish

* fix(pool): prevent worker leaks from context cancellation

- Use background context for worker goroutines to prevent premature termination
- Preserve logging context while making worker lifecycle independent of caller
- Rename maps for better clarity (e.g. jobsMap -> jobMap)
- Improve node stream management with nodeStreams map
- Clean up error handling and logging patterns

This fixes an issue where workers could be leaked when the caller's context
was cancelled before proper cleanup could occur.

* refactor(pool): Ensure eventual consistency of job assignments

Improves the Node component's ability to detect and reassign jobs from stale
or deleted workers by:
1. Adding explicit orphaned job detection for workers missing keep-alive entries
2. Centralizing worker cleanup logic to ensure consistent job reassignment
3. Simplifying worker state validation to catch edge cases in distributed scenarios

This ensures that no jobs are lost when workers become unavailable, maintaining
eventual consistency of job assignments across the worker pool.

* pool: improve worker cleanup and job requeuing reliability

Enhances worker cleanup mechanism by handling stale cleanup locks and
adding cleanup verification. Key changes:

* Add detection and cleanup of stale worker cleanup locks
* Clean up jobs from jobMap after successful requeue
* Improve logging around worker cleanup and job requeuing
* Upgrade requeue log level to Info for better operational visibility

This improves reliability of the distributed job system by preventing
orphaned jobs and stale locks from accumulating over time.

* Prevent streams from being recreated after destroy

* Prevent re-creation of worker streams

* Add proper options to missed node stream add call

* streaming: cleanup stale consumers during sink initialization

Add cleanup of stale consumers during sink initialization to prevent accumulation
of stale consumers in Redis. Previously stale consumers were only cleaned up
periodically, which could lead to a buildup if sinks did not shut down cleanly.

Also refactor the stale consumer cleanup logic to:
1. Extract common cleanup code into deleteStreamStaleConsumers
2. Improve error handling and logging
3. Properly clean up all related data structures (Redis consumer group,
   keep-alive map, and consumers map)

* Add event ID to requeue log entries

* pool: refactor worker cleanup logic

Improve the worker cleanup implementation by:
1. Split cleanupWorker into smaller, focused functions:
   - acquireCleanupLock: handles cleanup lock management
   - requeueWorkerJobs: handles job requeuing
   - cleanupWorker: orchestrates the cleanup process

2. Simplify cleanupInactiveWorkers:
   - Use activeWorkers() to get list of active workers
   - Combine jobMap and workerMap checks into a single loop
   - Skip workers being actively cleaned up

3. Rename isActive to isWithinTTL to better reflect its purpose
   - Function checks if a timestamp is within TTL duration
   - Used consistently across node and worker cleanup

* pool: Add periodic cleanup of stale pending jobs

This commit adds a new background process to clean up stale entries in the
pending jobs map. Previously, stale entries were only cleaned up when
attempting to dispatch a job with the same key. Now, a dedicated goroutine
runs at the ackGracePeriod frequency to proactively remove expired entries.

Additional changes:
- Fix jobPendingMap comment to clarify it's indexed by job key not worker ID
- Add debug logs for worker shutdown in handleEvents and keepAlive
- Refactor timestamp validation to use isWithinTTL helper across the codebase
- Improve error handling in cleanupStalePendingJobs using TestAndDelete

The periodic cleanup helps prevent memory leaks from abandoned dispatch
attempts and makes the job dispatch system more reliable.

* pool: simplify job requeuing logic

Remove the requeueJob helper function and directly use dispatchJob for
requeueing jobs during worker cleanup and rebalancing.

* Properly delete stale workers with requeued jobs

* Fix deadlock from blocked ichan notifications

Non-read notifications on the ichan channel were blocking writes and causing deadlocks during Set operations. This commit removes ichan and replaces it with a waiter-based mechanism using dedicated per-call channels, ensuring notifications are delivered without blocking.

v1.2.0

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Enforce unique job keys (#50)

* Proper support for empty string values in rmap

Setting an empty string value then calling Get now
returns empty string and true.

Also remove the need for using a sentinel value ("*")
when doing a reset.

* Update dependencies

* Enforce unique job keys

This commit does a couple of things:
1. It adds a new `SetIfNotExists` methods to replicated maps that will set a value only if it isn't already in the map.
2. It ensures the pool node `DispatchJob` method returns an the new `ErrJobExists` error when attempting to queue a job with a key that already exists.

It was previously the responsibility of the client to ensure no two calls to DispatchJob used the same key, now the pool package enforces it.

* Encode rmap string content in publish

So that we can ensure decoding all the parts in Go.
This replaces the previous algo that was using : to separate key and values
which was not working if a key or a value also contained :.

* Protect rmap.SetAndWait against corner cases

Fix all tests

v1.1.0

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Fix a few issues (#48)

* Fix a few issues

* Make sure that adding a dispatch return event to a node stream creates the stream if needed.
  This could cause jobs that are added right on startup to fail to be dispatched.
* Fix potential panic in cleanup goroutine when the pending jobs map contains nil values
* Make sure to close all maps when closing the node.
* Properly log event information when discarding stale ack events.
* More consistent logging

* Fix a few additional issues

* Ensure stale node streams are gced
* Make close and shutdown code cleaner
* Cleanup all maps properly on node close

v1.0.6

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Fix/lost job on requeue (#45)

* Do not remove requeued jobs from Redis

* Simplify usage of pool package

Remove options that should not be changed.
Fix a potential race condition on requeue where a job payload may get deleted too early.

* Fix race condition in test

v1.0.5

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Reduce lock contention (#44)

In particular the changes remove the need to lock when calling user code.

v1.0.5-rc1

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Delete stale consumers not associated with sinks (#43)

v1.0.4

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Timeout DispatchJob if no dispatch return is received (#42)

This is to avoid blocking in case of error.

v1.0.3

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Log panics occuring in event handler goroutines (#41)

v1.0.2

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Make sure all timestamps are UTC (#40)