Skip to content

Commit

Permalink
Vincent/create analytics actions view (#459)
Browse files Browse the repository at this point in the history
Resolve #458
  • Loading branch information
VincentAntoine committed Apr 27, 2023
2 parents 5da5b8d + b6f215c commit d826d2e
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
CREATE MATERIALIZED VIEW public.analytics_actions AS

SELECT
a.id,
a.mission_id,
action_start_datetime_utc,
EXTRACT(year FROM action_start_datetime_utc) AS year,
m.start_datetime_utc AS mission_start_datetime_utc,
m.end_datetime_utc AS mission_end_datetime_utc,
mission_type,
action_type,
COALESCE(m.facade, 'Hors fa莽ade') AS facade,
cu.name AS control_unit,
adm.name AS administration,
CASE WHEN COALESCE(t1->>'theme', '') = '' THEN 'Aucun th猫me' ELSE t1->>'theme' END AS theme_level_1,
CASE WHEN TRIM(BOTH '"' FROM (COALESCE(theme_level_2, 'null')::VARCHAR)) IN ('', 'null') THEN 'Aucun sous-th猫me' ELSE TRIM(BOTH '"' FROM (theme_level_2::VARCHAR)) END AS theme_level_2,
CASE WHEN action_type = 'CONTROL' THEN ST_X(geom_element.geom) END AS longitude,
CASE WHEN action_type = 'CONTROL' THEN ST_Y(geom_element.geom) END AS latitude,
CASE WHEN action_type = 'CONTROL' THEN CASE WHEN jsonb_array_length(a.value->'infractions') > 0 THEN true ELSE false END END AS infraction,
(a.value->'actionNumberOfControls')::DOUBLE PRECISION AS number_of_controls,
CASE WHEN action_type = 'SURVEILLANCE' THEN COALESCE((a.value->>'duration')::DOUBLE PRECISION, EXTRACT(epoch FROM m.end_datetime_utc - m.start_datetime_utc) / 3600) END AS surveillance_duration
FROM env_actions a
LEFT JOIN ST_Dump(a.geom) AS geom_element
ON true
LEFT JOIN LATERAL jsonb_array_elements(a.value->'themes') t1 ON true
LEFT JOIN LATERAL jsonb_array_elements(t1->'subThemes') theme_level_2 ON true
JOIN missions m
ON a.mission_id = m.id
LEFT JOIN LATERAL unnest(mission_types) mission_type ON true
LEFT JOIN missions_control_units mcu
ON mcu.mission_id = m.id
LEFT JOIN control_units cu
ON cu.id = mcu.control_unit_id
LEFT JOIN administrations adm
ON adm.id = cu.administration_id
WHERE
NOT m.deleted AND
m.closed AND
action_type IN ('CONTROL', 'SURVEILLANCE')
ORDER BY action_start_datetime_utc DESC;

CREATE INDEX ON analytics_actions USING BRIN(action_start_datetime_utc);
33 changes: 33 additions & 0 deletions datascience/src/pipeline/flows/refresh_materialized_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pathlib import Path

import pandas as pd
from prefect import Flow, Parameter, case, task
from prefect.executors import LocalDaskExecutor
from sqlalchemy import Table, text

from src.db_config import create_engine
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.infrastructure import get_table


@task(checkpoint=False)
def refresh_view(view: Table) -> pd.DataFrame:

assert isinstance(view, Table)

query = text(f"REFRESH MATERIALIZED VIEW {view.schema}.{view.name}")
e = create_engine("monitorenv_remote")
e.execute(query)


with Flow("Refresh materialized view", executor=LocalDaskExecutor()) as flow:

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):

view_name = Parameter("view_name")
schema = Parameter("schema", "public")
view = get_table(table_name=view_name, schema=schema)
refresh_view(view)

flow.file_name = Path(__file__).name
14 changes: 13 additions & 1 deletion datascience/src/pipeline/flows_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dotenv import dotenv_values
from prefect.executors.dask import LocalDaskExecutor
from prefect.run_configs.docker import DockerRun
from prefect.schedules import CronSchedule
from prefect.schedules import CronSchedule, Schedule, clocks
from prefect.storage.local import Local

from config import (
Expand All @@ -18,11 +18,22 @@
fao_areas,
historic_control_units,
natinfs,
refresh_materialized_view,
regulations,
)

################################ Define flow schedules ################################

refresh_materialized_view.flow.schedule = Schedule(
clocks=[
clocks.CronClock(
"30 12 * * *",
parameter_defaults={
"view_name": "analytics_actions",
},
),
]
)
regulations.flow.schedule = CronSchedule("6,16,26,36,46,56 * * * *")

###################### List flows to register with prefect server #####################
Expand All @@ -33,6 +44,7 @@
fao_areas.flow,
historic_control_units.flow,
regulations.flow,
refresh_materialized_view.flow,
natinfs.flow,
]

Expand Down
35 changes: 35 additions & 0 deletions datascience/src/pipeline/shared_tasks/infrastructure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import prefect
from prefect import task
from sqlalchemy import Table

from src.db_config import create_engine
from src.pipeline import utils


@task(checkpoint=False)
def get_table(
table_name: str,
schema: str = "public",
database: str = "monitorenv_remote",
) -> Table:
"""
Returns a `Table` representing the specified table.
Args:
table_name (str): Name of the table
schema (str, optional): Schema of the table. Defaults to "public".
database (str, optional): Database of the table, can be 'monitorenv_remote'
or 'monitorfish_local'. Defaults to "monitorenv_remote".
Returns:
Table: `sqlalchemy.Table` representing the specified table.
"""

logger = prefect.context.get("logger")

return utils.get_table(
table_name,
schema=schema,
conn=create_engine(database),
logger=logger,
)
2 changes: 1 addition & 1 deletion datascience/src/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_table(

try:
logger.info(f"Searching for table {schema}.{table_name}...")
meta.reflect(only=[table_name])
meta.reflect(only=[table_name], views=True)
table = Table(table_name, meta, mustexist=True)
logger.info(f"Table {schema}.{table_name} found.")
except InvalidRequestError:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW public.analytics_actions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pandas as pd

from src.db_config import create_engine
from src.pipeline.flows.refresh_materialized_view import flow
from src.read_query import read_query
from tests.mocks import mock_check_flow_not_running

flow.replace(
flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running
)


def test_refresh_analytics_actions(reset_test_data):

e = create_engine("monitorenv_remote")
query = """
SELECT *
FROM analytics_actions
ORDER BY id, control_unit
"""

initial_actions = read_query("monitorenv_remote", query)

e.execute("DELETE FROM env_actions WHERE mission_id = 12")

actions_before_refresh = read_query("monitorenv_remote", query)

flow.schedule = None
state = flow.run(view_name="analytics_actions", schema="public")

assert state.is_successful()

actions_after_refresh = read_query("monitorenv_remote", query)

assert len(initial_actions) == 7
assert len(actions_before_refresh) == 7
assert len(actions_after_refresh) == 1

pd.testing.assert_frame_equal(initial_actions, actions_before_refresh)
pd.testing.assert_frame_equal(
initial_actions.query("mission_id != 12").reset_index(drop=True),
actions_after_refresh,
check_dtype=False,
)

0 comments on commit d826d2e

Please sign in to comment.