Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HRQB 13 - Pipeline structures and CLI commands #14

Merged
merged 5 commits into from
May 10, 2024

Conversation

ghukill
Copy link
Collaborator

@ghukill ghukill commented May 7, 2024

Purpose and background context

This PR fully realizes "pipelines" in this HRQBClient, with CLI commands to invoke and manage them.

Most of the building blocks were there with base classes PandasPickleTask and QuickbaseUpsertTask, but this completes the picture by extending functionality for base class HRQBPipelineTask, and wiring up CLI commands that invoke these special "pipeline" Tasks.

A related change was made during this work defining a required method PandasPickleTask.get_dataframe(...). This mirrors the QUickbaseUpsertTask.get_records(...) pattern, where it's preferable to have a "worker" method on a Luigi Task class, such that you can test its output directly without the complexities of the .run() method which will write to a Target.

In both cases, run() can be overridden if the Task requires it, but most tasks will be doing one of the following, and the defult run() method is well suited for this:

  • creating new dataframes from external sources (Extract Tasks)
  • taking an input dataframe and modifying the data somehow (Transform Tasks)
  • taking an input dataframe and writing to Quickbase (Load Tasks)

Apologies for the large commit, though the vast majority of changes are tests and test fixtures. The most relevant new functionality can be seen here:

How can a reviewer manually see the effects of these changes?

While there are not any real pipelines defined yet (these will populate hrqb.tasks.* when created), fixtures have been created for automated testing that are demonstrative of how they will be structured.

You may need some env vars, as they are considered required, but you can set fake/placeholder values:

WORKSPACE=dev
SENTRY_DSN=None
LUIGI_CONFIG_PATH=hrqb/luigi.cfg
QUICKBASE_API_TOKEN=abc123
QUICKBASE_APP_ID=def456

Structure of tests.fixtures.tasks

This fixtures folder structure mirrors what Tasks will look like in the hrqb.tasks module:

tests/fixtures/tasks
├── __init__.py
├── extract.py
    --> ExtractAnimalColors
    --> ExtractAnimalNames
├── load.py
    --> LoadAnimals
    --> LoadAnimalsDebug
├── pipelines.py
    --> Animals
    --> AnimalsDebug
└── transform.py
    --> PrepareAnimals

While this mirrors how Tasks will actually be defined in hrqb.tasks, it's not very helpful for an intuitive sense of how they wire to each other.

Inspect full, annotated pipeline code

The fixture file tests/fixtures/full_annotated_pipeline.py exists to hold a full, annotated pipeline. This can be useful to see how Tasks are largely defined by:

  • indicating what tasks they depend on
  • providing a get_dataframe() or get_records() method as appropriate

The next section shows how to run this via the CLI, but it can also be interacted with directly in a python REPL:

from tests.fixtures.full_annotated_pipeline import *

pipeline_task = AlphaNumeric()

print(pipeline_task.pipeline_as_ascii())
"""
├── INCOMPLETE: AlphaNumeric()
   ├── INCOMPLETE: CombineLettersAndNumbers(table_name=, pipeline=AlphaNumeric, stage=Transform)
      ├── INCOMPLETE: MultiplyNumbers(table_name=, pipeline=AlphaNumeric, stage=Transform)
         ├── INCOMPLETE: GenerateNumbers(table_name=, pipeline=AlphaNumeric, stage=Extract)
      ├── INCOMPLETE: GenerateLetters(table_name=, pipeline=AlphaNumeric, stage=Extract)
"""

Run CLI command

The ultimate goal of the changes from this PR are to support CLI level commands to invoke pipelines, which is now possible.

NOTE: without setting TARGETS_DIRECTORY env var, the default location will be the app's output/ folder.

1- Run status command for a tests fixture pipeline:
pipenv run hrqb --verbose \
pipeline \
-pm tests.fixtures.tasks.pipelines \
-p AnimalsDebug \
status

Note the output showing all tasks are INCOMPLETE. This status output also gives a high level picture of the pipeline structure, showing all Tasks and how they rely on one another (where going down the tree, each Task relies on the Task above it). This is the same output as pipeline_task.pipeline_as_ascii() from above.

├── INCOMPLETE: AnimalsDebug()
   ├── INCOMPLETE: LoadAnimalsDebug(pipeline=AnimalsDebug, stage=Load, table_name=Animals)
      ├── INCOMPLETE: PrepareAnimals(pipeline=AnimalsDebug, stage=Transform, table_name=Animals)
         ├── INCOMPLETE: ExtractAnimalColors(table_name=, pipeline=AnimalsDebug, stage=Extract)
         ├── INCOMPLETE: ExtractAnimalNames(table_name=, pipeline=AnimalsDebug, stage=Extract)
2- Run run command for a tests fixture pipeline:
pipenv run hrqb --verbose \
pipeline \
-pm tests.fixtures.tasks.pipelines \
-p AnimalsDebug \
run

There should be a handful of outputs, and now the pipeline status should show the tasks as green COMPLETE. Looking in the output/ folder, you should also see artifacts from the pipeline run:

output
├── AnimalsDebug__Extract__ExtractAnimalColors.pickle
├── AnimalsDebug__Extract__ExtractAnimalNames.pickle
├── AnimalsDebug__Load__LoadAnimalsDebug.json
├── AnimalsDebug__Transform__PrepareAnimals.pickle

Note the filename structure of <PipelineName>__<Stage>__<TaskName>.<Extension>. By prefixing files with the pipeline name, this allows multiple pipelines to reuse the same Task logic without the risk of getting data from another pipeline. Within the same pipeline, the Task outputs will be reused without re-running the Task (e.g. Data Warehouse queries where perhaps multiple Tasks will reuse that data).

But, what if you do want to reuse the data from another pipeline run? This will be sorted in future PRs that define actual HRQB pipelines, if and when that need arises; could be as simple as a flag to omit the pipeline prefix effectively making it a Pipeline-agnostic Task.

If you immediately re-run the pipeline, you should see some output like this, indicating that no Tasks needed to re-run, as their Targets (files) already existed:

...
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 AnimalsDebug()

Did not run any tasks   <-------------------------------
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====
...
2- Run remove-data command for a tests fixture pipeline:
pipenv run hrqb --verbose \
pipeline \
-pm tests.fixtures.tasks.pipelines \
-p AnimalsDebug \
remove-data

Lastly, can run a command that will analyze the pipeline and remove any Targets (files) that may exist for the pipeline. This can also be accomplished as an addon step when running a pipeline with --remove-data.

Looking at output/ folder now, all files should be gone. To double confirm, you can run the status command again and note that all Tasks are now INCOMPLETE again.

Includes new or updated dependencies?

YES

Changes expectations for external applications?

YES

What are the relevant tickets?

Developer

  • All new ENV is documented in README
  • All new ENV has been added to staging and production environments
  • All related Jira tickets are linked in commit message(s)
  • Stakeholder approval has been confirmed (or is not needed)

Code Reviewer(s)

  • The commit message is clear and follows our guidelines (not just this PR message)
  • There are appropriate tests covering any new functionality
  • The provided documentation is sufficient for understanding any new functionality introduced
  • Any manual tests have been performed or provided examples verified
  • New dependencies are appropriate or there were no changes

Why these changes are being introduced:

This commit fully realizes pipelines via the HRQBPipelineTask base
class.  These pipelines can also be checked, managed, and run via
CLI commands.

How this addresses that need:
* HRQBPipelineTask fully defined with methods for accessing pieline
Task hierarchy
* CLI commands created for pipelines:
  * 'status' to get status of the pipeline
  * 'remove-data' to remove all data artifacts from pipeline Tasks
  * 'run' to run the pipeline
* Define PandasPickleTask.get_dataframe method that is required for
Tasks to define, preferred over overriding the run method

Side effects of this change:
* CLI is functional for running pipelines

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/HRQB-13
@ghukill ghukill marked this pull request as ready for review May 7, 2024 20:22
Why these changes are being introduced:

The CLI command group is expecting a pipeline name and optional
pipeline module, both as strings, to find a pipeline task to run.
Because these are strings, it requires loading that actual python
class via imports and dynamic loading.  While functional before,
it was a little opaque to have that complex logic in the CLI, where
it would be better suited as a dedicated method on the HRQBPipelineTask
class which is the target parent class for anything imported.

