Skip to content

Commit

Permalink
Fixed job stutter bug, added debug statements to chase out bug with d…
Browse files Browse the repository at this point in the history
…eep web checks. Changed logging to syslog. Cleaned up some of the log messages.
  • Loading branch information
dichotomy committed Mar 23, 2018
1 parent 9b1dc0b commit 4d9b481
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 52 deletions.
3 changes: 2 additions & 1 deletion Monitors/GenSocket.py
Expand Up @@ -56,6 +56,7 @@ def __init__(self):
self.fqdn = ""
self.port = 0
self.timeout = 30
self.noisy = False

def get_deferred(self, key):
deferred = Deferred()
Expand All @@ -66,7 +67,7 @@ def get_deferred(self, key):

def startedConnecting(self, connector):
if self.job:
sys.stderr.write("Job %s: Starting connection test\n" % self.job.get_job_id())
sys.stderr.write("Job %s: Starting connection\n" % self.job.get_job_id())

def buildProtocol(self, addr):
self.addr = addr
Expand Down
20 changes: 16 additions & 4 deletions Monitors/Jobs.py
Expand Up @@ -32,19 +32,19 @@ def add(self, job_json_str):
self.jobs[self.latest_job_id] = Job(job_json_str, self.debug)
self.jobs[self.latest_job_id].set_job_id(self.latest_job_id)
self.todo.append(self.latest_job_id)
sys.stderr.write("Job %s added: %s\n" % (self.latest_job_id, job_json_str))
sys.stderr.write("Job %s: added %s\n" % (self.latest_job_id, job_json_str))
return self.latest_job_id

def find_done_jobs(self):
for job_id in self.proc:
if self.jobs[job_id].is_done():
sys.stderr.write("Job %s is done, processing." % job_id)
sys.stderr.write("Job %s: is done, processing.\n" % job_id)
self.done.append(job_id)
for job_id in self.done:
if job_id in self.proc:
self.proc.remove(job_id)
else:
sys.stderr.write("WTF? Job %s is done but not in self.proc!" % job_id)
sys.stderr.write("WTF? Job %s is done but not in self.proc!\n" % job_id)
return self.done

def finish_job(self, job_id, reason):
Expand Down Expand Up @@ -73,7 +73,7 @@ def submitted_job(self, job_id):
self.pending_submitted.remove(job_id)
del(self.jobs[job_id])
else:
raise Exception("Job %s: marked submitted but not done.")
raise Exception("Job %s: marked submitted but not done.\n")

def get_job(self, job_id=None):
if job_id:
Expand Down Expand Up @@ -624,14 +624,26 @@ def __init__(self, json, job):
self.max_index = 0

def verify_page(self, page):
if self.debug:
sys.stderr.write("Checking contents...\n\tChecking size...\n")
if len(page)==self.json["size"]:
if self.debug:
sys.stderr.write("\tSize is good, checking keywords...\n:w")
for keyword in self.json["keywords"]:
if self.debug:
sys.stderr.write("\t\tChecking %s..." % keyword)
if keyword in page:
if self.debug:
sys.stderr.write("Good!\n")
continue
else:
if self.debug:
sys.stderr.write("Bad!\n")
self.invalid()
else:
self.invalid()
if self.debug:
sys.stderr.write("Done content check!\n")
self.success()

def get_size(self):
Expand Down
40 changes: 28 additions & 12 deletions Monitors/MonitorCore.py
@@ -1,18 +1,19 @@
#!/usr/bin/env python2.7
# requires: https://pypi.python.org/pypi/http-parser
from twisted.internet import reactor, protocol, ssl
from twisted.python import log
from http_parser.pyparser import HttpParser
from WebClient import WebServiceCheckFactory, JobFactory
from GenSocket import GenCheckFactory
from DNSclient import DNSclient
from Pingclient import PingProtocol
from FTPclient import FTP_client
from SMTPclient import SMTPFactory
from twisted.python import log
from twisted.python import syslog
#from twisted.python import log
import traceback
import time
import sys
import os

class MonitorCore(object):

Expand Down Expand Up @@ -87,25 +88,26 @@ def proc_result(self, job, result):
fileobj = open(filename, "w")
fileobj.write(result)
fileobj.close()
sys.stderr.write("Job %s submitted, SBE response in file %s\n" % (job_id, filename))
sys.stderr.write("Job %s: submitted, SBE response in file %s\n" % (job_id, filename))
else:
sys.stderr.write("Job %s submitted, SBE response: %s\n" % (job_id, result))
sys.stderr.write("Job %s: submitted, SBE response: %s\n" % (job_id, result))
sys.stderr.write("Job %s: submitted: %s\n" % (job_id, job_json))

