diff --git a/src/scheduler/kubernetes/scheduler.py b/src/scheduler/kubernetes/scheduler.py index a1137e40f..271eaa574 100644 --- a/src/scheduler/kubernetes/scheduler.py +++ b/src/scheduler/kubernetes/scheduler.py @@ -735,16 +735,13 @@ def schedule(self): self.schedule_job(job_id, cpu, memory) def handle_aborts(self): - cluster_name = os.environ['INFRABOX_CLUSTER_NAME'] - cursor = self.conn.cursor() cursor.execute(''' SELECT j.id, a.user_id FROM abort a JOIN job j ON a.job_id = j.id - AND j.cluster_name = %s - ''', [cluster_name]) + ''') aborts = cursor.fetchall() cursor.close() @@ -812,34 +809,35 @@ def handle_timeouts(self): cursor = self.conn.cursor() cursor.execute(""" UPDATE job SET state = 'error', end_date = current_timestamp, message = 'Aborted due to timeout' - WHERE id = %s""", (job_id,)) + WHERE id = %s and state = 'running' """, (job_id,)) cursor.close() def upload_console(self, job_id): cursor = self.conn.cursor() - cursor.execute(""" - SELECT output FROM console WHERE job_id = %s - ORDER BY date - """, [job_id]) - lines = cursor.fetchall() - cursor.close() + cursor.execute("begin") + try: + cursor.execute(""" + SELECT output FROM console WHERE job_id = %s + ORDER BY date FOR UPDATE + """, [job_id]) + lines = cursor.fetchall() - output = "" - for l in lines: - output += l[0] + output = "" + for l in lines: + output += l[0] - cursor = self.conn.cursor() - cursor.execute(""" - UPDATE job SET console = %s WHERE id = %s; - DELETE FROM console WHERE job_id = %s; - """, [output, job_id, job_id]) - cursor.close() + if output: + cursor.execute(""" + UPDATE job SET console = %s WHERE id = %s; + DELETE FROM console WHERE job_id = %s; + """, [output, job_id, job_id]) + cursor.execute("commit") + except Exception as e: + self.logger.error(e) + cursor.execute("rollback") + finally: + cursor.close() - cursor = self.conn.cursor() - cursor.execute(""" - DELETE FROM console WHERE job_id = %s - """, [job_id]) - cursor.close() def handle_orphaned_jobs(self): self.logger.debug("Handling orphaned jobs")