Skip to content

Commit

Permalink
feat(run scripts): create run scripts
Browse files Browse the repository at this point in the history
Create run scripts for dispatcher server & repo observer
  • Loading branch information
BrianLusina committed Jun 1, 2020
1 parent 1604b62 commit d249fea
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 65 deletions.
37 changes: 9 additions & 28 deletions ci/dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread

from utils.logger import logger
from utils import communicate
from ..utils import dispatch_tests
from ci.logger import logger
from ci.utils import communicate
from .utils import dispatch_tests

from .threading_tcp_server import ThreadingTCPServer
from .dispatcher_handler import DispatcherHandler
Expand All @@ -35,7 +35,7 @@ def manage_commit_lists(runner):
for commit, assigned_runner in server.dispathed_commits.items():
if assigned_runner == runner:
del server.dispatched_commits[commit]
serve.pending_commits.append(commit)
server.pending_commits.append(commit)
break
server.runners.remove(runner)

Expand Down Expand Up @@ -67,32 +67,17 @@ def redistribute(server):
time.sleep(5)


def serve():
def dispatcher(host, port):
"""
Entry point to dispatch server
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--host",
help="Dispatchers host, defaults to localhost",
default="localhost",
action="store",
)
parser.add_argument(
"--port",
help="Dispatchers port, defaults to 8000",
default=8000,
action="store",
)

args = parser.parse_args()
server = ThreadingTCPServer((host, int(port)), DispatcherHandler)

server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
logger.info(f"Dispatcher Server running on address {host}:{port}")

logger.info(f"Dispatcher Server running on address {args.host}:{args.port}")

runner_heartbeat = Thread(target=runner_checker, args=(server))
redistributor = Thread(target=redistribute, args=(server))
runner_heartbeat = Thread(target=runner_checker, args=(server,))
redistributor = Thread(target=redistribute, args=(server,))

try:
runner_heartbeat.start()
Expand All @@ -105,7 +90,3 @@ def serve():
server.dead = True
runner_heartbeat.join()
redistributor.join()


if __name__ == "__main__":
serve()
2 changes: 1 addition & 1 deletion ci/dispatcher/dispatcher_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
import os

from logger import logger
from ci.logger import logger
from .utils import dispatch_tests


Expand Down
4 changes: 2 additions & 2 deletions ci/dispatcher/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"""
import time

from logger import logger
from utils import communicate
from ci.logger import logger
from ci.utils import communicate


def dispatch_tests(server, commit_id):
Expand Down
42 changes: 8 additions & 34 deletions ci/repo_observer/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
typical repo observer will receive notifications from the Repository, but some VCS systems do not have built in
notification systems.
"""
import argparse
import os
import re
import socket
Expand All @@ -18,47 +17,26 @@
import time

from .exceptions import RepoObserverError
from ..utils import communicate
from ci.utils import communicate
from ci.logger import logger


def poll():
def observer(dispatcher_host, dispatcher_port, repo, poll, branch):
"""
Repo Observer poll function that communicates with the dispatcher server to send tests that are found
Repo Observer that communicates with the dispatcher server to send tests that are found
on the repository on new changes commited to the repo. This will watch the repo every 5 seconds for any
new commit that is made & make a dispatch to the dispatch server to initiate running of new tests.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--dispatcher-server",
help="dispatcher host:port , by default it uses localhost:8000",
default="localhost:8000",
action="store",
)
parser.add_argument(
"--repo", metavar="REPO", type=str, help="path to the repository to observe"
)
parser.add_argument(
"--poll", help="how long to keep polling repository", default=5, type=int
)
parser.add_argument(
"--branch", help="which branch to run tests against", default="master", type=str
)

args = parser.parse_args()
dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

while True:
try:
# call the bash script that will update the repo and check
# for changes. If there's a change, it will drop a .commit_id file
# with the latest commit in the current working directory
logger.info(f"cloning repo {args.repo}")
subprocess.check_output(
["./scripts/update_repo.sh", args.repo, args.branch]
)
logger.info(f"cloning repo {repo}")
subprocess.check_output(["./scripts/update_repo.sh", repo, branch])
except subprocess.CalledProcessError as e:
logger.error(f"Failed to update & check repo {args.repo}, err: {e.output}")
logger.error(f"Failed to update & check repo {repo}, err: {e.output}")
raise RepoObserverError(
f"Could not update & check repository. Err: {e.output}"
)
Expand Down Expand Up @@ -91,13 +69,9 @@ def poll():
f"Failed to dispatch test to dispatcher. Is Dispatcher OK? err: {response}"
)
raise RepoObserverError(f"Could not dispatch test: {response}")
logger.info(f"Dispatched tests for {args.repo}!")
logger.info(f"Dispatched tests for {repo}!")
else:
# Dispatcher has an issue
raise RepoObserverError(f"Could not dispatch test {response}")

time.sleep(args.poll)


if __name__ == "__main__":
poll()
time.sleep(poll)
29 changes: 29 additions & 0 deletions run_dispatcher_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import argparse
from ci.dispatcher.dispatcher import dispatcher


def run_dispatcher():
"""
Entry point to dispatch server
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--host",
help="Dispatchers host, defaults to localhost",
default="localhost",
action="store",
)
parser.add_argument(
"--port",
help="Dispatchers port, defaults to 8000",
default=8000,
action="store",
)

args = parser.parse_args()

dispatcher(args.host, int(args.port))


if __name__ == "__main__":
run_dispatcher()
34 changes: 34 additions & 0 deletions run_repo_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import argparse

from ci.repo_observer.observer import observer


def run_observer():
"""
Run Observer
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--dispatcher-server",
help="dispatcher host:port , by default it uses localhost:8000",
default="localhost:8000",
action="store",
)
parser.add_argument(
"--repo", metavar="REPO", type=str, help="path to the repository to observe"
)
parser.add_argument(
"--poll", help="how long to keep polling repository", default=5, type=int
)
parser.add_argument(
"--branch", help="which branch to run tests against", default="master", type=str
)

args = parser.parse_args()
dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

observer(dispatcher_host, dispatcher_port, args.repo, args.poll, args.branch)


if __name__ == "__main__":
run_observer()

0 comments on commit d249fea

Please sign in to comment.