Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions src/scheduler/kubernetes/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,16 +735,13 @@ def schedule(self):
self.schedule_job(job_id, cpu, memory)

def handle_aborts(self):
cluster_name = os.environ['INFRABOX_CLUSTER_NAME']

cursor = self.conn.cursor()
cursor.execute('''
SELECT j.id, a.user_id
FROM abort a
JOIN job j
ON a.job_id = j.id
AND j.cluster_name = %s
''', [cluster_name])
''')

aborts = cursor.fetchall()
cursor.close()
Expand Down Expand Up @@ -812,34 +809,35 @@ def handle_timeouts(self):
cursor = self.conn.cursor()
cursor.execute("""
UPDATE job SET state = 'error', end_date = current_timestamp, message = 'Aborted due to timeout'
WHERE id = %s""", (job_id,))
WHERE id = %s and state = 'running' """, (job_id,))
cursor.close()

def upload_console(self, job_id):
cursor = self.conn.cursor()
cursor.execute("""
SELECT output FROM console WHERE job_id = %s
ORDER BY date
""", [job_id])
lines = cursor.fetchall()
cursor.close()
cursor.execute("begin")
Comment thread
ib-steffen marked this conversation as resolved.
try:
cursor.execute("""
SELECT output FROM console WHERE job_id = %s
ORDER BY date FOR UPDATE
""", [job_id])
lines = cursor.fetchall()

output = ""
for l in lines:
output += l[0]
output = ""
for l in lines:
output += l[0]

cursor = self.conn.cursor()
cursor.execute("""
UPDATE job SET console = %s WHERE id = %s;
DELETE FROM console WHERE job_id = %s;
""", [output, job_id, job_id])
cursor.close()
if output:
cursor.execute("""
UPDATE job SET console = %s WHERE id = %s;
DELETE FROM console WHERE job_id = %s;
""", [output, job_id, job_id])
cursor.execute("commit")
except Exception as e:
self.logger.error(e)
cursor.execute("rollback")
finally:
cursor.close()

cursor = self.conn.cursor()
cursor.execute("""
DELETE FROM console WHERE job_id = %s
""", [job_id])
cursor.close()

def handle_orphaned_jobs(self):
self.logger.debug("Handling orphaned jobs")
Expand Down