From 7b90d1f6aadfe196a2654cc4a11117756c966c2c Mon Sep 17 00:00:00 2001 From: Rui Ban Date: Thu, 18 Oct 2018 17:20:27 +0800 Subject: [PATCH 1/3] fix job console missing --- src/scheduler/kubernetes/scheduler.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/scheduler/kubernetes/scheduler.py b/src/scheduler/kubernetes/scheduler.py index a1137e40f..1e8d29e3e 100644 --- a/src/scheduler/kubernetes/scheduler.py +++ b/src/scheduler/kubernetes/scheduler.py @@ -817,28 +817,26 @@ def handle_timeouts(self): def upload_console(self, job_id): cursor = self.conn.cursor() + cursor.execute("begin") cursor.execute(""" SELECT output FROM console WHERE job_id = %s - ORDER BY date + ORDER BY date FOR UPDATE """, [job_id]) lines = cursor.fetchall() - cursor.close() output = "" for l in lines: output += l[0] - cursor = self.conn.cursor() + if not output: + cursor.execute("commit;") + return + cursor.execute(""" UPDATE job SET console = %s WHERE id = %s; DELETE FROM console WHERE job_id = %s; """, [output, job_id, job_id]) - cursor.close() - - cursor = self.conn.cursor() - cursor.execute(""" - DELETE FROM console WHERE job_id = %s - """, [job_id]) + cursor.execute("commit") cursor.close() def handle_orphaned_jobs(self): From 14ec155b34b782dd85df4b6ddd8726cc8f18e68c Mon Sep 17 00:00:00 2001 From: Rui Ban Date: Fri, 19 Oct 2018 14:59:43 +0800 Subject: [PATCH 2/3] also update handle_timeouts and handle_aborts --- src/scheduler/kubernetes/scheduler.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/scheduler/kubernetes/scheduler.py b/src/scheduler/kubernetes/scheduler.py index 1e8d29e3e..ed06e0e9f 100644 --- a/src/scheduler/kubernetes/scheduler.py +++ b/src/scheduler/kubernetes/scheduler.py @@ -735,16 +735,13 @@ def schedule(self): self.schedule_job(job_id, cpu, memory) def handle_aborts(self): - cluster_name = os.environ['INFRABOX_CLUSTER_NAME'] - cursor = self.conn.cursor() cursor.execute(''' SELECT j.id, a.user_id FROM abort a JOIN job j ON a.job_id = j.id - AND j.cluster_name = %s - ''', [cluster_name]) + ''') aborts = cursor.fetchall() cursor.close() @@ -812,15 +809,14 @@ def handle_timeouts(self): cursor = self.conn.cursor() cursor.execute(""" UPDATE job SET state = 'error', end_date = current_timestamp, message = 'Aborted due to timeout' - WHERE id = %s""", (job_id,)) + WHERE id = %s and state = 'running' """, (job_id,)) cursor.close() def upload_console(self, job_id): cursor = self.conn.cursor() - cursor.execute("begin") cursor.execute(""" - SELECT output FROM console WHERE job_id = %s - ORDER BY date FOR UPDATE + SELECT output FROM console WHERE job_id = %s + ORDER BY date """, [job_id]) lines = cursor.fetchall() @@ -829,16 +825,16 @@ def upload_console(self, job_id): output += l[0] if not output: - cursor.execute("commit;") + cursor.close() return cursor.execute(""" - UPDATE job SET console = %s WHERE id = %s; - DELETE FROM console WHERE job_id = %s; - """, [output, job_id, job_id]) - cursor.execute("commit") + UPDATE job SET console = %s WHERE id = %s; + DELETE FROM console WHERE job_id = %s; + """, [output, job_id, job_id]) cursor.close() + def handle_orphaned_jobs(self): self.logger.debug("Handling orphaned jobs") From 77e198250a11016a229a04a2c5c30324f4b3a34c Mon Sep 17 00:00:00 2001 From: Rui Ban Date: Fri, 19 Oct 2018 15:10:41 +0800 Subject: [PATCH 3/3] use transaction to protect console output --- src/scheduler/kubernetes/scheduler.py | 36 +++++++++++++++------------ 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/scheduler/kubernetes/scheduler.py b/src/scheduler/kubernetes/scheduler.py index ed06e0e9f..271eaa574 100644 --- a/src/scheduler/kubernetes/scheduler.py +++ b/src/scheduler/kubernetes/scheduler.py @@ -814,25 +814,29 @@ def handle_timeouts(self): def upload_console(self, job_id): cursor = self.conn.cursor() - cursor.execute(""" - SELECT output FROM console WHERE job_id = %s - ORDER BY date - """, [job_id]) - lines = cursor.fetchall() + cursor.execute("begin") + try: + cursor.execute(""" + SELECT output FROM console WHERE job_id = %s + ORDER BY date FOR UPDATE + """, [job_id]) + lines = cursor.fetchall() - output = "" - for l in lines: - output += l[0] + output = "" + for l in lines: + output += l[0] - if not output: + if output: + cursor.execute(""" + UPDATE job SET console = %s WHERE id = %s; + DELETE FROM console WHERE job_id = %s; + """, [output, job_id, job_id]) + cursor.execute("commit") + except Exception as e: + self.logger.error(e) + cursor.execute("rollback") + finally: cursor.close() - return - - cursor.execute(""" - UPDATE job SET console = %s WHERE id = %s; - DELETE FROM console WHERE job_id = %s; - """, [output, job_id, job_id]) - cursor.close() def handle_orphaned_jobs(self):