Skip to content

Commit

Permalink
Refactore report.py, run only one workflow and unit tests for now
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-kotliar committed Feb 11, 2021
1 parent 2100e1b commit d6f91b9
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 98 deletions.
78 changes: 39 additions & 39 deletions .travis.yml
Expand Up @@ -13,8 +13,8 @@ python:

env:
- NTEST=1
- NTEST=2
- NTEST=3
# - NTEST=2
# - NTEST=3
# - NTEST=4
# - NTEST=5
# - NTEST=6
Expand Down Expand Up @@ -64,43 +64,43 @@ jobs:
secure: Mji1koR4nyt/KgoycpuvgIp9toFVNYaSxUmNY6EVt0pmIpRb/GKbw6TdyfAdtnSAwH3BcSUC/R1hCwyaXfv1GDPFYqv9Yg1MaNHR1clvo8E8KIIPt1JDqPM47lgPQQFFbwB+Cc6uSV0Nn9oDBkhWEPQqV3kI/GJkSUzSs/yjZqR4C+aZxsJzE+VX2ZzeGCD3x4mzhAAWan4MLrdgANaXQVTHhyHIhTp3l109FblYimMvx8HqKotMiM+32mVFxgwf/pMw/N8gDOFXd4VrtlaOqqHpn4VJko+jSNYuAdKn62N2KFKqExyU39ycvU9ngYaU38nmCjJdibRgNyxfdH6LfndS9xzu3KPY64ACLG1i8Ym+57Q7wSJZAb2WF/b8av1RnkKMUGHHYXBzVIGk7Abvuhde0DsV0lr9XsapQn7XySmhdBWYazZTr+AtgIdsx7AmHV1ug6nPp3tIQzW1+YAOf295Puwqbrn+SF3jYw6167jAl5M1a81kxqli1UTsLgpcaTbTD1ofwLn4gP3VuU1f4fKGzhrxl6ybHW+LpO/wkcN2wJDdBbqz5OQIYfshMQEooIODOw1OonmwbY3vcMATuvi7Hz3mIElqpu3TVxH9aoBzcvL1148wPhZF8u87T8nDgsHeUT66I56ILGcZszASolt2Cb6oPZmxg2jgajTREwk=
on:
tags: true
- name: DAG with embedded workflow (just one test)
script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --embed
- name: DAG with attached workflow using combined API call (just one test)
script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --combine
- name: DAG with embedded workflow using combined API call (just one test)
script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --embed --combine
- name: Test of `init --upgrade`
before_install:
- mkdir -p ~/airflow/dags
- cp ./tests/data/dags/bam_bedgraph_bigwig_single_old_format.py ~/airflow/dags
- cp ./tests/data/workflows/bam-bedgraph-bigwig-single.cwl ~/airflow/dags
install:
- pip install . --constraint ./packaging/constraints/constraints-$TRAVIS_PYTHON_VERSION.txt
before_script:
- cwl-airflow init --upgrade
- rm -f ~/airflow/dags/bam-bedgraph-bigwig-single.cwl
script: airflow dags list # to check if all DAGs are correct
- name: Test packaging for Ubuntu 18.04, Python 3.6
install:
- ./packaging/portable/ubuntu/pack.sh 18.04 3.6 $TRAVIS_BRANCH
- ls ./packaging/portable/ubuntu/build/
- tar xzf "./packaging/portable/ubuntu/build/python_3.6_with_cwl_airflow_${TRAVIS_BRANCH}_ubuntu_18.04.tar.gz"
before_script:
- ./python3/bin_portable/airflow --help # to generate airflow.cfg
- sed -i'.backup' -e 's/^executor.*/executor = LocalExecutor/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^parsing_processes.*/parsing_processes = 1/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^sql_alchemy_pool_enabled.*/sql_alchemy_pool_enabled = False/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^dag_dir_list_interval =.*/dag_dir_list_interval = 60/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^parallelism =.*/parallelism = 1/g' ~/airflow/airflow.cfg
- sed -i'.backup' -e 's/^sql_alchemy_conn.*/sql_alchemy_conn = mysql:\/\/airflow:airflow@127.0.0.1:6603\/airflow/g' ~/airflow/airflow.cfg
- ./python3/bin_portable/cwl-airflow init
- ./python3/bin_portable/airflow connections add process_report --conn-type http --conn-host localhost --conn-port 3070 # to add process_report connection
- ./python3/bin_portable/airflow scheduler > /dev/null 2>&1 &
- ./python3/bin_portable/cwl-airflow api > /dev/null 2>&1 &
- sleep 5 # to let scheduler to parse all dags, otherwise we can't run the following command
- ./python3/bin_portable/airflow dags unpause resend_results
script: ./python3/bin_portable/cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1
# - name: DAG with embedded workflow (just one test)
# script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --embed
# - name: DAG with attached workflow using combined API call (just one test)
# script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --combine
# - name: DAG with embedded workflow using combined API call (just one test)
# script: cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1 --embed --combine
# - name: Test of `init --upgrade`
# before_install:
# - mkdir -p ~/airflow/dags
# - cp ./tests/data/dags/bam_bedgraph_bigwig_single_old_format.py ~/airflow/dags
# - cp ./tests/data/workflows/bam-bedgraph-bigwig-single.cwl ~/airflow/dags
# install:
# - pip install . --constraint ./packaging/constraints/constraints-$TRAVIS_PYTHON_VERSION.txt
# before_script:
# - cwl-airflow init --upgrade
# - rm -f ~/airflow/dags/bam-bedgraph-bigwig-single.cwl
# script: airflow dags list # to check if all DAGs are correct
# - name: Test packaging for Ubuntu 18.04, Python 3.6
# install:
# - ./packaging/portable/ubuntu/pack.sh 18.04 3.6 $TRAVIS_BRANCH
# - ls ./packaging/portable/ubuntu/build/
# - tar xzf "./packaging/portable/ubuntu/build/python_3.6_with_cwl_airflow_${TRAVIS_BRANCH}_ubuntu_18.04.tar.gz"
# before_script:
# - ./python3/bin_portable/airflow --help # to generate airflow.cfg
# - sed -i'.backup' -e 's/^executor.*/executor = LocalExecutor/g' ~/airflow/airflow.cfg
# - sed -i'.backup' -e 's/^parsing_processes.*/parsing_processes = 1/g' ~/airflow/airflow.cfg
# - sed -i'.backup' -e 's/^sql_alchemy_pool_enabled.*/sql_alchemy_pool_enabled = False/g' ~/airflow/airflow.cfg
# - sed -i'.backup' -e 's/^dag_dir_list_interval =.*/dag_dir_list_interval = 60/g' ~/airflow/airflow.cfg
# - sed -i'.backup' -e 's/^parallelism =.*/parallelism = 1/g' ~/airflow/airflow.cfg
# - sed -i'.backup' -e 's/^sql_alchemy_conn.*/sql_alchemy_conn = mysql:\/\/airflow:airflow@127.0.0.1:6603\/airflow/g' ~/airflow/airflow.cfg
# - ./python3/bin_portable/cwl-airflow init
# - ./python3/bin_portable/airflow connections add process_report --conn-type http --conn-host localhost --conn-port 3070 # to add process_report connection
# - ./python3/bin_portable/airflow scheduler > /dev/null 2>&1 &
# - ./python3/bin_portable/cwl-airflow api > /dev/null 2>&1 &
# - sleep 5 # to let scheduler to parse all dags, otherwise we can't run the following command
# - ./python3/bin_portable/airflow dags unpause resend_results
# script: ./python3/bin_portable/cwl-airflow test --suite workflows/tests/conformance_tests.yaml --spin --range 1

