Skip to content

Commit

Permalink
Merge pull request #22 from MITLibraries/HRQB-17-functional-tasks-and…
Browse files Browse the repository at this point in the history
…-pipelines-prep

HRQB 17 - Prep for functional tasks and pipelines
  • Loading branch information
ghukill committed May 22, 2024
2 parents 86810bb + 04d1796 commit 1ee91d6
Show file tree
Hide file tree
Showing 20 changed files with 383 additions and 24 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ See additional diagrams and documentation in the [docs](docs) folder:
SENTRY_DSN=# If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
WORKSPACE=# Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
LUIGI_CONFIG_PATH=hrqb/luigi.cfg # this env var must be set, pointing to config file in hrqb folder
QUICKBASE_API_URL=# Quickbase API base URL
QUICKBASE_API_TOKEN=# Quickbase API token
QUICKBASE_APP_ID=# Quickbase App ID
DATA_WAREHOUSE_CONNECTION_STRING=# Data Warehouse SQLAlchemy connection string, e.g. oracle+oracledb://user1:pass1@example.org:1521/ABCDE
Expand All @@ -42,7 +43,8 @@ DATA_WAREHOUSE_CONNECTION_STRING=# Data Warehouse SQLAlchemy connection string,

```shell
DYLD_LIBRARY_PATH=/usr/local/lib:$DYLD_LIBRARY_PATH # used when developing on arm64 architecture + Rosetta2 environment
TARGETS_DIRECTORY=#Location to store Task Targets, overriding application default of "output"
TARGETS_DIRECTORY=# Location to store Task Targets, overriding application default of "output"
LUIGI_NUM_WORKERS=# Number of processes for luigi to run tasks in parallel. If not set, defaults to 1 in application.
```

## CLI Commands
Expand Down
12 changes: 12 additions & 0 deletions hrqb/base/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ def read(self) -> pd.DataFrame:
def write(self, df: pd.DataFrame) -> None:
df.to_pickle(self.path)

@property
def records_count(self) -> int | None:
if self.exists():
return len(self.read())
return None


class QuickbaseTableTarget(HRQBLocalTarget):
"""Target is upsert to Quickbase table."""
Expand All @@ -36,3 +42,9 @@ def read(self) -> dict:
def write(self, data: dict, indent: bool = True) -> int: # noqa: FBT001, FBT002
with open(self.path, "w") as f:
return f.write(json.dumps(data, indent=indent))

@property
def records_count(self) -> int | None:
if self.exists():
return self.read()["metadata"]["totalNumberOfRecordsProcessed"]
return None
16 changes: 15 additions & 1 deletion hrqb/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ def run(self) -> None:
Because Load Tasks (upserting data to Quickbase) are so uniform, this run method
can be defined on this base class. All data required for this operation exists
on the Task: data from parent Transform class and QB table name.
Partial successes are possible for Quickbase upserts. This method will log
warnings when detected in API response, but task will be considered complete and
ultimately successful.
"""
records = self.get_records()

Expand All @@ -208,6 +212,11 @@ def run(self) -> None:
)
results = qbclient.upsert_records(upsert_payload)

# log warning, but consider task complete if some errors present in API response
if api_errors := results.get("metadata", {}).get("lineErrors"):
message = f"Quickbase API call completed but had errors: {api_errors}"
logger.warning(message)

self.target.write(results)


Expand All @@ -229,9 +238,14 @@ def requires(self):
CLI.
"""

parent_pipeline_name = luigi.OptionalStrParameter(default=None, significant=False)

@property
def pipeline_name(self) -> str:
return self.__class__.__name__
output = self.__class__.__name__
if self.parent_pipeline_name:
output = f"{self.parent_pipeline_name}__{output}"
return output

@staticmethod
def init_task_from_class_path(
Expand Down
2 changes: 2 additions & 0 deletions hrqb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ class Config:
"WORKSPACE",
"SENTRY_DSN",
"LUIGI_CONFIG_PATH",
"QUICKBASE_API_URL",
"QUICKBASE_API_TOKEN",
"QUICKBASE_APP_ID",
"DATA_WAREHOUSE_CONNECTION_STRING",
)
OPTIONAL_ENV_VARS = (
"DYLD_LIBRARY_PATH",
"TARGETS_DIRECTORY",
"LUIGI_NUM_WORKERS",
)

def check_required_env_vars(self) -> None:
Expand Down
4 changes: 4 additions & 0 deletions hrqb/tasks/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import luigi # type: ignore[import-untyped]
from luigi.execution_summary import LuigiRunResult # type: ignore[import-untyped]

from hrqb.config import Config


def run_pipeline(pipeline_task: luigi.WrapperTask) -> LuigiRunResult:
"""Function to run a HRQBPipelineTask via luigi runner."""
return luigi.build(
[pipeline_task],
local_scheduler=True,
detailed_summary=True,
workers=Config().LUIGI_NUM_WORKERS or 1,
)
Empty file removed hrqb/utils/db.py
Empty file.
69 changes: 63 additions & 6 deletions hrqb/utils/quickbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import json
import logging
from collections.abc import Callable
from collections.abc import Callable, Iterator

import pandas as pd
import requests
from attrs import define, field
from requests.exceptions import RequestException

from hrqb.config import Config
from hrqb.exceptions import QBFieldNotFoundError
Expand All @@ -18,7 +19,7 @@

@define
class QBClient:
api_base: str = field(default="https://api.quickbase.com/v1")
api_base: str = field(factory=lambda: Config().QUICKBASE_API_URL)
cache_results: bool = field(default=True)
_cache: dict = field(factory=dict, repr=False)

Expand Down Expand Up @@ -50,15 +51,25 @@ def make_request(
return self._cache[request_hash]

# make API call
results = requests_method(
response = requests_method(
f"{self.api_base}/{path.removeprefix('/')}",
headers=self.request_headers,
**kwargs,
).json()
)

# handle non 2xx responses
if not 200 <= response.status_code < 300: # noqa: PLR2004
message = (
f"Quickbase API error - status {response.status_code}, "
f"content: {response.text}"
)
raise RequestException(message)

data = response.json()
if self.cache_results:
self._cache[request_hash] = results
self._cache[request_hash] = data

return results
return data

def get_app_info(self) -> dict:
"""Retrieve information about the QB app.
Expand Down Expand Up @@ -151,3 +162,49 @@ def map_and_format_records_for_upsert(
raise QBFieldNotFoundError(message)
mapped_records.append(mapped_record)
return mapped_records

def query_records(self, query: dict) -> dict:
"""Query for Table Records.
https://developer.quickbase.com/operation/runQuery
"""
return self.make_request(requests.post, "records/query", json=query)

def query_records_mapped_fields_iter(self, query: dict) -> Iterator[dict]:
"""Query for records, yielding records with fields mapped to their labels."""
response = self.make_request(requests.post, "records/query", json=query)
field_map = {f["id"]: f["label"] for f in response["fields"]}
for record in response["data"]:
yield {
field_map[int(field_id)]: field["value"]
for field_id, field in record.items()
}

def get_table_as_df(
self,
table_id: str,
fields: list | None = None,
) -> pd.DataFrame:
"""Retrieve all records for a table as a DataFrame.
If arg 'fields' if passed, results will be limited to only those fields from the
table.
Additionally, by relying on self.query_records_mapped_fields_iter() to iteratively
yield records, this method is safe for large Quickbase tables.
"""
table_fields_df = self.get_table_fields(table_id)
if fields:
table_fields_df = table_fields_df[table_fields_df.label.isin(fields)]

records = self.query_records_mapped_fields_iter(
{
"from": table_id,
"select": list(table_fields_df.id),
}
)

return pd.DataFrame(
records,
columns=table_fields_df.label,
)
79 changes: 76 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import shutil
from unittest import mock

import luigi
import pandas as pd
Expand All @@ -21,7 +22,7 @@
SQLQueryWithParameters,
)
from tests.fixtures.tasks.load import LoadAnimals
from tests.fixtures.tasks.pipelines import Animals, AnimalsDebug
from tests.fixtures.tasks.pipelines import Animals, AnimalsDebug, Creatures
from tests.fixtures.tasks.transform import PrepareAnimals


Expand All @@ -30,6 +31,7 @@ def _test_env(monkeypatch, targets_directory, data_warehouse_connection_string):
monkeypatch.setenv("SENTRY_DSN", "None")
monkeypatch.setenv("WORKSPACE", "test")
monkeypatch.setenv("LUIGI_CONFIG_PATH", "hrqb/luigi.cfg")
monkeypatch.setenv("QUICKBASE_API_URL", "http://qb.example.org/v1")
monkeypatch.setenv("QUICKBASE_API_TOKEN", "qb-api-acb123")
monkeypatch.setenv("QUICKBASE_APP_ID", "qb-app-def456")
monkeypatch.setenv("TARGETS_DIRECTORY", str(targets_directory))
Expand Down Expand Up @@ -142,12 +144,12 @@ def task_load_animals(pipeline_name):


@pytest.fixture
def task_pipeline_animals(pipeline_name):
def task_pipeline_animals():
return Animals()


@pytest.fixture
def task_pipeline_animals_debug(pipeline_name):
def task_pipeline_animals_debug():
return AnimalsDebug()


Expand All @@ -156,6 +158,11 @@ def task_extract_sql_query_with_parameters(pipeline_name):
return SQLQueryWithParameters(pipeline=pipeline_name)


@pytest.fixture
def task_pipeline_creatures():
return Creatures()


@pytest.fixture
def task_extract_animal_names_target(targets_directory, task_extract_animal_names):
shutil.copy(
Expand Down Expand Up @@ -272,6 +279,72 @@ def mocked_qb_api_upsert(
return api_response


@pytest.fixture
def mocked_query_all_fields_payload():
return {
"from": "bck7gp3q2",
"select": [6, 7, 8],
}


@pytest.fixture
def mocked_query_some_fields_payload():
return {
"from": "bck7gp3q2",
"select": [6, 7],
}


@pytest.fixture
def mocked_qb_api_runQuery_select_all_fields(
qbclient, mocked_table_id, mocked_query_all_fields_payload, global_requests_mock
):
url = f"{qbclient.api_base}/records/query"
with open("tests/fixtures/qb_api_responses/runQuery_all_fields.json") as f:
api_response = json.load(f)
global_requests_mock.register_uri(
"POST",
url,
additional_matcher=lambda req: req.json() == mocked_query_all_fields_payload,
json=api_response,
)
return api_response


@pytest.fixture
def mocked_qb_api_runQuery_select_some_fields(
qbclient, mocked_table_id, mocked_query_some_fields_payload, global_requests_mock
):
url = f"{qbclient.api_base}/records/query"
with open("tests/fixtures/qb_api_responses/runQuery_some_fields.json") as f:
api_response = json.load(f)
global_requests_mock.register_uri(
"POST",
url,
additional_matcher=lambda req: req.json() == mocked_query_some_fields_payload,
json=api_response,
)
return api_response


@pytest.fixture
def mocked_query_table_fields():
return pd.DataFrame(
[
{"label": "Full Name", "id": 6},
{"label": "Amount", "id": 7},
{"label": "Date time", "id": 8},
]
)


@pytest.fixture
def qbclient_with_mocked_table_fields(qbclient, mocked_query_table_fields):
with mock.patch.object(type(qbclient), "get_table_fields") as mocked_table_fields:
mocked_table_fields.return_value = mocked_query_table_fields
yield qbclient


@pytest.fixture
def mocked_transform_pandas_target(tmpdir, mocked_table_name, mocked_upsert_data):
target = PandasPickleTarget(
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/cli/status_all_tasks_incomplete.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
├── INCOMPLETE: Animals()
├── INCOMPLETE: LoadAnimals(pipeline=Animals, stage=Load, table_name=Animals)
├── INCOMPLETE: PrepareAnimals(pipeline=Animals, stage=Transform, table_name=Animals)
├── INCOMPLETE: ExtractAnimalColors(table_name=, pipeline=Animals, stage=Extract)
├── INCOMPLETE: ExtractAnimalNames(table_name=, pipeline=Animals, stage=Extract)
├── INCOMPLETE: ExtractAnimalColors(pipeline=Animals, table_name=, stage=Extract)
├── INCOMPLETE: ExtractAnimalNames(pipeline=Animals, table_name=, stage=Extract)
4 changes: 2 additions & 2 deletions tests/fixtures/cli/status_extract_tasks_complete.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
├── INCOMPLETE: Animals()
├── INCOMPLETE: LoadAnimals(pipeline=Animals, stage=Load, table_name=Animals)
├── INCOMPLETE: PrepareAnimals(pipeline=Animals, stage=Transform, table_name=Animals)
├── COMPLETE: ExtractAnimalColors(table_name=, pipeline=Animals, stage=Extract)
├── COMPLETE: ExtractAnimalNames(table_name=, pipeline=Animals, stage=Extract)
├── COMPLETE: ExtractAnimalColors(pipeline=Animals, table_name=, stage=Extract)
├── COMPLETE: ExtractAnimalNames(pipeline=Animals, table_name=, stage=Extract)
38 changes: 38 additions & 0 deletions tests/fixtures/qb_api_responses/runQuery_all_fields.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"data": [
{
"6": {
"value": "Andre Harris"
},
"7": {
"value": 10
},
"8": {
"value": "2019-12-18T08:00:00Z"
}
}
],
"fields": [
{
"id": 6,
"label": "Full Name",
"type": "text"
},
{
"id": 7,
"label": "Amount",
"type": "numeric"
},
{
"id": 8,
"label": "Date time",
"type": "date time"
}
],
"metadata": {
"totalRecords": 1,
"numRecords": 1,
"numFields": 3,
"skip": 0
}
}
Loading

0 comments on commit 1ee91d6

Please sign in to comment.