Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Templated results are overwritten on raised LOOP #3619

Closed
marvin-robot opened this issue Nov 5, 2020 · 2 comments · Fixed by #3643
Closed

Templated results are overwritten on raised LOOP #3619

marvin-robot opened this issue Nov 5, 2020 · 2 comments · Fixed by #3643
Assignees

Comments

@marvin-robot
Copy link
Member

Opened from the Prefect Public Slack Community

me1548: Hey, I’m trying to use the LOOP and write the results on each iteration as documented https://docs.prefect.io/core/advanced_tutorials/using-results.html#looping|here. However, it doesn’t seem to work. It only writes the last iteration.

@task()
def log_output(result):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(result)


@task(result=LocalResult(dir='./results', location='test-{task_loop_count}.prefect'))
def loop_test():
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)
    print(n)

    if n &gt; 5:
        return n

    raise LOOP(f'Iteration {n}', result=dict(n=n+1))


with Flow("Postgres -&gt; BigQuery") as flow:
    x = loop_test()
    log_output(x)

See output logging and diagnostics in thread

me1548:

[2020-11-05 07:11:07] INFO - prefect.FlowRunner | Beginning Flow run for 'Postgres -&gt; BigQuery'
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Flow 'Postgres -&gt; BigQuery': Handling state change from Scheduled to Running
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'loop_test': Starting task run...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
1
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
2
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
3
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
4
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
5
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Looped
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Calling task.run() method...
6
[2020-11-05 07:11:07] DEBUG - prefect.LocalResult | Starting to upload result to test-6.prefect...
[2020-11-05 07:11:07] DEBUG - prefect.LocalResult | Finished uploading result to /Users/joell/joell.dev/Scraper/scraper-next/prefect-sync/results/test-6.prefect...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'loop_test': Handling state change from Running to Success
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'loop_test': Finished task run for task with final state: 'Success'
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'log_output': Starting task run...
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Handling state change from Pending to Running
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Calling task.run() method...
[2020-11-05 07:11:07] INFO - prefect.log_output | 6
[2020-11-05 07:11:07] DEBUG - prefect.TaskRunner | Task 'log_output': Handling state change from Running to Success
[2020-11-05 07:11:07] INFO - prefect.TaskRunner | Task 'log_output': Finished task run for task with final state: 'Success'
[2020-11-05 07:11:07] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-11-05 07:11:07] DEBUG - prefect.FlowRunner | Flow 'Postgres -&gt; BigQuery': Handling state change from Running to Success
{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__LOGGING__LEVEL",
    "PREFECT__CONTEXT__SECRETS__POSTGRES",
    "PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS",
    "PREFECT__FLOWS__CHECKPOINTING"
  ],
  "flow_information": {
    "environment": {
      "executor": true,
      "labels": true,
      "logger": true,
      "metadata": {
        "image": true
      },
      "on_exit": false,
      "on_start": false,
      "type": "LocalEnvironment"
    },
    "result": {
      "type": "LocalResult"
    },
    "schedule": {},
    "storage": {
      "_flows": {
        "Postgres -&gt; BigQuery": true
      },
      "_labels": false,
      "add_default_labels": true,
      "directory": true,
      "flows": {
        "Postgres -&gt; BigQuery": true
      },
      "path": false,
      "result": true,
      "secrets": false,
      "stored_as_script": false,
      "type": "Local"
    },
    "task_count": 2
  },
  "system_information": {
    "platform": "macOS-10.15.7-x86_64-i386-64bit",
    "prefect_backend": "server",
    "prefect_version": "0.13.13",
    "python_version": "3.8.6"
  }
}

znicholasbrown: Hi <@U01DW286KGC> - thanks for reporting this - I can confirm each iteration of the loop is overwriting the previous result as <https://docs.prefect.io/core/advanced_tutorials/using-results.html#looping|described here>, despite a templated location being present (this holds true for strings and callables on the location kwarg).

znicholasbrown: <@ULVA73B9P> open "Templated results are overwritten on raised LOOP"

Original thread can be found here.

@joshmeek
Copy link

joshmeek commented Nov 5, 2020

Hmm I don't think it is overwriting the result at that location each time. It appears to actually only be writing a result on the final iteration of the loop (when transitioning from Running -> Success instead of each iteration going from Running -> Looped). i.e. it's not triggering the result writing step in the task runner pipeline when the task enters a Looped state.

@cicdw Do you remember if we explicitly opt out of writing results for loop iterations if the task is looping in process?

@cicdw
Copy link
Member

cicdw commented Nov 5, 2020

Yea, at this time when a LOOP signal is raised the result is not written, here's the relevant code.

I don't think there was an explicit reason for that so we could change it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants