Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data-tool/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ OUT ?=
RESET_EXTRACT_POSTGRES ?=
run-extract-load:
. $(VENV_DIR)/bin/activate && \
python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" $(if $(strip $(OUT)), --out "$(OUT)",) --run-dbschemacli --dbschemacli-cmd "$${DBSCHEMACLI_CMD:-dbschemacli}"
python flows/refresh_extract_subset_flow.py --mode load

run-extract-refresh:
. $(VENV_DIR)/bin/activate && \
python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" $(if $(strip $(OUT)), --out "$(OUT)",) --mode refresh --run-dbschemacli --dbschemacli-cmd "$${DBSCHEMACLI_CMD:-dbschemacli}"
python flows/refresh_extract_subset_flow.py --mode refresh

run-update-colin-ar-ind: ## Run update COLIN AR indicator flow
. $(VENV_DIR)/bin/activate && \
Expand Down
12 changes: 6 additions & 6 deletions data-tool/flows/common/colin_queries.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@

from sqlalchemy import text


def get_updated_identifiers(timestamp: str, corp_list: str) -> str:
if not str(corp_list).strip():
raise ValueError('empty corp_list')
Expand All @@ -19,10 +22,9 @@ def get_updated_identifiers(timestamp: str, corp_list: str) -> str:
ORDER BY e.event_timestmp DESC, e.event_id DESC
) AS rn
FROM event e
JOIN (SELECT column_value AS corp_num
FROM corp_list c
JOIN corp_list c
ON c.corp_num = e.corp_num
WHERE e.event_timestmp > TIMESTAMP '{timestamp}' - INTERVAL '1' HOUR
WHERE e.event_timestmp > TIMESTAMP '{timestamp}' - INTERVAL '5' HOUR
AND NOT (
EXISTS (
SELECT 1
Expand All @@ -36,9 +38,7 @@ def get_updated_identifiers(timestamp: str, corp_list: str) -> str:
WHERE cea.corp_num = e.corp_num
)
)
)

SELECT le.EVENT_ID,
) SELECT le.EVENT_ID,
le.corp_num,
le.event_typ_cd,
le.event_timestmp,
Expand Down
45 changes: 45 additions & 0 deletions data-tool/flows/common/query_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,51 @@
import pandas as pd
from typing import Dict, Iterable, List, Sequence
import re

BC_PREFIX_RE = re.compile(r"^BC(\d+)$", re.IGNORECASE)

def convert_result_set_to_dict(rs):
df = pd.DataFrame(rs, columns=rs.keys())
result_dict = df.to_dict('records')
return result_dict

def corpnum_to_oracle_ids(target_ids: str | bytes | tuple | list | None) -> List[str]:
"""
Convert TARGET/Postgres corp ids into Oracle corporation.corp_num values.

For ids like BC0460007 -> 0460007
Otherwise leave as-is (A1234567 -> A1234567)

De-dupe while preserving order (avoid wasting Oracle IN-list slots).
"""
if target_ids is None:
return None

Check warning on line 22 in data-tool/flows/common/query_utils.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Return a value of type "list[str]" instead of "NoneType" or update function "corpnum_to_oracle_ids" type hint.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ4ie2x-KYRk1DYV3yFX&open=AZ4ie2x-KYRk1DYV3yFX&pullRequest=4388

if isinstance(target_ids, (tuple, list)) and len(target_ids) == 1:
target_ids = target_ids[0]

if isinstance(target_ids, bytes):
target_ids = target_ids.decode()

raw = str(target_ids).strip()
if not raw:
return None

Check warning on line 32 in data-tool/flows/common/query_utils.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Return a value of type "list[str]" instead of "NoneType" or update function "corpnum_to_oracle_ids" type hint.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ4ie2x-KYRk1DYV3yFY&open=AZ4ie2x-KYRk1DYV3yFY&pullRequest=4388
parsed = re.findall(r"'((?:''|[^'])*)'", raw)
if not parsed:
return None

Check warning on line 35 in data-tool/flows/common/query_utils.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Return a value of type "list[str]" instead of "NoneType" or update function "corpnum_to_oracle_ids" type hint.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ4ie2x-KYRk1DYV3yFZ&open=AZ4ie2x-KYRk1DYV3yFZ&pullRequest=4388
target_ids = [p.strip() for p in parsed if p.strip()]

out: List[str] = []
seen: set[str] = set()

