# This pipeline is meant to be run on a kubernetes cluster only, Do not run on your local machine directly to avoid causing complications in your development environment

# Import the needed libraries

### Run this notebook on  a kubernetes cluster  using minikube, multipass virutal machine with microk8s installed (on laptop or personal computer)  or a cloud native kubernetes engine

In [1]:
!python -m pip install --user --upgrade pip

!pip3 install pandas==0.24.2 matplotlib==3.2.2 scipy==1.4.1 statsmodels==0.12.0 scikit-learn==0.23.1 tensorflow==2.1.0 keras==2.3.1 --user

In [None]:
!pip3 install kfp --upgrade --user

In [7]:
#Check if the install was successful

!which dsl-compile

# Build the components

In [1]:
# Import Kubeflow SDK
import kfp
import kfp.dsl as dsl
import kfp.components as comp

# where the outputs are stored
data_path = "pipe_data"

# Data Injestion

In [6]:
def data_injestion(data_path):
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])

    import pandas as pd
    
    df_weather = pd.read_csv("https://raw.githubusercontent.com/Chizzy-codes/08-weather-conditions/master/data/Cleanedweathersummary.csv")
  
    #Save the injested data as a pickle file to be used by the data tranformation component.
    with open(f'{data_path}/inj_data', 'wb') as f:
        pickle.dump(df_weather, f)

In [7]:
idata = data_injestion(data_path)

  if (await self.run_code(code, result,  async_=asy)):


# Data Transformation

In [8]:
def data_transformation(data_path):
    
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.23.1'])
    
    from math import sqrt
    from numpy import concatenate
    from pandas import read_csv, DataFrame, concat
    from sklearn.preprocessing import MinMaxScaler, LabelEncoder
    from sklearn.metrics import mean_squared_error

    import pandas as pd
    import numpy as np
    
    import pickle

    
    # Load and unpack the test_data
    with open(f'{data_path}/inj_data','rb') as f:
        data = pickle.load(f)
    df_weather = data
    df_weather.drop(['YR','MO','DA'], axis=1,inplace = True)

    from datetime import datetime
    df_weather['datetime'] = pd.to_datetime(df_weather['Date'])
    # setting STA as the index to select APAPA weather station 
    df_weather = df_weather.set_index('STA')
    df_weather.drop(['Date'], axis=1, inplace=True)

    df_weather = df_weather.loc['30001']
    df_weather = df_weather.set_index('datetime')

    df_weather.drop(['Snowfall','PoorWeather','Precip'], axis=1,inplace = True)

    n_obs = 7
    X_train, X_test = df_weather[0:-n_obs], df_weather[-n_obs:]
    # Check size
    print(X_train.shape)
    print(X_test.shape)


    # convert series to supervised learning
    def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
        n_vars = 1 if type(data) is list else data.shape[1]
        df = DataFrame(data)
        cols, names = list(), list()
        # input sequence (t-n, ... t-1)
        for i in range(n_in, 0, -1):
            cols.append(df.shift(i))
            names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
        # forecast sequence (t, t+1, ... t+n)
        for i in range(0, n_out):
            cols.append(df.shift(-i))
            if i == 0:
                names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
            else:
                names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
        # put it all together
        agg = concat(cols, axis=1)
        agg.columns = names
        # drop rows with NaN values
        if dropnan:
            agg.dropna(inplace=True)
        return agg


    values = df_weather.values
    values = values.astype("float32")

    # normalize features
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled = scaler.fit_transform(values)

    # frame as supervised learning
    reframed = series_to_supervised(scaled, 1, 1)

    #Splitting values into train and test sets 
    n_obs1=14
    n_obs=7
    values_r = reframed.values
    train = values_r[:-n_obs1,:]
    val =values_r[-n_obs1:-n_obs,:]
    test =values_r[-n_obs:,:]

    # split into input and outputs
    train_X, train_y = train[:, :-3], train[:, -3:]
    val_X, val_y = val[:, :-3], val[:, -3:]
    test_X, test_y = test[:, :-3], test[:, -3:]

    # reshape input to be 3D [samples, timesteps, features] for LSTM algorithms 
    train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
    test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
    val_X = val_X.reshape((val_X.shape[0], 1, val_X.shape[1]))
    print(train_X.shape, train_y.shape, test_X.shape, test_y.shape,val_X.shape,val_y.shape)
    
    
    #Save the preprocessed data as a pickle file to be used by the model component.
    with open(f'{data_path}/preprocessed_data', 'wb') as f:
        pickle.dump((train_X, train_y, test_X, test_y, val_X, val_y, values, X_test), f)

