In [1]:
# default_exp pipeline

# Pipelines

<br>

### Imports

In [2]:
#exports
from powerdict import download, construct, update

import os
from typing import Any

from dagster import execute_pipeline, pipeline, solid, Field

<br>

### End-to-End Dataset Generation

We're now going to combine the dictionary generation steps into a pipeline using dagster, first we'll create the individual components.

In [3]:
#exports
@solid()
def download_source_data(context, raw_data_dir: str):
    try:
        download.download_opsd_power_plants_data(raw_data_dir)
        context.log.info('The latest source data was successfully retrieved')
    except:
        context.log.info('Source data could not be updated, will proceed with existing raw data sources')
    
    return 

@solid()
def construct_intermediate_dataset(_, definitions_dir: str, raw_data_dir: str, intermediate_data_dir: str) -> Any:
    df = construct.construct_output_df(definitions_dir, raw_data_dir)
    df.to_csv(f'{intermediate_data_dir}/power_stations.csv')
    
    return df

@solid()
def update_dataset_updates(_, df: Any, updates_data_dir: str) -> Any:
    df = update.apply_updates(df, updates_data_dir)
    
    return df

@solid()
def clean_output_dataset(_, df: Any, definitions_dir: str) -> Any:
    df = update.check_and_apply_output_defs(df, definitions_dir)
    
    return df

@solid()
def save_output_dataset(_, df: Any, output_data_dir: str):
    if not os.path.exists(output_data_dir):
        os.makedirs(output_data_dir)
        
    df.to_csv(f'{output_data_dir}/power_stations.csv')
    
    return

<br>

Then we'll combine them in a pipeline

In [4]:
#exports
@pipeline
def generate_output_dataset_pipeline():  
    download_source_data()
    
    df = construct_intermediate_dataset()
    df = update_dataset_updates(df)
    df = clean_output_dataset(df)
    
    save_output_dataset(df)

<br>

Which we can then run with `execute_pipeline` whilst also specifying the run_config 

In [6]:
run_config = {
    'solids': {
        'download_source_data': {
            'inputs': {
                'raw_data_dir': '../data/raw'
            },
        },
        'construct_intermediate_dataset': {
            'inputs': {
                'definitions_dir': '../data/definitions',
                'raw_data_dir': '../data/raw',
                'intermediate_data_dir': '../data/intermediate'
            },
        },
        'update_dataset_updates': {
            'inputs': {
                'updates_data_dir': '../data/updates'
            },
        },
        'clean_output_dataset': {
            'inputs': {
                'definitions_dir': '../data/definitions'
            },
        },
        'save_output_dataset': {
            'inputs': {
                'output_data_dir': '../data/output'
            },
        },
    }
}

execute_pipeline(generate_output_dataset_pipeline, run_config=run_config)

[32m2021-07-06 09:06:27[0m - dagster - [34mDEBUG[0m - generate_output_dataset_pipeline - 12c39666-77ee-4296-8f76-28716b97c61a - 8424 - ENGINE_EVENT - Starting initialization of resources [asset_store].
[32m2021-07-06 09:06:27[0m - dagster - [34mDEBUG[0m - generate_output_dataset_pipeline - 12c39666-77ee-4296-8f76-28716b97c61a - 8424 - ENGINE_EVENT - Finished initialization of resources [asset_store].
[32m2021-07-06 09:06:27[0m - dagster - [34mDEBUG[0m - generate_output_dataset_pipeline - 12c39666-77ee-4296-8f76-28716b97c61a - 8424 - PIPELINE_START - Started execution of pipeline "generate_output_dataset_pipeline".
[32m2021-07-06 09:06:27[0m - dagster - [34mDEBUG[0m - generate_output_dataset_pipeline - 12c39666-77ee-4296-8f76-28716b97c61a - 8424 - ENGINE_EVENT - Executing steps in process (pid: 8424)
[32m2021-07-06 09:06:27[0m - dagster - [34mDEBUG[0m - generate_output_dataset_pipeline - 12c39666-77ee-4296-8f76-28716b97c61a - 8424 - construct_intermediate_dataset.com

<dagster.core.execution.results.PipelineExecutionResult at 0x21834141970>

In [None]:
#hide
from nbdev.export import *
notebook2script()