In [1]:
#default_exp pipeline

# Pipeline

<br>

### Imports

In [2]:
#exports
import numpy as np
import pandas as pd

from dagster import execute_pipeline, pipeline, solid, Field

from batopt import clean

<br>

### End-to-End

We're now going to combine these steps into a pipeline using dagster, first we'll create the individual components.

In [13]:
@solid()
def load_data(_, raw_data_dir: str):
    loaded_data = dict()
    
    loaded_data['pv'] = clean.load_training_dataset(raw_data_dir, 'pv')
    loaded_data['demand'] = clean.load_training_dataset(raw_data_dir, 'demand')
    loaded_data['weather'] = clean.load_training_dataset(raw_data_dir, 'weather', dt_idx_freq='H')
    
    return loaded_data

@solid()
def clean_data(_, loaded_data, intermediate_data_dir: str):
    # Cleaning
    cleaned_data = dict()

    cleaned_data['pv'] = (loaded_data['pv']
                          .pipe(clean.interpolate_missing_panel_temps, loaded_data['weather'])
                          .pipe(clean.interpolate_missing_site_irradiance, loaded_data['weather'])
                          .pipe(clean.interpolate_missing_site_power)
                         )
    cleaned_data['weather'] = clean.interpolate_missing_weather_solar(loaded_data['pv'], loaded_data['weather'])
    cleaned_data['demand'] = loaded_data['demand']
    
    
    # Saving
    cleaned_data['pv'].to_csv(f'{intermediate_data_dir}/cleaned_pv.csv')
    cleaned_data['demand'].to_csv(f'{intermediate_data_dir}/cleaned_demand.csv')
    cleaned_data['weather'].to_csv(f'{intermediate_data_dir}/cleaned_weather.csv')
            
    return cleaned_data

<br>

Then we'll combine them in a pipeline

In [14]:
@pipeline
def end_to_end_pipeline(): 
    loaded_data = load_data()
    cleaned_data = clean_data(loaded_data)

<br>

Which we'll now run a test with

In [15]:
run_config = {
    'solids': {
        'load_data': {
            'inputs': {
                'raw_data_dir': '../data/raw',
            },
        },
        'clean_data': {
            'inputs': {
                'intermediate_data_dir': '../data/intermediate',
            },
        },
    }
}

execute_pipeline(end_to_end_pipeline, run_config=run_config)

[32m2021-01-29 01:28:26[0m - dagster - [34mDEBUG[0m - end_to_end_pipeline - dcacb44f-a4a2-4759-be05-a750516d08a5 - 912 - ENGINE_EVENT - Starting initialization of resources [asset_store].
[32m2021-01-29 01:28:26[0m - dagster - [34mDEBUG[0m - end_to_end_pipeline - dcacb44f-a4a2-4759-be05-a750516d08a5 - 912 - ENGINE_EVENT - Finished initialization of resources [asset_store].
[32m2021-01-29 01:28:26[0m - dagster - [34mDEBUG[0m - end_to_end_pipeline - dcacb44f-a4a2-4759-be05-a750516d08a5 - 912 - PIPELINE_START - Started execution of pipeline "end_to_end_pipeline".
[32m2021-01-29 01:28:26[0m - dagster - [34mDEBUG[0m - end_to_end_pipeline - dcacb44f-a4a2-4759-be05-a750516d08a5 - 912 - ENGINE_EVENT - Executing steps in process (pid: 912)
[32m2021-01-29 01:28:26[0m - dagster - [34mDEBUG[0m - end_to_end_pipeline - dcacb44f-a4a2-4759-be05-a750516d08a5 - 912 - load_data.compute - STEP_START - Started execution of step "load_data.compute".
[32m2021-01-29 01:28:26[0m - dagster

<dagster.core.execution.results.PipelineExecutionResult at 0x21b565a2ca0>

<br>

Finally we'll export the relevant code to our `batopt` module

In [None]:
#hide
from nbdev.export import notebook2script
    
notebook2script()