Skip to content

Commit

Permalink
Update tester to add new DAGs through API
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-kotliar committed Jul 8, 2020
1 parent 9c98fc0 commit ebc49bc
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 77 deletions.
64 changes: 42 additions & 22 deletions cwl_airflow/components/test/conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

from time import sleep
from shutil import rmtree
from urllib.parse import urljoin
from collections import OrderedDict
from airflow.settings import DAGS_FOLDER
from cwltest.utils import compare, CompareFail
from http.server import SimpleHTTPRequestHandler

Expand All @@ -24,7 +24,9 @@
)
from cwl_airflow.utilities.cwl import (
load_job,
DAG_TEMPLATE
embed_all_runs,
convert_to_workflow,
fast_cwl_load
)


Expand Down Expand Up @@ -141,37 +143,53 @@ def load_test_suite(args):
return suite_data_filtered


def create_dags(suite_data, dags_folder=None):
def create_dags(suite_data, args, dags_folder=None):
"""
Iterates over suite_data and exports DAG files into dags_folder.
If file with the same name has already been exported, skip it.
If dags_folder is not set, use DAGS_FOLDER from airflow.settings
TODO: maybe I will need to copy packed cwl file into the dags_folder too.
Gets list of all available DAGs in Airflow. Iterates over "suite_data"
and creates new DAGs for those, that don't exist yet. All done through
API. Airflow Scheduler will parse all dags at the end of the next
"dag_dir_list_interval" from airflow.cfg
"""

dags_folder = get_dir(DAGS_FOLDER if dags_folder is None else dags_folder)
# TODO: think how safe is it to force scheduler reload DAGs

r = requests.get(url=urljoin(args.api, "/api/experimental/dags")) # get list of all DAGs in Airflow. Never fails unless API is unavailable
dag_ids = [item["dag_id"] for item in r.json()["dags"]]

for test_data in suite_data.values():
dag_location = os.path.join(
dags_folder,
test_data["dag_id"] + ".py"
if test_data["dag_id"] in dag_ids: # do not create DAG with the same name
continue
workflow_tool = fast_cwl_load(test_data["tool"])
workflow_path = os.path.join(
args.tmp,
os.path.basename(test_data["tool"])
)
if not os.path.isfile(dag_location):
with open(dag_location, "w") as output_stream:
output_stream.write(
DAG_TEMPLATE.format(
test_data["tool"],
test_data["dag_id"]
)
)
if workflow_tool["class"] == "Workflow":
embed_all_runs(
workflow_tool=workflow_tool,
location=workflow_path
)
else:
convert_to_workflow(
command_line_tool=workflow_tool,
location=workflow_path
)
with open(workflow_path, "rb") as input_stream:
logging.info(f"Add DAG {test_data['dag_id']}")
r = requests.post( # create new DAG
url=urljoin(args.api, "/api/experimental/dags"),
params={
"dag_id": test_data["dag_id"]
},
files={"workflow": input_stream}
)


def trigger_dags(suite_data, args):
for run_id, test_data in suite_data.items():
logging.info(f"Trigger DAG {test_data['dag_id']} as {run_id}")
r = requests.post(
url=args.endpoint,
url=urljoin(args.api, "/api/experimental/dag_runs"),
params={
"run_id": run_id,
"dag_id": test_data["dag_id"],
Expand Down Expand Up @@ -201,12 +219,14 @@ def run_test_conformance(args):
Runs conformance tests
"""

# TODO: do not forget to remove args.tmp

# Load test suite data, setup a queue to keep results
suite_data = load_test_suite(args)
results_queue = queue.Queue(maxsize=len(suite_data))

# Create new dags
create_dags(suite_data) # TODO: should call API instead of manually creating new DAGs
create_dags(suite_data, args)

# Start thread to listen for status updates
listener = get_listener_thread(
Expand Down
10 changes: 5 additions & 5 deletions cwl_airflow/utilities/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def get_parser():
Default: 3070"
)
test_parser.add_argument(
"--endpoint",
"--api",
type=str,
default="http://127.0.0.1:8081/api/experimental/dag_runs",
help="Set CWL-Airflow 'api' endpoint to create and trigger DAGs. \
Default: http://127.0.0.1:8081/api/experimental/dag_runs"
default="http://127.0.0.1:8081",
help="Set CWL-Airflow API's base url (IP address and port). \
Default: http://127.0.0.1:8081"
)
test_parser.add_argument(
"--range",
Expand Down Expand Up @@ -234,7 +234,7 @@ def parse_arguments(argsl, cwd=None):
args, _ = get_parser().parse_known_args(argsl)
args = get_normalized_args(
args,
["func", "port", "host", "endpoint", "range", "spin"],
["func", "port", "host", "api", "range", "spin"],
cwd
)
assert_and_fix_args(args)
Expand Down
50 changes: 0 additions & 50 deletions tests/test_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,6 @@
tempfile.tempdir = "/private/tmp"


@pytest.mark.parametrize(
"args, control_dag_list",
[
(
[
"test",
"--suite", "./conformance/test_suite_1.yaml",
"--range", "1-3,5"
],
["bam-bedgraph-bigwig.py"]
),
(
[
"test",
"--suite", "./conformance/test_suite_1.yaml",
"--range", "1-3,6"
],
[
"bam-bedgraph-bigwig.py",
"bam-bedgraph-bigwig-single.py"
]
),
(
[
"test",
"--suite", "./conformance/test_suite_1.yaml"
],
[
"bam-bedgraph-bigwig.py",
"bam-bedgraph-bigwig-single.py",
"bam-bedgraph-bigwig-subworkflow.py"
]
)
]
)
def test_create_dags(args, control_dag_list):
parsed_args = parse_arguments(args, DATA_FOLDER)
try:
suite_data = load_test_suite(parsed_args)
create_dags(suite_data, parsed_args.tmp)
dags_list = get_files(parsed_args.tmp, ".*\\.py$").keys()
except Exception as err:
assert False, f"Failed to run test, {err}"
finally:
rmtree(parsed_args.tmp)

assert sorted(dags_list) == sorted(control_dag_list), \
"Failed to create python file for DAG"


@pytest.mark.parametrize(
"args, control_ids",
[
Expand Down

0 comments on commit ebc49bc

Please sign in to comment.