# Build and Execute Kubeflow Pipeline

## Build Pipeline

In [15]:
# Upgrade pip, install kfp, and restart kernel

!python -m pip install --user --upgrade pip
!pip install --user --upgrade kfp
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Requirement already up-to-date: pip in ./.local/lib/python3.7/site-packages (20.2.4)
Requirement already up-to-date: kfp in ./.local/lib/python3.7/site-packages (1.0.4)


In [2]:
# Import Kubeflow SDK
import os
import sys
import kfp
import kfp.dsl as dsl
import kfp.components as comp

### Build Container Components

#### Load Data

***NOTE -*** Intentionally not importing the `sts.data.loader.load_california_electricity_demand` function here to demonstrate using a base image and installing packages.

In [38]:
def load_data(data_path, api_key):
    
    # since we are using a base python Docker image for this component, we need to manually install dependencies
    # the other option would be to create Docker image specifically for each component in our pipeline (which we'll do later)
    
    # defining the install function
    import os
    import json
    import subprocess
    def install(name):
        subprocess.call(['pip', 'install', name])
        
    # install load_data dependencies
    install('pandas==1.1.0')
    install('requests==2.22.0')
    
    import pandas as pd
    import requests
    
    def read_or_download_data(filepath, api_key_env):

        if os.path.exists(filepath):
            data = read_json(filepath)
        else:
            api_key = try_get_env(api_key_env)
            response_json = fetch_california_demand(api_key)  
            write_json(response_json, filepath)
            data = read_json(filepath)
            
            print(f'------- SUCCESSFULLY SAVED DATE TO: {filepath} -------')

        return data

    def read_json(file):
        with open(file) as f:
            data = json.load(f)
        return data


    def write_json(data, filepath):
        with open(filepath, 'w') as file:
            json.dump(data, file)


    def try_get_env(api_key_env):
        env = os.getenv(api_key_env)
        if env:
            return env
        else:
            print('Please provide a valid EIA_API_KEY environment variable.')
            return None


    def fetch_california_demand(api_key):
        r = requests.get(
            'http://api.eia.gov/series',
            params={
                'api_key': api_key,
                'series_id': 'EBA.CAL-ALL.D.H',
                'out': 'json'
            }
        )
        return r.json()


    def json_to_df(data):
        df = pd.DataFrame(data['series'][0]['data'])
        return df
    
    # ---------------- EXECUTION -----------------
    
    data = read_or_download_data(data_path+'/demand.json', api_key)

    return



In [None]:
def preprocess_data(data_path):
    
    # defining the install function
    import subprocess
    def install(name):
        subprocess.call(['pip', 'install', name])
        
    # install load_data dependencies
    install('pandas==1.1.0')
    
    import json
    import pickle
    import pandas as pd
    
    def json_to_df(data):
        df = pd.DataFrame(data['series'][0]['data'])
        return df
    
    
    # ---------------- EXECUTION -----------------
    
    with open(data_path+'/demand.json') as f:
        data = json.load(f)
    
    df = (
        json_to_df(data)
        .rename(columns={0: 'ds', 1: 'y'})
        .assign(ds=utc_to_pst)
        .assign(ds=lambda df: df.ds.dt.tz_localize(None))
        .sort_values('ds')
    )
    
    with open(data_path+'/data_df.pkl', 'wb') as f:
        pickle.dump(df, f)

In [None]:
def fit_forecast_simple_model(data_path):
    
    # defining the install function
    import subprocess
    def install(name):
        subprocess.call(['pip', 'install', name])
        
    # install load_data dependencies
    install('pandas==1.1.0')
    install('fbprophet==0.6')
    
    import os
    import pickle
    import pandas as pd
    from fbprophet import Prophet

    def default_prophet_model(df):
        model = Prophet()
        model.fit(df)
        return model
    
    # ---------------- EXECUTION -----------------
    
    with open(data_path+'/data_df.pkl', 'rb') as f:
        df = pickle.load(f)
    
    model = default_prophet_model(df)

    future = model.make_future_dataframe(periods = 8760, freq='H')
    forecast = model.predict(future)
    
    # Write the forecast values to csv
    FORCAST_DIR = data_path+'/forecasts'
    if not os.path.exists(FORCAST_DIR):
        os.makedirs(FORCAST_DIR)

    forecast[['ds', 'yhat']].to_csv(FORCAST_DIR + 'prophet_simple.csv', index=False)
    

In [39]:
# Create load, preprocess, train components
load_data_op = comp.func_to_container_op(load_data, base_image='python:3.6.9')
preprocess_data_op = comp.func_to_container_op(preprocess_data, base_image='python:3.6.9')



### Build Kubeflow Pipeline

In [40]:
# Define the pipeline
@dsl.pipeline(
    name='Structural Time-Series Pipeline',
    description='A step-by-step pipeline for forcasting California energy demand.'
)

# Define pipeline function and input parameters
def sts_container_pipeline(
    data_path,
    api_key
):
    
    # Define and create a volume to share between pipeline components
    vop = dsl.VolumeOp(
        name='create_volume_arr',
        resource_name='sts-data-volume',
        size='0.5Gi',
        modes=dsl.VOLUME_MODE_RWO
        )
    
    # Add load_data component
    load_data_container = load_data_op(data_path, api_key) \
                                .add_pvolumes({data_path: vop.volume})
    
    # Add preprocess_data component
    preprocess_data_container = preprocess_data_op(data_path) \
                                .add_pvolumes({data_path:load_data_container.pvolume})
    
    


### Run Pipeline

In [41]:
DATA_PATH = '/mnt'
API_KEY = 'xyz'

In [42]:
pipeline_func = sts_container_pipeline

In [43]:
experiment_name = 'sts_loaddata_test'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
             "api_key":API_KEY}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,f'{experiment_name}.zip')

In [44]:
client = kfp.Client(host='2dba4b335ce47b69-dot-us-east1.pipelines.googleusercontent.com')

In [45]:
# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  run_name=run_name,
                                                  experiment_name=experiment_name,
                                                  arguments=arguments)

In [94]:
print(os.environ.get('KF_PIPELINES_ENDPOINT_ENV'))

None


In [7]:
!echo $PATH

/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin


In [None]:
!echo $PATH

In [None]:
module_path = os.path.abspath(os.path.join('../'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [15]:
os.path.abspath(os.path.join('../'))

'/home/jovyan/structural-time-series-vol-1/structural-time-series'

In [12]:
os.os.path.join('../')

'../'

In [None]:
sys.path.append()

In [9]:
!pwd

/home/jovyan/structural-time-series-vol-1/structural-time-series/nbs


In [11]:
os.getcwd()

'/home/jovyan/structural-time-series-vol-1/structural-time-series/nbs'

In [None]:
def load_data(data_path):
    
    # since we are using a base tensorflow/python Docker image, we need to manually install dependencies
    # the other option would be to create Docker image specifically for each component in our pipeline
    
    # defining the install function
    import subprocess
    def install(name):
        subprocess.call(['pip', 'install', name])
    
    # func_to_container_op requires packages to be imported inside of the function