-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Bug summary
Hello, I would like to introduce cache isolation at SERIALIZABLE level in my project but when trying to do so I faced an issue that I was able to reproduce in the following code snippet:
import asyncio
import random
from prefect import task
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.locking.memory import MemoryLockManager
from prefect.transactions import IsolationLevel
cache_policy = (INPUTS + TASK_SOURCE).configure(
isolation_level=IsolationLevel.SERIALIZABLE,
lock_manager=MemoryLockManager(),
)
@task(cache_policy=cache_policy)
async def my_task(x: int):
await asyncio.sleep(random.randint(1, 3))
result = x + random.randint(1, 100)
print (f"Task result: {result}")
return result
async def main():
tasks = [my_task(42) for _ in range(3)]
results = await asyncio.gather(*tasks)
print(f"All task results: {results}")
if __name__ == "__main__":
asyncio.run(main())When running the code above all the tasks acquire the lock and runs concurrently producing different results even if they uses the same cache key and when two of them try to release the lock the error ValueError: No lock held by... is logged. Below the complete console log:
Task result: 80
13:55:59.633 | INFO | Task run 'my_task' - Finished in state Completed()
Task result: 103
13:56:01.631 | ERROR | Task run 'my_task' - An error was encountered while committing transaction '922f24a6e44d5c6d3e1bc2b4cc0ef084'
Traceback (most recent call last):
File "<project_root>/.venv/lib/python3.12/site-packages/prefect/transactions.py", line 338, in commit
self.store.release_lock(self.key)
File "<project_root>/.venv/lib/python3.12/site-packages/prefect/results.py", line 775, in release_lock
return self.lock_manager.release_lock(key, holder)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<project_root>/.venv/lib/python3.12/site-packages/prefect/locking/memory.py", line 178, in release_lock
raise ValueError(
ValueError: No lock held by Mac.local:41014:8727021632:MainThread for transaction with key 922f24a6e44d5c6d3e1bc2b4cc0ef084
13:56:01.638 | INFO | Task run 'my_task' - Finished in state Completed()
13:56:01.643 | ERROR | Task run 'my_task' - An error was encountered while committing transaction '922f24a6e44d5c6d3e1bc2b4cc0ef084'
Traceback (most recent call last):
File "<project_root>/.venv/lib/python3.12/site-packages/prefect/transactions.py", line 338, in commit
self.store.release_lock(self.key)
File "<project_root>/.venv/lib/python3.12/site-packages/prefect/results.py", line 775, in release_lock
return self.lock_manager.release_lock(key, holder)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<project_root>/.venv/lib/python3.12/site-packages/prefect/locking/memory.py", line 178, in release_lock
raise ValueError(
ValueError: No lock held by Mac.local:41014:8727021632:MainThread for transaction with key 922f24a6e44d5c6d3e1bc2b4cc0ef084
13:56:01.644 | INFO | Task run 'my_task' - Finished in state Completed()
Task result: 66
All task results: [103, 80, 66]
Process finished with exit code 0I noticed that the documentation about cache isolation uses threading in the example provided but it does not mention it is a requirement when using SERIALIZABLE level.
Is it actually a requirement to run tasks that uses the same cache key in different Threads?.
If it is a requirement would it be ok to run some tasks in separate threads and other in the main thread using asyncio.to_thread like in the example below?
import asyncio
import random
from prefect import task
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.locking.memory import MemoryLockManager
from prefect.transactions import IsolationLevel
cache_policy = (INPUTS + TASK_SOURCE).configure(
isolation_level=IsolationLevel.SERIALIZABLE,
lock_manager=MemoryLockManager(),
)
@task(cache_policy=cache_policy)
async def my_task(x: int):
await asyncio.sleep(random.randint(1, 3))
result = x + random.randint(1, 100)
print (f"Task result: {result}")
return result
async def main():
tasks = []
tasks.extend([my_task(i) for i in range(3)])
tasks.extend([asyncio.to_thread(asyncio.run, my_task(42)) for _ in range(3)])
results = await asyncio.gather(*tasks)
print(f"All task results: {results}")
if __name__ == "__main__":
asyncio.run(main())Thanks in advance!
Version info
Version: 3.0.11
API version: 0.8.4
Python version: 3.12.6
Git commit: a17ccfcf
Built: Thu, Oct 24, 2024 5:36 PM
OS/Arch: darwin/arm64
Profile: dev-local
Server type: server
Pydantic version: 2.9.2
Integrations:
prefect-docker: 0.6.2
Additional context
No response