In [None]:
#!python -m pip install --user --upgrade pip
#!pip3 install pandas==0.23.4 matplotlib==3.0.3 scipy==1.2.1 scikit-learn==0.22 tensorflow==2.3 keras==2.4.3 --user

In [None]:
#!pip3 install kfp --upgrade --user

In [1]:
# import libraries for pipeline
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [2]:
# create  directory for outputs.
output_dir = "/home/jovyan/data/"

In [8]:
def data_download(store_path):
    
    import pickle
    import sys, subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "pandas"])
    import pandas as pd
    
    
    data_path = "http://archive.ics.uci.edu/ml/machine-learning-databases/00235/household_power_consumption.zip"
    
    subprocess.run(["wget", "-O", "household_power_consumption.zip", data_path ])
    subprocess.run(["unzip", "household_power_consumption.zip"])
    subprocess.run(["unzip", "*.zip"])
    
    print(' Unzipping Done! ')
    
    subprocess.call(["rm", "-r", "*.zip"])
    data = pd.read_csv('household_power_consumption.txt',sep=';',parse_dates={'date_time' : ['Date', 'Time']}, infer_datetime_format=True,na_values=['nan','?'])
    
    with open(f'{store_path}/downloaded_data','wb') as f:
        
        pickle.dump((data),f)
    print(' Complete')

In [9]:
data_download(output_dir)

 Unzipping Done! 
 Complete


In [15]:
def data_preprocessing(store_path):
    
    import pickle
    import sys, subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "pandas"])
    import pandas as pd
    
    
    with open(f"{store_path}/downloaded_data", "rb") as f:
        data = pickle.load(f)
        
    # making sure duplicates are zero.
        
    assert data.duplicated().sum() == 0,"There are duplicates"
    
    # replace null values
    for i in range(1,8):
        data.iloc[:,i]=data.iloc[:,i].fillna(data.iloc[:,i].value_counts().index[0])
        
    # making sure the null values are completely taken care of.    
    assert data['date_time'].isnull().sum() == 0, " Date time has null values"
    assert data['Global_active_power'].isnull().sum() == 0, " Global active power has null values"
    assert data['Global_reactive_power'].isnull().sum() == 0, " Global reactive power has null values"
    assert data['Global_intensity'].isnull().sum() == 0, " Global intensity null values"
    assert data['Sub_metering_1'].isnull().sum() == 0, " sub meter 1 has null values"
    assert data['Sub_metering_2'].isnull().sum() == 0, " sub meter 2 has null values"
    assert data['Sub_metering_3'].isnull().sum() == 0, " sub meter 3 has null values"
   
    # store clean data
    with open(f'{store_path}/clean_data','wb') as f:
        
        pickle.dump((data),f)
    
    print('Done!')
    

In [16]:
data_preprocessing(output_dir)

Done!


