Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Location ids table and locatable events QA check #6164

Merged
merged 8 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Added

- FlowETL now updates a new table `events.location_ids` each time a new day of CDR data is ingested, to record the first and last date that each location ID appears in the data. [#5376](https://github.com/Flowminder/FlowKit/issues/5376)
- New FlowETL QA check "count_locatable_events", which counts the number of added rows with location ID corresponding to a cell with a known location. [#5289](https://github.com/Flowminder/FlowKit/issues/5289)

### Changed

### Fixed
Expand Down
15 changes: 15 additions & 0 deletions flowdb/bin/build/0020_schema_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ cause a great toll to the ingestion process. IDs are not
mandatory, but a number of features created by `flowmachine`
require this field.

This schema also includes a table 'location_ids', for
recording all location IDs that appear in the CDR
with the earliest and latest date that each location ID
appears. The data ingestion pipeline defined in FlowETL
includes a task that updates the 'location_ids' table when
new CDR records are ingested.

-----------------------------------------------------------
*/
CREATE SCHEMA IF NOT EXISTS events;
Expand Down Expand Up @@ -150,3 +157,11 @@ CREATE SCHEMA IF NOT EXISTS events;
country_code NUMERIC

);

CREATE TABLE IF NOT EXISTS events.location_ids (
location_id TEXT,
cdr_type TEXT,
first_active_date DATE,
last_active_date DATE,
PRIMARY KEY(location_id, cdr_type)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from flowetl.mixins.fixed_sql_mixin import fixed_sql_operator

UpdateLocationIDsTableOperator = fixed_sql_operator(
class_name="UpdateLocationIDsTableOperator",
sql="""
INSERT INTO events.location_ids (location_id, cdr_type, first_active_date, last_active_date)
SELECT location_id,
'{{ params.cdr_type }}' AS cdr_type,
'{{ ds }}'::DATE AS first_active_date,
'{{ ds }}'::DATE AS last_active_date
FROM {{ final_table }}
WHERE location_id IS NOT NULL
GROUP BY location_id
ON CONFLICT (location_id, cdr_type)
DO UPDATE SET first_active_date = least(EXCLUDED.first_active_date, events.location_ids.first_active_date),
last_active_date = greatest(EXCLUDED.last_active_date, events.location_ids.last_active_date);
""",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SELECT
count(*)
FROM
{{ final_table }} ev
INNER JOIN infrastructure.cells cl
ON ev.location_id = cl.id
WHERE '{{ ds }}'::date BETWEEN coalesce(cl.date_of_first_service, '-infinity'::timestamptz)
AND coalesce(cl.date_of_last_service, 'infinity'::timestamptz)
AND cl.geom_point NOTNULL
7 changes: 7 additions & 0 deletions flowetl/flowetl/flowetl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ def create_dag(
)
from flowetl.operators.extract_from_view_operator import ExtractFromViewOperator
from flowetl.operators.update_etl_table_operator import UpdateETLTableOperator
from flowetl.operators.update_location_ids_table_operator import (
UpdateLocationIDsTableOperator,
)
from flowetl.sensors.data_present_sensor import DataPresentSensor
from flowetl.sensors.file_flux_sensor import FileFluxSensor
from flowetl.sensors.table_flux_sensor import TableFluxSensor
Expand Down Expand Up @@ -367,6 +370,9 @@ def create_dag(
task_id="analyze_parent", target="{{ parent_table }}", pool="postgres_etl"
)
update_records = UpdateETLTableOperator(task_id="update_records")
update_location_ids_table = UpdateLocationIDsTableOperator(
task_id="update_location_ids_table"
)

from_stage = extract

Expand All @@ -389,6 +395,7 @@ def create_dag(
)
attach >> [
update_records,
update_location_ids_table,
*get_qa_checks(additional_qa_check_paths=additional_qa_check_paths),
]
globals()[dag_id] = dag
Expand Down
4 changes: 4 additions & 0 deletions flowetl/tests/integration/test_dags_present.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def test_dags_present():
"latest_timestamp",
"count_imeis",
"count_imsis",
"count_locatable_events",
"count_locatable_location_ids",
"count_null_imeis",
"count_null_imsis",
Expand All @@ -68,6 +69,7 @@ def test_dags_present():
"create_staging_view",
"extract",
"update_records",
"update_location_ids_table",
"wait_for_data",
},
),
Expand All @@ -90,6 +92,7 @@ def test_dags_present():
"latest_timestamp",
"count_imeis",
"count_imsis",
"count_locatable_events",
"count_locatable_location_ids",
"count_null_imeis",
"count_null_imsis",
Expand All @@ -108,6 +111,7 @@ def test_dags_present():
"create_staging_view",
"extract",
"update_records",
"update_location_ids_table",
"wait_for_data",
},
),
Expand Down
21 changes: 20 additions & 1 deletion flowetl/tests/integration/test_full_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_file_pipeline(
6. Table is indexed
7. Table is clustered
8. Table is date constrained
9. All location IDs today are in events.location_ids table

"""
exit_code, output = run_dag(dag_id="filesystem_dag", exec_date="2016-03-01")
Expand Down Expand Up @@ -106,10 +107,28 @@ def test_file_pipeline(
etl_meta_query = "SELECT EXISTS(SELECT * FROM etl.etl_records WHERE cdr_date='2016-03-01' AND state='ingested' and cdr_type='calls');"
assert flowdb_transaction.execute(etl_meta_query).fetchall()[0][0]

# Check all location IDs today are in events.location_ids table

location_ids_query = """
SELECT NOT EXISTS (
SELECT location_id
FROM events.calls_20160301
LEFT JOIN (
SELECT location_id
FROM events.location_ids
WHERE cdr_type = 'calls'
AND '2016-03-01'::date BETWEEN first_active_date AND last_active_date
) active_location_ids
USING (location_id)
WHERE active_location_ids.location_id IS NULL
)
"""
assert flowdb_transaction.execute(location_ids_query).fetchall()[0][0]

# Check qa checks

qa_check_query = "SELECT count(*) from etl.post_etl_queries WHERE cdr_date='2016-03-01' AND cdr_type='calls'"
assert flowdb_transaction.execute(qa_check_query).fetchall()[0][0] == 23
assert flowdb_transaction.execute(qa_check_query).fetchall()[0][0] == 24


def test_file_pipeline_bad_file(
Expand Down
28 changes: 28 additions & 0 deletions flowetl/tests/integration/test_qa.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,34 @@ def test_count_locatable_location_ids(cdr_type, flowdb_transaction, jinja_env):
assert check_result == 1


@pytest.mark.parametrize("cdr_type", ["calls", "sms", "mds", "topups"])
def test_count_locatable_events(cdr_type, flowdb_transaction, jinja_env):
create_sql = f"""CREATE TABLE IF NOT EXISTS events.{cdr_type}_20160101 (LIKE events.{cdr_type});"""
insert_sql = f"""INSERT INTO events.{cdr_type}_20160101(datetime, msisdn, location_id) VALUES
('2016-01-01 00:01:00'::timestamptz, '{"A" * 64}', '{"B" * 64}'),
('2016-01-01 00:01:00'::timestamptz, '{"A" * 64}', '{"B" * 64}'),
('2016-01-01 00:01:00'::timestamptz, '{"A" * 64}', '{"C" * 64}'),
('2016-01-01 00:01:00'::timestamptz, '{"A" * 64}', '{"D" * 64}')"""
cells_sql = f"""
INSERT INTO
infrastructure.cells (id, version, date_of_first_service, date_of_last_service, geom_point)
VALUES
('{"B" * 64}', 0, NULL, NULL, 'POINT(0 0)'),
('{"C" * 64}', 0, NULL, '2016-01-02'::date, NULL),
('{"C" * 64}', 1, '2016-01-02'::date, NULL, 'POINT(0 0)')
"""
flowdb_transaction.execute(create_sql)
flowdb_transaction.execute(insert_sql)
flowdb_transaction.execute(cells_sql)
check_sql = jinja_env.get_template("count_locatable_events.sql").render(
cdr_type=cdr_type, final_table=f"events.{cdr_type}_20160101", ds="2016-01-01"
)

check_result, *_ = list(flowdb_transaction.execute(check_sql))[0]

assert check_result == 2


@pytest.mark.parametrize("cdr_type", ["calls", "sms", "mds", "topups"])
def test_count_msisdns(cdr_type, flowdb_transaction, jinja_env):
create_sql = f"""CREATE TABLE IF NOT EXISTS events.{cdr_type}_20160101 (LIKE events.{cdr_type});"""
Expand Down
Loading