Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow.py: add ability to modify flow_run_id in flow.run() #3163

Merged
merged 4 commits into from Aug 16, 2020

Conversation

Mastermjr
Copy link

Please describe your work and make sure your PR:

  • [x ] adds new tests (if appropriate):
  • add a changelog entry in the changes/ directory (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

TESTING:

no new tests were added however I tested locally like below in example.py:

#--------------------------------------------------------------
#imports
#--------------------------------------------------------------
#global imports

# FLow imports
import prefect
from prefect import Flow, Parameter, task
from prefect.tasks.shell import  ShellTask
from prefect.engine.executors import DaskExecutor


#--------------------------------------------------------------
# Define custom task functions
#--------------------------------------------------------------
#Props: max_retries=3, retry_delay=datetime.timedelta(minutes=10)

@task(log_stdout=True)
def plus_one(x):
    """A task that adds 1 to a number"""
    print("asdf")
    return x + 1

@task(log_stdout=True)
def build_command(name):
    print("FLOW_RUN_ID: {}".format(prefect.context.flow_run_id))
    return 'echo "HELLO, {}!\n"'.format(name)

#--------------------------------------------------------------
# Instantiate task classes
#--------------------------------------------------------------

run_in_bash = ShellTask(name='run a command in bash')

#--------------------------------------------------------------
# Open a Flow context and use the functional API (if possible)
#--------------------------------------------------------------

with Flow('EMAIL') as flow:
    #required run_id from parameter
    out = Parameter('out')

    # store the result of each task call, even if you don't use the result again
    two = plus_one(1)

    # for clarity, call each task on its own line
    cmd = build_command(name=out)
    shell_result = run_in_bash(command=cmd)

    # use the imperative API where appropriate
    shell_result.set_upstream(two)

    # validate flow and test serialization
    flow.validate()
    #flow.serialize()

    # print out flow:
    # flow.visualize()

#TEST IF IT WORKS:
flow.run(out="ASDFASDFASDFASDF",
         flow_run_id="THIS IS A TEST",
         executor=DaskExecutor(
                address='localhost:8786'
                )
         )

In addition, I setup logging the flow_run_id in: ~/.prefect/config.toml be replacing the logging section with the below values:

[logging]

# The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
 level = "DEBUG"
#
# # The log format
format = "[%(asctime)s] %(levelname)s - %(name)s | %(flow_run_id)s | %(message)s"

#dditional log attributes to extract from context
# e.g., log_attributes = "['context_var']"
log_attributes = "['flow_run_id']"

# the timestamp format
datefmt = "%Y-%m-%d %H:%M:%S"

# Send logs to Prefect Cloud
log_to_cloud = false

# Extra loggers for Prefect log configuration
extra_loggers = "[]"

Finally, I ran below:

$python3 --version
Python 3.8.5
$ python3 example.py
[2020-08-14 23:41:34] INFO - prefect.FlowRunner | None | Beginning Flow run for 'EMAIL'
[2020-08-14 23:41:34] INFO - prefect.FlowRunner | THIS IS A TEST | Starting flow run.
[2020-08-14 23:41:34] DEBUG - prefect.FlowRunner | THIS IS A TEST | Flow 'EMAIL': Handling state change from Scheduled to Running
[2020-08-14 23:41:34] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'plus_one': Starting task run...
[2020-08-14 23:41:34] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'plus_one': Handling state change from Pending to Running
[2020-08-14 23:41:34] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'plus_one': Calling task.run() method...
[2020-08-14 23:41:34] INFO - prefect.TaskRunner | THIS IS A TEST | asdf
[2020-08-14 23:41:34] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'plus_one': Handling state change from Running to Success
[2020-08-14 23:41:34] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'plus_one': finished task run for task with final state: 'Success'
[2020-08-14 23:41:34] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'out': Starting task run...
[2020-08-14 23:41:34] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'out': Handling state change from Pending to Running
[2020-08-14 23:41:34] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'out': Calling task.run() method...
[2020-08-14 23:41:34] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'out': Handling state change from Running to Success
[2020-08-14 23:41:35] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'out': finished task run for task with final state: 'Success'
[2020-08-14 23:41:35] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'build_command': Starting task run...
[2020-08-14 23:41:35] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'build_command': Handling state change from Pending to Running
[2020-08-14 23:41:35] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'build_command': Calling task.run() method...
[2020-08-14 23:41:35] INFO - prefect.TaskRunner | THIS IS A TEST | FLOW_RUN_ID: THIS IS A TEST
[2020-08-14 23:41:35] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'build_command': Handling state change from Running to Success
[2020-08-14 23:41:35] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'build_command': finished task run for task with final state: 'Success'
[2020-08-14 23:41:35] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'run a command in bash': Starting task run...
[2020-08-14 23:41:35] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'run a command in bash': Handling state change from Pending to Running
[2020-08-14 23:41:35] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'run a command in bash': Calling task.run() method...
[2020-08-14 23:41:35] DEBUG - prefect.run a command in bash | THIS IS A TEST | HELLO, ASDFASDFASDFASDF!
[2020-08-14 23:41:35] DEBUG - prefect.run a command in bash | THIS IS A TEST |
[2020-08-14 23:41:35] DEBUG - prefect.TaskRunner | THIS IS A TEST | Task 'run a command in bash': Handling state change from Running to Success
[2020-08-14 23:41:35] INFO - prefect.TaskRunner | THIS IS A TEST | Task 'run a command in bash': finished task run for task with final state: 'Success'
[2020-08-14 23:41:35] INFO - prefect.FlowRunner | THIS IS A TEST | Flow run SUCCESS: all reference tasks succeeded
[2020-08-14 23:41:35] DEBUG - prefect.FlowRunner | THIS IS A TEST | Flow 'EMAIL': Handling state change from Running to Success

What does this PR change?

PR adds the ability to use the flow_run_id paramter in the flow.run() function, so the a user armed only with prefect core can control flow_run id's.

Why is this PR important?

I want to control flow run Id's locally using prefect core and this PR enables it, without changing any functionality :)
Everything should work like before

@marvin-robot
Copy link
Member

Here I am, brain the size of a planet and they ask me to welcome you to Prefect.

So, welcome to the community @Mastermjr! 🎉 🎉

src/prefect/core/flow.py Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Aug 15, 2020

Codecov Report

Merging #3163 into master will increase coverage by 0.00%.
The diff coverage is 100.00%.

src/prefect/core/flow.py Outdated Show resolved Hide resolved
Fixing kwargs to pop to a default of uuid

Co-authored-by: Chris White <white.cdw@gmail.com>
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome thanks @Mastermjr !

@cicdw cicdw merged commit 69c5243 into PrefectHQ:master Aug 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants