Skip to content

Commit

Permalink
Example of usage of Branching I/O Manager (#11549)
Browse files Browse the repository at this point in the history
### Summary & Motivation

### How I Tested These Changes
  • Loading branch information
schrockn committed Jan 13, 2023
1 parent cfabdb6 commit 8abea5d
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
prod_dagster_home/
prod_storage/
dev_dagster_home/
my-featured-branch_storage/
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
rm -rf dev_dagster_home
rm -rf prod
rm -rf prod_dagster_home
rm -rf my-feature-branch_storage

mkdir dev_dagster_home
mkdir prod
mkdir prod_dagster_home
mkdir my-feature-branch_storage
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
rm -rf dev_dagster_home
rm -rf my-feature-branch_storage

mkdir dev_dagster_home
mkdir my-feature-branch_storage
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import base64
from io import BytesIO

import matplotlib.pyplot as plt
import pandas as pd
import requests
from tqdm import tqdm
from wordcloud import STOPWORDS, WordCloud


def extract() -> pd.DataFrame:
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
hackernews_topstory_ids = requests.get(newstories_url).json()

results = []
for item_id in tqdm(hackernews_topstory_ids[:100]):
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)

hackernews_topstories = pd.DataFrame(results)

return hackernews_topstories


def transform(hackernews_topstories: pd.DataFrame) -> bytes:
stopwords = set(STOPWORDS)
stopwords.update(["Ask", "Show", "HN"])
titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)

# Generate the word cloud image
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(titles_cloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)

# Save the image to a buffer
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
return f"""
# Wordcloud of top Hacker News stories
![img](data:image/png;base64,{image_data.decode()})
""".strip()


def load(md_content: str):
with open("output.md", "w") as f:
f.write(md_content)


if __name__ == "__main__":
input = extract()
output = transform(input)
load(output)
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os

import pandas
from dagster import (
AssetKey,
AssetMaterialization,
DagsterInstance,
Definitions,
OpExecutionContext,
Output,
asset,
file_relative_path,
job,
op,
)
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.storage.branching.branching_io_manager import BranchingIOManager
from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
from hackernews import extract, transform


@op
def replicate_asset_materializations_for_group(context: OpExecutionContext):
assets_to_ship = ["hackernews_source_data", "hackernews_wordcloud", "process_wordcloud"]

prod_instance = DagsterInstance.from_config(file_relative_path(__file__, "prod_dagster_home/"))

for asset_key_str in assets_to_ship:
asset_key = AssetKey(asset_key_str)
context.log.info(
f"About to call get_latest_materialization_event on {asset_key} in prod instance."
)
latest_event_log_entry = prod_instance.get_latest_materialization_event(asset_key)
if latest_event_log_entry:
asset_mat = latest_event_log_entry.asset_materialization
if asset_mat:
context.log.info(
f"Inserting {latest_event_log_entry} into dev with metadata"
f" {asset_mat.metadata}"
)
parent_run_id = latest_event_log_entry.run_id
parent_run_url = f"http://127.0.0.1:3000/runs/{parent_run_id}"
yield AssetMaterialization(
asset_key=asset_key_str,
description=asset_mat.description,
metadata={
**asset_mat.metadata,
**{
"parent_asset_catalog_url": MetadataValue.url(
f"http://127.0.0.1:3000/assets/{asset_key_str}"
),
"parent_run_id": parent_run_id,
"parent_run_url": MetadataValue.url(parent_run_url),
"parent_timestamp": latest_event_log_entry.timestamp,
},
},
# metadata_entries=asset_mat.metadata_entries,
partition=asset_mat.partition,
tags=asset_mat.tags,
)
else:
context.log.info(f"Did not find entry in catalog for {asset_key_str} in prod instance.")

yield Output(value=None)


@job
def ship_asset_materializations():
replicate_asset_materializations_for_group()


@asset
def hackernews_source_data():
return extract()


@asset
def hackernews_wordcloud(hackernews_source_data: pandas.DataFrame):
return transform(hackernews_source_data)


@asset
def process_wordcloud(hackernews_wordcloud):
# do something
return hackernews_wordcloud


def is_prod():
return os.getenv("DAGSTER_DEPLOYMENT") == "prod"


DEFAULT_BRANCH_NAME = "dev"


def get_branch_name():
assert not is_prod()
return os.getenv("DAGSTER_DEPLOYMENT", DEFAULT_BRANCH_NAME)


def construct_branching_io_manager(get_prod_io_manager, get_branch_io_manager):
if is_prod():
return get_prod_io_manager()

branch_name = get_branch_name()

prod_io_manager = get_prod_io_manager()

return BranchingIOManager(
parent_io_manager=prod_io_manager,
branch_io_manager=get_branch_io_manager(branch_name),
branch_name=branch_name,
)


def get_prod_io_manager():
return PickledObjectFilesystemIOManager(base_dir="prod_storage")


def get_branch_io_manager(branch_name: str):
branch_storage_folder = f"{branch_name}_storage"
return PickledObjectFilesystemIOManager(base_dir=branch_storage_folder)


dev_defs = Definitions(
assets=[hackernews_source_data, hackernews_wordcloud, process_wordcloud],
jobs=[] if is_prod() else [ship_asset_materializations],
resources={
"io_manager": construct_branching_io_manager(get_prod_io_manager, get_branch_io_manager)
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
HOME_DIR=$(pwd)
DAGSTER_DEPLOYMENT=my-featured-branch DAGSTER_HOME=$HOME_DIR/dev_dagster_home/ dagit -f hn_dagster.py -p 3001
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# home_dir = `pwd`
# echo `pwd`
HOME_DIR=$(pwd)
DAGSTER_DEPLOYMENT=prod DAGSTER_HOME=$HOME_DIR/prod_dagster_home/ dagit -f hn_dagster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dagster
dagit
matplotlib
pandas
wordcloud

0 comments on commit 8abea5d

Please sign in to comment.