In [69]:
from subprocess import check_output
import re
from pandas import DataFrame
import numpy as np
from pathlib import Path

def parse_qstat_line(line):
    job_identifier, *rest = line.split()
    split_job_identifier = list(re.match(r'([0-9]+)\[([^\]]*)\]\.(.*)', job_identifier).groups())
    return split_job_identifier + rest

def qstat():
    cmd_output = check_output(['qstat', '-t'])
    if not cmd_output: return DataFrame([], columns = ['JobId','ArrayId', 'Machine', 'JobName', 'User', 'Time', 'Status', 'Queue'])
    header, line, *body = cmd_output.decode('utf8').split('\n')
    body = [parse_qstat_line(line) for line in body if line]
    return DataFrame(body, columns = ['JobId','ArrayId', 'Machine', 'JobName', 'User', 'Time', 'Status', 'Queue'])
    
def job_summary(jobs): return jobs[jobs.ArrayId != ''].groupby(['JobName', 'Status']).agg({'ArrayId':'count'})

job_summary(qstat())

Unnamed: 0_level_0,Unnamed: 1_level_0,ArrayId
JobName,Status,Unnamed: 2_level_1
phase_diagram_5,R,1


In [66]:
qstat()

Unnamed: 0,JobId,ArrayId,Machine,JobName,User,Time,Status,Queue
0,2303199,,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
1,2303199,727.0,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
2,2303200,,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
3,2303200,728.0,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
4,2303203,,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
5,2303203,729.0,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
6,2303204,,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
7,2303204,951.0,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
8,2303205,,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a
9,2303205,952.0,cx1,phase_diagram_5,tch14,0,Q,v1_throughput2a


In [63]:
def make_jobs_df(array_list, JobName, JobId, Machine = 'cx1'):
    df = DataFrame(columns = ['JobId','ArrayId', 'Machine', 'JobName', 'User', 'Time', 'Status', 'Queue'])
    df.ArrayId = np.array(np.array(array_list))
    df.JobId = JobId
    df.Machine = Machine
    df.JobName = JobName
    return df


def read_logs(working_dir, jobs, errors = True, outputs = False):
    log_dir = working_dir / 'logs'
    for index, job in jobs.iterrows():
        output_file = log_dir / f'{job.JobName}.o{job.JobId}.{job.ArrayId}'
        error_file =  log_dir / f'{job.JobName}.e{job.JobId}.{job.ArrayId}'
        
        job_name = f'{job.JobId}[{job.ArrayId}].{job.Machine}'
        print(f'########## {job_name} ###############')
        if output_file.exists() and outputs: print(output_file.read_text())
        if error_file.exists() and errors: print(error_file.read_text())

def qsub(working_dir, jobs):
    '''Takes a dataframe of jobs'''
    log_dir = working_dir / 'logs'
    for index, job in jobs.iterrows():
        job_name = f'{job.JobId}[{job.ArrayId}].{job.Machine}'
        cmd_output = check_output(['qsub', '-J', f'{job.ArrayId}-{job.ArrayId + 1}:2', '../runscript.sh'], cwd = log_dir, encoding = 'utf8')
        print(f'starting {job_name}: {cmd_output}')

def qdel(jobs):    
    for index, job in jobs.iterrows():
        if job.ArrayId != '':
            job_name = f'{job.JobId}[{job.ArrayId}].{job.Machine}'
            cmd_output = check_output(['qdel', job_name])
            print(f'killing {job_name}: {cmd_output}')

In [64]:
jobs_to_start = make_jobs_df(
    array_list = [951, 952, 953, 954, 1056, 1057, 1058, 1059, 1060, 1463, 1464, 1837, 1838, 1839, 1840, 1904, 1965, 1966, 1967, 5123, 5266, 5267],
    JobName = 'phase_diagram_5',
    JobId = 2299532,
    Machine = 'cx1'
)
#read_logs(Path('./data/phase_diagram_5'), jobs_to_start)
#qsub(Path('./data/phase_diagram_5'), jobs_to_start)

