Skip to content

Commit

Permalink
feat(reporter-service): test results reporter
Browse files Browse the repository at this point in the history
CI component Reporter Service to display test results if needed by requesting clients or users. As
of now, this is not exposed via any API. However, this reporter service will be receiving test
results from test runner instances & will write them to a file or display them (feature not
available). This takes away this feature from dispatcher service, allowing more decoupling & an
easier role for dispatcher. Letting the Dispatcher Server only handling dispatching commit_ids to
test runners & reporter service to handle test results
  • Loading branch information
BrianLusina committed Jun 2, 2020
1 parent ec32ab8 commit fdac153
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 54 deletions.
47 changes: 1 addition & 46 deletions ci/dispatcher/dispatcher_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
from ci.logger import logger
from .utils import dispatch_tests

basedir = os.path.abspath(os.path.dirname(__file__))


class DispatcherHandler(socketserver.BaseRequestHandler):
"""
Expand All @@ -30,7 +28,7 @@ class DispatcherHandler(socketserver.BaseRequestHandler):
status:
:cvar command_re: Compiled Regex of the command to handle for incoming request
:cvar BUG_SIZE: buffer size
:cvar BUF_SIZE: buffer size
"""

command_re = re.compile(r"([b])'(\w+)(:.+)*'")
Expand All @@ -44,7 +42,6 @@ def handle(self):
"status": self.check_status,
"register": self.register,
"dispatch": self.dispatch,
"results": self.results,
}

if not self.command_groups:
Expand Down Expand Up @@ -98,45 +95,3 @@ def dispatch(self):
# we can dispatch tests, we have at least 1 test runner available
self.request.sendall(b"OK")
dispatch_tests(self.server, commit_id, branch)

def results(self):
"""
This command is used by the test runners to post back results to the dispatcher server
it is used in the format:
`results:<commit ID>:<length of results in bytes>:<results>`
<commit ID> is used to identify which commit ID the tests were run against
<length of results in bytes> is used to figure out how big a buffer is needed for the results data
<results> holds actual result output
"""

logger.info("Received test results from Test Runner")

results = self.command_groups.group(3)[1:]
results = results.split(":")

commit_id = results[0]
length_msg = int(results[1])

# 3 is the number of ":" in the sent command
remaining_buffer = self.BUF_SIZE - (
len("results") + len(commit_id) + len(results[1]) + 3
)

if length_msg > remaining_buffer:
self.data += self.request.recv(length_msg - remaining_buffer).strip()

del self.server.dispatched_commits[commit_id]

test_results_path = f"{basedir}/test_results"

if not os.path.exists(test_results_path):
os.makedirs(test_results_path)

with open(f"{test_results_path}/{commit_id}", "w") as f:
data = f"{self.data}".split(":")[3:]
data = "\n".join(data)
f.write(data)

self.request.sendall(b"OK")
32 changes: 32 additions & 0 deletions ci/reporter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
This is the Reporter Service.
This is responsible for Reporting test results as received from test runners
"""
import time
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread

from ci.logger import logger
from ci.utils import communicate


from .threading_tcp_server import ThreadingTCPServer
from .reporter_handler import ReporterHandler


def reporter_service(host, port):
"""
Entry point to Reporter Service
"""

server = ThreadingTCPServer((host, int(port)), ReporterHandler)

logger.info(f"Reporter Service running on address {host}:{port}")

try:
# Run forever unless stopped
server.serve_forever()
except (KeyboardInterrupt, Exception):
# in case it is stopped or encounters any error
server.dead = True
104 changes: 104 additions & 0 deletions ci/reporter/reporter_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
Dispatch Handler
This is responsible for handling all requests that come in for the dispatch server
"""

import socketserver
import re
import os

from ci.logger import logger

basedir = os.path.abspath(os.path.dirname(__file__))


class ReporterHandler(socketserver.BaseRequestHandler):
"""
Inherits from socketserver's BaseRequestHandler
This overrides the hande method to execute the various commands that come in from connections to the
dispatch server.
Compilation of the command from a Regex if first used to check if there are commands to execute
& if nothing compiles, returns a response stating an invalid command was requested
It then proceeds to handle the commands if the command is available & can be handled
4 Commands are handled
status:
:cvar command_re: Compiled Regex of the command to handle for incoming request
:cvar BUF_SIZE: buffer size
"""

command_re = re.compile(r"([b])'(\w+)(:.+)*'")
BUF_SIZE = 1024

def handle(self):
self.data = self.request.recv(self.BUF_SIZE).strip()
self.command_groups = self.command_re.match(f"{self.data}")

self.commands = {
"status": self.check_status,
"results": self.results,
}

if not self.command_groups:
self.invalid_command()
return

command = self.command_groups.group(2)

# Handle commands, if none match, handle invalid command
self.commands.get(command, self.invalid_command)()

def invalid_command(self):
self.request.sendall(b"Invalid command")

def check_status(self):
"""
Checks the status of the dispatcher server
"""
logger.info("Checking Reporter Service Status")
self.request.sendall(b"OK")

def results(self):
"""
This command is used by the test runners to post back results to the dispatcher server
it is used in the format:
`results:<commit ID>:<length of results in bytes>:<results>`
<commit ID> is used to identify which commit ID the tests were run against
<length of results in bytes> is used to figure out how big a buffer is needed for the results data
<results> holds actual result output
"""

logger.info("Received test results from Test Runner")

results = self.command_groups.group(3)[1:]
results = results.split(":")

commit_id = results[0]
length_msg = int(results[1])

