Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Failed to run BQ task when cache is enabled #2864

Closed
2 tasks done
pingsutw opened this issue Sep 12, 2022 · 8 comments
Closed
2 tasks done

[BUG] Failed to run BQ task when cache is enabled #2864

pingsutw opened this issue Sep 12, 2022 · 8 comments
Labels
bug Something isn't working flytekit FlyteKit Python related issue flytepropeller

Comments

@pingsutw
Copy link
Member

pingsutw commented Sep 12, 2022

Describe the bug

Slack Thread

Failed to run BQ task when the cache is enabled because type validation is failing.

{"json":{"exec_id":"atfkcwwv5cfr7wzhqq94","node":"n1","ns":"flytesnacks-
development","res_ver":"5501760","routine":"worker-3","tasktype":"python-
task","wf":"flytesnacks:development:example_test.wf"},"level":"error","msg":"DataCatalog failed to get outputs from 
artifact 45bd1d68-a013-43b1-a56b-b7597b559125, err: unexpected artifactData: [o0] type: 
[structured_dataset_type:\u003c\u003e ] does not match any task output type: 
[structured_dataset_type:\u003cformat:\"parquet\" \u003e ]","ts":"2022-09-12T06:56:41Z"}

When the cache is enabled, we'll retrieve artifacts from datacatalog and check if the structured dataset's schema and format match the expected type.

However, the default format of the structured dataset in the expected type is always Parquet, but the format of the output structured dataset is "".

@task(cache=True, cache_version="1.0")
def t2() -> StructuredDataset: # The default format of structured dataset is Parquet here
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    return StructuredDataset(df, uri=bq_uri) # The format of structured dataset is "" 

Two ways to fix it.

  1. Change these lines to
if len(structuredDatasetType.Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
		return false
	}
  1. Change the default format of the expected type to "" in flytekit, and change these lines to the below. However, it will break existing users. If users upgrade flytekit, they have to upgrade the propeller as well.
if len(t.literalType.GetStructuredDatasetType().Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
		return false
	}

structuredDatasetType is input type
t.literalType.GetStructuredDatasetType() is expected type

Expected behavior

BQ task should run successfully even if the cache is enabled

Additional context to reproduce

import uuid
import pandas as pd
from typing_extensions import Annotated
from flytekit import task, workflow, StructuredDataset, kwtypes


@task(cache=True, cache_version="2.0")
def t1() -> StructuredDataset:
    df = pd.DataFrame({
        "name": ["dylan", "steve"],
        "age": [33, 32]
    })
    return StructuredDataset(df)


@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset) -> StructuredDataset:
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    table_id = str(uuid.uuid4())
    bq_uri = f"bq://flyte-test-340607.dataset.{table_id}"
    return StructuredDataset(df, uri=bq_uri)


@workflow
def wf() -> StructuredDataset:
    return t2(sd=t1())


if __name__ == "__main__":
    wf()

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@pingsutw pingsutw added bug Something isn't working flytekit FlyteKit Python related issue flytepropeller labels Sep 12, 2022
@pingsutw
Copy link
Member Author

cc @wild-endeavor

@hamersaw
Copy link
Contributor

I'm not sure I have enough context on StructuredDatasets to completely understand this issue. It sounds like this type supports many different formats (ie. parquet, csv, etc), but the column definitions are not known at compile-time. So currently, we type check only the format. The issue is that sometimes the format can be defined as empty (ie. "")? It seems like with how opinionated Flyte is with statically typing data allowing an empty format to satisfy everything is only asking for problems downstream. Where does the default parquet format come from? Where exactly does the empty format come from?

@pingsutw
Copy link
Member Author

Type validation happens at compile(register) time and runtime.

@task(cache=True, cache_version="2.0")
def t1() -> StructuredDataset:
    ... 
    return StructuredDataset(df=df, uri="bq://...")

@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset) -> Annotated[StructuredDataset, "csv"]
    return return StructuredDataset(df=df)

@workflow
def wf():
    t2(sd: t1())
  • Compile time
    In above example, flytekit will serialize StructuredDataset to StructuredDatasetType(columns="", format="parquet"), which means the dataframe will be serialized to a parquet file by default. During the compile time, the propeller will check if the output literal type (column schema and format) of t1 matches the input literal type of t2.

  • Runtime (if the cache is enabled)
    Flytekit convert StructuredDataset(df=df, uri="bq://...") (python type) to StructuredDataset(uri=bq://, metadata=structured_dataset_type(column="", format="")) (Literal). we don't serialize df to a file, so we don't have a format here.
    propeller fetches the output of t1 from datacatalog and check if the metadata (structured_dataset_type(column="", format="")) is match the input type of t2 (structured_dataset_type(column="", format="parquet") in this case). The error occurs in the case because the format of metadata is different from the format of t2

@hamersaw
Copy link
Contributor

OK, last two questions:
(1) Does the format ever matter in a StructuredDataset? It seems that it can be used arbitrarily, and then if it doesn't matter why do we include it in type checking?
(2) If I understand correctly, this breaks because the function returns a different type; although not to the same extent, this is same as:

@task
def foo() -> int:
    return "a"

In this case it will obviously break in local execution. When executed in a cluster, how does this break? Will FlyteKit try to read / write the data and fail? If we just remove the runtime check on StructuredDataset format will this fail the same way?

@hamersaw
Copy link
Contributor

Quick update - the following example:

@task
def say_hello() -> str:
    return 0

@workflow
def my_wf() -> str:
    res = say_hello()
    return res

fails with:

[2/2] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 526, in dispatch_execute
        raise TypeError(

Message:

    Failed to convert return value for var o0 for function flyte.workflows.example.say_hello with error <class 'flytekit.core.type_engine.TypeTransformerFailedError'>: Type of Val '0' is not an instance of <class 'str'>

SYSTEM ERROR! Contact platform administrators.

I'm still wondering if it makes sense to remove the runtime check on the data type during the cache lookup and just let flytekit fail if there's an issue?!?!

@pingsutw
Copy link
Member Author

I'm still wondering if it makes sense to remove the runtime check on the data type during the cache lookup and just let flytekit fail if there's an issue?!?!

There are still some benefits to do a runtime check on the data type during the cache. For example, If the error happens on the propeller side, then we don't need to spend additional time and resources to run the task.

we just remove the runtime check on StructuredDataset format

I think you're probably right.

@task(cache=True, cache_version="2.0")
def t1() -> Annotated[StructuredDataset, "csv"]:
    ... 
    return StructuredDataset(df=df) # here we already have "format" and "URI" in StructuredDatasetMetadata

@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset):
    ...     

@workflow
def wf():
    t2(sd: t1())

In the above example, it fails at compile time because the format in the SD doesn't match.
However, I think It is not necessary for users to specify the format when using SD as input because we already have SD metadata in upstream literal, and the SD transformer is able to find the specific decoder to deserialize SD.

As a result, I think we can remove this line. just remove the check on the format.

cc @wild-endeavor

@gitgraghu
Copy link

any update on the fix for this issue ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working flytekit FlyteKit Python related issue flytepropeller
Projects
None yet
Development

No branches or pull requests

3 participants