before_install:
- git clone https://github.com/datirium/workflows.git --recursive
Expand Down
126 changes: 67 additions & 59 deletions cwl_airflow/utilities/report.py
Expand Up @@ -86,7 +86,10 @@ def report_progress(context, from_task=None):
If dag_run failed but this function was run from the task callback,
error would be always "". The "error" is not "" only when this function
will be called from the DAG callback, thus making it the last and the only
message with the meaningful error description.
message with the meaningful error description. Workflow execution
statistics is generated only when this function is called from DAG (not
task). Also , not delivered messages will be backed up only if this function
was called from DAG.
"""

from_task = False if from_task is None else from_task
Expand All @@ -105,38 +108,7 @@ def report_progress(context, from_task=None):
"error": get_error_category(context) if dag_run.state == State.FAILED and not from_task else ""
}
}
post_progress(message, from_task)


def post_progress(message, from_task, backup=None):
"""
If this function was called not from a task and we failed to send a request
when backup was true (by default it's always true) we need to guarantee that
message is not getting lost so we back it up into the Variable to be able to
resend it later. We don't backup not sent messages if user didn't add the
required connection in Airflow by catching AirflowNotFoundException.
"""

backup = True if backup is None else backup

try:
http_hook.run(endpoint=ROUTES["progress"], json=message, extra_options={"timeout": 30})
except AirflowNotFoundException as err:
logging.debug(f"Failed to POST progress updates. Skipping \n {err}")
except Exception as err:
logging.debug(f"Failed to POST progress updates. \n {err}")
if backup and not from_task and message["payload"]["progress"] != 100: # we don't need to resend messages with progress == 100
logging.debug("Save the message into the Variables")
dag_id = message["payload"]["dag_id"]
run_id = message["payload"]["run_id"]
Variable.set(
key=f"post_progress__{dag_id}__{run_id}",
value={
"message": message,
"endpoint": ROUTES["progress"]
},
serialize_json=True
)
post_progress(message, not from_task) # no need to backup progress messages that came from tasks


def report_results(context):
Expand All @@ -146,7 +118,7 @@ def report_results(context):
endless import loop (file where we define CWLJobGatherer class import this
file). If CWLDAG is contsructed with custom gatherer node, posting results
might not work. We need to except missing results file as the same callback
is used for clean_dag_run DAG.
is used for clean_dag_run DAG. All not delivered messages will be backed up.
"""

dag_run = context["dag_run"]
Expand All @@ -165,14 +137,63 @@ def report_results(context):
}
}
post_results(message)



def report_status(context):
"""
Reports status of the current task. No message backup needed.
"""

dag_run = context["dag_run"]
ti = context["ti"]
message = {
"payload": {
"state": ti.state,
"dag_id": dag_run.dag_id,
"run_id": dag_run.run_id,
"task_id": ti.task_id
}
}
post_status(message)


def post_progress(message, backup=None):
"""
If we failed to post progress report when backup was true (by default it's
always true) we need to guarantee that message is not getting lost so we
back it up into the Variable to be able to resend it later. We don't backup
not sent messages if user didn't add the required connection in Airflow by
catching AirflowNotFoundException. Function may fail only when message is not
properly formatted.
"""

backup = True if backup is None else backup

try:
http_hook.run(endpoint=ROUTES["progress"], json=message, extra_options={"timeout": 30})
except AirflowNotFoundException as err:
logging.debug(f"Failed to POST progress updates. Skipping \n {err}")
except Exception as err:
logging.debug(f"Failed to POST progress updates. \n {err}")
if backup:
logging.debug("Save the message into the Variables")
Variable.set(
key=f"post_progress__{message['payload']['dag_id']}__{message['payload']['run_id']}",
value={
"message": message,
"endpoint": ROUTES["progress"]
},
serialize_json=True
)


def post_results(message, backup=None):
"""
If we failed to post results when backup was true (by default it's always true)
we need to guarantee that message is not getting lost so we back it up into the
Variable to be able to resend it later. We don't backup not sent messages if user
didn't add the required connection in Airflow by catching AirflowNotFoundException.
May fail only when message is not properly formatted.
"""

backup = True if backup is None else backup
Expand All @@ -182,12 +203,11 @@ def post_results(message, backup=None):
except AirflowNotFoundException as err:
logging.debug(f"Failed to POST results. Skipping \n {err}")
except Exception as err:
logging.debug(f"Failed to POST results. Save the message into the Variables \n {err}")
logging.debug(f"Failed to POST results. \n {err}")
if backup:
dag_id = message["payload"]["dag_id"]
run_id = message["payload"]["run_id"]
logging.debug("Save the message into the Variables")
Variable.set(
key=f"post_results__{dag_id}__{run_id}",
key=f"post_results__{message['payload']['dag_id']}__{message['payload']['run_id']}",
value={
"message": message,
"endpoint": ROUTES["results"]
Expand All @@ -196,24 +216,10 @@ def post_results(message, backup=None):
)


def report_status(context):
dag_run = context["dag_run"]
ti = context["ti"]
message = {
"payload": {
"state": ti.state,
"dag_id": dag_run.dag_id,
"run_id": dag_run.run_id,
"task_id": ti.task_id
}
}
post_status(message)


def post_status(message):
"""
We don't need to backup not delivered status updates so we
don't save them in to Variables
We don't need to backup not delivered status updates
so we don't save them in to Variables. Never fails.
"""

try:
Expand All @@ -227,10 +233,11 @@ def clean_up(context):
Loads "cwl" arguments from the DAG, just in case updates them with
all required defaults, and, unless "keep_tmp_data" was set to True,
tries to remove all remporary data and related records in the XCom
table. If this function is called from clean_dag_run or any other
DAG that doesn't have "cwl" in the "default_args" we will catch
KeyError exception
table. If this function is called from the callback of clean_dag_run
or any other DAG that doesn't have "cwl" in the "default_args" we will
catch KeyError exception.
"""

try:
default_cwl_args = get_default_cwl_args(
context["dag"].default_args["cwl"]
Expand Down Expand Up @@ -266,6 +273,7 @@ def dag_on_success(context):


def dag_on_failure(context):
# we need to report progress, because we will also report error in it
# we need to report progress, because we will also report
# error and execution statistics in it
report_progress(context)
clean_up(context)

0 comments on commit d6f91b9

Please sign in to comment.