def job_submit_pass(self, result, job):
job_id = job.get_job_id()
sys.stderr.write("Job %s: successfully submitted %s \n" % (job_id, result))
self.proc_result(job, result)
self.jobs_done.append(job_id)
self.jobs.submitted_job(job_id)

def job_submit_fail(self, failure, job):
job_id = job.get_job_id()
sys.stderr.write("Job %s failed due to %s \n" % (job_id, failure.getErrorMessage()))
sys.stderr.write("Job %s: failed due to %s \n" % (job_id, failure.getErrorMessage()))
if job.get_job_fail():
sys.stderr.write("giving up.\n")
else:
sys.stderr.write("retrying in %s.\n" % self.resubmit_interval)
reactor.callLater(self.resubmit_interval, self.post_job(job))
reactor.callLater(self.resubmit_interval, self.post_job, job)

def dns_fail(self, failure, job, dnsobj):
# Do this if the DNS check failed
Expand Down Expand Up @@ -147,13 +149,13 @@ def ping_fail(self, failure, job, pingobj):

def ftp_fail(self, failure, service, job_id):
if "530 Login incorrect" in failure:
sys.stderr.write("Job ID %s: Login failure\n" % job_id)
sys.stderr.write("Job %s: Login failure\n" % job_id)
service.fail_login()
elif "Connection refused" in failure:
sys.stderr.write("Job ID %s: Connection failure\n" % job_id)
sys.stderr.write("Job %s: Connection failure\n" % job_id)
service.fail_conn("refused")
else:
sys.stderr.write("Job ID %s: Failure %s\n" % (job_id, failure))
sys.stderr.write("Job %s: Failure %s\n" % (job_id, failure))
service.fail_conn(failure)

def check_services(self, job):
Expand Down Expand Up @@ -187,21 +189,35 @@ def gen_service_connect_pass(self, result, job, service):
proto = service.get_proto()
port = service.get_port()
jobid = job.get_job_id()
sys.stderr.write("Job %s: Service %s/%s passed. %s\n" % (jobid, proto, port, result))
sys.stderr.write("Job %s: Service %s/%s passed. %s\n" % (jobid, port, proto, result))

def gen_service_connect_fail(self, failure, job, service):
service.fail_conn(failure)
proto = service.get_proto()
port = service.get_port()
jobid = job.get_job_id()
sys.stderr.write("Job %s: Service %s/%s failed:\n\t%s\n" % (jobid, proto, port, failure))
sys.stderr.write("Job %s: Service %s/%s failed:\n\t%s\n" % (jobid, port, proto, failure))

def check_dir(dir):
try:
os.stat(dir)
except OSError as e:
if e.errno == 2:
sys.stderr.write("No such directory %s, creating" % dir)
os.mkdir(dir)
else:
sys.stderr.write("Directory %s - Unknown error%s: %s" % (e.errno, e.strerror))


if __name__=="__main__":
# Testing with an artificial job file
from Parameters import Parameters
from Jobs import Jobs

log.startLogging(open('log/MonitorCore.log', 'w'))
for dir in ("log", "raw", "sbe"):
check_dir(dir)
#log.startLogging(open('log/MonitorCore.log', 'w'))
syslog.startLogging(prefix="Scorebot")
params = Parameters()
jobs = Jobs()
mon_obj = MonitorCore(params, jobs)
Expand Down
9 changes: 2 additions & 7 deletions Monitors/Parameters.py
Expand Up @@ -8,14 +8,9 @@ class Parameters(object):
def __init__(self):
self.debug = False
self.timeout = 90
#self.sbe_auth = "0987654321"
#self.sb_ip = "10.200.100.50"
self.sbe_auth = "33ab8ea2-1258-44d8-ac5e-643e88e2a87c"
self.sb_ip = "10.200.100.110"
self.sbe_auth = "2863e4ad-fcc5-4b35-8d31-42112c1c7b5b"
self.sb_ip = "10.150.100.81"
self.sb_port = 80
#self.sb_ip = "10.200.100.205"
#self.sb_ip = "172.25.20.211"
#self.sb_port = 8080
self.job_url = "/api/job"
self.reason = ""
self.headers = {}
Expand Down
3 changes: 1 addition & 2 deletions Monitors/SMTPclient.py
Expand Up @@ -101,8 +101,7 @@ def clientConnectionLost(self, connector, reason):
sys.stderr.write( "Job %s: clientConnectionLost\t" % self.job.get_job_id())
sys.stderr.write( "given reason: %s\t" % reason)
sys.stderr.write( "self.reason: %s\t" % self.reason)
sys.stderr.write( "\nReceived: %s\n" % self.get_server_headers())
conn_time = self.end - self.start
#sys.stderr.write( "\nReceived: %s\n" % self.get_server_headers())
if self.data:
self.service.set_data(self.data)
if self.fail and self.reason:
Expand Down
57 changes: 35 additions & 22 deletions Monitors/WebClient.py
Expand Up @@ -37,7 +37,7 @@ def parse_str(self, cookie_str):
elif "HttpOnly":
self.httponly = True
else:
raise Exception("Unknown token %s in Cookie %s!" % (piece, cookie_str))
raise Exception("Unknown token %s in Cookie %s!\n" % (piece, cookie_str))

def get(self):
return "%s=%s" % (self.name, self.value)
Expand Down Expand Up @@ -142,7 +142,7 @@ def dataReceived(self, data):
#sys.stderr.write(line)
if self.parser.is_headers_complete():
status = self.parser.get_status_code()
sys.stderr.write("Job %s: Returned status %s\n" % (self.job_id, status))
sys.stderr.write("Job %s: Returned status %s\n" % (self.job_id, status))
if self.authing:
if status != 302:
raise Exception("Job %s: Failed authentication\n" % (self.job_id))
Expand Down Expand Up @@ -178,10 +178,10 @@ def dataReceived(self, data):
sys.stderr.write("Current self.body: %s\n" % self.body)
# TODO - find a way to deal with this, SBE jobs currently don't trigger this check, but we need it for health checks
if self.parser.is_message_complete():
sys.stderr.write( "Job %s: ConnID %s: MESSAGE COMPLETE for %s!\n" % (self.job_id, self.factory.get_conn_id(), self.url))
if self.conn:
self.conn.verify_page(self.body)
if self.factory.get_debug():
sys.stderr.write( "Job %s: ConnID %s: MESSAGE COMPLETE!\n" % (self.job_id, self.factory.get_conn_id()))
sys.stderr.write("Job %s: Received this body: %s\n" % (self.job_id, self.body))
self.factory.proc_body(self.body)
# self.factory.proc_body(self.body)
Expand Down Expand Up @@ -247,6 +247,9 @@ def get_url(self):
def get_headers(self):
return self.headers

def get_body(self):
return self.body

class JobFactory(WebCoreFactory):

def __init__(self, params, jobs, op, job=None):
Expand Down Expand Up @@ -274,7 +277,7 @@ def __init__(self, params, jobs, op, job=None):
raise Exception("Job %s: Unknown operation %s\n" % (self.job_id, op))

def set_code(self, code):
self.code = code
self.code = int(code)

def set_job_fail(self):
self.job_fail = True
Expand All @@ -297,26 +300,34 @@ def clientConnectionFailed(self, connector, reason):
sys.stderr.write( "self.reason: %s\t" % self.reason)
if self.debug:
sys.stderr.write( "\nReceived: %s\n" % self.get_server_headers())
self.params.fail_conn("Job %s connection failed\n" %
(self.op), reason.getErrorMessage(), self.get_server_headers())
self.deferreds[connector].errback(reason)
#self.params.fail_conn("Job %s connection failed\n" %
#(self.op), reason.getErrorMessage(), self.get_server_headers())
if connector in self.deferreds:
self.deferreds[connector].errback(reason)

def clientConnectionLost(self, connector, reason):
if "put" in self.op:
job_id = self.job.get_job_id()
sys.stderr.write("Job %s: Received code %s\n" % (job_id, self.code))
if self.code == 202:
self.deferreds[connector].callback(reason)
#elif self.code == 400:
#self.deferreds[connector].errback(reason)
sys.stderr.write("Job %s: submitted.\n" % job_id)
self.deferreds[connector].callback("Connection closed")
return
else:
self.deferreds[connector].errback(reason)
sys.stderr.write( "Job %s: JobFactory Put clientConnectionLost\n" % job_id)
# Todo - restore code that checks to see if the job was successfully submitted. It was removed from SBE
return
sys.stderr.write( "Job %s: JobFactory Put clientConnectionLost, received code %s\n" % (job_id, self.code))
return
elif "get" in self.op:
sys.stderr.write( "Job GET request clientConnectionLost\n")
if self.get_debug():
sys.stderr.write( "Job GET request clientConnectionLost\n")
sys.stderr.write("\nReceived code %s:" % self.code)
if self.debug:
sys.stderr.write( "\nReceived: %s\n" % self.get_server_headers())
if self.code == 403:
# This means that SBE has no running games, so just die quietly.
sys.stderr.write("Got code 403, quitting\n")
sys.stderr.write("\tGot %s from server\n" % self.body)
return
if self.fail:
sys.stderr.write("Fail bit set\n")
sys.stderr.write( "given reason: %s\t" % reason)
Expand Down Expand Up @@ -439,7 +450,7 @@ def conn_pass(self, result):
self.service.pass_conn()

def conn_fail(self, failure):
sys.stdout.write("Job %s: Finished content check with result %s: %s/%s | %s\n" % \
sys.stdout.write("Job %s: Failed connect on content check with result %s: %s/%s | %s\n" % \
(self.job.get_job_id(), failure, self.service.get_port(), self.service.get_proto(),
content.get_url))
print failure
Expand All @@ -448,15 +459,15 @@ def conn_fail(self, failure):
def content_pass(self, result, content):
content.success()
self.service.pass_conn()
sys.stdout.write("Job %s: Finished content check with result %s: %s/%s | %s\n" % \
(self.job.get_job_id(), result, self.service.get_port(), self.service.get_proto(),
content.get_url))
sys.stdout.write("Job %s: Finished content check for %s/%s | %s\n" % \
(self.job.get_job_id(), self.service.get_port(), self.service.get_proto(),
content.get_url()))

def content_fail(self, failure, content):
content.fail(failure)
sys.stdout.write("Job %s: Finished content check with result %s: %s/%s | %s\n" % \
sys.stdout.write("Job %s: Failed content integrity check with result %s: %s/%s | %s\n" % \
(self.job.get_job_id(), failure, self.service.get_port(), self.service.get_proto(),
content.get_url))
content.get_url()))
print failure

def add_fail(self, reason):
Expand Down Expand Up @@ -519,7 +530,8 @@ def clientConnectionLost(self, connector, reason):
self.deferreds[connector].callback(self.job.get_job_id())

if __name__ == "__main__":
from twisted.python import log
#from twisted.python import log
from twisted.python import syslog
from DNSclient import DNSclient
import sys

Expand Down Expand Up @@ -564,7 +576,8 @@ def check_job(params, jobs):
query_d.addCallback(check_web, params, job)
query_d.addErrback(job_fail, job)

log.startLogging(open('log/webtest.log', 'w'))
#log.startLogging(open('log/webtest.log', 'w'))
syslog.startLogging(prefix="Scorebot")
jobs = Jobs()
jobfile = open("test_webjob.txt")
sys.stderr.write( "Testing %s\n" % sys.argv[0])
Expand Down
8 changes: 4 additions & 4 deletions scorebot3.0.dia
Expand Up @@ -5,7 +5,7 @@
<dia:color val="#ffffff"/>
</dia:attribute>
<dia:attribute name="pagebreak">
<dia:color val="#000099"/>
<dia:color val="#ffffff"/>
</dia:attribute>
<dia:attribute name="paper">
<dia:composite type="paper">
Expand Down Expand Up @@ -53,7 +53,7 @@
</dia:composite>
</dia:attribute>
<dia:attribute name="color">
<dia:color val="#d8e5e5"/>
<dia:color val="#ffffff"/>
</dia:attribute>
<dia:attribute name="guides">
<dia:composite type="guides">
Expand Down Expand Up @@ -3882,7 +3882,7 @@
<dia:point val="85.6632,34.6884"/>
</dia:attribute>
<dia:attribute name="obj_bb">
<dia:rectangle val="49.4158,20.5338;85.7277,34.753"/>
<dia:rectangle val="49.4158,20.5338;85.7277,34.7529"/>
</dia:attribute>
<dia:attribute name="meta">
<dia:composite type="dict"/>
Expand Down Expand Up @@ -3944,7 +3944,7 @@
<dia:point val="85.6632,35.4884"/>
</dia:attribute>
<dia:attribute name="obj_bb">
<dia:rectangle val="61.3631,35.4232;85.7284,45.7822"/>
<dia:rectangle val="61.3631,35.4231;85.7285,45.7822"/>
</dia:attribute>
<dia:attribute name="meta">
<dia:composite type="dict"/>
Expand Down

0 comments on commit 4d9b481

Please sign in to comment.