Skip to content

Commit

Permalink
Merge pull request #929 from Flowminder/flowetl-config-MA
Browse files Browse the repository at this point in the history
Minor flowetl tweaks
  • Loading branch information
maxalbert committed Jun 13, 2019
2 parents 9a74291 + 3c96e82 commit 6ee09e5
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Expand Up @@ -582,7 +582,7 @@ jobs:
name: run flowetl integration tests
command: |
cd flowetl
TAG=$CIRCLE_SHA1 pipenv run pytest --junit-xml=test_results/pytest/results_integration.xml ./tests
FLOWETL_TESTS_CONTAINERS_TAG=$CIRCLE_SHA1 pipenv run pytest --junit-xml=test_results/pytest/results_integration.xml ./tests
- run:
name: run etl module unit tests
command: |
Expand Down
1 change: 1 addition & 0 deletions flowetl/Pipfile
Expand Up @@ -12,6 +12,7 @@ docker = "*"
ipython = "*"
etl = {editable = true,path = "./etl"}
pytest-cov = "*"
structlog = "*"

[packages]

Expand Down
18 changes: 9 additions & 9 deletions flowetl/Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions flowetl/dags/etl.py
Expand Up @@ -6,8 +6,8 @@
"""
Skeleton specification for ETL DAG
"""
import logging
import os
import structlog

from pathlib import Path

Expand All @@ -22,20 +22,21 @@
from etl.etl_utils import construct_etl_dag, CDRType
from etl.config_parser import validate_config, get_config_from_file

logger = structlog.get_logger("flowetl")
default_args = {"owner": "flowminder", "start_date": parse("1900-01-01")}

# Determine if we are in a testing environment - use dummy callables if so
if os.environ.get("TESTING", "") == "true":
task_callable_mapping = TEST_ETL_TASK_CALLABLES
logging.info("running in testing environment")
logger.info("running in testing environment")
dag = construct_etl_dag(
task_callable_mapping=task_callable_mapping,
default_args=default_args,
cdr_type="testing",
)
else:
task_callable_mapping = PRODUCTION_ETL_TASK_CALLABLES
logging.info("running in production environment")
logger.info("running in production environment")

# read and validate the config file before creating the DAGs
global_config_dict = get_config_from_file(
Expand Down
12 changes: 8 additions & 4 deletions flowetl/dags/etl_sensor.py
Expand Up @@ -3,10 +3,12 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

# -*- coding: utf-8 -*-
import logging
import os
import structlog

# need to import and not use so that airflow looks here for a DAG
from airflow import DAG # pylint: disable=unused-import

from airflow import DAG
from pendulum import parse

from etl.etl_utils import construct_etl_sensor_dag
Expand All @@ -15,15 +17,17 @@
PRODUCTION_ETL_SENSOR_TASK_CALLABLE,
)

logger = structlog.get_logger("flowetl")

default_args = {"owner": "flowminder", "start_date": parse("1900-01-01")}

if os.environ.get("TESTING", "") == "true":
logging.info("running in testing environment")
logger.info("running in testing environment")
dag = construct_etl_sensor_dag(
callable=TEST_ETL_SENSOR_TASK_CALLABLE, default_args=default_args
)
else:
logging.info("running in production environment")
logger.info("running in production environment")
dag = construct_etl_sensor_dag(
callable=PRODUCTION_ETL_SENSOR_TASK_CALLABLE, default_args=default_args
)
21 changes: 21 additions & 0 deletions flowetl/etl/etl/__init__.py
Expand Up @@ -3,3 +3,24 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

# -*- coding: utf-8 -*-

import json
import structlog


structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(serializer=json.dumps),
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
cache_logger_on_first_use=True,
)
10 changes: 6 additions & 4 deletions flowetl/etl/etl/dummy_task_callables.py
Expand Up @@ -8,21 +8,23 @@
"""

import os
import logging
import structlog

from uuid import uuid1
from pendulum import utcnow
from airflow.models import DagRun, TaskInstance
from airflow.api.common.experimental.trigger_dag import trigger_dag

logger = structlog.get_logger("flowetl")

# pylint: disable=unused-argument
def dummy__callable(*, dag_run: DagRun, task_instance: TaskInstance, **kwargs):
"""
Dummy python callable - raises an exception if the environment variable
TASK_TO_FAIL is set to the name of the current task, otherwise succeeds
silently.
"""
logging.info(dag_run)
logger.info(dag_run)
if os.environ.get("TASK_TO_FAIL", "") == task_instance.task_id:
raise Exception

Expand All @@ -31,7 +33,7 @@ def dummy_failing__callable(*, dag_run: DagRun, **kwargs):
"""
Dummy python callable raising an exception
"""
logging.info(dag_run)
logger.info(dag_run)
raise Exception


Expand All @@ -40,5 +42,5 @@ def dummy_trigger__callable(*, dag_run: DagRun, **kwargs):
In test env we just want to trigger the etl_testing DAG with
no config.
"""
logging.info(dag_run)
logger.info(dag_run)
trigger_dag("etl_testing", run_id=str(uuid1()), execution_date=utcnow())
15 changes: 9 additions & 6 deletions flowetl/etl/etl/production_task_callables.py
Expand Up @@ -6,8 +6,8 @@
"""
Contains the definition of callables to be used in the production ETL dag.
"""
import logging
import shutil
import structlog

from pathlib import Path
from uuid import uuid1
Expand All @@ -26,6 +26,9 @@
generate_table_names,
)

logger = structlog.get_logger("flowetl")


# pylint: disable=unused-argument
def render_and_run_sql__callable(
*,
Expand Down Expand Up @@ -138,7 +141,7 @@ def success_branch__callable(*, dag_run: DagRun, **kwargs):
for task_id in ["init", "extract", "transform", "load"]
]

logging.info(dag_run)
logger.info(dag_run)

if any(previous_task_failures):
branch = "quarantine"
Expand Down Expand Up @@ -166,24 +169,24 @@ def trigger__callable(
"""

found_files = find_files(dump_path=dump_path)
logging.info(found_files)
logger.info(found_files)

# remove files that either do not match a pattern
# or have been processed successfully allready...
filtered_files = filter_files(
found_files=found_files, cdr_type_config=cdr_type_config
)
logging.info(filtered_files)
logger.info(filtered_files)

# what to do with these!?
bad_files = list(set(found_files) - set(filtered_files))
logging.info(bad_files)
logger.info(bad_files)

configs = [
(file, get_config(file_name=file.name, cdr_type_config=cdr_type_config))
for file in filtered_files
]
logging.info(configs)
logger.info(configs)

for file, config in configs:

Expand Down
2 changes: 1 addition & 1 deletion flowetl/etl/setup.py
Expand Up @@ -13,5 +13,5 @@
include_package_data=True,
zip_safe=False,
# pinning airflow version so that it is the same as in the Dockerfile
install_requires=["apache-airflow[postgres]==1.10.3"],
install_requires=["apache-airflow[postgres]==1.10.3", "structlog"],
)
6 changes: 3 additions & 3 deletions flowetl/etl/tests/test_config.py
Expand Up @@ -86,14 +86,14 @@ def test_find_files_default_filter(tmpdir):
tmpdir.join("B.txt").write("content")
tmpdir.join("README.md").write("content")

tmpdir_path_obj = Path(dump_path=tmpdir)
tmpdir_path_obj = Path(tmpdir)

files = find_files(tmpdir_path_obj)
files = find_files(dump_path=tmpdir_path_obj)

assert set([file.name for file in files]) == set(["A.txt", "B.txt"])


def test_find_files_default_filter(tmpdir):
def test_find_files_non_default_filter(tmpdir):
"""
Test that find files returns correct files
with non-default filter argument.
Expand Down

0 comments on commit 6ee09e5

Please sign in to comment.