How this addresses that need:
* Adds method HRQBPipelineTask.init_task_from_class_path dedicated
to finding an instance via module and class strings to import and
instantiate

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/HRQB-13
Copy link

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, some questions

README.md Show resolved Hide resolved
hrqb/base/task.py Outdated Show resolved Hide resolved
hrqb/base/task.py Show resolved Hide resolved
hrqb/cli.py Outdated Show resolved Hide resolved
hrqb/cli.py Outdated Show resolved Hide resolved
hrqb/cli.py Show resolved Hide resolved
tests/fixtures/full_annotated_pipeline.py Outdated Show resolved Hide resolved
tests/test_cli.py Show resolved Hide resolved
tests/test_pipelines.py Outdated Show resolved Hide resolved
@ghukill
Copy link
Collaborator Author

ghukill commented May 9, 2024

Thanks for the thoughtful review @ehanson8, particularly given the size of changes. Updates in new commit.

@ghukill ghukill requested a review from ehanson8 May 9, 2024 13:27
Copy link

@ehanson8 ehanson8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks for the changes!

hrqb/base/task.py Show resolved Hide resolved
Copy link

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghukill Lots of cool methods implemented here! I agree with @ehanson8 that the HRQBPipelineTask.pipeline_as_ascii() is a really helpful feature. Just had one question / minor suggested change.

import os
from abc import abstractmethod
from collections.abc import Iterator
from typing import Literal

import luigi # type: ignore[import-untyped]
import pandas as pd

from hrqb.base import PandasPickleTarget, QuickbaseTableTarget

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the target attribute be converted to a property on these HRQBTask classes using @property? 🤔

Copy link
Collaborator Author

@ghukill ghukill May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I understanding that you'd want to be able to use self.target instead of self.target() in places like this?

class PandasPickleTask(HRQBTask):    

    def run(self) -> None:
        self.target.write(self.get_dataframe())   # <----------------?

If so, I think it'd get tricky. target() is a luigi method from luigi.Task that HRQBTask extends. Actually, one reason these classes exist is so that actual downstream tasks won't have to interact with self.target() very often. They should like either define get_dataframe() or get_records(), which themselves interact with self.target().

In short: target is a reserved method from luigi.Task, so could be hard to @property-ify.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the conversation on slack. During that, realizing that I misspoke above and target is not a reserved property or method on luigi.Task. As discussed, I think this was me early on in the project thinking it'd be handy to refer directly to a Task's "target" as an object to receive and work with vs what feels like a more opaque output() method which just so happens to return a target class.

Going to dig in a bit and see if some renaming or reworking might make this a little cleaner. Thanks for raising!

Copy link
Collaborator Author

@ghukill ghukill May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonavellecuerdo - this commit was added that addresses your original request / question: 142cfb8.

Overall, I think it's a good one, thanks for raising.

To summarize the commit message a bit: the goal of HRQBTask.target() was to more accurately reflect that it's returning an actual luigi.Target instance, where the default requirement in luigi is to define a <Task>.output() method that does the same. All in all, it's mostly just a project-specific preference to use the word "target" (which feels more accurate to what it returns) over the default luigi "output". But by making it a property, as you asked about, I think it further leans into this opinionation in a good way!

The end result is the ability to do MyTask.target and get the fully instantiated target instead of MyTask.output() which, to me, feels a bit vague as to what it's returning or doing.

Why these changes are being introduced:

In this project, it was decided to include a convenience
attribute 'target' that was a method, to better indicate
it is returning a luigi.Target instance.  Default luigi Tasks
require an output() method that returns a Target.

Converting this to a property further leans into this
project-specific convention of providing a convenience
attribute, making it an even easier to use property.

How this addresses that need:
* Converts HRQBTask.target() into a property
* defines HRQBTask.output() to avoid defining this
in classes that extend this opinionated project class

Side effects of this change:
* None

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/HRQB-13
Copy link

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making these updates!

@ghukill ghukill merged commit 24676cc into main May 10, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants