diff --git a/ci/dispatcher/dispatcher.py b/ci/dispatcher/dispatcher.py index 45e2f99..8e7660c 100644 --- a/ci/dispatcher/dispatcher.py +++ b/ci/dispatcher/dispatcher.py @@ -14,9 +14,9 @@ from socket import socket, AF_INET, SOCK_STREAM from threading import Thread -from utils.logger import logger -from utils import communicate -from ..utils import dispatch_tests +from ci.logger import logger +from ci.utils import communicate +from .utils import dispatch_tests from .threading_tcp_server import ThreadingTCPServer from .dispatcher_handler import DispatcherHandler @@ -35,7 +35,7 @@ def manage_commit_lists(runner): for commit, assigned_runner in server.dispathed_commits.items(): if assigned_runner == runner: del server.dispatched_commits[commit] - serve.pending_commits.append(commit) + server.pending_commits.append(commit) break server.runners.remove(runner) @@ -67,32 +67,17 @@ def redistribute(server): time.sleep(5) -def serve(): +def dispatcher(host, port): """ Entry point to dispatch server """ - parser = argparse.ArgumentParser() - parser.add_argument( - "--host", - help="Dispatchers host, defaults to localhost", - default="localhost", - action="store", - ) - parser.add_argument( - "--port", - help="Dispatchers port, defaults to 8000", - default=8000, - action="store", - ) - args = parser.parse_args() + server = ThreadingTCPServer((host, int(port)), DispatcherHandler) - server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler) + logger.info(f"Dispatcher Server running on address {host}:{port}") - logger.info(f"Dispatcher Server running on address {args.host}:{args.port}") - - runner_heartbeat = Thread(target=runner_checker, args=(server)) - redistributor = Thread(target=redistribute, args=(server)) + runner_heartbeat = Thread(target=runner_checker, args=(server,)) + redistributor = Thread(target=redistribute, args=(server,)) try: runner_heartbeat.start() @@ -105,7 +90,3 @@ def serve(): server.dead = True runner_heartbeat.join() redistributor.join() - - -if __name__ == "__main__": - serve() diff --git a/ci/dispatcher/dispatcher_handler.py b/ci/dispatcher/dispatcher_handler.py index fe3bd14..9ecdf35 100644 --- a/ci/dispatcher/dispatcher_handler.py +++ b/ci/dispatcher/dispatcher_handler.py @@ -8,7 +8,7 @@ import re import os -from logger import logger +from ci.logger import logger from .utils import dispatch_tests diff --git a/ci/dispatcher/utils.py b/ci/dispatcher/utils.py index cc9718c..9debcbe 100644 --- a/ci/dispatcher/utils.py +++ b/ci/dispatcher/utils.py @@ -5,8 +5,8 @@ """ import time -from logger import logger -from utils import communicate +from ci.logger import logger +from ci.utils import communicate def dispatch_tests(server, commit_id): diff --git a/ci/repo_observer/observer.py b/ci/repo_observer/observer.py index 93116fc..51a797c 100644 --- a/ci/repo_observer/observer.py +++ b/ci/repo_observer/observer.py @@ -8,7 +8,6 @@ typical repo observer will receive notifications from the Repository, but some VCS systems do not have built in notification systems. """ -import argparse import os import re import socket @@ -18,47 +17,26 @@ import time from .exceptions import RepoObserverError -from ..utils import communicate +from ci.utils import communicate from ci.logger import logger -def poll(): +def observer(dispatcher_host, dispatcher_port, repo, poll, branch): """ - Repo Observer poll function that communicates with the dispatcher server to send tests that are found + Repo Observer that communicates with the dispatcher server to send tests that are found on the repository on new changes commited to the repo. This will watch the repo every 5 seconds for any new commit that is made & make a dispatch to the dispatch server to initiate running of new tests. """ - parser = argparse.ArgumentParser() - parser.add_argument( - "--dispatcher-server", - help="dispatcher host:port , by default it uses localhost:8000", - default="localhost:8000", - action="store", - ) - parser.add_argument( - "--repo", metavar="REPO", type=str, help="path to the repository to observe" - ) - parser.add_argument( - "--poll", help="how long to keep polling repository", default=5, type=int - ) - parser.add_argument( - "--branch", help="which branch to run tests against", default="master", type=str - ) - - args = parser.parse_args() - dispatcher_host, dispatcher_port = args.dispatcher_server.split(":") while True: try: # call the bash script that will update the repo and check # for changes. If there's a change, it will drop a .commit_id file # with the latest commit in the current working directory - logger.info(f"cloning repo {args.repo}") - subprocess.check_output( - ["./scripts/update_repo.sh", args.repo, args.branch] - ) + logger.info(f"cloning repo {repo}") + subprocess.check_output(["./scripts/update_repo.sh", repo, branch]) except subprocess.CalledProcessError as e: - logger.error(f"Failed to update & check repo {args.repo}, err: {e.output}") + logger.error(f"Failed to update & check repo {repo}, err: {e.output}") raise RepoObserverError( f"Could not update & check repository. Err: {e.output}" ) @@ -91,13 +69,9 @@ def poll(): f"Failed to dispatch test to dispatcher. Is Dispatcher OK? err: {response}" ) raise RepoObserverError(f"Could not dispatch test: {response}") - logger.info(f"Dispatched tests for {args.repo}!") + logger.info(f"Dispatched tests for {repo}!") else: # Dispatcher has an issue raise RepoObserverError(f"Could not dispatch test {response}") - time.sleep(args.poll) - - -if __name__ == "__main__": - poll() + time.sleep(poll) diff --git a/run_dispatcher_server.py b/run_dispatcher_server.py new file mode 100644 index 0000000..973acc5 --- /dev/null +++ b/run_dispatcher_server.py @@ -0,0 +1,29 @@ +import argparse +from ci.dispatcher.dispatcher import dispatcher + + +def run_dispatcher(): + """ + Entry point to dispatch server + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--host", + help="Dispatchers host, defaults to localhost", + default="localhost", + action="store", + ) + parser.add_argument( + "--port", + help="Dispatchers port, defaults to 8000", + default=8000, + action="store", + ) + + args = parser.parse_args() + + dispatcher(args.host, int(args.port)) + + +if __name__ == "__main__": + run_dispatcher() diff --git a/run_repo_observer.py b/run_repo_observer.py new file mode 100644 index 0000000..44a5a9d --- /dev/null +++ b/run_repo_observer.py @@ -0,0 +1,34 @@ +import argparse + +from ci.repo_observer.observer import observer + + +def run_observer(): + """ + Run Observer + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--dispatcher-server", + help="dispatcher host:port , by default it uses localhost:8000", + default="localhost:8000", + action="store", + ) + parser.add_argument( + "--repo", metavar="REPO", type=str, help="path to the repository to observe" + ) + parser.add_argument( + "--poll", help="how long to keep polling repository", default=5, type=int + ) + parser.add_argument( + "--branch", help="which branch to run tests against", default="master", type=str + ) + + args = parser.parse_args() + dispatcher_host, dispatcher_port = args.dispatcher_server.split(":") + + observer(dispatcher_host, dispatcher_port, args.repo, args.poll, args.branch) + + +if __name__ == "__main__": + run_observer()