starting 2299532[951].cx1: 2303204[].cx1

starting 2299532[952].cx1: 2303205[].cx1

starting 2299532[953].cx1: 2303206[].cx1

starting 2299532[954].cx1: 2303207[].cx1

starting 2299532[1056].cx1: 2303208[].cx1

starting 2299532[1057].cx1: 2303209[].cx1

starting 2299532[1058].cx1: 2303210[].cx1

starting 2299532[1059].cx1: 2303211[].cx1

starting 2299532[1060].cx1: 2303212[].cx1

starting 2299532[1463].cx1: 2303213[].cx1

starting 2299532[1464].cx1: 2303214[].cx1

starting 2299532[1837].cx1: 2303215[].cx1

starting 2299532[1838].cx1: 2303216[].cx1

starting 2299532[1839].cx1: 2303217[].cx1

starting 2299532[1840].cx1: 2303218[].cx1

starting 2299532[1904].cx1: 2303219[].cx1

starting 2299532[1965].cx1: 2303220[].cx1

starting 2299532[1966].cx1: 2303221[].cx1

starting 2299532[1967].cx1: 2303222[].cx1

starting 2299532[5123].cx1: 2303223[].cx1

starting 2299532[5266].cx1: 2303224[].cx1

starting 2299532[5267].cx1: 2303225[].cx1



In [15]:
jobs = qstat()
jobs[jobs.Status == 'R']

Unnamed: 0,JobId,ArrayId,Machine,JobName,User,Time,Status,Queue


In [19]:
jobs_to_start = pd.DataFrame(columns = ['JobId','ArrayId', 'Machine', 'JobName', 'User', 'Time', 'Status', 'Queue'])
jobs_to_start.ArrayId = np.array([727, 728, 729, 951, 952, 953, 954, 1056, 1057, 1058, 1059, 1060, 1463, 1464, 1837, 1838, 1839, 1840, 1904, 1965, 1966, 1967, 5123, 5266, 5267])

qsub(working_dir = Path('./data/phase_diagram_5'), jobs = to_kill)

NameError: name 'iterable' is not defined

In [34]:
jobs

Unnamed: 0,JobId,ArrayId,Machine,JobName,User,Time,Status,Queue
0,2287114,,cx1,phase_diagram_3,tch14,0,B,v1_throughput2a
1,2287114,1,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
2,2287114,2,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
3,2287114,3,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
4,2287114,4,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
5,2287114,5,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
6,2287114,6,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
7,2287114,7,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
8,2287114,8,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a
9,2287114,9,cx1,phase_diagram_3,tch14,0,X,v1_throughput2a


In [115]:
from collections import Counter
j = job_summary(qstat())
for name, jobs in j.groupby(level=0):
    statuses, nums = np.array([[status, job.ArrayId] for status, job in jobs.groupby(level=1)]).T
    N = 20
    nums = N * np.round(nums / np.sum(nums))
    nums[-1] -= (np.sum(nums) - N)
    print(nums)
    

AttributeError: 'list' object has no attribute 'ArrayId'

True

In [82]:
from slackclient import SlackClient

def slack_message(message, channel):
    token = 'xoxp-525787134705-526388674674-526594178197-c5b1337fd7ba764c07a787297720f650'
    sc = SlackClient(token)

    sc.api_call('chat.postMessage', channel=channel, 
                text=message, username='CMTH bot',
                icon_emoji=':robot_face:')
    
import time
last_qstat = qstat()
while True:
    time.sleep(10)
    print('Checking qstat')
    q = qstat()
    if not q.equals(last_qstat):
        summary = job_summary(qstat())
        slack_message(summary.to_string(), 'bots-notifications')
        last_qstat = q

Checking qstat
Checking qstat
Checking qstat
Checking qstat
Checking qstat
Checking qstat
Checking qstat
Checking qstat
Checking qstat
Checking qstat
Checking qstat


KeyboardInterrupt: 