Hi,
My current setup has Airflow running on Celery for task scheduling and I use Dask as the HPC layer, on a Mesos cluster.
One thing I noticed is that the Dask workers are using cached functions even though Airflow sees the right code.
The code is on a read-only NFS share and it is mounted across every worker node, and the package is installed with the editable option (-e).
The way I am currently trying to solve this issue is with a plugin that follows but I'm not sure why it isn't working as intended. Thoughts? Here's my current code.
class RestartIdleWorkerPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
self.worker_to_task = defaultdict(set)
self.task_to_worker_state = {}
def transition(self, key, start, finish, *args, **kwargs):
if finish not in ('forgotten', 'memory'):
return
if finish == 'memory':
worker = kwargs['worker']
worker_state = self.scheduler.workers[worker]
self.task_to_worker_state[key] = worker_state
self.worker_to_task[kwargs['worker']].add(key)
return
if finish == 'forgotten':
worker = self.task_to_worker_state[key]
del self.task_to_worker_state[key]
if len(worker.processing) == 0 and len(worker.has_what) == 0:
self._restart_worker(worker)
def _restart_worker(self, worker, timeout=3):
""" Restart worker. Reset local state. """
logger.info(f'Plugin is restarting worker {worker}')
nanny_address = self.scheduler.get_worker_service_addr(worker, 'nanny')
nanny = rpc(nanny_address, connection_args=self.scheduler.connection_args)
nanny.restart(close=True, timeout=timeout * 0.8, executor_wait=True)
nanny.close_rpc()
Hi,
My current setup has Airflow running on Celery for task scheduling and I use Dask as the HPC layer, on a Mesos cluster.
One thing I noticed is that the Dask workers are using cached functions even though Airflow sees the right code.
The code is on a read-only NFS share and it is mounted across every worker node, and the package is installed with the editable option (-e).
The way I am currently trying to solve this issue is with a plugin that follows but I'm not sure why it isn't working as intended. Thoughts? Here's my current code.