# 3 is the number of ":" in the sent command
remaining_buffer = self.BUF_SIZE - (
len("results") + len(commit_id) + len(results[1]) + 3
)

if length_msg > remaining_buffer:
self.data += self.request.recv(length_msg - remaining_buffer).strip()

test_results_path = f"{basedir}/test_results"

if not os.path.exists(test_results_path):
os.makedirs(test_results_path)

with open(f"{test_results_path}/{commit_id}", "w") as f:
data = f"{self.data}".split(":")[3:]
data = "\n".join(data)
f.write(data)

self.request.sendall(b"OK")
22 changes: 22 additions & 0 deletions ci/reporter/threading_tcp_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Custom Socket TCP Server to handle mutliple connections. Typically TCP servers can only handle
1 connection at a time. This is not ideal in this case, because there could be an instance where
a test runner has a connection open with the dispatcher & a connection comes in from the repo_observer.
The repo observer has to wait for the initial connection to disconnect & close, before it can proceed
with its request.
In order to handle multiple requests, this adds threading ability to the SocketServer in order to service
multiple connections on different threads.
"""
import socketserver


class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""
Custom Threading TCP Server which ensures that there is continuous, ordered streams
of data between servers. UDP does not ensure this
:cvar dead: indicate to other threads that we ain't alive
"""

dead = False
9 changes: 8 additions & 1 deletion ci/test_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ def dispatcher_checker(server):
return


def test_runner_server(host, port, repo, dispatcher_host, dispatcher_port):
def test_runner_server(
host, port, repo, dispatcher_host, dispatcher_port, reporter_host, reporter_port
):
"""
This invokes the Test Runner server.
:param host: host to use
:param port: port to use
:param repo: repository to watch
:param dispatcher_host: Dispatcher host
:param dispatcher_port: Dispatcher Port
:param reporter_host: Reporter Host
:param reporter_port: Reporter Port
"""
range_start = 8900

Expand Down Expand Up @@ -95,6 +101,7 @@ def test_runner_server(host, port, repo, dispatcher_host, dispatcher_port):
server.repo_folder = repo

server.dispatcher_server = {"host": dispatcher_host, "port": dispatcher_port}
server.reporter_service = {"host": reporter_host, "port": reporter_port}
response = communicate(
server.dispatcher_server["host"],
int(server.dispatcher_server["port"]),
Expand Down
37 changes: 31 additions & 6 deletions ci/test_runner/test_runner_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess
import os
import unittest
import socket

from ci.logger import logger
from ci.utils import communicate
Expand Down Expand Up @@ -77,10 +78,34 @@ def run_tests(self, commit_id, branch, repo_folder):
result_file.close()
result_file = open("results", "r")

# send results to dispatcher
# send results to reporter service
output = result_file.read()
communicate(
self.server.dispatcher_server["host"],
int(self.server.dispatcher_server["port"]),
f"results:{commit_id}:{len(output)}:{output}",
)

# check that reporter service is alive & running before sending output
# TODO: cache/retry logic to send test results to reporter service in case it is unreachable
try:
response = communicate(
self.server.reporter_service["host"],
int(self.server.reporter_service["port"]),
"status",
)

if response != b"OK":
logger.warning(
"Reporter Service does not seem ok, Can't send test results..."
)
return
elif response == b"OK":
logger.info(
"Sending test results to Reporter Service..."
)
communicate(
self.server.reporter_service["host"],
int(self.server.reporter_service["port"]),
f"results:{commit_id}:{len(output)}:{output}",
)
except socket.error as e:
logger.error(
"Cannot communicate with Reporter Service..."
)
return
2 changes: 2 additions & 0 deletions ci/test_runner/threading_tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ class ThreadingTCPServer(ThreadingMixIn, TCPServer):
Custom TCP Server which allows spawning of threads to allow requests to be handled on different threads
:cvar dispatcher_server: Holds the dispatcher server host/port information
:cvar reporter_service: Holds the dispatcher server host/port information
:cvar last_communication: Keeps track of last communication from dispatcher
:cvar busy: Status flag indicating whether the test runner server is currently busy
:cvar dead: Status flag indicating whether this test runner server is dead/unresponsive
"""

dispatcher_server = None
reporter_service = None
last_communication = None
busy = False
dead = False
29 changes: 29 additions & 0 deletions run_reporter_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import argparse
from ci.reporter import reporter_service


def run_reporter():
"""
Entry point to reporter service
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--host",
help="Reporter's host, defaults to localhost",
default="localhost",
action="store",
)
parser.add_argument(
"--port",
help="Reporter's port, defaults to 8555",
default=8555,
action="store",
)

args = parser.parse_args()

reporter_service(args.host, int(args.port))


if __name__ == "__main__":
run_reporter()
15 changes: 14 additions & 1 deletion run_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ def run_test_runner():
default="localhost:8000",
action="store",
)
parser.add_argument(
"--reporter-service",
help="reporter host:port, by default it uses localhost:8555",
default="localhost:8555",
action="store",
)
parser.add_argument(
"--repo",
metavar="REPO",
Expand All @@ -39,9 +45,16 @@ def run_test_runner():
runner_port = args.port
repository = args.repo
dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
reporter_host, reporter_port = args.reporter_service.split(":")

test_runner_server(
runner_host, runner_port, repository, dispatcher_host, dispatcher_port
runner_host,
runner_port,
repository,
dispatcher_host,
dispatcher_port,
reporter_host,
reporter_port,
)


Expand Down

0 comments on commit fdac153

Please sign in to comment.