Conversation
17a05c4
to
e4049e8
Compare
This reverts commit 1ee6f59.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥇
@@ -197,6 +197,7 @@ def get_updates(self, job_id: uuid.UUID) -> HistoryByScalar: | |||
@Pyro4.expose | |||
def stop(self): | |||
if self.__was_shutdown: | |||
LOGGER.warning("Agent API was shutdown twice.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen?
@@ -91,8 +106,8 @@ def get_job_status(self, job_name) -> JobStatus: | |||
:raises JobNotFoundException: If job was not found. | |||
:return: Job status | |||
""" | |||
|
|||
self.check_or_build_connection() | |||
with self.lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a thread lock? Are there any race conditions?
LOGGER.exception("Polling for updates failed") | ||
# Ignore | ||
|
||
# TODO Remove duplicate code with Scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goes back to perhaps having a common base class with Scheduler
🤔
@@ -66,6 +67,8 @@ def daemonize(self, serialized): | |||
pid = os.fork() | |||
if pid > 0: # Close parent process | |||
return | |||
if not self.terminate_daemon: | |||
self.terminate_daemon = multiprocessing.get_context("spawn").Event() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multiprocessing.get_context("spawn")
should also be abstracted away somehow so we don't duplicate it everywhere
Sorry for the huge PR, it took quite some effort to get stuff working.
sagemaker.analytics
appends new rows to the end of DataFrame once new values emerge. This should be checkedjob.add_scalar
Scheduler
for querying and posting job metrics, these should probably go toTrackerBase
orJobMonitorBase
to avoid duplicating codethreading.Lock
in initializingSageMakerHelper
connections to avoid race conditionsSageMakerJobMonitor
. It only notifies when new values are available, though.meeshkan stop
to not raise an exception when used before starting the agentterminate_daemon
event in the daemon process, this should be more robust than setting it before spawning and forking (?)maxParallel
to one. Also changedterminate_daemon
to be created in the API process. The process can only be stopped via the Pyro proxy anyway in real usage, as only tests re-use theService
instance forstart
->stop
etc.