-
Notifications
You must be signed in to change notification settings - Fork 3k
Description
Feature Request / Improvement
Problem
PyIceberg currently relies on multiprocessing in Table.plan_files and the pyarrow interface. Unfortunately, multiprocessing relies on /dev/shm, which is not provided by serverless runtimes like AWS Lambda and AWS Fargate. In effect, reliance on /dev/shm assumes the user has control over the host environment and thus disqualifies use in serverless environments.
Apparently, one way to emulate Lambda or Fargate container runtimes locally is by running a container with --ipc="none". This will disable the /dev/shm mount and cause the multiprocessing module to fail with the following error:
[ERROR] OSError: [Errno 38] Function not implemented
Traceback (most recent call last):
File "/var/task/app.py", line 27, in handler
result = scan.to_arrow().slice_length(limit)
File "/var/task/pyiceberg/table/__init__.py", line 819, in to_arrow
self.plan_files(),
File "/var/task/pyiceberg/table/__init__.py", line 776, in plan_files
with ThreadPool() as pool:
File "/var/lang/lib/python3.10/multiprocessing/pool.py", line 930, in __init__
Pool.__init__(self, processes, initializer, initargs)
File "/var/lang/lib/python3.10/multiprocessing/pool.py", line 196, in __init__
self._change_notifier = self._ctx.SimpleQueue()
File "/var/lang/lib/python3.10/multiprocessing/context.py", line 113, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/var/lang/lib/python3.10/multiprocessing/queues.py", line 341, in __init__
self._rlock = ctx.Lock()
File "/var/lang/lib/python3.10/multiprocessing/context.py", line 68, in Lock
return Lock(ctx=self.get_context())
File "/var/lang/lib/python3.10/multiprocessing/synchronize.py", line 162, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/var/lang/lib/python3.10/multiprocessing/synchronize.py", line 57, in __init__
sl = self._semlock = _multiprocessing.SemLock(Interesting note from multiprocessing.pool.ThreadPool:
A
ThreadPoolshares the same interface asPool, which is designed around a pool of processes and predates the introduction of theconcurrent.futuresmodule [...] Users should generally prefer to useconcurrent.futures.ThreadPoolExecutor, which has a simpler interface that was designed around threads from the start, and which returnsconcurrent.futures.Futureinstances that are compatible with many other libraries, includingasyncio.
Proposal
Perhaps PyIceberg should support multiple concurrency strategies, allowing the user to configure which is most appropriate for their runtime/resources.
Instead of using multiprocessing directly, we could instead depend on a concrete implementation of an Executor from the concurrent.futures module wherever we need concurrency. The user can select the appropriate implementation via configuration:
PYICEBERG__CONCURRENCY=processusesProcessPoolExecutor(default, same as current implementation)PYICEBERG__CONCURRENCY=threadusesThreadPoolExecutor(appropriate for serverless environments)
This might even allow PyIceberg to support other concurrency models in the future, e.g., user-defined implementations of Executor.
I reproduced this problem and have a fix on a fork, confirming that this approach at least works. Depending on feedback from the community, I can tidy things up and submit a PR. 😃
Query engine
None