In [None]:
import datetime as dt
import io

import requests
import dagster
import pandas as pd
import plotly
import plotly.express as px
from pathlib import Path

## Local example setup
We have a **fake source data server** running on http://localhost:8000 (using _python -m http.server 8000_)

In [None]:
response = requests.get("http://localhost:8000/metadata.json")
response.json()

In [None]:
response = requests.get("http://localhost:8000/2023-01-01.csv")
pd.read_csv(io.BytesIO(response.content))

## Custom Resources
We can define our own resources to pass to the op.

In [None]:
class CoolApiClient:
    def __init__(self, root_url: str):
        self.root_url = root_url

    def get_metadata(self) -> dict:
        response = requests.get(f"{self.root_url}/metadata.json")
        response.raise_for_status()
        return response.json()

    def get_month_data(self, month: str) -> pd.DataFrame:
        response = requests.get(f"{self.root_url}/{month}.csv")
        response.raise_for_status()
        return pd.read_csv(io.BytesIO(response.content), parse_dates=["Timestamp"])

In [None]:
api_client = CoolApiClient("http://localhost:8000")

In [None]:
api_client.get_metadata()

In [None]:
api_client.get_month_data("2023-01-01")

## Writing our Op

In [None]:
@dagster.op(required_resource_keys={"api_client"})
def generate_monthly_summary(
    context: dagster.OpExecutionContext,
    month: str,
) -> pd.Series:
    # grab the custom resource from the context and use it to get data
    api_client = context.resources.api_client
    data = api_client.get_month_data(month)

    # the context also has access to the dagster logger
    context.log.debug(f"retrieved {data.shape[0]} rows")
    
    # our awesome business logic (monthly cumulative energy per turbine)
    ser = (
        data.groupby("TurbineName")
        ["Power (kW)"]
        .sum()
        .div(6) # kW-10min to kW-h
        .rename("Energy (kWh)")
    )
    
    # returning the result directly, we'll handle the write using the `IOManager`
    return ser

## Testing
being able to inject dependencies makes the tests easy to setup, without need for patching

In [None]:
from unittest.mock import Mock

def test_generate_monthly_summary():
    # setup mocks
    fake_data =  pd.DataFrame({
        "Timestamp": pd.date_range("2023-01-01", periods=3, freq="10Min"),
        "TurbineName": "1",
        "Wind speed (m/s)": [5,2,3],
        "Power (kW)": [500.,200.,300.],
    })
    mock_api_client = Mock(spec=CoolApiClient)
    mock_api_client.get_month_data.return_value = fake_data
    
    # build the context with mocked resources
    context = dagster.build_op_context(resources={"api_client": mock_api_client})
    
    # run the op with the mocked context
    actual = generate_monthly_summary(context, month="2023-01-01")
    
    # check results
    expected_idx = pd.Series("1", name="TurbineName")
    expected = pd.Series([1000/6], name="Energy (kWh)", index=expected_idx)
    pd.testing.assert_series_equal(actual, expected, check_freq=False)
    
test_generate_monthly_summary()

## Local Run
when using it against a real local resource we just need to pass a real instance of the api client

In [None]:
local_api_client = CoolApiClient(root_url='http://localhost:8000')
context = dagster.build_op_context(resources={"api_client": local_api_client})

monthly_energy_df = generate_monthly_summary(context, month="2023-01-01")

monthly_energy_df

## Defining a second Op
We'll define a second op that generates an image based on the output of the previous op

In [None]:
@dagster.op(required_resource_keys={"api_client"}, out=dagster.Out(io_manager_key="html_io_manager"))
def generate_map_visualisation(
    context: dagster.OpExecutionContext,
    monthly_energy_by_turbine: pd.Series,
) -> str:
    api_client = context.resources.api_client
    metadata = api_client.get_metadata()

    # preparing data for map
    map_df = pd.concat(
        [pd.DataFrame(metadata).set_index("TurbineName"), monthly_energy_by_turbine], axis=1
    ).reset_index()

    # create map
    fig = px.scatter_mapbox(
        map_df,
        lat="Latitude",
        lon="Longitude",
        hover_name="TurbineName",
        mapbox_style="open-street-map",
        zoom=12,
        size="Energy (kWh)",
        color="Energy (kWh)",
    )

    return fig.to_html()

## Defining a custom IOManager

In [None]:
class MyLocalHtmlIoManager(dagster.IOManager):
    def __init__(self, directory: Path):
        self.directory = directory

    def handle_output(self, context: dagster.OutputContext, obj: str) -> None:
        fp = Path(self.directory) / "viz.html"
        with open(fp, "w", encoding="utf8") as f:
            f.write(obj)
        context.log.info(f"HTML file stored here: {fp}")

    def load_input(self, context: dagster.InputContext) -> None:
        raise NotImplementedError()

# Define the graph and job

In [None]:
@dagster.graph
def my_processing_graph() -> None:
    energy_df = generate_monthly_summary()
    generate_map_visualisation(monthly_energy_by_turbine=energy_df)

In [None]:
processing_job = my_processing_graph.to_job(
    resource_defs={
        "api_client": local_api_client,
        "html_io_manager": MyLocalHtmlIoManager(directory="./data/output-viz"),
    }
)

we can even run it in process memory to see there are no failures

In [None]:
result = processing_job.execute_in_process(
    run_config={'ops': {'generate_monthly_summary': {'inputs': {'month': '2023-01-01'}}}}
)

In [None]:
from IPython.display import HTML

HTML("./data/output-viz/viz.html")
# which is the same as `HTML(result.output_for_node('generate_map_visualisation'))`

## Schedules 
you can create an define schedule to run the job by defining a ScheduleDefinition.

You can use cron syntax "0 0 * * *" but it also accepts "@hourly", "@daily", "@weekly", and "@monthly"

**NOTE**: you need the dagster-deamon running

In [None]:
dagster.ScheduleDefinition(job=processing_job, cron_schedule="@monthly", execution_timezone="UTC")

## Partitions & Hooks

In [None]:
@dagster.monthly_partitioned_config(start_date=dt.datetime(2023, 1, 1))
def my_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict:
    return {"ops": {"generate_monthly_summary": {"inputs": {"month": start.strftime("%Y-%m-%d")}}}}

In [None]:
@dagster.failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: dagster.HookContext) -> None:
    context.resources.slack.get_client().chat_postMessage(
        channel="#demo",
        text=FAILURE_MESSAGE_TEMPLATE.format(context=context),
    )

In [None]:
from dagster_slack import SlackResource  # <-- one of the many pre-written integrations

my_processing_graph.to_job(
    config=my_partitioned_config,       # new
    hooks={slack_message_on_failure},   # new
    resource_defs={
        "slack": SlackResource(token=dagster.EnvVar("SLACK_TOKEN")),  # new
        "api_client": local_api_client,
        "html_io_manager": MyLocalHtmlIoManager(directory="./data/output-viz"),
    },
)

## What about the UI?
to demo partitions and failure hooks it might be easier to jump to the UI..
http://localhost:3000/locations/windy/jobs/my_processing_graph