Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create async monitoring service #2034

Merged
merged 144 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
144 commits
Select commit Hold shift + click to select a range
c8e103b
WIP: Create initial async monitoring service
joj0s Jun 24, 2022
0afe496
Cancel all asyncio monitoring tasks on thread stop
joj0s Jun 29, 2022
694b2c3
Move asyncio loop initialization to monitoring class
joj0s Jun 29, 2022
094eda4
Add aiofile to project requirements
joj0s Jun 30, 2022
95295f8
Merge branch 'develop' into new_monitoring_service
egede Jul 25, 2022
a4a6ba3
Asyncio compatible tests (#2040)
joj0s Aug 3, 2022
3136d4e
Merge branch 'develop' into new_monitoring_service
joj0s Aug 5, 2022
836d099
Create a testing dummy remote backend
joj0s Jul 22, 2022
7ff5d7e
Enable job finalisation simulation in dummy remote backend
joj0s Aug 5, 2022
fdbda50
Add dunder all method to dummy remote init module
joj0s Aug 5, 2022
376b7ab
Add aiohttp to project dependecies
joj0s Aug 5, 2022
62aa3c7
Add job id logging info to dummy remote server
joj0s Aug 5, 2022
4d2549d
Make dummy server a singleton and move initialization
joj0s Aug 9, 2022
79fa425
Move test backend handlers to GangaTest package
joj0s Aug 10, 2022
da24c2e
Import job templates from localhost in dummy backend
joj0s Aug 10, 2022
d742127
Remove testing handlers from executable.py
joj0s Aug 11, 2022
e3f06da
Merge pull request #2047 from ganga-devs/dummy_remote_backend
joj0s Aug 16, 2022
3bfead0
Merge branch 'develop' into new_monitoring_service
joj0s Aug 18, 2022
a7a76ec
Implement per backend poll rate for new mon service
joj0s Sep 25, 2022
7b3df5a
Improve logging and read poll rates from config
joj0s Sep 26, 2022
3619656
Merge branch 'develop' into new_monitoring_service
joj0s Sep 27, 2022
ca5cd02
Remove duplicate method
joj0s Sep 27, 2022
b38ac24
Simplify poll rate fallback
joj0s Sep 27, 2022
cca37f8
Revert "Simplify poll rate fallback"
joj0s Sep 27, 2022
27ea3c4
Test debugging: revert to standard poll rate
joj0s Sep 27, 2022
75b0b50
Decrease default backend poll rate
joj0s Sep 28, 2022
027ce96
Create initial DIRAC subprocess executor
joj0s Sep 20, 2022
2b9c2f9
Enable sandbox download with new DIRAC concurrency model
joj0s Sep 21, 2022
3bc6500
Remove debugging code
joj0s Sep 21, 2022
d12a6f1
Handle multiple DIRAC envs
joj0s Sep 21, 2022
cc350a2
Remove arg str conversion in DIRAC status monitoring
joj0s Sep 21, 2022
0c6920a
Wrap DIRAC commands in dict
joj0s Sep 28, 2022
70ed3c6
Remove debugging print statements
joj0s Sep 28, 2022
c95ce27
Move DIRAC app status fetching to job finalisation
joj0s Sep 28, 2022
65cc280
Remove cprofile wrapper from DiracBase
joj0s Sep 28, 2022
69a5985
Fix env creation in DIRAC subprocess
joj0s Sep 28, 2022
b5ea041
Add psutil to project requirements
joj0s Sep 28, 2022
7a19375
autopep8 action fixes
joj0s Sep 28, 2022
d03ac70
PEP8 fixes
Oct 3, 2022
4605642
PEP8
Oct 3, 2022
ee0d069
PEP8
Oct 3, 2022
738e780
PEP8
Oct 3, 2022
829740c
Add backend tear down in monitoring service
joj0s Oct 8, 2022
2b73213
autopep8 action fixes
joj0s Oct 8, 2022
083da57
Format IBackend and GangaCore init.py manually
joj0s Oct 8, 2022
f5a3702
Merge branch 'new_monitoring_service' into variable_poll_rate
joj0s Oct 8, 2022
4eba7ff
Merge branch 'variable_poll_rate' into autopep8-patches/variable_poll…
joj0s Oct 8, 2022
94e3ef5
autopep8 action fixes
joj0s Oct 8, 2022
0865e00
Merge branch 'autopep8-patches/variable_poll_rate' of github.com:gang…
joj0s Oct 8, 2022
6afbfe6
Fix assert keyword missing whitespace
joj0s Oct 8, 2022
af8578d
Remove commented out code from DIRAC boot file
joj0s Oct 8, 2022
f2a9365
Remove redefined method in old DIRAC process
joj0s Oct 8, 2022
584e35d
Make setup.py whitespace more readable
joj0s Oct 8, 2022
81023dc
Ignore missing DiracAdmin imports in DIRAC commands file
joj0s Oct 8, 2022
293fe6d
Merge pull request #2070 from ganga-devs/autopep8-patches/dirac_concu…
joj0s Oct 11, 2022
4d4dd16
Merge pull request #2068 from ganga-devs/autopep8-patches/variable_po…
joj0s Oct 11, 2022
e338b81
Merge pull request #2069 from ganga-devs/dirac_concurrency
joj0s Oct 11, 2022
fb49062
Merge pull request #2066 from ganga-devs/variable_poll_rate
joj0s Oct 11, 2022
dde026a
Add DIRAC executor process logging
joj0s Oct 19, 2022
a765677
Merge branch 'develop' into new_monitoring_service
joj0s Oct 27, 2022
e53d3ca
TO BE REVERTED: Test DIRAC path
joj0s Nov 1, 2022
25cc257
WIP: Add sys prefixes
joj0s Nov 2, 2022
a793b08
Set sys variables from DIRACOS env variable
joj0s Nov 2, 2022
262c551
Add attribute error check when decoding DIRAC env
joj0s Nov 3, 2022
f048e5f
Shut down mon thread explicitly
joj0s Nov 5, 2022
89b44d5
Merge pull request #2088 from ganga-devs/shutdown_mon_thread
joj0s Nov 7, 2022
659ebc0
Handle monitoring service autostart (#2089)
joj0s Nov 14, 2022
4b6673d
Add DIRAC executor process tear down (#2075)
joj0s Nov 14, 2022
e24dae6
Remove numParalellJobs option and move it to DIRAC only (#2086)
joj0s Nov 14, 2022
ca1833e
Fix aio mon service check interval and add sys import
joj0s Dec 3, 2022
1a4dba2
Add completing status check and asyncio task creation to Dirac
joj0s Dec 3, 2022
4bfc0a8
Import sys exit in dirac executor process
joj0s Dec 3, 2022
c308da4
Fix active dirac process management dictionary
joj0s Dec 3, 2022
f7d5ab4
Replace AioQueue with regular mp queue in Dirac subprocess
joj0s Dec 4, 2022
69168f4
Add 'been queued' check and remove old 'completing' one in dirac base
joj0s Dec 4, 2022
3e688db
Fix 'completing' job cleanup when restarting Ganga session
joj0s Jan 25, 2023
0de8d12
Terminate DIRAC subprocess via event
joj0s Jan 25, 2023
b93737d
Replace shared manager dict with Ultradict in DIRAC subprocess
joj0s Jan 25, 2023
1061ed9
Make all dirac finalization tasks in DiracBase async
joj0s Jan 25, 2023
32955ec
Fix monitoring teardown order in shutdown manager
joj0s Jan 25, 2023
ea4e986
Add runtime error handler in DIRAC subprocess
joj0s Jan 25, 2023
ed6875d
Increase max number of concurrently monitorable DIRAC jobs to 1000
joj0s Jan 26, 2023
2ed9883
Add ultradict to project requirements
joj0s Jan 26, 2023
20afe2e
Reduce GangaDiracError logger level in DIracBase
joj0s Jan 26, 2023
3830bd4
Fix mon service 'completing' job cleanup when all subjobs are dirty
joj0s Jan 26, 2023
8f89e1d
Add cred_req option to all DIRAC finalization tasks
joj0s Jan 30, 2023
de4547d
Update project dependencies
joj0s Jan 31, 2023
0f936ae
Move DiracProcessManager to avoid test failures
joj0s Feb 8, 2023
c6f0741
Revert "Move DiracProcessManager to avoid test failures"
joj0s Feb 11, 2023
69ce510
Execute global backend teardown hooks in monitoring shutdown method
joj0s Feb 11, 2023
956732d
Install gcc in Dirac tests
joj0s Feb 11, 2023
5f4c0b5
Include python3-devel in Dirac CI setup
joj0s Feb 11, 2023
ab01b93
Include both gcc and python3-devel in DIRAC CI setup
joj0s Feb 11, 2023
93c83e5
Install correct python3-rh devel package in DIRAC setup
joj0s Feb 11, 2023
f6e2cbc
Fix typo in DIRAC CI setup
joj0s Feb 11, 2023
5530c6a
Add lazy loading of job status for cleanup
joj0s Feb 25, 2023
595219b
Merge branch 'develop' into new_monitoring_service
joj0s Feb 25, 2023
49a4b8f
Merge branch 'new_monitoring_service' into bug_fixes
joj0s Feb 25, 2023
f046535
Add Dirac dependency installation step to LHCb integration tests
joj0s Feb 25, 2023
207be70
Set up gcc and compiled dependency installation in LHCb tests
joj0s Feb 27, 2023
47c39e4
Tweak glibc installation in LHCb CI
joj0s Feb 27, 2023
337dbe2
Add jobs.select call to get only active jobs in dirty job cleanup
joj0s Mar 2, 2023
60371e4
Send secondary exceptions to logger in monitoring serice
joj0s Mar 7, 2023
7194776
Fix DIRAC job finalization handling and resubmission on fail
joj0s Apr 27, 2023
3e53354
Remove debugging prints in DIRAC job wrapper
joj0s Apr 27, 2023
e4b2750
Pipe all DiracExecutorProcess output to main logger
joj0s May 28, 2023
d32bff9
Change default Dirac subprocess logging level to DEBUG
joj0s May 28, 2023
1b2935e
Merge branch 'develop' into new_monitoring_service
egede Jun 9, 2023
39b96a1
Merge branch 'new_monitoring_service' into bug_fixes
egede Jun 9, 2023
5a8dcd9
Fix graceful shutdown of async monitoring thread
joj0s Jun 17, 2023
d5f4c99
Add check for dirty outputdirs when loading mon service
joj0s Jun 17, 2023
6772936
Improve DIRAC and UltraDict exception handling
joj0s Jun 17, 2023
32c11d8
conflict resolution
mesmith75 Jul 3, 2023
b36c3da
fix import
mesmith75 Jul 3, 2023
9b471fc
Fix AlreadyClosed exception logging in DiracProcessManager
joj0s Jul 16, 2023
4abed91
Upgrade aiohttp version for 3.11 compatibility
joj0s Jul 18, 2023
c6b49d6
Change aiohttp version to 3.8.4
joj0s Jul 18, 2023
0be9de8
conflict resolution
mesmith75 Aug 7, 2023
1709460
autopep8 action fixes
mesmith75 Aug 7, 2023
5b16f7b
Update __init__.py
mesmith75 Aug 7, 2023
c35cec4
Fix autopep8 warnings for DiracBase
joj0s Aug 8, 2023
eb9c7f9
Fix autopep8 warning in logging module
joj0s Aug 8, 2023
787b15f
Merge pull request #2094 from ganga-devs/autopep8-patches/bug_fixes
joj0s Aug 8, 2023
2950bb3
Fix linting violations
joj0s Aug 8, 2023
40917a7
Merge pull request #2093 from ganga-devs/bug_fixes
joj0s Aug 8, 2023
00d1835
Merge branch 'develop' into new_monitoring_service
mesmith75 Aug 8, 2023
40cf786
Update ultradict and psutil dependencies
joj0s Aug 10, 2023
be4197d
Remove ultradict from main dependencies
joj0s Aug 10, 2023
ea38194
Merge branch 'develop' into new_monitoring_service
mesmith75 Oct 9, 2023
0d08aaf
Merge branch 'new_monitoring_service' of github.com:ganga-devs/ganga …
joj0s Oct 19, 2023
cd0d2f5
Add DIRAC CVMFS path dynamically when initializing DIRAC subprocess
joj0s Dec 6, 2023
f5f18ae
Merge branch 'develop' into new_monitoring_service
mesmith75 Dec 8, 2023
c091549
Remove fixed version dependencies from setup.py
joj0s Dec 8, 2023
4e7e49e
Merge branch 'develop' into new_monitoring_service
mesmith75 Dec 12, 2023
c6b44a0
Merge branch 'develop' into new_monitoring_service
mesmith75 Dec 18, 2023
c6f6074
Merge branch 'develop' into new_monitoring_service
mesmith75 Jan 8, 2024
8c132f7
Update setup.py
mesmith75 Jan 8, 2024
5f8e469
Update setup.py
mesmith75 Jan 8, 2024
be82cb9
Update setup.py
mesmith75 Jan 8, 2024
36c286a
Update setup.py
mesmith75 Jan 8, 2024
160de44
Update setup.py
mesmith75 Jan 8, 2024
69ff4a0
Merge branch 'develop' into new_monitoring_service
mesmith75 Jan 10, 2024
4d9aaee
Update setup.py
mesmith75 Jan 10, 2024
316957f
Update setup.py
mesmith75 Jan 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 2 additions & 12 deletions ganga/GangaCore/Core/InternalServices/Coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,9 @@ def _checker(): return True


def disableMonitoringService():

# disable the mon loop
log.debug("Shutting down the main monitoring loop")
from GangaCore.Core.MonitoringComponent.Local_GangaMC_Service import _purge_actions_queue, stop_and_free_thread_pool
_purge_actions_queue()
stop_and_free_thread_pool()
log.debug("Disabling the central Monitoring")
from GangaCore.Core import monitoring_component
monitoring_component.disableMonitoring()
monitoring_component.disable()


def disableInternalServices():
Expand Down Expand Up @@ -155,11 +149,7 @@ def disableInternalServices():

def enableMonitoringService():
from GangaCore.Core import monitoring_component
monitoring_component.alive = True
monitoring_component.enableMonitoring()
from GangaCore.Core.MonitoringComponent.Local_GangaMC_Service import _makeThreadPool, ThreadPool
if not ThreadPool or len(ThreadPool) == 0:
_makeThreadPool()
monitoring_component.enable()
global servicesEnabled
servicesEnabled = True

Expand Down
8 changes: 6 additions & 2 deletions ganga/GangaCore/Core/InternalServices/ShutdownManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# Ganga imports
from GangaCore.Core.GangaThread import GangaThreadPool
from GangaCore.Core.GangaThread.WorkerThreads import _global_queues, shutDownQueues
from GangaCore.Core import monitoring_component

from GangaCore.Core.InternalServices import Coordinator
from GangaCore.Runtime import Repository_runtime, bootstrap
from GangaCore.Utility import stacktracer
Expand Down Expand Up @@ -62,13 +62,17 @@ def _unprotected_ganga_exitfuncs():
setConfigOption('Configuration', 'DiskIOTimeout', 1)

# Stop the monitoring loop from iterating further
from GangaCore.Core import monitoring_component

if monitoring_component is not None:
try:
getStackTrace()
if monitoring_component.alive:
monitoring_component.disableMonitoring()
logger.debug('Disabling and joining monitoring thread...')
monitoring_component.disable()
monitoring_component.stop()
monitoring_component.join()
logger.debug('Monitoring thread stopped')
except Exception as err:
logger.exception("Exception raised while stopping the monitoring: %s" % err)

Expand Down
228 changes: 228 additions & 0 deletions ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
import functools
import shutil
import traceback

from GangaCore.Core.GangaRepository.Registry import RegistryKeyError, RegistryLockError
from GangaCore.Core.GangaThread import GangaThread
from GangaCore.GPIDev.Base.Proxy import getName, stripProxy
from GangaCore.GPIDev.Lib.Job.Job import lazyLoadJobBackend, lazyLoadJobStatus
from GangaCore.Utility.Config import getConfig
from GangaCore.Utility.logging import getLogger
from GangaCore.GPIDev.Lib.Job.utils import lazyLoadJobObject

config = getConfig("PollThread")
THREAD_POOL_SIZE = config['update_thread_pool_size']
POLL_RATE = config['base_poll_rate'] # in seconds
log = getLogger()


async def log_exceptions(awaitable):
try:
return await awaitable
except Exception:
log.warn(traceback.format_exc())


class AsyncMonitoringService(GangaThread):
def __init__(self, registry_slice=None):
GangaThread.__init__(self, name="AsyncMonitoringService")
self.daemon = True
self.loop = asyncio.new_event_loop()
self.enabled = False
self.alive = True
self.registry_slice = registry_slice
self.active_backends = {}
self.scheduled_backend_checks = {}
self.thread_executor = None

def run(self):
asyncio.set_event_loop(self.loop)
self.thread_executor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
self.loop.run_forever()

def _check_active_backends(self, job_slice=None):
if not self.enabled:
return

if job_slice:
fixed_ids = job_slice.ids()
else:
fixed_ids = self.registry_slice.ids()

found_active_backends = {}
for i in fixed_ids:
try:
# This is safe as it's addressing a job which _better_ be in the job repo
j = stripProxy(self.registry_slice(i))
job_status = lazyLoadJobStatus(j)
if job_status in ['submitted', 'running'] or (j.master and (job_status in ['submitting'])):
backend_obj = lazyLoadJobBackend(j)
backend_name = getName(backend_obj)
found_active_backends.setdefault(backend_name, [])
found_active_backends[backend_name].append(j)

except RegistryKeyError as err:
log.debug("RegistryKeyError: The job was most likely removed")
log.debug("RegError %s" % str(err))
except RegistryLockError as err:
log.debug("RegistryLockError: The job was most likely removed")
log.debug("Reg LockError%s" % str(err))

# If a backend is newly found as active, trigger its monitoring
previously_active_backends = self.active_backends
self.active_backends = found_active_backends
for backend_name in self.active_backends:
if backend_name not in previously_active_backends:
log.debug(f'Adding {backend_name} to list of backends to monitor.')
self._check_backend(backend_obj)

self._log_backend_summary(found_active_backends)

if previously_active_backends:
self._cleanup_finished_backends(previously_active_backends, found_active_backends)

self.loop.call_later(POLL_RATE, self._check_active_backends)

def _log_backend_summary(self, active_backends):
summary = "{"
for backend, these_jobs in active_backends.items():
summary += '"' + str(backend) + '" : ['
for this_job in these_jobs:
summary += str(stripProxy(this_job).id) + ', ' # getFQID('.')) + ', '
summary += '], '
summary += '}'
log.debug(f"Active backends: {summary}")

def run_monitoring_task(self, monitoring_task, jobs):
if not self.enabled:
return
if asyncio.iscoroutinefunction(monitoring_task):
self.loop.create_task(log_exceptions(monitoring_task(jobs)))
else:
self.loop.create_task(self._run_in_threadpool(monitoring_task, jobs))

def _check_backend(self, backend):
if not self.enabled:
return
backend_name = getName(backend)
if backend_name not in self.active_backends:
return

job_slice = self.active_backends[backend_name]
backend.master_updateMonitoringInformation(job_slice)
self._schedule_next_backend_check(backend)

def _schedule_next_backend_check(self, backend):
backend_name = getName(backend)
if backend_name in config:
pRate = config[backend_name]
else:
pRate = config['default_backend_poll_rate']
timer_handle = self.loop.call_later(pRate, self._check_backend, backend)
self.scheduled_backend_checks.setdefault(backend_name, [])
self.scheduled_backend_checks[backend_name].append(timer_handle)

async def _run_in_threadpool(self, task, jobs):
await self.loop.run_in_executor(
self.thread_executor, functools.partial(task, jobs=jobs))

def _cleanup_finished_backends(self, previously_active_backends, found_active_backends):
# Check for backends which have no more jobs to monitor and trigger their cleanup.
for backend, jobs in previously_active_backends.items():
if backend not in found_active_backends:
for job in jobs:
if job.status == 'completing':
self.loop.call_later(1, self._cleanup_finished_backends,
previously_active_backends, found_active_backends)
return
log.debug(f'Removing {getName(backend)} from active backends')
backend_obj = lazyLoadJobBackend(jobs[0])
self._cleanup_backend(backend_obj)

def _cleanup_backend(self, backend):
backend_name = getName(backend)
if backend_name not in self.scheduled_backend_checks:
return
for timer_handle in self.scheduled_backend_checks[backend_name]:
timer_handle.cancel()
del self.scheduled_backend_checks[backend_name]
try:
backend.tear_down_monitoring()
except NotImplementedError:
pass

def _cleanup_scheduled_tasks(self):
scheduled_tasks = [task for task in asyncio.all_tasks(self.loop) if task is not asyncio.current_task(self.loop)]
for task in scheduled_tasks:
task.cancel()

def _cleanup_dirty_jobs(self):
"""
Check for jobs that may have been interrupted for completing and revert them back to the
submitted status.
"""
running_slice = self.registry_slice.select(status='running')
completing_slice = self.registry_slice.select(status='completing')
submitted_slice = self.registry_slice.select(status='submitted')

for job in chain(running_slice, completing_slice, submitted_slice):
j = stripProxy(self.registry_slice(job.id))
status = lazyLoadJobStatus(j)
subjobs = lazyLoadJobObject(j, 'subjobs')
if not subjobs and status == 'completing':
self._remove_dirty_outputdir(j)
j.status = 'running'
continue
if subjobs:
if status == 'completing':
j.status = 'running'
for sj in j.subjobs:
if lazyLoadJobStatus(sj) == 'completing':
self._remove_dirty_outputdir(sj)
sj.status = 'running'

def _remove_dirty_outputdir(self, job):
if not job.outputdir:
log.warn(f'Tried to reset sandbox download for job {job.id} due to dirty state but found no outputdir')
return
try:
shutil.rmtree(job.outputdir)
log.debug(f'Removed outputdir {job.outputdir} due to dirty state.')
except FileNotFoundError:
log.warn(f'Tried to reset sandbox download for job {job.id} due to dirty state but found no outputdir')

def disable(self):
if not self.alive:
log.error("Cannot disable monitoring loop. It has already been stopped")
return False
self.thread_executor.shutdown()
self._cleanup_scheduled_tasks()
self.enabled = False
return True

def enable(self):
if not self.alive:
log.error("Cannot start monitoring loop. It has already been stopped")
return False
self.enabled = True
self.thread_executor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
self._cleanup_dirty_jobs()
self.loop.call_soon_threadsafe(self._check_active_backends)
return True

def stop(self):
self.alive = False
self.enabled = False
self.thread_executor.shutdown()
try:
for backend_name, active_jobs in self.active_backends.items():
log.debug(f'Cleaning up {backend_name} before monitoring shutdown')
backend_obj = lazyLoadJobBackend(active_jobs[0])
self._cleanup_backend(backend_obj)
except Exception as err:
log.error(err)
self._cleanup_scheduled_tasks()
self.loop.call_soon_threadsafe(self.loop.stop)
15 changes: 6 additions & 9 deletions ganga/GangaCore/Core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ def bootstrap(reg_slice, interactive_session, my_interface=None):
it to GangaCore.GPI
"""
# Must do some Ganga imports here to avoid circular importing
from GangaCore import GANGA_SWAN_INTEGRATION
from GangaCore.Core.MonitoringComponent.Local_GangaMC_Service import \
JobRegistry_Monitor
from GangaCore.Runtime.GPIexport import exportToInterface
from GangaCore.Core.MonitoringComponent.MonitoringService import AsyncMonitoringService
from GangaCore.Utility.Config import getConfig
from GangaCore.Runtime.GPIexport import exportToInterface
from GangaCore.Utility.logging import getLogger

global monitoring_component

# start the monitoring loop
monitoring_component = JobRegistry_Monitor(reg_slice)
monitoring_component = AsyncMonitoringService(registry_slice=reg_slice)
monitoring_component.start()

# override the default monitoring autostart value with the setting from interactive session
Expand All @@ -49,13 +48,11 @@ def bootstrap(reg_slice, interactive_session, my_interface=None):

# Enable job monitoring if requested
if config['autostart']:
monitoring_component.enableMonitoring()
monitoring_component.enable()

# export the runMonitoring function to the public interface
if not my_interface:
import GangaCore.GPI
my_interface = GangaCore.GPI

exportToInterface(my_interface, 'runMonitoring', monitoring_component.runMonitoring, 'Functions')
if GANGA_SWAN_INTEGRATION:
exportToInterface(my_interface, 'reloadJob', monitoring_component.reloadJob, 'Functions')
exportToInterface(my_interface, 'runMonitoring', monitoring_component.enable, 'Functions')
5 changes: 3 additions & 2 deletions ganga/GangaCore/Core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def __init__(self, repo=None, what=''):
logger.error("Shutting Down Repository_runtime")
from GangaCore.Runtime import Repository_runtime
Repository_runtime.shutdown()
except:
except BaseException:
logger.error("Unable to disable Internal services, they may have already been disabled!")


Expand Down Expand Up @@ -194,7 +194,8 @@ def __init__(self, *args, **kwds):

class GangaTypeError(GangaException, TypeError):
"""
Class analogous to GangaKeyError. This class wraps TypeError so that users are prevented from seeing stack traces from known good exceptions thrown in Ganga code.
Class analogous to GangaKeyError. This class wraps TypeError so that users are prevented from seeing stack traces
from known good exceptions thrown in Ganga code.
"""

def __init__(self, *args, **kwds):
Expand Down