Implement atomic add method and advisory locking in Channel interface#45
Implement atomic add method and advisory locking in Channel interface#45
Conversation
There was a problem hiding this comment.
Pull request overview
This PR extends Graflow’s Channel abstraction with concurrency-focused primitives (atomic_add and an advisory lock context manager) and adds tests + examples documenting safe concurrent update patterns.
Changes:
- Added
Channel.atomic_add()API and implemented it for Memory/Redis/Typed channels. - Added
Channel.lock()advisory context manager (real per-key locking inMemoryChannel, default no-op otherwise). - Added a concurrency example plus a dedicated thread-safety test suite for
MemoryChannel.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
graflow/channels/base.py |
Adds the new atomic_add() abstract method and a default no-op lock() context manager. |
graflow/channels/memory_channel.py |
Implements per-key RLock advisory locking and a lock-backed atomic_add; adds pickle support for lock fields. |
graflow/channels/redis_channel.py |
Implements atomic_add() via Redis INCRBYFLOAT using a counter key namespace. |
graflow/channels/typed.py |
Delegates atomic_add() and lock() through TypedChannel. |
tests/channels/test_memory_channel_thread_safety.py |
Adds concurrency/race reproduction, atomic_add, advisory lock, and serialization tests for MemoryChannel. |
examples/03_data_flow/channel_concurrency.py |
New tutorial-style example showing unsafe vs safe concurrent patterns. |
examples/03_data_flow/README.md |
Updates the data-flow tutorial to include and reference the new concurrency APIs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…d using generic key retrieval
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Makoto YUI <yuin405@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Makoto YUI <yuin405@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Makoto YUI <yuin405@gmail.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.
Comments suppressed due to low confidence (1)
graflow/channels/redis_channel.py:253
__setstate__recreates the Redis client withdecode_responses=Trueunconditionally. This can change behavior compared to the original instance (e.g., if the user constructed the channel withdecode_responses=Falseor supplied a client configured differently). Persist and restore the effectivedecode_responsessetting (or store the original client configuration) to avoid serialization changing runtime semantics.
super().__setstate__(state)
assert redis is not None, "redis package is required for RedisChannel"
self.redis_client = redis.Redis(
host=self._host, port=self._port, db=self._db, decode_responses=True, **self._kwargs
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Makoto YUI <yuin405@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Makoto YUI <yuin405@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Makoto YUI <yuin405@gmail.com>
This pull request introduces thread-safe and atomic operations for channel data in Graflow, enabling robust inter-task communication in concurrent workflows. It adds two key features:
atomic_addfor atomic numeric updates andlockfor advisory per-key locking, with implementations for both in-memory and Redis-backed channels. The documentation and examples have been updated to demonstrate safe data sharing and synchronization patterns.Channel concurrency and atomic operations:
atomic_addmethod to theChannelbase class and implemented it in bothMemoryChannel(thread-safe using per-keyRLock) andRedisChannel(using RedisINCRBYFLOAT), allowing atomic numeric increments/decrements without race conditions. [1] [2] [3]lockcontext manager to theChannelbase class, with concrete implementations inMemoryChannel(per-keyRLock) andRedisChannel(distributed lock via Redis), enabling safe compound read-modify-write operations. [1] [2] [3]Documentation and examples:
examples/03_data_flow/README.mdguide to include new sections and patterns for thread-safe channel operations, atomic counters, and compound updates with locking, clarifying best practices for concurrent workflows. [1] [2] [3] [4] [5]channel_concurrency.pythat demonstrates unsafe parallel increments, safe atomic adds, and compound updates with locks, showing the practical impact of the new features.Internal improvements:
These changes make channel operations in Graflow safe and reliable for concurrent execution, and provide clear guidance and examples for users to adopt these patterns.