/
ensemble_director
executable file
·456 lines (363 loc) · 18 KB
/
ensemble_director
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
#!/usr/bin/python3
import os
import json
import uuid
import copy
import time
import socket
import datetime
import argparse
import threading
import encryption
import communication
import ensemble_enums
import database_access
import ensemble_logging
import ensemble_constants
import dateutil.parser as parser
from time import sleep
from ast import literal_eval
parser = argparse.ArgumentParser(description="Ensemble Agent")
parser.add_argument("--debug", action=argparse.BooleanOptionalAction, help="Puts the agent in debug mode where all logged events are outputed to console")
parser.add_argument("--config-file", required=True, help="Ensemble config file. Required to run the application. Please see github readme for more information")
args = parser.parse_args()
if(args.debug != None):
ensemble_logging.logLevel = 2
#Config file loaded from disk
APP_CONFIG = {}
#The ip address that the director will listen on
HOST_IP = ''
#Port that agents call in on to register
AGENT_REGISTRATION_PORT = 0
KEY_EXISTS = False
agents={}
agent_jobs = {}
def run_command(command):
ensemble_logging.log_message(f"Running Command: {command}")
process = os.popen(command)
ensemble_logging.log_message(f"Process started with pid {process}")
database_access.insert_stream_event({"EventType":ensemble_enums.StreamEvent.Director.value, "Event":f"Running Command: {command}", "EventId":process})
result = process.read()
return result.strip()
def initialize():
global APP_CONFIG
if(os.path.exists(ensemble_constants.LOGS_DIR) == False):
os.popen(f"{ensemble_constants.MAKE_DIR_COMMAND} ./{ensemble_constants.LOGS_DIR}")
ensemble_logging.log_message("Creating log directory")
if(os.path.exists(f"./{ensemble_constants.CERT_PEM_FILENAME}") == False or os.path.exists(f"./{ensemble_constants.KEY_PEM_FILENAME}") == False):
#generate cert and key for web portal:
run_command(ensemble_constants.OPENSSL_GENERATE_CERT_AND_KEY_COMMAND)
if(ensemble_logging.logLevel == 1):
ensemble_logging.log_message("Starting Ensemble Web Server")
threading.Thread(target=run_command,args=(f"./{ensemble_constants.ENSEMBLE_WEB_FILE}",)).start()
else:
ensemble_logging.log_message("Starting Ensemble Web Server with Debugging Enabled")
threading.Thread(target=run_command,args=(f"./{ensemble_constants.ENSEMBLE_WEB_FILE} --debug",)).start()
ensemble_logging.log_message("PEMs created")
ensemble_logging.log_message("Importing config file")
import_config()
#initialize the encryption lib with the key from the config or a new key
APP_CONFIG = encryption.initialize(APP_CONFIG)
database_access.initialize()
#write updated config to disk
write_config_to_disk()
check_for_new_jobs()
#register_api_callbacks()
threading.Timer(5.0,check_job_status).start()
threading.Thread(target=send_agent_health_check).start()
threading.Thread(target=start_agent_listener_server).start()
threading.Thread(target=check_for_agent_commands).start()
threading.Thread(target=check_for_scheduled_jobs).start()
#Writes the config file to disk at the location provided on startup
def write_config_to_disk():
config = open(args.config_file,"w")
config.write(json.dumps(APP_CONFIG))
config.close()
#Imports the config file from the path provided on startup
def import_config():
global APP_CONFIG
global HOST_IP
global AGENT_REGISTRATION_PORT
if(args.config_file == None):
ensemble_logging.log_message(f"Unable to find {args.config_file} in the working directory. Cannot continue without a config file")
exit()
APP_CONFIG = json.loads(open(args.config_file,"r").read())
if(ensemble_constants.CONFIG_FILE_HOST_IP not in APP_CONFIG
or ensemble_constants.CONFIG_FILE_AGENT_REG_PORT not in APP_CONFIG
or ensemble_constants.CONFIG_FILE_AGENT_COM_PORT not in APP_CONFIG):
ensemble_logging.log_message(f"Unable to find all required fields in the config file. Config must contain '{ensemble_constants.CONFIG_FILE_HOST_IP}', '{ensemble_constants.CONFIG_FILE_AGENT_REG_PORT}', and '{ensemble_constants.CONFIG_FILE_AGENT_COM_PORT}' at a minimum")
exit()
HOST_IP = APP_CONFIG[ensemble_constants.CONFIG_FILE_HOST_IP]
AGENT_REGISTRATION_PORT = APP_CONFIG[ensemble_constants.CONFIG_FILE_AGENT_REG_PORT]
#Starts the agent registration listening server
def start_agent_listener_server():
ensemble_logging.log_message("Starting Ensemble Agent Listening Server")
bindAddress = HOST_IP
bindPort = int(AGENT_REGISTRATION_PORT)
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ensemble_logging.log_message(f"Agent Listener Server Binding to {bindAddress} on port {bindPort}")
serverSocket.bind((bindAddress, bindPort))
serverSocket.listen(100)
database_access.insert_stream_event({"EventType":ensemble_enums.StreamEvent.Director.value, "Event":f"Agent Listener Server Binding to {bindAddress} on port {bindPort}", "EventId":""})
while True:
connection, address = serverSocket.accept()
request = connection.recv(1024)
ensemble_logging.log_message(f"Message Received {connection} {address} {request}")
threading.Thread(target=handle_agent_registration_request,args=(request,address,)).start()
#Handles an agent registration request
def handle_agent_registration_request(request, address):
try:
database_access.register_agent(address[0])
ensemble_logging.log_message(encryption.decrypt_string(request))
ensemble_logging.log_message(f"Agent at {address[0]} registered")
ensemble_logging.log_message(f"{database_access.get_agent_count()} Total Agents Online")
database_access.add_message({"MessageType":ensemble_enums.MessageType.NOTIFICATION.value, "Message": f"Agent Online:{address[0]}"})
database_access.insert_stream_event({"EventType":ensemble_enums.StreamEvent.Agent.value, "Event":f"Agent at {address[0]} registered", "EventId":address[0]})
except:
ensemble_logging.log_message("Bad agent request received")
#Converts a job to string
def job_to_string(job):
return ensemble_constants.JOB_REQUEST + ensemble_constants.MESSAGE_BREAK + json.dumps(job)
#Handles a job request from the api POSSIBLY DEPRICATED
def handle_job_result_api_request(request):
results = {}
if(request in agent_jobs):
for agentIp in agent_jobs[request]:
result = send_job_result_request(agentIp,request)
results[agentIp] = result
return str(results)
#Sends a admin command to the agent
def send_agent_command_log_request(agentIp,command):
commandPayload = ""
if(ensemble_constants.CLEAR_LOGS in command):
commandPayload = ensemble_constants.CLEAR_LOGS
database_access.add_message({"MessageType":ensemble_enums.MessageType.INFORMATION.value, "Message": f"Clear log command sent to {agentIp}"})
elif(ensemble_constants.RESTART in command):
commandPayload = ensemble_constants.RESTART
database_access.add_message({"MessageType":ensemble_enums.MessageType.INFORMATION.value, "Message": f"Restart command sent to {agentIp}"})
elif(ensemble_constants.STOP_JOBS in command):
commandPayload = ensemble_constants.STOP_JOBS
database_access.add_message({"MessageType":ensemble_enums.MessageType.INFORMATION.value, "Message": f"Stop jobs command sent to {agentIp}"})
elif(ensemble_constants.KILL in command):
commandPayload = ensemble_constants.KILL
database_access.add_message({"MessageType":ensemble_enums.MessageType.INFORMATION.value, "Message": f"Kill command sent to {agentIp}"})
encryptedPayload = encryption.encrypt_string(commandPayload)
communication.tx(agentIp,int(5682),encryptedPayload)
#Sends a request for a job result to the agent ip passed in
def send_job_result_request(agentIp,id):
payload = ensemble_constants.JOB_RESULTS + ensemble_constants.MESSAGE_BREAK + str(id)
ensemble_logging.log_message(f"Payload {payload}")
encryptedPayload = encryption.encrypt_string(payload)
ensemble_logging.log_message(f"Sending Job Result Request {encryptedPayload}")
response = communication.txrx(agentIp,int(5682),encryptedPayload)
decryptedResponse = encryption.decrypt_string(response)
ensemble_logging.log_message(f"Result for job {id} is {decryptedResponse}")
return decryptedResponse
JOB_STATUS_CHECKER_RUNNING = False
def check_job_status():
try:
global JOB_STATUS_CHECKER_RUNNING
JOB_STATUS_CHECKER_RUNNING = True
runningJobs = database_access.get_all_running_jobs()
for runningJob in runningJobs:
agentIp = runningJob["AgentIp"]
jobId = runningJob["JobId"]
status = send_job_status_request(agentIp, jobId)
if(int(status) == 1):
ensemble_logging.log_message(f"Job completed for {agentIp}")
database_access.insert_stream_event({"EventType":ensemble_enums.StreamEvent.Job.value, "Event":f"Job completed for {agentIp}", "EventId":jobId})
result = send_job_result_request(agentIp, jobId)
database_access.update_job_result(agentIp,jobId,result)
database_access.add_message({"MessageType":ensemble_enums.MessageType.NOTIFICATION.value, "Message": f"Job Completed for Job ID:{jobId}"})
runningJobs.remove(runningJob)
else:
ensemble_logging.log_message(f"Job not completed yet for {agentIp}")
except Exception as error:
ensemble_logging.log_message(f"Error while checking job status {error}")
database_access.add_message({"MessageType":ensemble_enums.MessageType.WARNING.value, "Message": f"Job Failed for Job ID:{jobId}"})
runningJobs.remove(runningJob)
retry_job(agentIp, jobId)
finally:
threading.Timer(5.0,check_job_status).start()
def retry_job(agentIp,jobId):
database_access.add_message({"MessageType":ensemble_enums.MessageType.INFORMATION.value, "Message": f"Retrying Job ID:{jobId}"})
database_access.update_job_reset_job_status(jobId)
def send_job_status_request(agentIp,id):
payload = ensemble_constants.JOB_STATUS + ensemble_constants.MESSAGE_BREAK + str(id)
payload = encryption.encrypt_string(payload)
ensemble_logging.log_message(f"Sending Job Status Request {payload}")
response = encryption.decrypt_string(communication.txrx(agentIp,int(5682), payload))
ensemble_logging.log_message(f"Status for job {id} is {response}")
return response
def handle_start_job_request_api_request(request):
job = json.loads(request)
database_access.queue_job_request(job)
return
def check_for_new_jobs():
try:
jobs = database_access.check_for_new_jobs()
if(len(jobs) > 0):
ensemble_logging.log_message(f"Jobs found {jobs}")
database_access.insert_stream_event({"EventType":ensemble_enums.StreamEvent.Director.value, "Event":f"Jobs found {jobs}", "EventId":""})
for j in jobs:
if(len(j) >= 5):
job = {
"Id": j[1],
"Cmd": j[2],
"Targets": literal_eval(j[3]),
"IsSingleCmd": j[4]
}
isLoadBalanced = bool(j[7])
if(isLoadBalanced):
start_job_request_request_load_balanced(job)
else:
start_job_request_all_agents(job)
else:
ensemble_logging.log_message(f'Bad row in job results {j}')
except Exception as error:
ensemble_logging.log_message(f'Error while checking for new jobs {error}')
finally:
threading.Timer(5.0,check_for_new_jobs).start()
def start_job_request_all_agents(job):
agents = database_access.get_all_agents()
for agent in agents:
send_job_request(agent["IpAddress"], job)
def start_job_request_request_load_balanced(job):
batches = {}
agents = database_access.get_all_agents()
agentCount = len(agents)
if agentCount == 0:
return
#ROUND ROBIN LOAD BALANCE
agentIndex = 0
if job["Targets"] is not None and len(job["Targets"]) > 0:
for target in job["Targets"]:
#For every agent, assign one target then increment to the next agent
#if we've reached the last agent, start batching at the begining again
if(agentIndex >= agentCount):
agentIndex = 0
#If this agent doesn't have any batches yet, create one
if(len(batches) == 0 or agents[agentIndex]["IpAddress"] not in batches):
batches[agents[agentIndex]["IpAddress"]] = []
#get the batch for this agent (Batch<AgentIP,TargetIp[]>)
batch = batches[agents[agentIndex]["IpAddress"]]
#If this is the first batch item create a new batch array and add it
if(len(batch) == 0):
batches[agents[agentIndex]["IpAddress"]] = [target]
#otherwise append our new batch item
else:
batches[agents[agentIndex]["IpAddress"]].append(target)
agentIndex += 1
else:
if(agentIndex >= agentCount):
agentIndex = 0
if(len(batches) == 0 or agents[agentIndex]["IpAddress"] not in batches):
batches[agents[agentIndex]["IpAddress"]] = []
batch = batches[agents[agentIndex]["IpAddress"]]
if(len(batch) == 0):
batches[agents[agentIndex]["IpAddress"]] = [ensemble_constants.NA]
else:
batches[agents[agentIndex]["IpAddress"]].append(ensemble_constants.NA)
agentIndex += 1
#for every batch of work, clone the job, assign the targets from the batch
#and send the job to the specificed agent for that batch
for key in batches:
clonedJob = copy.deepcopy(job)
clonedJob["Targets"] = batches[key]
send_job_request(key, clonedJob)
def send_job_request(agentIp,job):
if(job["Id"] not in agent_jobs):
agent_jobs[job["Id"]] = [agentIp]
else:
agent_jobs[job["Id"]].append(agentIp)
payloadBuilder = job_to_string(job)
payload=encryption.encrypt_string(payloadBuilder)
database_access.update_job_started(job["Id"])
for target in job["Targets"]:
if ensemble_constants.NA in target:
continue
database_access.insert_target(target)
_target = database_access.get_target_by_target(target)
database_access.insert_target_job(_target["Id"],job["Id"])
database_access.insert_new_agent_job_record(agentIp, job["Id"])
database_access.add_message({"MessageType":ensemble_enums.MessageType.NOTIFICATION.value, "Message": f"Job Started for Job ID:{job['Id']}"})
ensemble_logging.log_message(f'Sending Job Request: {agentIp}:{APP_CONFIG[ensemble_constants.CONFIG_FILE_AGENT_COM_PORT]} {payload}')
communication.tx(agentIp,int(APP_CONFIG[ensemble_constants.CONFIG_FILE_AGENT_COM_PORT]),payload)
def check_for_scheduled_jobs():
try:
#get all scheduled jobs that are recurring or scheduled for today
jobs = database_access.get_scheduled_jobs()
for scheduledJob in jobs:
scheduledJobExists = database_access.check_if_scheduled_job_exists(scheduledJob["id"])
if(scheduledJobExists == False):
scheduledRunTime = None if scheduledJob['runTime'] == '' else datetime.datetime.strptime(f"{datetime.datetime.now().date()} {scheduledJob['runTime']}",f"%Y-%m-%d %H:%M:%S.%f")
runWindowMargin = datetime.timedelta(minutes=1)
recurringJobThatIsReadyToRun = scheduledJob["runDateTime"] == '' and scheduledRunTime >= (datetime.datetime.now() - runWindowMargin) and scheduledRunTime <= datetime.datetime.now()
scheduledJobRunDateTime = None if scheduledJob["runDateTime"] == '' else datetime.datetime.strptime(scheduledJob["runDateTime"],'%Y-%m-%d %H:%M:%S')
timeNow = datetime.datetime.now()
scheduledJobIsReadyToRun = scheduledRunTime == None and scheduledJobRunDateTime <= timeNow
if(recurringJobThatIsReadyToRun or scheduledJobIsReadyToRun):
job = {
"Id": uuid.uuid4(),
"Cmd": scheduledJob['cmd'],
"Targets": scheduledJob['targets'],
"IsSingleCmd": scheduledJob['isSingleCmd'],
"IsLoadBalanced": scheduledJob['isLoadBalanced'],
"WorkspaceId": scheduledJob['workspaceId']
}
database_access.queue_job_request(job)
database_access.insert_workspace_job(job["WorkspaceId"], str(job["Id"]))
if(scheduledJobRunDateTime):
database_access.remove_scheduled_job(scheduledJob["id"])
database_access.insert_scheduled_job_mapping({"JobId":job["Id"],"ScheduledJobId":scheduledJob["id"]})
#add to scheduled job run mapping table
except Exception as error:
ensemble_logging.log_message(f'Error while checking for scheduled job {error}')
finally:
threading.Timer(30,check_for_scheduled_jobs).start()
supportedCommands = [ensemble_constants.CLEAR_LOGS, ensemble_constants.STOP_JOBS, ensemble_constants.KILL, ensemble_constants.RESTART]
def check_for_agent_commands():
try:
agentCommands = database_access.get_pending_agent_commands()
for agentCommand in agentCommands:
agent = database_access.get_agent_by_id(agentCommand["AgentId"])
if(agentCommand["Command"] in supportedCommands):
ensemble_logging.log_message(f'Sending {agentCommand["Command"]} Request to {agent["IpAddress"]}')
database_access.insert_stream_event({"EventType":ensemble_enums.StreamEvent.Agent.value, "Event":f'Sending {agentCommand["Command"]} Request to {agent["IpAddress"]}', "EventId":f"{agentCommand['AgentId']}"})
send_agent_command_log_request(agent["IpAddress"],agentCommand["Command"])
database_access.complete_all_agent_commands(agentCommand["Id"])
except Exception as error:
ensemble_logging.log_message(f'Error while checking agent commands {error}')
finally:
threading.Timer(2.0,check_for_agent_commands).start()
def send_agent_health_check():
try:
agents = database_access.get_all_agents()
for agent in agents:
payload = encryption.encrypt_string(ensemble_constants.HEALTH_REQUEST)
try:
response = communication.txrx(agent["IpAddress"],int(APP_CONFIG[ensemble_constants.CONFIG_FILE_AGENT_COM_PORT]),payload)
if len(response) == 0:
database_access.set_agent_inactive(agent["Id"])
database_access.add_message({"MessageType":ensemble_enums.MessageType.WARNING.value, "Message": f"Agent has gone offline {agent['IpAddress']}"})
database_access.insert_stream_event({"EventType":ensemble_enums.StreamEvent.Agent.value, "Event":f"Agent has gone offline {agent['IpAddress']}", "EventId":f"{agent['Id']}"})
else:
healthJsonObj = json.loads(encryption.decrypt_string(response))
agentHealth = {
"Id":agent["Id"],
"MemPct":healthJsonObj["MemInfo"][2],
"StgPct":healthJsonObj["StgInfo"][3],
"CpuPct":healthJsonObj["CpuInfo"],
"LogSize": 0 if len(str(healthJsonObj["LogSize"])) == 0 or healthJsonObj["LogSize"] == None else healthJsonObj["LogSize"],
"RunningProcesses":healthJsonObj["RunningProcesses"]
}
database_access.update_agent_health_status(agentHealth)
except Exception as error:
ensemble_logging.log_message(f'Error while sending agent health request {error}')
continue
except Exception as error:
ensemble_logging.log_message(f'Error while checking agent health {error}')
finally:
threading.Timer(10.0,send_agent_health_check).start()
initialize()