# Dagster Example

This is an example in the notebook.

In [8]:
from dagster import (
    InputDefinition,
    Output,
    OutputDefinition,
    execute_pipeline,
    pipeline,
    solid
)
from dagster_pandas import DataFrame

from darts import models
import pandas as pd

from soam.forecaster import Forecaster
from soam.savers import CSVSaver

In [10]:
@solid(output_defs=[OutputDefinition(name="forecaster")])
def crete_model(context) -> Forecaster:
    my_model = models.Prophet(weekly_seasonality=False, daily_seasonality=False)
    forecaster = Forecaster(my_model)
    return forecaster

In [None]:
@solid(output_defs=[OutputDefinition(name="retail_dataset", dagster_type=DataFrame)])
def read_csv(context):
    return pd.read_csv("./example_retail_sales.csv")

In [None]:
@solid(input_defs=[
       InputDefinition("forecaster"),
       InputDefinition("retail_dataset")
       ])
def forecast_save(context, forecaster, retail_dataset):
    prediction = forecaster.run(raw_series=retail_dataset, output_length=7)
    prediction.to_csv("./example_retail_predictions.csv", index=False)

In [14]:
@pipeline
def serial_pipeline():
    forecast_save(crete_model(),read_csv())

In [15]:
execute_pipeline(serial_pipeline)

2020-08-21 10:23:11 - dagster - DEBUG - serial_pipeline - a0f411d7-c849-4bfe-a26a-adf3db748551 - PIPELINE_START - Started execution of pipeline "serial_pipeline".
                 pid = 7672
2020-08-21 10:23:11 - dagster - DEBUG - serial_pipeline - a0f411d7-c849-4bfe-a26a-adf3db748551 - ENGINE_EVENT - Executing steps in process (pid: 7672)
 event_specific_data = {"error": null, "marker_end": null, "marker_start": null, "metadata_entries": [["pid", null, ["7672"]], ["step_keys", null, ["['crete_model.compute', 'read_csv.compute', 'forecast_save.compute']"]]]}
                 pid = 7672
2020-08-21 10:23:11 - dagster - DEBUG - serial_pipeline - a0f411d7-c849-4bfe-a26a-adf3db748551 - STEP_START - Started execution of step "crete_model.compute".
                 pid = 7672
               solid = "crete_model"
    solid_definition = "crete_model"
            step_key = "crete_model.compute"
2020-08-21 10:23:11 - dagster - DEBUG - serial_pipeline - a0f411d7-c849-4bfe-a26a-adf3db748551 - STEP

<dagster.core.execution.results.PipelineExecutionResult at 0x7f909de31210>