Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log_links to GetTaskResponse #2021

Merged
merged 7 commits into from
Dec 15, 2023
Merged

Add log_links to GetTaskResponse #2021

merged 7 commits into from
Dec 15, 2023

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Dec 5, 2023

Tracking issue

flyteorg/flyte#3936

Why are the changes needed?

To show the log link on FlyteConsole

What changes were proposed in this pull request?

  • Add log_links to GetTaskResponse
  • Show the log link in local execution
  • BigQuery returns a log link

How was this patch tested?

Setup process

  1. Enable BigQuery agent
  2. Run a BigQuery Task
import pandas as pd
import pyarrow as pa
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
from typing_extensions import Annotated

MyDataset = Annotated[StructuredDataset, kwtypes(name=str)]


@task
def create_bq_table() -> StructuredDataset:
    df = pd.DataFrame(data={"name": ["Alice", "bob"], "age": [5, 6]})
    return StructuredDataset(
        dataframe=df, uri="bq://dogfood-gcp-dataplane:dataset.flyte_table3"
    )


bigquery_task_templatized_query = BigQueryTask(
    name="bigquery",
    inputs=kwtypes(version=int),
    output_structured_dataset_type=MyDataset,
    task_config=BigQueryConfig(ProjectID="dogfood-gcp-dataplane"),
    query_template="SELECT * from dataset.flyte_table3;",  # type: ignore
)


@task
def convert_bq_table_to_arrow_table(sd: MyDataset) -> pa.Table:
    t = sd.open(pa.Table).all()
    print(t)
    return t


@workflow
def wf(version: int = 10) -> pa.Table:
    create_bq_table()
    sd = bigquery_task_templatized_query(version=version)
    return convert_bq_table_to_arrow_table(sd=sd)


if __name__ == "__main__":
    wf()

Screenshots

image image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link

codecov bot commented Dec 5, 2023

Codecov Report

Attention: 3 lines in your changes are missing coverage. Please review.

Comparison is base (f0d4f13) 85.84% compared to head (8f66243) 84.61%.

Files Patch % Lines
flytekit/models/core/execution.py 60.00% 1 Missing and 1 partial ⚠️
...lytekit-bigquery/flytekitplugins/bigquery/agent.py 75.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2021      +/-   ##
==========================================
- Coverage   85.84%   84.61%   -1.24%     
==========================================
  Files         306      306              
  Lines       22897    22905       +8     
  Branches     3470     3471       +1     
==========================================
- Hits        19657    19382     -275     
- Misses       2645     2919     +274     
- Partials      595      604       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Comment on lines 219 to 221
print(f"{self._entity.name} task state: {State.Name(int(state))}, message: {res.resource.message or None}")
for link in res.log_links:
print(f"{link.name}: {link.uri}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use the logger here?

@@ -215,7 +216,9 @@ async def _get(self, resource_meta: bytes) -> GetTaskResponse:
else:
res = self._agent.get(grpc_ctx, resource_meta)
state = res.resource.state
logger.info(f"Task state: {state}, State message: {res.resource.message}")
print(f"{self._entity.name} task state: {State.Name(int(state))}, message: {res.resource.message or None}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably want this in the agent executor mixin?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is in the mixin.

@@ -79,12 +80,20 @@ def create(
def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse:
client = bigquery.Client()
metadata = Metadata(**json.loads(resource_meta.decode("utf-8")))
log_links = [
TaskLog(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing ttl?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default value should we set here

@kumare3
Copy link
Contributor

kumare3 commented Dec 8, 2023

lgtm, couple changes

Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I've tested it in this PR.
flyteorg/flyte#4529

Future-Outlier
Future-Outlier previously approved these changes Dec 11, 2023
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add tests, LGTM!

Signed-off-by: Kevin Su <pingsutw@apache.org>
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@pingsutw pingsutw self-assigned this Dec 13, 2023
@pingsutw pingsutw merged commit f15c140 into master Dec 15, 2023
72 of 74 checks passed
RRap0so pushed a commit to RRap0so/flytekit that referenced this pull request Dec 15, 2023
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Rafael Raposo <rafaelraposo@spotify.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants