Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flowetl - basically working.... #858

Merged
merged 118 commits into from
Jun 13, 2019
Merged

Flowetl - basically working.... #858

merged 118 commits into from
Jun 13, 2019

Conversation

josh-gree
Copy link
Contributor

Closes N/A

I have:

  • Formatted any Python files with black
  • Brought the branch up to date with master
  • Added any relevant Github labels
  • Added tests for any new additions
  • Added or updated any relevant documentation
  • Added an Architectural Decision Record (ADR), if appropriate
  • Added an MPLv2 License Header if appropriate
  • Updated the Changelog

Description

Lots to talk about. Just adding so we can look at it - will add more description once we have had a chat.

@josh-gree
Copy link
Contributor Author

Bye Bye All!

CircleCI

Now using the machine executor in the flowetl test job. This is needed so that we can make use of bind mounts.

Dev Environment

Addition of some Flowetl defaults. Locations of local mounts for dump, archive, ingest, quarantine and config. Also location of these mounts within the flowetl container specified by MOUNT_HOME. Exposing flowetl PG on FLOWETL_POSTGRES_PORT.

docker-compose.yml

Bind volumes added to flowetl. flowdb connection string passed to flowetl as AIRFLOW_CONN_FLOWDB. flowetl_db port exposed.

04_schema_other.sql

Have changed the table etl.etl, for recording ingestion process, to the better named etl.etl_records and have removed use of PG enums as this is enforced in the sql model. Removed filename column since cdr_type, cdr_date should be unique. You should probably remove the aggrgates.aggregates table at some point...

Pipfile

Added pytest-cov and postgres extra for airflow.

./dags

zero coverage on these two files since they are not really python code but a DAG DSL.

However both these files make use of tested functions construct_etl_dag and construct_etl_sensor_dag to produce the actual DAG object. Main logic in these files is switching between testing env (where DAGs have dummy non-functional tasks) and production env where tasks are as they should be for actual use.

The etl_sensor is pretty straight forward just logic for switch between testing and production.

The etl DAG is a little bit more involved...

In production we want to have an individual etl DAG for each cdr type that the sensor dispatches to based on the cdr_type of the file found. Why? Because different cdr_types in same country can be an order of magnitude different in size and so it would be nice to be able to control how many files we try to ingest concurrently per cdr_type - this is only possible on a dag by dag basis by setting the dags concurrency level.

So in etl.py file we construct an identical DAG (other than it's dag_id - which will become etl_{cdr_type}) for each cdr type that exists in etl.etl_utils.CDRType but before doing this we make sure that the global config.yml file is valid - this means that we know what filename patterns we are looking for for each cdr_type. This is accomplished using the function validate_config which will either pass silently if everything is OK or raise exception and stop airflow from constructing the various etl DAGs.

construct_etl_sensor_dag + construct_etl_dag + dag_task_callable_mappings.py

The two construct_*_dag functions are where the actual structure of the etl_{cdr_type} and etl_sensor DAGs are specified. All tasks are PythonOperator or BranchPythonOperator (which is basically the same but should emit the task_id of the next task to run) - I decided to use PythonOperator rather than more specific airflow operators because I think its more flexible but of course whether thats the right call is up to you in the future. The construction functions specify the structure of the DAG but not what the actual python_callable for each task is - this is passed in as an argument and allows for switching between dummy callables (for tests) and real ones in production.

The actual callables for each task are created, using partials for default args, in the file dag_task_callable_mappings.py. Then imported into the two files in ./dags and passed to the construction functions.

Production task callables

Only four types of production callable are used throughout the DAGs - different behaviour of the callables is manipulated by passing default args via partials. These are specified in etl/etl/production_callables.py.

  1. render_and_run_sql__callable (used in etl_{cdr_type} DAGs)

This function is used for tasks that need to run sql against flowdb (extract, transform, load and clean) - the args template_name and fixed_sql specify the different behaviour between each of these tasks. As an example if we are in the DAG etl_calls and the task transform then we would specify template_name="transform" and fixed_sql=False then this task would look in CONFIG_DIR/etl/calls for a sql file called transform.sql (this should be a jinja template file) this file will be templated and then applied to flowdb (the connection to flowdb is specified via the arg db_hook which is the same for all use - this is of type DbApiHook which means that you could change easily to another type of DB if desired). The fixed_sql arg is only True for the tasks load and clean the reason being that these tasks are agnostic to the type of cdr - loading to parent tables is the same and cleaning is also the same. When fixed_sql=True then the jinja templated sql files will be looked for in CONFIG_DIR/fixed_sql.

  1. move_file_and_record_ingestion_state__callable (used in etl_{cdr_type} DAGs)

These callables are used in tasks init, archive and quarantine - they deal with moving files between various host locations and recording the state of the ingestion in flowdb. They make use of the model defined in etl/model.py. Different behaviour for each task is determined by the args from_dir and to_dir. Using names of various directories as the state associated to the etl process.

init -> from_dir='dump', to_dir='ingest' and state in etl.etl_records will be ingest

archive -> from_dir='ingest', to_dir='archive' and state in etl.etl_records will be archive

quarantine -> from_dir='ingest', to_dir='quarantine' and state in etl.etl_records will be quarantine

  1. success_branch__callable (used in etl_{cdr_type} DAGs)

This one is simple - checks if any downstream (or upsteam!? Whatevs! previous tasks) have failed - if so returns the string quarantine and follows the branch that deals with moving the file to quarantine location. If all tasks succeeded then returns the string archive and follows that path.

  1. trigger__callable (used in etl_sensor DAG)

This callable is used in the sense task of the etl_sensor. It looks in the dump location for files and then for each file it finds determines if the file needs to be processed and what configuration is needed for the etl DAGs

more to follow...

@josh-gree
Copy link
Contributor Author

josh-gree commented May 31, 2019

Don't actually have much to add :-(

Have tests for most (if not all...) functions in etl/tests and some integration like tests in tests/integration.

Obvs not being paid anymore so probably not going to devote much time to this PR from now on but since now OS you can comment and whatever and I will try to answer or make changes as an when I have time....

Much love Joshuaaaaaaaaaaa!

Copy link
Contributor

@maxalbert maxalbert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great foundation, thanks a lot @josh-gree! Great work on the readability of the code and approach to testing. I had a look through it this past week and am happy to merge this as is so we can build on it in future PRs.

@maxalbert maxalbert added the ready-to-merge Label indicating a PR is OK to automerge label Jun 13, 2019
@mergify mergify bot merged commit 9a74291 into master Jun 13, 2019
@mergify mergify bot deleted the flowetl-config branch June 13, 2019 10:26
@maxalbert maxalbert mentioned this pull request Jun 13, 2019
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
FlowETL ready-to-merge Label indicating a PR is OK to automerge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants