Skip to content

Commit

Permalink
Assorted fixes to the hackernews assets demo (#7030)
Browse files Browse the repository at this point in the history
* Assorted hacker news asset improvements

* Move dbt metadata collection into snowflake io manager
  • Loading branch information
sryza committed Mar 15, 2022
1 parent 8ad1195 commit aa5712e
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import json
import os

import pandas as pd
from dagster_dbt import dbt_cli_resource
from dagster_dbt.asset_defs import load_assets_from_dbt_manifest
from hacker_news_assets.resources.snowflake_io_manager import (
SHARED_SNOWFLAKE_CONF,
connect_snowflake,
)

from dagster import MetadataValue
from dagster.utils import file_relative_path

DBT_PROJECT_DIR = file_relative_path(__file__, "../../hacker_news_dbt")
Expand All @@ -22,18 +16,7 @@
)


def asset_metadata(_context, model_info):
config = dict(SHARED_SNOWFLAKE_CONF)
config["schema"] = model_info["schema"]
with connect_snowflake(config=config) as con:
df = pd.read_sql(f"SELECT * FROM {model_info['name']} LIMIT 5", con=con)
num_rows = con.execute(f"SELECT COUNT(*) FROM {model_info['name']}").fetchone()

return {"Data sample": MetadataValue.md(df.to_markdown()), "Rows": num_rows[0]}


assets = load_assets_from_dbt_manifest(
json.load(open(os.path.join(DBT_PROJECT_DIR, "target", "manifest.json"))),
runtime_metadata_fn=asset_metadata,
io_manager_key="warehouse_io_manager",
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
@asset(
ins={
"stories": AssetIn(metadata={"columns": ["id"]}),
"comments": AssetIn(metadata={"columns": ["id", "by", "parent"]}),
"comments": AssetIn(metadata={"columns": ["id", "user_id", "parent"]}),
},
io_manager_key="warehouse_io_manager",
)
Expand All @@ -16,7 +16,7 @@ def comment_stories(stories: DataFrame, comments: DataFrame) -> DataFrame:
Owners: sandy@elementl.com, owen@elementl.com
"""
comments.rename(columns={"by": "commenter_id", "id": "comment_id"}, inplace=True)
comments.rename(columns={"user_id": "commenter_id", "id": "comment_id"}, inplace=True)
comments = comments.set_index("comment_id")[["commenter_id", "parent"]]
stories = stories.set_index("id")[[]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def _id_range_for_time(start: int, end: int, hn_client):

def _get_item_timestamp(item_id):
item = hn_client.fetch_item_by_id(item_id)
if not item:
raise ValueError(f"No item with id {item_id}")
return item["time"]

max_item_id = hn_client.fetch_max_item_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ def comments(items: SparkDF) -> SparkDF:

@asset(io_manager_key="warehouse_io_manager", partitions_def=hourly_partitions)
def stories(items: SparkDF) -> SparkDF:
return items.where(items["type"] == "stories")
return items.where(items["type"] == "story")
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ def recommender_model(user_story_matrix: IndexedCooMatrix):
"""
An SVD model for collaborative filtering-based recommendation.
"""
n_components = random.randint(90, 110)
n_components = min(random.randint(90, 110), len(user_story_matrix.col_index) - 1)
svd = TruncatedSVD(n_components=n_components)
svd.fit(user_story_matrix.matrix)

total_explained_variance = svd.explained_variance_ratio_.sum()

yield Output(
return Output(
svd,
metadata={
"Total explained variance ratio": total_explained_variance,
Expand Down Expand Up @@ -60,7 +60,7 @@ def component_top_stories(
{"component_index": Series(components_column), "title": Series(titles_column)}
)

yield Output(
return Output(
component_top_stories,
metadata={
"Top component top stories": MetadataValue.md(
Expand All @@ -80,7 +80,7 @@ def top_components_to_markdown(component_top_stories: DataFrame) -> str:
component_markdowns.append(
"\n".join(
[f"Component {i}"]
+ ["- " + row["title"] for _, row in component_i_top_5_stories.iterrows()]
+ ["- " + str(row["title"]) for _, row in component_i_top_5_stories.iterrows()]
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pandas import DataFrame, Series
from scipy.sparse import coo_matrix

from dagster import asset
from dagster import Output, asset


@dataclass
Expand All @@ -23,7 +23,7 @@ class IndexedCooMatrix:


@asset
def user_story_matrix(comment_stories: DataFrame) -> IndexedCooMatrix:
def user_story_matrix(comment_stories: DataFrame):
"""
A sparse matrix where the rows are users, the columns are stories, and the values
are whether the user commented on the story.
Expand All @@ -39,10 +39,13 @@ def user_story_matrix(comment_stories: DataFrame) -> IndexedCooMatrix:
sparse_cols = story_col_indices[deduplicated["story_id"]]
sparse_data = np.ones(len(sparse_rows))

return IndexedCooMatrix(
matrix=coo_matrix(
(sparse_data, (sparse_rows, sparse_cols)), shape=(len(users), len(stories))
return Output(
IndexedCooMatrix(
matrix=coo_matrix(
(sparse_data, (sparse_rows, sparse_cols)), shape=(len(users), len(stories))
),
row_index=Series(user_row_indices.index.values, index=user_row_indices),
col_index=Series(story_col_indices.index.values, index=story_col_indices),
),
row_index=Series(user_row_indices.index.values, index=user_row_indices),
col_index=Series(story_col_indices.index.values, index=story_col_indices),
metadata={"# Rows (users)": len(users), "# Cols (stories)": len(stories)},
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
def make_activity_stats_job(asset_group: AssetGroup) -> JobDefinition:
return asset_group.build_job(
name="activity_stats",
selection=["comment_daily_stats", "story_daily_stats", "activity_daily_stats"],
selection=[
"comment_daily_stats",
"story_daily_stats",
"activity_daily_stats",
"activity_forecast",
],
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
"parquet_io_manager": s3_partitioned_parquet_io_manager,
"warehouse_io_manager": snowflake_io_manager_prod,
"pyspark": configured_pyspark,
"warehouse_loader": snowflake_io_manager_prod,
"hn_client": hn_api_subsample_client.configured({"sample_rate": 10}),
"dbt": dbt_prod_resource,
}
Expand All @@ -70,7 +69,6 @@
"parquet_io_manager": s3_partitioned_parquet_io_manager,
"warehouse_io_manager": snowflake_io_manager_staging,
"pyspark": configured_pyspark,
"warehouse_loader": snowflake_io_manager_staging,
"hn_client": hn_api_subsample_client.configured({"sample_rate": 10}),
"dbt": dbt_staging_resource,
}
Expand All @@ -80,7 +78,6 @@
"parquet_io_manager": local_partitioned_parquet_io_manager,
"warehouse_io_manager": local_partitioned_parquet_io_manager,
"pyspark": configured_pyspark,
"warehouse_loader": snowflake_io_manager_prod,
"hn_client": hn_snapshot_client,
"dbt": ResourceDefinition.none_resource(),
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(self, config):
self._config = config

def handle_output(self, context: OutputContext, obj: Union[PandasDataFrame, SparkDataFrame]):
schema, table = "hackernews", context.asset_key.path[-1]
schema, table = DB_SCHEMA, context.asset_key.path[-1]

time_window = context.asset_partitions_time_window if context.has_asset_partitions else None
with connect_snowflake(config=self._config, schema=schema) as con:
Expand All @@ -83,6 +83,15 @@ def handle_output(self, context: OutputContext, obj: Union[PandasDataFrame, Spar
yield from self._handle_spark_output(obj, schema, table)
elif isinstance(obj, PandasDataFrame):
yield from self._handle_pandas_output(obj, schema, table)
elif obj is None: # dbt
config = dict(SHARED_SNOWFLAKE_CONF)
config["schema"] = DB_SCHEMA
with connect_snowflake(config=config) as con:
df = read_sql(f"SELECT * FROM {context.name} LIMIT 5", con=con)
num_rows = con.execute(f"SELECT COUNT(*) FROM {context.name}").fetchone()

yield MetadataEntry.md(df.to_markdown(), "Data sample")
yield MetadataEntry.int(num_rows, "Rows")
else:
raise Exception(
"SnowflakeIOManager only supports pandas DataFrames and spark DataFrames"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
],
)
def test_comment_stories(comments, stories, expected):
comments = DataFrame(comments, columns=["id", "parent", "by"])
comments = DataFrame(comments, columns=["id", "parent", "user_id"])
stories = DataFrame(stories, columns=["id"])
result = comment_stories.op(stories=stories, comments=comments)
expected = DataFrame(expected, columns=["comment_id", "story_id", "commenter_id"]).set_index(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ def test_user_story_matrix(comment_stories, expected):
comment_stories_df = DataFrame(
comment_stories, columns=["comment_id", "story_id", "commenter_id"]
)
indexed_matrix = user_story_matrix.op(comment_stories=comment_stories_df)
indexed_matrix = user_story_matrix.op(comment_stories=comment_stories_df).value

assert indexed_matrix.matrix.toarray().tolist() == expected

0 comments on commit aa5712e

Please sign in to comment.