diff --git a/ci/dispatcher/dispatcher_handler.py b/ci/dispatcher/dispatcher_handler.py index 2d316f7..a17b0ec 100644 --- a/ci/dispatcher/dispatcher_handler.py +++ b/ci/dispatcher/dispatcher_handler.py @@ -11,8 +11,6 @@ from ci.logger import logger from .utils import dispatch_tests -basedir = os.path.abspath(os.path.dirname(__file__)) - class DispatcherHandler(socketserver.BaseRequestHandler): """ @@ -30,7 +28,7 @@ class DispatcherHandler(socketserver.BaseRequestHandler): status: :cvar command_re: Compiled Regex of the command to handle for incoming request - :cvar BUG_SIZE: buffer size + :cvar BUF_SIZE: buffer size """ command_re = re.compile(r"([b])'(\w+)(:.+)*'") @@ -44,7 +42,6 @@ def handle(self): "status": self.check_status, "register": self.register, "dispatch": self.dispatch, - "results": self.results, } if not self.command_groups: @@ -98,45 +95,3 @@ def dispatch(self): # we can dispatch tests, we have at least 1 test runner available self.request.sendall(b"OK") dispatch_tests(self.server, commit_id, branch) - - def results(self): - """ - This command is used by the test runners to post back results to the dispatcher server - it is used in the format: - - `results:::` - - is used to identify which commit ID the tests were run against - is used to figure out how big a buffer is needed for the results data - holds actual result output - """ - - logger.info("Received test results from Test Runner") - - results = self.command_groups.group(3)[1:] - results = results.split(":") - - commit_id = results[0] - length_msg = int(results[1]) - - # 3 is the number of ":" in the sent command - remaining_buffer = self.BUF_SIZE - ( - len("results") + len(commit_id) + len(results[1]) + 3 - ) - - if length_msg > remaining_buffer: - self.data += self.request.recv(length_msg - remaining_buffer).strip() - - del self.server.dispatched_commits[commit_id] - - test_results_path = f"{basedir}/test_results" - - if not os.path.exists(test_results_path): - os.makedirs(test_results_path) - - with open(f"{test_results_path}/{commit_id}", "w") as f: - data = f"{self.data}".split(":")[3:] - data = "\n".join(data) - f.write(data) - - self.request.sendall(b"OK") diff --git a/ci/reporter/__init__.py b/ci/reporter/__init__.py new file mode 100644 index 0000000..e250ef4 --- /dev/null +++ b/ci/reporter/__init__.py @@ -0,0 +1,32 @@ +""" +This is the Reporter Service. + +This is responsible for Reporting test results as received from test runners +""" +import time +from socket import socket, AF_INET, SOCK_STREAM +from threading import Thread + +from ci.logger import logger +from ci.utils import communicate + + +from .threading_tcp_server import ThreadingTCPServer +from .reporter_handler import ReporterHandler + + +def reporter_service(host, port): + """ + Entry point to Reporter Service + """ + + server = ThreadingTCPServer((host, int(port)), ReporterHandler) + + logger.info(f"Reporter Service running on address {host}:{port}") + + try: + # Run forever unless stopped + server.serve_forever() + except (KeyboardInterrupt, Exception): + # in case it is stopped or encounters any error + server.dead = True diff --git a/ci/reporter/reporter_handler.py b/ci/reporter/reporter_handler.py new file mode 100644 index 0000000..da31cae --- /dev/null +++ b/ci/reporter/reporter_handler.py @@ -0,0 +1,104 @@ +""" +Dispatch Handler + +This is responsible for handling all requests that come in for the dispatch server +""" + +import socketserver +import re +import os + +from ci.logger import logger + +basedir = os.path.abspath(os.path.dirname(__file__)) + + +class ReporterHandler(socketserver.BaseRequestHandler): + """ + Inherits from socketserver's BaseRequestHandler + + This overrides the hande method to execute the various commands that come in from connections to the + dispatch server. + + Compilation of the command from a Regex if first used to check if there are commands to execute + & if nothing compiles, returns a response stating an invalid command was requested + + It then proceeds to handle the commands if the command is available & can be handled + + 4 Commands are handled + + status: + :cvar command_re: Compiled Regex of the command to handle for incoming request + :cvar BUF_SIZE: buffer size + """ + + command_re = re.compile(r"([b])'(\w+)(:.+)*'") + BUF_SIZE = 1024 + + def handle(self): + self.data = self.request.recv(self.BUF_SIZE).strip() + self.command_groups = self.command_re.match(f"{self.data}") + + self.commands = { + "status": self.check_status, + "results": self.results, + } + + if not self.command_groups: + self.invalid_command() + return + + command = self.command_groups.group(2) + + # Handle commands, if none match, handle invalid command + self.commands.get(command, self.invalid_command)() + + def invalid_command(self): + self.request.sendall(b"Invalid command") + + def check_status(self): + """ + Checks the status of the dispatcher server + """ + logger.info("Checking Reporter Service Status") + self.request.sendall(b"OK") + + def results(self): + """ + This command is used by the test runners to post back results to the dispatcher server + it is used in the format: + + `results:::` + + is used to identify which commit ID the tests were run against + is used to figure out how big a buffer is needed for the results data + holds actual result output + """ + + logger.info("Received test results from Test Runner") + + results = self.command_groups.group(3)[1:] + results = results.split(":") + + commit_id = results[0] + length_msg = int(results[1]) + + # 3 is the number of ":" in the sent command + remaining_buffer = self.BUF_SIZE - ( + len("results") + len(commit_id) + len(results[1]) + 3 + ) + + if length_msg > remaining_buffer: + self.data += self.request.recv(length_msg - remaining_buffer).strip() + + test_results_path = f"{basedir}/test_results" + + if not os.path.exists(test_results_path): + os.makedirs(test_results_path) + + with open(f"{test_results_path}/{commit_id}", "w") as f: + data = f"{self.data}".split(":")[3:] + data = "\n".join(data) + f.write(data) + + self.request.sendall(b"OK") diff --git a/ci/reporter/threading_tcp_server.py b/ci/reporter/threading_tcp_server.py new file mode 100644 index 0000000..4f33544 --- /dev/null +++ b/ci/reporter/threading_tcp_server.py @@ -0,0 +1,22 @@ +""" +Custom Socket TCP Server to handle mutliple connections. Typically TCP servers can only handle +1 connection at a time. This is not ideal in this case, because there could be an instance where +a test runner has a connection open with the dispatcher & a connection comes in from the repo_observer. +The repo observer has to wait for the initial connection to disconnect & close, before it can proceed +with its request. + +In order to handle multiple requests, this adds threading ability to the SocketServer in order to service +multiple connections on different threads. +""" +import socketserver + + +class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + """ + Custom Threading TCP Server which ensures that there is continuous, ordered streams + of data between servers. UDP does not ensure this + + :cvar dead: indicate to other threads that we ain't alive + """ + + dead = False diff --git a/ci/test_runner/__init__.py b/ci/test_runner/__init__.py index 90d22c6..6f18cb2 100644 --- a/ci/test_runner/__init__.py +++ b/ci/test_runner/__init__.py @@ -47,13 +47,19 @@ def dispatcher_checker(server): return -def test_runner_server(host, port, repo, dispatcher_host, dispatcher_port): +def test_runner_server( + host, port, repo, dispatcher_host, dispatcher_port, reporter_host, reporter_port +): """ This invokes the Test Runner server. :param host: host to use :param port: port to use :param repo: repository to watch + :param dispatcher_host: Dispatcher host + :param dispatcher_port: Dispatcher Port + :param reporter_host: Reporter Host + :param reporter_port: Reporter Port """ range_start = 8900 @@ -95,6 +101,7 @@ def test_runner_server(host, port, repo, dispatcher_host, dispatcher_port): server.repo_folder = repo server.dispatcher_server = {"host": dispatcher_host, "port": dispatcher_port} + server.reporter_service = {"host": reporter_host, "port": reporter_port} response = communicate( server.dispatcher_server["host"], int(server.dispatcher_server["port"]), diff --git a/ci/test_runner/test_runner_handler.py b/ci/test_runner/test_runner_handler.py index 88c1e72..30b7df5 100644 --- a/ci/test_runner/test_runner_handler.py +++ b/ci/test_runner/test_runner_handler.py @@ -4,6 +4,7 @@ import subprocess import os import unittest +import socket from ci.logger import logger from ci.utils import communicate @@ -77,10 +78,34 @@ def run_tests(self, commit_id, branch, repo_folder): result_file.close() result_file = open("results", "r") - # send results to dispatcher + # send results to reporter service output = result_file.read() - communicate( - self.server.dispatcher_server["host"], - int(self.server.dispatcher_server["port"]), - f"results:{commit_id}:{len(output)}:{output}", - ) + + # check that reporter service is alive & running before sending output + # TODO: cache/retry logic to send test results to reporter service in case it is unreachable + try: + response = communicate( + self.server.reporter_service["host"], + int(self.server.reporter_service["port"]), + "status", + ) + + if response != b"OK": + logger.warning( + "Reporter Service does not seem ok, Can't send test results..." + ) + return + elif response == b"OK": + logger.info( + "Sending test results to Reporter Service..." + ) + communicate( + self.server.reporter_service["host"], + int(self.server.reporter_service["port"]), + f"results:{commit_id}:{len(output)}:{output}", + ) + except socket.error as e: + logger.error( + "Cannot communicate with Reporter Service..." + ) + return diff --git a/ci/test_runner/threading_tcp_server.py b/ci/test_runner/threading_tcp_server.py index cd7096b..95f4f60 100644 --- a/ci/test_runner/threading_tcp_server.py +++ b/ci/test_runner/threading_tcp_server.py @@ -6,12 +6,14 @@ class ThreadingTCPServer(ThreadingMixIn, TCPServer): Custom TCP Server which allows spawning of threads to allow requests to be handled on different threads :cvar dispatcher_server: Holds the dispatcher server host/port information + :cvar reporter_service: Holds the dispatcher server host/port information :cvar last_communication: Keeps track of last communication from dispatcher :cvar busy: Status flag indicating whether the test runner server is currently busy :cvar dead: Status flag indicating whether this test runner server is dead/unresponsive """ dispatcher_server = None + reporter_service = None last_communication = None busy = False dead = False diff --git a/run_reporter_service.py b/run_reporter_service.py new file mode 100644 index 0000000..78ca8ac --- /dev/null +++ b/run_reporter_service.py @@ -0,0 +1,29 @@ +import argparse +from ci.reporter import reporter_service + + +def run_reporter(): + """ + Entry point to reporter service + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host", + help="Reporter's host, defaults to localhost", + default="localhost", + action="store", + ) + parser.add_argument( + "--port", + help="Reporter's port, defaults to 8555", + default=8555, + action="store", + ) + + args = parser.parse_args() + + reporter_service(args.host, int(args.port)) + + +if __name__ == "__main__": + run_reporter() diff --git a/run_test_runner.py b/run_test_runner.py index 793ba99..4273321 100644 --- a/run_test_runner.py +++ b/run_test_runner.py @@ -26,6 +26,12 @@ def run_test_runner(): default="localhost:8000", action="store", ) + parser.add_argument( + "--reporter-service", + help="reporter host:port, by default it uses localhost:8555", + default="localhost:8555", + action="store", + ) parser.add_argument( "--repo", metavar="REPO", @@ -39,9 +45,16 @@ def run_test_runner(): runner_port = args.port repository = args.repo dispatcher_host, dispatcher_port = args.dispatcher_server.split(":") + reporter_host, reporter_port = args.reporter_service.split(":") test_runner_server( - runner_host, runner_port, repository, dispatcher_host, dispatcher_port + runner_host, + runner_port, + repository, + dispatcher_host, + dispatcher_port, + reporter_host, + reporter_port, )