-
Notifications
You must be signed in to change notification settings - Fork 0
/
lustre_realtimejobs_torque.py
executable file
·114 lines (97 loc) · 3.64 KB
/
lustre_realtimejobs_torque.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python
import sys
import time
import select
import _inotify
sys.path.append("MySQL")
import MySQLObject
ROOTDIR="/var/spool/torque/server_priv/accounting"
class Logfile:
def __init__(self, prefix, path, filename):
''' path: directory where the file is
filename: name of file to read
prefix: filename has to start with this prefix
'''
self.path = path
self.prefix = prefix
self.filename = self.path+"/"+filename
self.f = open(self.filename,"r")
self.db = MySQLObject.MySQLObject()
self.read_from_last_pos_to_end()
def read_from_last_pos_to_end(self):
'''read from file from current position to current end, build lists for inserts and updates
and do batch execution'''
jobstarts = {}
jobends = {}
b=self.f.read()
for l in b.split("\n"):
sp = l[:-1].split(";")
if len(sp)>1 and sp[1] in ["S","E","A","D"]: # A ABORT / D delete do not always produce E record
jobid=sp[2]
datestr=sp[0]
if sp[1] == "S":
fi = sp[3].split()
for i in fi:
if i.startswith("start"):
start=int(i.split('=')[1])
if i.startswith("user"):
owner=i.split('=')[1]
if i.startswith("exec_host"):
l=[]
for n in [x.split('/')[0] for x in i.split('=')[1].split("+")]:
if n not in l:
l.append(n)
hosts=",".join(l)
end=-1
jobstarts[jobid] = (jobid, start, end, owner, hosts, "")
print "jobstart:",jobid,"owner:",owner,len(l),"nodes"
else:
end=int(time.mktime(time.strptime(datestr,"%m/%d/%Y %H:%M:%S")))
start = -1
owner = ""
hosts = ""
jobends[jobid] = (jobid, start, end, owner, hosts, "")
inserts = []
updates = []
# if S and E come together, merge them into one insert
for i in jobstarts:
if i not in jobends:
inserts.append(jobstarts[i])
else:
tmp = list(jobstarts[i])
tmp[2] = jobends[i][2]
inserts.append(tuple(tmp))
for i in jobends:
if i not in jobstarts:
updates.append(jobends[i])
# insert into DB - executemany is hard to achieve, as we need to insert users as well
for j in inserts:
self.db.insert_job(*j)
for j in updates:
self.db.update_job(*j)
self.db.commit()
def switch_file(self,filename):
todayfile = time.strftime("%Y%m%d")
if filename.startswith(self.prefix) and todayfile in filename:
self.read_from_last_pos_to_end()
self.f.close()
self.filename = self.path+"/"+filename
self.f = open(self.filename, "r")
#print "new file", self.filename
def action(self, e):
if e["mask"] & _inotify.CREATE:
self.switch_file(e["name"])
if e["mask"] & _inotify.MODIFY:
self.read_from_last_pos_to_end()
def mainloop():
fd = _inotify.create()
wddir = _inotify.add(fd, ROOTDIR, _inotify.CREATE | _inotify.MODIFY)
todayfile = time.strftime("%Y%m%d")
# todayfile = "20140320"
lf = Logfile("",ROOTDIR,todayfile)
while True:
# blocking wait
_inotify.read_event(fd, lf.action)
time.sleep(0.1)
if __name__ == "__main__":
mainloop()