Skip to content

Commit

Permalink
Looks like I have a tool to run conformance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-kotliar committed Jul 6, 2020
1 parent e7f1e24 commit 325df17
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 205 deletions.
4 changes: 2 additions & 2 deletions cwl_airflow/components/api/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from airflow.api.common.experimental import trigger_dag

from cwl_airflow.utilities.helpers import get_version, get_dir
from cwl_airflow.utilities.airflow import conf_get, DAG_TEMPLATE
from cwl_airflow.utilities.cwl import conf_get, DAG_TEMPLATE


class CWLApiBackend():
Expand Down Expand Up @@ -209,7 +209,7 @@ def wes_collect_attachments(self, run_id):
tempdir = tempfile.mkdtemp(
dir=get_dir(
path.abspath(
conf_get("cwl", "tmp_folder", os.path.join(AIRFLOW_HOME, "cwl_temp_folder"))
conf_get("cwl", "tmp_folder", os.path.join(AIRFLOW_HOME, "cwl_tmp_folder"))
)
),
prefix="run_id_"+run_id+"_"
Expand Down
183 changes: 151 additions & 32 deletions cwl_airflow/components/test/conformance.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,141 @@
import os
import logging
import sys
import uuid

from queue import Queue
import json
import queue
import requests
import itertools
import threading
import socketserver

from time import sleep
from shutil import rmtree
from collections import OrderedDict

from airflow.settings import DAGS_FOLDER
from cwltest.utils import compare, CompareFail
from http.server import SimpleHTTPRequestHandler

from cwl_airflow.utilities.helpers import (
load_yaml,
get_dir,
get_absolute_path,
get_rootname
)
from cwl_airflow.utilities.airflow import DAG_TEMPLATE
from cwl_airflow.utilities.cwl import (
load_job,
DAG_TEMPLATE
)


def get_listener_thread(
results_queue,
port,
daemon
):
httpd = socketserver.TCPServer(("", port), CustomHandler)
httpd.results_queue = results_queue # to have access to results_queue from CustomHandler through self.server.results_queue
return threading.Thread(
target=httpd.serve_forever,
daemon=daemon
)


def get_spinner_thread():
return threading.Thread(target=spin, daemon=True)


def get_checker_thread(
suite_data,
results_queue,
daemon
):
return threading.Thread(target=check_result,
daemon=daemon,
kwargs={
"suite_data": suite_data,
"results_queue": results_queue
})


class CustomHandler(SimpleHTTPRequestHandler):

def do_POST(self):
self.send_response(200)
self.end_headers()
if "status" in self.path:
return None
payload = json.loads(
self.rfile.read(
int(self.headers["Content-Length"])
).decode("UTF-8")
)["payload"]
if payload.get("results", None) or payload.get("state", None) == "failed":
self.server.results_queue.put({
"run_id": payload["run_id"],
"dag_id": payload["dag_id"],
"results": payload.get("results", None)
})


def spin():
spinner = itertools.cycle(['-', '/', '|', '\\'])
while True:
sys.stdout.write(next(spinner))
sleep(0.1)
sys.stdout.flush()
sys.stdout.write('\b')


def check_result(suite_data, results_queue):
processed = 0
while processed < len(suite_data):
try:
item = results_queue.get()
except queue.Empty:
continue
processed = processed + 1
run_id = item["run_id"]
try:
compare(suite_data[run_id]["output"], item["results"])
except CompareFail as ex:
suite_data[run_id]["error"] = str(ex)
finally:
rmtree(suite_data[run_id]["job"]["outputs_folder"])


def load_test_suite(args):
"""
Loads tests from the provided --suite file.
Selects tests based on the indices from --range.
Updates all tool and job locations to be absolute.
Updates tools locations to be absolute, loads
jobs and updates all inputs files locations to
be absolute too. Adds "outputs_folder" to the job
Adds run_id's as keys for easy access and proper
test identification when test is run.
test identification when receiving results.
"""

suite_data = load_yaml(args.suite)
suite_dir = os.path.dirname(args.suite)
suite_data_filtered = OrderedDict() # use OrderedDict just to keep it similar to suite_data
suite_data_filtered = OrderedDict() # use OrderedDict just to keep it similar to suite_data
for i in args.range:
test_data = suite_data[i]
run_id = str(uuid.uuid4())
tool_location = get_absolute_path(test_data["tool"], suite_dir)
job_data = load_job(
tool_location,
get_absolute_path(test_data["job"], suite_dir)
)
job_data["outputs_folder"] = get_dir(os.path.join(args.tmp, run_id))

test_data.update({
"job": get_absolute_path(test_data["job"], suite_dir),
"tool": get_absolute_path(test_data["tool"], suite_dir),
"job": job_data, # already parsed, includes "outputs_folder"
"tool": tool_location,
"dag_id": get_rootname(test_data["tool"])
})
run_id = str(uuid.uuid4())
suite_data_filtered[run_id] = test_data

suite_data_filtered[run_id] = test_data # use "run_id" as a key for fast access when checking results
return suite_data_filtered


Expand All @@ -48,50 +148,69 @@ def create_dags(suite_data, dags_folder=None):
TODO: maybe I will need to copy packed cwl file into the dags_folder too.
"""

if dags_folder is None:
dags_folder = get_dir(DAGS_FOLDER)
else:
dags_folder = get_dir(dags_folder)
dags_folder = get_dir(DAGS_FOLDER if dags_folder is None else dags_folder)

for test_data in suite_data.values():
tool_location = test_data["tool"]
dag_id = get_rootname(tool_location)
dag_location = os.path.join(
dags_folder,
dag_id + ".py"
test_data["dag_id"] + ".py"
)
if not os.path.isfile(dag_location):
with open(dag_location, 'w') as output_stream:
with open(dag_location, "w") as output_stream:
output_stream.write(
DAG_TEMPLATE.format(
tool_location,
dag_id
test_data["tool"],
test_data["dag_id"]
)
)


def trigger_dags(suite_data, args):
for run_id, test_data in suite_data.items():
r = requests.post(
url=args.endpoint,
params={
"run_id": run_id,
"dag_id": test_data["dag_id"],
"conf": json.dumps(
{
"job": test_data["job"]
}
)
}
)


def run_test_conformance(args):
"""
Runs conformance tests
"""

# Load test suite data, setup a queue for tests
# Load test suite data, setup a queue to keep results
suite_data = load_test_suite(args)
suite_queue = Queue(maxsize=len(suite_data))
results_queue = queue.Queue(maxsize=len(suite_data))

# Create new dags
create_dags(suite_data)

# Start status update listener
listener = get_listener_thread(queue=queue, port=args.port, daemon=True)
create_dags(suite_data) # TODO: should call API instead of manually creating new DAGs

# Start thread to listen for status updates
listener = get_listener_thread(
results_queue=results_queue,
port=args.port,
daemon=True
)
listener.start()

# Start checker thread
checker = get_checker_thread(data=data_dict, daemon=False)
# Start checker thread to evaluate received results
checker = get_checker_thread(
suite_data=suite_data,
results_queue=results_queue,
daemon=False
)
checker.start()

# Trigger all dags
trigger_dags(data_dict, args)
trigger_dags(suite_data, args)

# Display spinner if --spin
if args.spin:
Expand All @@ -101,5 +220,5 @@ def run_test_conformance(args):
# Wait until all triggered dags return results
checker.join()

if any(item.get("error", None) for item in data_dict.values()):
if any(item.get("error", None) for item in suite_data.values()):
sys.exit(1)
88 changes: 15 additions & 73 deletions cwl_airflow/extensions/cwldag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
from cwltool.argparser import get_default_args
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.configuration import AIRFLOW_HOME

from cwl_airflow.utilities.helpers import get_dir
from cwl_airflow.utilities.airflow import conf_get
from cwl_airflow.utilities.cwl import fast_cwl_load, get_items
from cwl_airflow.utilities.cwl import (
fast_cwl_load,
get_items,
get_default_cwl_args
)
from cwl_airflow.extensions.operators.cwlstepoperator import CWLStepOperator
from cwl_airflow.extensions.operators.cwljobdispatcher import CWLJobDispatcher
from cwl_airflow.extensions.operators.cwljobgatherer import CWLJobGatherer
Expand Down Expand Up @@ -60,82 +61,23 @@ def __setup_params(self, kwargs, workflow):
to be queued longer then half a year:)
"""

# default args provided by user.
# Use deepcopy to prevent from changing in place
# default args provided by user. Use deepcopy to prevent from changing in place
user_default_args = deepcopy(kwargs.get("default_args", {}))

# cwl args provided by user within default_args.
# Use deepcopy to prevent from changing in place
user_cwl_args = deepcopy(user_default_args.get("cwl", {}))

# default arguments required by cwltool
required_cwl_args = get_default_args()

# update default arguments required by cwltool with those that are provided by user
required_cwl_args.update(user_cwl_args)

# update default arguments required by cwltool with those that
# might be updated based on higher priority of airflow configuration file.
# If airflow configuration file doesn't include correspondent parameters,
# use those that were provided by user, or defaults
required_cwl_args.update(
{
"workflow": workflow,
"tmp_folder": get_dir(
conf_get(
"cwl", "tmp_folder",
user_cwl_args.get(
"tmp_folder", os.path.join(AIRFLOW_HOME, "cwl_temp_folder")
)
)
),
"outputs_folder": get_dir( # to store outputs if "outputs_folder" is not overwritten in job
conf_get(
"cwl", "outputs_folder",
user_cwl_args.get(
"outputs_folder", os.path.join(AIRFLOW_HOME, "cwl_outputs_folder")
)
)
),
"pickle_folder": get_dir(
conf_get(
"cwl", "pickle_folder",
user_cwl_args.get(
"pickle_folder", os.path.join(AIRFLOW_HOME, "cwl_pickle_folder")
)
)
),
"use_container": conf_get(
"cwl", "use_container",
user_cwl_args.get("use_container", True) # execute jobs in a docker containers
),
"no_match_user": conf_get(
"cwl", "no_match_user",
user_cwl_args.get("no_match_user", False) # disables passing the current uid to "docker run --user"
),
"skip_schemas": conf_get(
"cwl", "skip_schemas",
user_cwl_args.get("skip_schemas", True) # it looks like this doesn't influence anything in the latest cwltool
),
"strict": conf_get(
"cwl", "strict",
user_cwl_args.get("strict", False)
),
"quiet": conf_get(
"cwl", "quiet",
user_cwl_args.get("quiet", False)
),
"rm_tmpdir": True,
"move_outputs": "move"
}
# get all the parameters required by cwltool with preset by user defaults
required_cwl_args = get_default_cwl_args(
preset_cwl_args=user_default_args.get("cwl", {})
)

# update default args provided by user with updated cwl args
# need to add path to the workflow
required_cwl_args["workflow"] = workflow

# update default args provided by user with required by cwltool args
user_default_args.update({
"cwl": required_cwl_args
})

# default arguments required by CWL-Airflow
# default arguments required by CWL-Airflow (no need to put it in a separate function so far)
required_default_args = {
"start_date": days_ago(180),
"email_on_failure": False,
Expand All @@ -145,7 +87,7 @@ def __setup_params(self, kwargs, workflow):
"on_retry_callback": task_on_retry
}

# Updated default arguments required by CWL-Airflow with those that are provided by user
# Updated default arguments required by CWL-Airflow with those that are provided by user for cwltool
required_default_args.update(user_default_args)

# update kwargs with correct default_args and callbacks if those were not set by user
Expand Down
6 changes: 4 additions & 2 deletions cwl_airflow/extensions/operators/cwljobgatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

from cwl_airflow.utilities.airflow import collect_reports
from cwl_airflow.utilities.cwl import relocate_outputs
from cwl_airflow.utilities.cwl import (
relocate_outputs,
collect_reports
)
from cwl_airflow.utilities.report import post_status


Expand Down

0 comments on commit 325df17

Please sign in to comment.