Skip to content

Commit

Permalink
use generic Output type annotations in hacker news (#7600)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 12, 2022
1 parent 2238f8b commit 0ba6b2a
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 25 deletions.
3 changes: 1 addition & 2 deletions examples/hacker_news/hacker_news/ops/download_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@
"items": Out(
io_manager_key="parquet_io_manager",
metadata={"partitioned": True},
dagster_type=DataFrame,
)
},
required_resource_keys={"hn_client"},
)
def download_items(context, id_range: Tuple[int, int]) -> Output:
def download_items(context, id_range: Tuple[int, int]) -> Output[DataFrame]:
"""
Downloads all of the items for the id range passed in as input and creates a DataFrame with
all the entries.
Expand Down
17 changes: 7 additions & 10 deletions examples/hacker_news/hacker_news/ops/id_range_for_time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timezone
from typing import Tuple

from dagster import Out, Output, Tuple
from dagster import Output
from dagster import _check as check
from dagster import op

Expand Down Expand Up @@ -89,20 +90,16 @@ def _get_item_timestamp(item_id):
return id_range, metadata


@op(
required_resource_keys={"hn_client", "partition_bounds"},
out=Out(
Tuple[int, int],
description="The lower (inclusive) and upper (exclusive) ids that bound the range for the partition",
),
)
def id_range_for_time(context):
@op(required_resource_keys={"hn_client", "partition_bounds"})
def id_range_for_time(context) -> Output[Tuple[int, int]]:
"""
For the configured time partition, searches for the range of ids that were created in that time.
Returns the lower (inclusive) and upper (exclusive) ids that bound the range for the partition"
"""
id_range, metadata = _id_range_for_time(
context.resources.partition_bounds["start"],
context.resources.partition_bounds["end"],
context.resources.hn_client,
)
yield Output(id_range, metadata=metadata)
return Output(id_range, metadata=metadata)
11 changes: 5 additions & 6 deletions examples/hacker_news/hacker_news/ops/recommender_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from dagster.utils import file_relative_path


@op(out=Out(dagster_type=TruncatedSVD, metadata={"key": "recommender_model"}))
def build_recommender_model(user_story_matrix: IndexedCooMatrix):
@op(out=Out(metadata={"key": "recommender_model"}))
def build_recommender_model(user_story_matrix: IndexedCooMatrix) -> Output[TruncatedSVD]:
"""
Trains an SVD model for collaborative filtering-based recommendation.
"""
Expand All @@ -20,7 +20,7 @@ def build_recommender_model(user_story_matrix: IndexedCooMatrix):

total_explained_variance = svd.explained_variance_ratio_.sum()

yield Output(
return Output(
svd,
metadata={
"Total explained variance ratio": total_explained_variance,
Expand Down Expand Up @@ -48,14 +48,13 @@ def build_recommender_model(user_story_matrix: IndexedCooMatrix):
),
},
out=Out(
dagster_type=DataFrame,
io_manager_key="warehouse_io_manager",
metadata={"table": "hackernews.component_top_stories"},
),
)
def build_component_top_stories(
model: TruncatedSVD, user_story_matrix: IndexedCooMatrix, story_titles: DataFrame
):
) -> Output[DataFrame]:
"""
For each component in the collaborative filtering model, finds the titles of the top stories
it's associated with.
Expand All @@ -81,7 +80,7 @@ def build_component_top_stories(
{"component_index": Series(components_column), "title": Series(titles_column)}
)

yield Output(
return Output(
component_top_stories,
metadata={
"Top component top stories": MetadataValue.md(
Expand Down
2 changes: 1 addition & 1 deletion examples/hacker_news/hacker_news/ops/user_story_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class IndexedCooMatrix:


@op(out=Out(metadata={"key": "user_story_matrix"}))
def build_user_story_matrix(comment_stories: DataFrame):
def build_user_story_matrix(comment_stories: DataFrame) -> Output[IndexedCooMatrix]:
"""
Builds a sparse matrix where the rows are users, the columns are stories, and the values
are whether the user commented on the story.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timezone
from typing import Tuple

from hacker_news_assets.partitions import hourly_partitions

Expand Down Expand Up @@ -91,12 +92,12 @@ def _get_item_timestamp(item_id):
description="The lower (inclusive) and upper (exclusive) ids that bound the range for the partition",
partitions_def=hourly_partitions,
)
def id_range_for_time(context):
def id_range_for_time(context) -> Output[Tuple[int, int]]:
"""
For the configured time partition, searches for the range of ids that were created in that time.
"""
start, end = context.output_asset_partitions_time_window()
id_range, metadata = _id_range_for_time(
start.timestamp(), end.timestamp(), context.resources.hn_client
)
yield Output(id_range, metadata=metadata)
return Output(id_range, metadata=metadata)
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
required_resource_keys={"hn_client"},
partitions_def=hourly_partitions,
)
def items(context, id_range_for_time: Tuple[int, int]):
def items(context, id_range_for_time: Tuple[int, int]) -> Output[DataFrame]:
"""Items from the Hacker News API: each is a story or a comment on a story."""
start_id, end_id = id_range_for_time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


@asset
def recommender_model(user_story_matrix: IndexedCooMatrix):
def recommender_model(user_story_matrix: IndexedCooMatrix) -> Output[TruncatedSVD]:
"""
An SVD model for collaborative filtering-based recommendation.
"""
Expand All @@ -35,7 +35,7 @@ def recommender_model(user_story_matrix: IndexedCooMatrix):
)
def component_top_stories(
recommender_model: TruncatedSVD, user_story_matrix: IndexedCooMatrix, stories: DataFrame
):
) -> Output[DataFrame]:
"""
For each component in the collaborative filtering model, the titles of the top stories
it's associated with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class IndexedCooMatrix:


@asset
def user_story_matrix(comment_stories: DataFrame):
def user_story_matrix(comment_stories: DataFrame) -> Output[IndexedCooMatrix]:
"""
A sparse matrix where the rows are users, the columns are stories, and the values
are whether the user commented on the story.
Expand Down

0 comments on commit 0ba6b2a

Please sign in to comment.