Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Harvesting Logic repo to incorporate feedback changes #4577

Closed
26 tasks
btylerburton opened this issue Jan 4, 2024 · 16 comments
Closed
26 tasks

Refactor Harvesting Logic repo to incorporate feedback changes #4577

btylerburton opened this issue Jan 4, 2024 · 16 comments
Assignees
Labels
H2.0/Harvest-Runner Harvest Source Processing for Harvesting 2.0

Comments

@btylerburton
Copy link
Contributor

btylerburton commented Jan 4, 2024

User Story

In order to incorporate feedback from the most recent design sessions, changes need to be made to the datagov-harvesting-logic (DHL) repo in order to fully test a DCAT-US record end-to-end.

Acceptance Criteria

[ACs should be clearly demoable/verifiable whenever possible. Try specifying them using BDD.]

  • GIVEN DHL has been supplied as a dependency to an Airflow instance
    AND an instance of the ETL-Pipeline has been invoked
    WHEN the extract method is called with a valid harvest source config
    THEN I would like to generate a single Source class which contains incoming source records, the corresponding records for that harvest source in CKAN, and a list of lifecycle methods to perform ETL operations with.

  • GIVEN the extract process has extracted the incoming records and their counterparts in CKAN
    THEN I would like to compare the incoming records with the CKAN records and generate lists of items to: CREATE, UPDATE and DESTROY
    AND to return a count, as a dict or a list, of the number of datasets in each.

  • GIVEN I have called the delete method using a valid harvest source config
    THEN I expect the DHL to attempt to delete each
    AND to return the success/failure metrics of that operation

  • GIVEN I have called the validate method using a valid harvest source config
    THEN I expect the DHL to attempt to validate each item against the current DCAT-US 1.1 schema
    AND to return the success/failure metrics of that operation to Airflow

  • GIVEN I have called the load method using a valid harvest source config
    THEN I expect the DHL to attempt to load each item to be created or updated into CKAN using package_create
    AND to return success/failure metrics of that operation to Airflow

Background

[Any helpful contextual notes or links to artifacts/evidence, if needed]

Security Considerations (required)

[Any security concerns that might be implicated in the change. "None" is OK, just be explicit here!]

Sketch

  • Refactor DHL to accept a CKAN instance URL as an environment variable
  • Refactor DHL to use Source and Record Classes
  • Refactor the Source class to include the lifecycle methods below:
    • Compare
    • Delete
    • Validate
    • Load
  • Refactor Extract to pull down source records and their corresponding CKAN records
  • Refactor Compare to generate lists of records to CREATE, UPDATE & DELETE, and to return a count, as a dict or a list, of the number of datasets in each.
  • Refactor Delete
    • To accept a list of records
    • Make a RESTful call to the appropriate CKAN instance
    • And return back success/fail metrics of that operation
  • Refactor Validate
    • To accept a list of records
    • Validate the records against the current DCAT-US 1.1 schema
    • And to return back success/fail metrics of that operation
  • Refactor Load
    • To accept a list of records
    • Make a RESTful call to the appropriate CKAN instance
    • And to return back success/fail metrics of that operation

Reference

diagram

@rshewitt rshewitt self-assigned this Jan 5, 2024
@rshewitt
Copy link
Contributor

rshewitt commented Jan 5, 2024

@jbrown-xentity and I spoke about the load function and we think we can nix it in the harvesting logic repo. in other words, a dag won't be calling something like harvester.load(*args). instead, the load function will reside in the dag itself. it could look something like this @btylerburton

def load( harvest_datasets, operation ):
  ckan = harvester.create_ckan_entrypoint( ckan_url, api_key ) 
  operations = { 
    "delete": harvester.purge_ckan_package,
    "create": harvester.create_ckan_package,
    "update": harvester.update_ckan_package
  } 
  for dataset in harvest_datasets:
    operations[operation]( ckan, dataset ) 

@btylerburton
Copy link
Contributor Author

I like that enhancement. Will harvesting logic still return three separate lists of datasets?

@rshewitt
Copy link
Contributor

rshewitt commented Jan 5, 2024

yeah that's what i'm thinking. the reason being that we could potentially expand each load-type ( i.e. create, update, delete) like this?

compare_result = compare(harvest_source, ckan_source) 
"""
compare_result = {
  "create": [ id1, id2, id3],
  "update": [ id5, id6, id7],
  "delete": [ id10, id15, id14]
}
"""

load.expand( compare_result["create"], "create" ) 
load.expand( compare_result["delete"], "delete" )
load.expand( compare_result["update"], "update" )

since we don't have a rollback in the event of a partial completion this could be fine? do we need a rollback in the event of let's say 2/8 dataset creations fail?

@btylerburton
Copy link
Contributor Author

since we don't have a rollback in the event of a partial completion this could be fine? do we need a rollback in the event of let's say 2/8 dataset creations fail?

This brings up a good question around error reporting/tracking. I've been thinking about this in regards to the DCAT pipeline. We should discuss as a team how we want to handle things.

Take this case:

[record to be created] -> validation [fails] -> load [skipped]

or

[record to be transformed] -> transform [fails] -> validation [skipped] -> load [skipped]

Airflow is happiest when I put a skip exception in at the failure step, which allows it to skip any downstream tasks gracefully, but this also means that the pipeline is "green", so we need a way of recording/handling those exceptions.

It's easy enough to log the failures, we just need to know what to do with them.

@rshewitt
Copy link
Contributor

rshewitt commented Jan 5, 2024

pipeline test sketch as a reference. the changes are untested. meant to serve as an aggregate of @robert-bryson 's work with the classes and show the order of operations similar to how they could be in airflow ( not literally just more of workflow )

@jbrown-xentity
Copy link
Contributor

It's easy enough to log the failures, we just need to know what to do with them.

This is what we should do with them: #4582

@rshewitt
Copy link
Contributor

rshewitt commented Jan 8, 2024

using a data validator like pydantic against our proposed classes could be valuable. this could add teeth to our type hints as well @robert-bryson

@rshewitt
Copy link
Contributor

rshewitt commented Jan 8, 2024

using dataclasses could be a nice way to treat our classes. the equality and hash methods that come with them seem relevant

@rshewitt
Copy link
Contributor

rshewitt commented Jan 8, 2024

making use of the property decorator seems like it could give us more control of how we set, get, and/or delete attributes in our classes. the more constrictive we our with setting attributes could mean less headaches down the road ( or cause more? )

@btylerburton btylerburton changed the title [Placeholder] Refactor Harvesting Logic repo to to expose batch download details into inferface Refactor Harvesting Logic repo to incorporate feedback changes Jan 9, 2024
@btylerburton btylerburton added the H2.0/Harvest-Runner Harvest Source Processing for Harvesting 2.0 label Jan 10, 2024
@rshewitt
Copy link
Contributor

rshewitt commented Jan 10, 2024

if we intend to store an instance of a class we need to make sure it's serializable. dataclasses comes with a asdict method which may do what we want. take for example

@dataclass 
class A
  age: int

@dataclass
class B 
  records: Dict = field(default_factory=lambda: {})

a = A(25) 
b = B() 

b.records["a"] = a
dataclasses.asdict(b) # >> {'records': {'a': {'age': 25}}}

the asdict method will conveniently unpack the nested instance of A by its fields. we can then serialize this output into a json str. this example is meant to be a simplified version of our Record in Source design.

here's an example of what happens when I try to use asdict on b when its records dict contains something that's not serializable

ckan = ckanapi.RemoteCKAN("path\to\ckan\endpoint",apikey="api_key")
b.records["ckan"] = ckan # >> {'a': A(age=15), 'ckan': <ckanapi.remoteckan.RemoteCKAN object at 0x1010b4cd0>}
dataclasses.asdict(b) # >> returns error "TypeError: ActionShortcut.__getattr__.<locals>.action() takes 0 positional arguments but 1 was given"

@rshewitt
Copy link
Contributor

rshewitt commented Jan 11, 2024

for continuity and organization i'm going to convert our tests to classes. i've also added a ckan extract test class. some tests could be...

  • using the wrong url or apikey in ckanapi.RemoteCKAN( url, apikey )
    • there's no verification when you run this function so I can pass ckanapi.RemoteCKAN( "a", "b" ) and it will return just fine. it will only complain when i try to do something with the return ( e.g. ckanapi.RemoteCKAN( "a", "b" ).action.package_create(**kwargs) won't work.
  • ckan being unavailable
  • getting a proper return and converting it to {"identifier": "hash"} format

a test that could be useful for the compare is when ckan returns nothing suggesting everything needs to be created

@rshewitt
Copy link
Contributor

rshewitt commented Jan 18, 2024

draft pr. this pr may be open for an extended period of time to allow for fixes addressing issues @btylerburton gets as he tests the module out in airflow. or maybe that should be separate ticket?

@rshewitt
Copy link
Contributor

i'm gonna process all our current harvest sources through the harvesting logic code to fix any issues on my end ( excluding any real creation, deletion, or updating )

@rshewitt
Copy link
Contributor

ran a test load on catalog-dev admin. number of datasets has increased from 345 to 720. this test was run on my machine and not in airflow.

@rshewitt
Copy link
Contributor

rshewitt commented Jan 26, 2024

log on last nights load test of dcatus. processing seems to have been held up at 00:03:16,472
harvest_load.log

@nickumia
Copy link

nickumia commented Jan 29, 2024

Airflow is happiest when I put a skip exception in at the failure step, which allows it to skip any downstream tasks gracefully, but this also means that the pipeline is "green", so we need a way of recording/handling those exceptions.

@btylerburton What happens when you use a AirflowFailException? It still does the tasks afterwards?? 😱 Or is the concern that the subsequent tasks show up as failed (even when they're not run)? I ran into a similar issue when running github actions. This doesn't completely answer the question, but having robust conditions and clear logic is important..

Maybe this reference might provide some ideas to the team: https://www.restack.io/docs/airflow-knowledge-airflow-skip-exception-guide (i.e. having a fallback mechanism to do the email notification might meet the needs?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
H2.0/Harvest-Runner Harvest Source Processing for Harvesting 2.0
Projects
Status: 🗄 Closed
Development

No branches or pull requests

4 participants