for target_id in target_ids:
m = BC_PREFIX_RE.match(target_id)
oracle_id = m.group(1) if m else target_id

if oracle_id not in seen:
out.append(oracle_id)
seen.add(oracle_id)

if not out:
return None

Check warning on line 50 in data-tool/flows/common/query_utils.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Return a value of type "list[str]" instead of "NoneType" or update function "corpnum_to_oracle_ids" type hint.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ4ie2x-KYRk1DYV3yFa&open=AZ4ie2x-KYRk1DYV3yFa&pullRequest=4388
return ",".join("'" + x.replace("'", "''") + "'" for x in out)

Check warning on line 51 in data-tool/flows/common/query_utils.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Return a value of type "list[str]" instead of "str" or update function "corpnum_to_oracle_ids" type hint.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ4ie2x-KYRk1DYV3yFb&open=AZ4ie2x-KYRk1DYV3yFb&pullRequest=4388
106 changes: 76 additions & 30 deletions data-tool/flows/refresh_extract_subset_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@
from prefect.states import Failed
from flask import current_app
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from datetime import datetime, timezone
from config import get_named_config
from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers, get_updated_identifiers_for_batch
from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers_for_batch
from common.init_utils import colin_oracle_init, get_config
from common.query_utils import corpnum_to_oracle_ids

_REPO_ROOT = Path(__file__).resolve().parents[2]
_SCRIPT_PATH = _REPO_ROOT / 'data-tool' / 'scripts' / 'generate_cprd_subset_extract.py'
_GENERATED_DIR = _REPO_ROOT / 'data-tool' / 'scripts' / 'generated'
_DEFAULT_DDL = _REPO_ROOT / 'data-tool' / 'scripts' / 'colin_corps_extract_postgres_ddl'
_SUBSET = _GENERATED_DIR / 'subset_refresh.sql'

def _resolve_master_script_path(out: str | None) -> Path:
if out:
return Path(out).expanduser().resolve()
return _SUBSET.resolve()
if not out:
return _SUBSET.resolve()
p = Path(out).expanduser()
if p.is_absolute():
return p.resolve()
return (_REPO_ROOT / p).resolve()

def _run_cmd(argv: list[str], env: dict[str, str] | None = None) -> None:
r = subprocess.run(argv, cwd=str(_REPO_ROOT), capture_output=False, text=True, env=env)
Expand Down Expand Up @@ -65,8 +72,8 @@
def cleanup_extract_postgres_db() -> None:
_reset_extract_postgres_db()

@task(name='Get-Updated-Identifiers-Colin')
def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int) -> list[dict]:
@task(name='Get-Updated-Identifiers-Colin', cache_policy=NO_CACHE)
def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine) -> list[dict]:
"""
Get updated corp nums from colin with cutoff timestamp
"""
Expand All @@ -75,10 +82,11 @@
with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn:
row = conn.execute(text(mig_sql)).fetchone()

corp_list = row[0] if row else None
corp_list = corpnum_to_oracle_ids(row[0]) if row else None
# oracle_ids = corpnum_to_oracle_ids(corp_list)

Check warning on line 86 in data-tool/flows/refresh_extract_subset_flow.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this commented out code.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ4ie22AKYRk1DYV3yFc&open=AZ4ie22AKYRk1DYV3yFc&pullRequest=4388

colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list))
with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_ORACLE).connect() as conn:
with colin_oracle_engine.connect() as conn:
result = conn.execute(text(colin_sql))
rows = [dict(row) for row in result.mappings()]
return rows
Expand All @@ -94,6 +102,8 @@
pg_disable_method: str,
out: str | None,
include_cp: bool = False,
target_connection: str = 'ctst_pg',
prefix_numeric_bc: bool = False,
) -> subprocess.CompletedProcess:
"""
Generate Commands
Expand All @@ -115,13 +125,16 @@
'--pg-disable-method',
pg_disable_method,
]
argv.extend(['--target-connection', target_connection])
if pg_fastload:
argv.append('--pg-fastload')
if include_cp:
argv.append('--include-cp')
out_path = Path(out).expanduser().resolve() if out is not None else _SUBSET.resolve()
if prefix_numeric_bc:
argv.append('--prefix-numeric-bc')
out_path = _resolve_master_script_path(out)
out_path.parent.mkdir(parents=True, exist_ok=True)
argv.extend(['--out', str(out)])
argv.extend(['--out', str(out_path)])

return subprocess.run(
argv,
Expand All @@ -144,18 +157,19 @@
)

@flow(name='Extract-Subset-Flow', log_prints=True, persist_result=False)
def extract_pull_flow(

Check failure on line 160 in data-tool/flows/refresh_extract_subset_flow.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 31 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ4ie22AKYRk1DYV3yFd&open=AZ4ie22AKYRk1DYV3yFd&pullRequest=4388
corp_file: str,
mode: str = 'load',
chunk_size: int = 900,
threads: int = 4,
pg_fastload: bool = False,
pg_disable_method: str = 'replica_role',
pg_disable_method: str = 'table_triggers',
out: str | None=None,
run_dbschemacli: bool = False,
dbschemacli_cmd: str = 'dbschemacli',
reset_extract_postgres: bool = True,
include_cp: bool = False,
target_connection: str = 'ctst_pg',
) -> None:
"""
Generate files
Expand All @@ -165,25 +179,56 @@
print('Running in refresh mode: skipping Postgres DB reset')
if reset_extract_postgres:
cleanup_extract_postgres_db()

cutoff = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

config = get_config()
colin_oracle_engine = colin_oracle_init(config)
# Get Identifiers
feed_path: Path | None = None
if mode == 'refresh':
updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=1, colin_oracle_engine=colin_oracle_engine)
print(f'Colin updated identifiers : {len(updated_rows)} rows')
_GENERATED_DIR.mkdir(parents=True, exist_ok=True)
feed_path = _GENERATED_DIR / f'refresh_corp_feed_{os.getpid()}.tmp'
seen = set()
lines = []
for row in updated_rows:
for k, v in row.items():
if k is None or v is None:
continue
if str(k).lower() == 'corp_num':
c = str(v).strip()
if c and c not in seen:
seen.add(c)
lines.append(c)
break
if not lines:
raise ValueError('refresh: no corp_num in updated_rows')
feed_path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
corp_file = str(feed_path)
result: subprocess.CompletedProcess | None = None
print(f'Running CPRD subset extract generator {corp_file}')
result = run_cprd_subset_extract_generator(
corp_file=corp_file,
mode=mode,
chunk_size=chunk_size,
threads=threads,
pg_fastload=pg_fastload,
include_cp=include_cp,
pg_disable_method=pg_disable_method,
out=out,
)
if result.returncode != 0:
try:
result = run_cprd_subset_extract_generator(
corp_file=corp_file,
mode=mode,
chunk_size=chunk_size,
threads=threads,
pg_fastload=pg_fastload,
include_cp=include_cp,
pg_disable_method=pg_disable_method,
out=out,
target_connection=target_connection,
prefix_numeric_bc=(mode=='refresh'),
)
finally:
if feed_path is not None:
feed_path.unlink(missing_ok=True)
if result.returncode != 0 and result is not None:
raise RuntimeError(f'Generator exited with code {result.returncode}')
print(f'generator completed successfully')

cutoff = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=128)
print(f'Colin updated identifiers : {len(updated_rows)} rows')

if run_dbschemacli:
master_script = _resolve_master_script_path(out=out)
run_result = run_dbschemacli_task(
Expand All @@ -197,15 +242,16 @@

if __name__ == '__main__':
p = argparse.ArgumentParser(description='Run Extract-Pull flow....')
p.add_argument('corp_file', help='Path to newline-delimited corp identifiers')
p.add_argument('--corp_file', default='../data-tool/scripts/generated/delta_ctst.txt', help='Path to newline-delimited corp identifiers')
p.add_argument('--mode', default='load', choices=('refresh', 'load'))
p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.')
p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads')
p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load')
p.add_argument('--include-cp', action='store_true', help='Include corp type CP in subset extract queries')
p.add_argument('--pg-disable-method', default='replica_role', choices=('table_triggers', 'replica_role'))
p.add_argument('--out', default=None, help='Output path for generated master script.')
p.add_argument('--run-dbschemacli', action='store_true')
p.add_argument('--pg-disable-method', default='table_triggers', choices=('table_triggers', 'replica_role'))
p.add_argument('--out', default='data-tool/scripts/subset/generated/subset_refresh.sql', help='Output path for generated master script.')
p.add_argument('--run-dbschemacli', action='store_false')
p.add_argument('--dbschemacli-cmd', default='dbschemacli')
p.add_argument('--reset-extract-postgres', action='store_false')
p.add_argument('--target-connection', default='ctst_pg')
extract_pull_flow(**vars(p.parse_args()))