In [9]:
new_data = data_transformation(data_path)

(515, 3)
(7, 3)
(507, 1, 3) (507, 3) (7, 1, 3) (7, 3) (7, 1, 3) (7, 3)


# Model Building and Training

In [10]:
def train(data_path):
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.23.1'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'keras==2.3.1'])
    
    import pickle
    import pandas as pd
    from math import sqrt
    from numpy import concatenate
    from pandas import read_csv, DataFrame, concat
    from sklearn.preprocessing import MinMaxScaler, LabelEncoder

    from keras.models import Sequential
    from keras.layers import Dense, LSTM, Bidirectional, Dropout
    
    with open(f'{data_path}/preprocessed_data','rb') as f:
        data = pickle.load(f)
        
    train_X, train_y, test_X, test_y, val_X, val_y, values, X_test = data
    
    #model creation
    model = Sequential()
    model.add(Bidirectional(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2]))))
    model.add(Dropout(rate=0.5))
    model.add(Dense(3))
    model.compile(loss='mae', optimizer='adam')
    
    #Fitting the model
    history = model.fit(train_X, train_y, epochs=50, batch_size=30, validation_data=(val_X, val_y), verbose=2, shuffle=False)
    
    #Save the model to the designated 
    model.save(f'{data_path}/model')
    
    # make a prediction
    forecast = model.predict(test_X)
    
    # invert scaling for forecast
    test_X = test_X.reshape((test_X.shape[0], test_X.shape[2]))
    
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaler.fit(values)
    
    inv_forecast = scaler.inverse_transform(forecast)
    forecast_df= pd.DataFrame(inv_forecast,columns=["MaxTemp","MinTemp","MeanTemp"])
    forecast_df.set_index(X_test.index,inplace=True)
    
    true_values= scaler.inverse_transform(test_y)
    true_df= pd.DataFrame(true_values,columns=["MaxTemp","MinTemp","MeanTemp"])
    true_df.set_index(X_test.index,inplace=True)
    
    with open(f'{data_path}/final_data', 'wb') as f:
        pickle.dump((forecast_df, true_df), f)

In [11]:
built = train(data_path)

Train on 507 samples, validate on 7 samples
Epoch 1/50
 - 7s - loss: 0.4694 - val_loss: 0.4844
Epoch 2/50
 - 0s - loss: 0.3345 - val_loss: 0.3031
Epoch 3/50
 - 0s - loss: 0.1877 - val_loss: 0.1350
Epoch 4/50
 - 0s - loss: 0.1297 - val_loss: 0.1185
Epoch 5/50
 - 0s - loss: 0.1236 - val_loss: 0.1136
Epoch 6/50
 - 0s - loss: 0.1232 - val_loss: 0.1080
Epoch 7/50
 - 0s - loss: 0.1275 - val_loss: 0.1071
Epoch 8/50
 - 0s - loss: 0.1271 - val_loss: 0.1097
Epoch 9/50
 - 0s - loss: 0.1183 - val_loss: 0.1076
Epoch 10/50
 - 0s - loss: 0.1207 - val_loss: 0.1097
Epoch 11/50
 - 0s - loss: 0.1164 - val_loss: 0.1070
Epoch 12/50
 - 0s - loss: 0.1193 - val_loss: 0.1059
Epoch 13/50
 - 0s - loss: 0.1195 - val_loss: 0.1061
Epoch 14/50
 - 0s - loss: 0.1163 - val_loss: 0.1059
Epoch 15/50
 - 0s - loss: 0.1178 - val_loss: 0.1061
Epoch 16/50
 - 0s - loss: 0.1181 - val_loss: 0.1096
Epoch 17/50
 - 0s - loss: 0.1164 - val_loss: 0.1072
Epoch 18/50
 - 0s - loss: 0.1143 - val_loss: 0.1055
Epoch 19/50
 - 0s - loss: 0.1

# Model Validation

In [13]:
def validate(data_path):
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.24.2'])
    import pandas as pd
    from math import sqrt
    import numpy as np
    import pickle
    
    with open(f'{data_path}/final_data','rb') as f:
        data = pickle.load(f)
    forecast_df, true_df = data
    
    evaluation = []
    for forecast, actual in [(forecast_df['MaxTemp'].values, true_df['MaxTemp']), (forecast_df['MinTemp'].values, true_df['MinTemp']), (forecast_df['MeanTemp'].values, true_df['MeanTemp'])]:
    
        mape = np.mean(np.abs(forecast - actual)/np.abs(actual)) *100  # MAPE
        me = np.mean(forecast - actual)             # ME
        mae = np.mean(np.abs(forecast - actual))    # MAE
        mpe = np.mean((forecast - actual)/actual)   # MPE
        rmse = np.mean((forecast - actual)**2)**.5  # RMSE
        corr = np.corrcoef(forecast, actual)[0,1]   # corr
        mins = np.amin(np.hstack([forecast[:,None], 
                                  actual[:,None]]), axis=1)
        maxs = np.amax(np.hstack([forecast[:,None], 
                                  actual[:,None]]), axis=1)
        minmax = 1 - np.mean(mins/maxs)             # minmax
        display = {'mape':mape, 'me':me, 'mae': mae, 
                'mpe': mpe, 'rmse':rmse, 'corr':corr, 'minmax':minmax}
        
        evaluation.append(display)
        
        
    with open(f'{data_path}/result.txt', 'w') as f:
        f.write(" Evaluation: {}".format(evaluation))
    
    print('Successfully!')

In [14]:
result = validate(data_path)

[{'mape': 2.803053893148899, 'me': -0.6685578, 'mae': 0.8615728, 'mpe': -0.02160792, 'rmse': 0.9583256278903064, 'corr': 0.5207246156110829, 'minmax': 0.027966201305389404}]
[{'mape': 2.803053893148899, 'me': -0.6685578, 'mae': 0.8615728, 'mpe': -0.02160792, 'rmse': 0.9583256278903064, 'corr': 0.5207246156110829, 'minmax': 0.027966201305389404}, {'mape': 4.561843723058701, 'me': 0.18935367, 'mae': 1.0652953, 'mpe': 0.011268819, 'rmse': 1.4280704880208481, 'corr': 0.16666835849435638, 'minmax': 0.044041335582733154}]
[{'mape': 2.803053893148899, 'me': -0.6685578, 'mae': 0.8615728, 'mpe': -0.02160792, 'rmse': 0.9583256278903064, 'corr': 0.5207246156110829, 'minmax': 0.027966201305389404}, {'mape': 4.561843723058701, 'me': 0.18935367, 'mae': 1.0652953, 'mpe': 0.011268819, 'rmse': 1.4280704880208481, 'corr': 0.16666835849435638, 'minmax': 0.044041335582733154}, {'mape': 1.9842369481921196, 'me': -0.28008762, 'mae': 0.5449333, 'mpe': -0.009974812, 'rmse': 0.7521124732870128, 'corr': 0.48098

In [None]:
# Create components.
inj_op = comp.func_to_container_op(data_injestion , base_image = "tensorflow/tensorflow:latest-gpu-py3")
transformation_op = comp.func_to_container_op(data_transformation, base_image = "tensorflow/tensorflow:latest-gpu-py3")
train_op = comp.func_to_container_op(train, base_image = "tensorflow/tensorflow:latest-gpu-py3")
validate_op = comp.func_to_container_op(validate, base_image = "tensorflow/tensorflow:latest-gpu-py3")


# Build Kubeflow Pipelines

In [81]:
#Create a client to enable communication with the Pipelines API server.
client = kfp.Client()

In [None]:
# Define the pipeline
@dsl.pipeline(
    name='Weather Conditions Pipeline',
    description=
    'An ML pipeline that builds and validates a model for the weather conditions dataset.'
)
# Define parameters to be fed into pipeline
def weather_conditions_pipeline(data_path: str):

    # Define volume to share data between components.
    vop = dsl.VolumeOp(name="create_volume",
                       resource_name="data-volume",
                       size="1Gi",
                       modes=dsl.VOLUME_MODE_RWO)

    # Create data injestion component.
    injestion_container = inj_op(data_path) \
                                    .add_pvolumes({data_path: vop.volume})

    # Create data transformation component.
    transformation_container = transformation_op(data_path) \
                                    .add_pvolumes({data_path: injestion_container.pvolume})
    # Create model training component.
    train_container = train_op(data_path) \
                                    .add_pvolumes({data_path: transformation_container.pvolume})
    
    # Create model validation component.
    validate_container = validate_op(data_path) \
                                    .add_pvolumes({data_path: train_container.pvolume})
    

    # Print the result of the prediction
    validation_result_container = dsl.ContainerOp(
        name="print_validation_result",
        image='library/bash:4.4.23',
        pvolumes={data_path: validate_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt'])

In [None]:
DATA_PATH = data_path

pipeline_func = weather_conditions_pipeline

In [None]:
experiment_name = 'weather_conditions_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)