In [24]:
def forecasting(store_path):
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, "-m", "pip", "install", "pandas"])
    subprocess.run([sys.executable, "-m", "pip", "install", "numpy"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn==0.23.1'])
    subprocess.run([sys.executable, "-m", "pip", "install", "tensorflow==2.3", "keras==2.4.3"])
    
    import pandas as pd
    import numpy as np
    
    with open(f"{store_path}/clean_data", "rb") as f:
        data = pickle.load(f)
        
    data = data[['date_time','Global_active_power','Global_reactive_power', 'Voltage']]
    data = data.set_index('date_time')
    
    # split data
    train = data[:2075000]
    test = data[2075000:]
    
    #normalize data
    from sklearn.preprocessing import MinMaxScaler
    sc =MinMaxScaler(feature_range = (0,1))
    scaled_data = sc.fit_transform(train)
    
    
    top = len(scaled_data)
    time_steps = 30
    
    # creating the time steps
    x_train = []
    y_train = []

    for i in range(time_steps, top):
        x_train.append(scaled_data[i-time_steps:i, :])  
        y_train.append(scaled_data[i,:])
        
    x_train, y_train = np.array(x_train), np.array(y_train)
    
    # reshaping the data
    x_train = np.reshape(x_train,(x_train.shape[0], x_train.shape[1], 3))
    
    # building the model
    
    from keras.models import Sequential
    from keras.layers import Dense
    from keras.layers import LSTM
    from keras.layers import Dropout
    
    
    trend_detector = Sequential()
    
    trend_detector.add(LSTM(units = 30, return_sequences = True, input_shape = (x_train.shape[1], 3)))
    trend_detector.add(Dropout(0.3))
    trend_detector.add(LSTM(units = 30, return_sequences =True))
    trend_detector.add(Dropout(0.3))
    trend_detector.add(LSTM(units = 30))
    trend_detector.add(Dropout(0.3))
    trend_detector.add(Dense(units=3))
    trend_detector.compile(optimizer = 'RMSprop', loss = 'mean_squared_error', metrics = ['accuracy'])
    
    #fitting the model
    trend_detector.fit(x_train, y_train, epochs = 1, batch_size = 10)
    # I used one epoch because of time, i.e how long it takes the model to run
    
    # predictions and analysis
    total = pd.concat((train, test), axis = 0)

    new_data = total[len(total) - len(test) - time_steps :]
    new_data = sc.transform(new_data)
    test_count = len(test)
    val = time_steps + test_count


    x_test = []
    for i in range(time_steps, val): 
        x_test.append(new_data[i - time_steps: i, : ])

    x_test= np.array(x_test)
    x_test = np.reshape(x_test,(x_test.shape[0], x_test.shape[1], 3))

    predicted_values_calc = trend_detector.predict(x_test)
    predicted_values = sc.inverse_transform(predicted_values_calc)

    pred_df = pd.DataFrame(predicted_values, columns= data.columns)


    assert len(test) == len(pred_df), " length of actaul values is not the the same with predicted values."

    hold = test.index.values
    pred_df['date_time'] = hold
    pred_df = pred_df.set_index('date_time')

    print('Predicted ... ')
    pred_df.head()

    print('Actual ...')
    test.head()
    
    # write predictions to results.txt
    with open(f'{store_path}/results.txt','w') as result:
        result.write(f'Prediciton: {pred_df} | Actual {x_test}')
    
    print('Forecasting Complete ...')


In [25]:
forecasting(output_dir)

Predicted ... 
Actual ...
Forecasting Complete ...


### Components

In [26]:
import kfp
from kfp import dsl
import kfp.components as comp

In [27]:
!which dsl-compile

In [28]:
data_download_op = comp.func_to_container_op(data_download, base_image= "python:3.7")
data_preprocessing_op = comp.func_to_container_op(data_preprocessing, base_image= "python:3.7")
forecasting_op = comp.func_to_container_op(forecasting, base_image="tensorflow/tensorflow:latest-gpu-py3")

### Pipeline

In [29]:
# create client that would enable communication with the Pipelines API server 
client = kfp.Client()

In [32]:
@dsl.pipeline(name ="Electric Power Consumption",
        description = "Electric Power Consumption Forecasting Pipeline")

def electric_power_consumption(store_path:str):
    
    volume_op = dsl.VolumeOp(
        name="data_volume",
        resource_name="data-volume",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWO)
    
    # Create data download components.
    data_download_container = data_download_op(store_path).add_pvolumes({store_path:volume_op.volume})

    # Create data preprocessing component.
    data_preprocessing_container = data_preprocessing_op(store_path).add_pvolumes({store_path: data_download_container.pvolume})
        
    # Create Forecasting Component.
    forecasting_conatiner = forecasting_op(store_path)\
                                        .add_pvolumes({store_path:data_preprocessing_container.pvolume})

In [35]:
#DATA_PATH ="/home/jovyan/data/"
#DATA_PATH = "/mnt"
STORE_PATH  = "/mnt"



pipeline_func = electric_power_consumption

experiment_name = 'electric_power_consumption_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"store_path":STORE_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)