In [None]:
import kfp
import kfp.dsl as dsl
from kfp import compiler
from kfp import components
from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
from kfp.components import func_to_container_op

# PIPELINE

In [None]:
def build_model(X_train_path: InputPath(str), Y_train_path: InputPath(str),
         trained_model: OutputPath('TFModel'), epoch_amount:int = 122,batches:int = 64,num_prediction:int =3):
    
    import pandas as pd
    import numpy as np
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.layers import LSTM
    from tensorflow.keras.layers import Dropout
    from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

    print('Libraries imported!')
    
    X_train = np.loadtxt(X_train_path)
    X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
    Y_train = np.loadtxt(Y_train_path)
    
    
    def build_model(X_train, Y_train,epoch_amount,batches):
        """Builds the LSTM model

        Based on the inputs given starts training the model
        in order to be used in forecasts.

        Args:
            X_train: Training set X, which has been reshaped
            Y_train: Training set Y, no modifications

        Returns:
            model: The final LSTM model
            history: Model validation history

        Raises:
            Exception: Any exception in model training.
        """
        try:
            print('Training the LSTM model')

            model = Sequential()
            model.add(LSTM(100, input_shape=(X_train.shape[1],
                                             X_train.shape[2]), recurrent_dropout=0.2))
            model.add(Dense(60))
            model.add(Dropout(0.2))
            model.add(Dense(1))
            model.compile(loss='mean_squared_error', optimizer='adam')
            # Verbose 0 for cleanliness
            history = model.fit(X_train, Y_train, epochs=epoch_amount,
                                batch_size=batches, verbose=0, shuffle=False)
        except Exception as e:
            print("Exception in model training "+ str(e))
        else:
            print('LSTM Model successfully trained!')
            model.save(trained_model)
            
    build_model(X_train, Y_train, epoch_amount, batches)
        
   

In [None]:
def forecast(station, X_train_path: InputPath(str), model_path: InputPath('TFModel'), dataset_path: InputPath(str),
         scaler_path: InputPath(str), df_path: InputPath(str), final_df: OutputPath(str), look_back: int, num_prediction:int =3) -> float:
    """ 
    Function that predicts future values with given model
    
    Returns last actual value to help debugging.
    
    Saves predicted values to a bigQuery table.
    """
        
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'protobuf==3.12.0'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'joblib==0.16.0'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'grpcio==1.24.3'])
    import pandas as pd
    import numpy as np
    import joblib
    import datetime
    from datetime import timedelta
    from time import time
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from sklearn.preprocessing import MinMaxScaler
    import time
    from dateutil.relativedelta import relativedelta

    print('Libraries imported!')
    
    X_train = np.loadtxt(X_train_path)
    X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
    model = tf.keras.models.load_model(model_path)
    dataset = np.loadtxt(dataset_path)
    scaler = joblib.load(scaler_path)
    df1 = pd.read_csv(df_path)
    print(df1.tail())
    df1['Date'] = pd.to_datetime(df1['Date'])
    
        
    def forecasting(X_train,dataset,scaler,model,num_prediction):

        def predict(num_prediction, model):
                """Creates a prediction based on the model given

                Makes a forecast for the time steps given on num_prediction. 
                For example num_prediction = 3 outputs t, t+1 ,t+2 and t+3 values, 
                of which the three latter are forecasts.

                Args:
                    num_prediction: How many timesteps forward is the model predicting
                    model: The pretrained model which is used in the prediction

                Returns:
                    prediction_list: A list of the predicted values


                Raises:
                    Exception: Any exception in model training.
                """
                try:
                    print('Making the predictions')
                    prediction_list = X_train[-look_back:]

                    for _ in range(num_prediction):
                        x = prediction_list[-look_back:]
                        x = x.reshape((1, 1, look_back))
                        out = model.predict(x)[0][0]
                        prediction_list = np.append(prediction_list, out)
                    prediction_list = prediction_list[look_back-1:]
                except Exception as e:
                    print("Exception in prediction" + str(e))
                else:
                    return prediction_list
        # Reshaping the dataset in preparation to apply the forecast
        X_train = dataset.reshape((-1))

        # How many time-steps forward is being predicted.
        forecast = predict(num_prediction, model)
        forecast = scaler.inverse_transform([forecast])
        print('Forecasting successful!')
        # The first value is the last actual value.
        # The last three values are predictions (forecasts).
        # t, t+1 ,t+2 ,t+3
        lastmonth = df1['Date'][len(df1) - 1].date()
        nextmonth = lastmonth.replace(day=1) + relativedelta(months=1)
        print("lastmonth",lastmonth)
        print("nextmonth:",nextmonth)
        final = pd.DataFrame(columns=['Date','Prediction','Model','MET_LOAD_TIME','MET_CRT_BY_PROCESS', 'Station'])

        for i in range(num_prediction):
            current = datetime.datetime.now()

            final.at[i,'Date'] = nextmonth + relativedelta(months=i)
            final.at[i,'Prediction'] = forecast[0][i+1]
            final.at[i,'Model'] = 'LSTM'
            final.at[i,'MET_LOAD_TIME'] = current
            final.at[i,'MET_CRT_BY_PROCESS'] = 'LSTM Process'
            final.at[i, 'Station']  = station

        
        final.to_csv(final_df, index=False)
        return(forecast[0][0])
    
    # Function call
    return(forecasting(X_train,dataset,scaler,model,num_prediction))

In [None]:
def write_data(result_path: InputPath(str)):
    """Writes result to BigQuery.
    
    Arguments: Path to result dataframe.
    """
    import sys, subprocess;
    import pandas as pd
    import io
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas_gbq==0.13.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'google-auth<2.0dev,>==1.18.0'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pyarrow==0.17.1'])
    import pandas_gbq
    import pyarrow
    from dateutil.relativedelta import relativedelta
    from google.cloud import bigquery
    
    final = pd.read_csv(result_path)
    print("Result table read, starting upload to BQ.")
    client = bigquery.Client()
    table_id = 'r-instance.CL_Demand_Forecast.CL_demand_predictions'
    # Since string columns use the "object" dtype, pass in a (partial) schema
    # to ensure the correct BigQuery data type.
    
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.schema=[
        bigquery.SchemaField('Date', bigquery.enums.SqlTypeNames.DATE),
        bigquery.SchemaField('Prediction', bigquery.enums.SqlTypeNames.FLOAT),
        bigquery.SchemaField('Model', bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField('MET_LOAD_TIME', bigquery.enums.SqlTypeNames.DATETIME),
        bigquery.SchemaField('MET_CRT_BY_PROCESS', bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField('Station', bigquery.enums.SqlTypeNames.STRING),
    ]
    print(str(final.to_json(orient="records",lines=True)))
    with io.StringIO(final.to_json(orient="records",lines=True)) as source_file:
        job = client.load_table_from_file(source_file, table_id, job_config=job_config)
    
    # Wait for the load job to complete.
    job.result()
    print("Table uploaded to bigQuery!")

In [None]:
def query_data(data_path: OutputPath(str), extdata_path: OutputPath(str), datas1_path: OutputPath(str), datas2_path: OutputPath(str), datas3_path: OutputPath(str), datas4_path: OutputPath(str)):
    """Queries the data from BigQuery.

    Loads the data from BigQuery.


    Returns:
        df1: A pandas dataframe of the CardLock data

    Raises:
        NotFound: An error occured in loading the table from BQ.
    """
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas_gbq==0.13.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'grpcio==1.24.3'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'google-auth==1.18.0'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'google-auth<2.0dev,>==1.18.0'])
    import pandas as pd
    import pandas_gbq
    from google.cloud import bigquery
    from google.api_core.exceptions import AlreadyExists, NotFound
    try:
        # Loads the data from BigQuery
        # Expects that Transaction_Date is stored as datetime and Quantity as float in BigQuery.
        print('Loading the data...')
        client = bigquery.Client()
        sql1 = """
   HIDDEN
        """

        df1 = client.query(sql1).to_dataframe()
        df1 = df1.rename(columns={"Quantity": "Amount"})
        
        # Loads the historical demand from bigquery
        sql2 = """
HIDDEN
        """
        df2 = client.query(sql2).to_dataframe()
        

        sql_station_1 = """
HIDDEN
        """
        sql_station_2 = """
HIDDEN
        """
        sql_station_3 = """
HIDDEN
        """
        sql_station_4 = """
HIDDEN
        """
        client = bigquery.Client()
        df_station1 = client.query(sql_station_1).to_dataframe()
        df_station2 = client.query(sql_station_2).to_dataframe()
        df_station3 = client.query(sql_station_3).to_dataframe()
        df_station4 = client.query(sql_station_4).to_dataframe()
        print('Data loaded')

    except NotFound:
        print("An error occured in loading the tables from BigQuery: TABLE NOT FOUND")
    else:
        #return df1, df2
        df1.to_csv(data_path, index=False)
        df2.to_csv(extdata_path, index=False)
        df_station1.to_csv(datas1_path, index=False)
        df_station2.to_csv(datas2_path, index=False)
        df_station3.to_csv(datas3_path, index=False)
        df_station4.to_csv(datas4_path, index=False)

In [None]:
 def process_data(station, look_back, data_path: InputPath(str), datas1_path: InputPath(str), datas2_path: InputPath(str), datas3_path: InputPath(str), datas4_path: InputPath(str), edata_path: InputPath(str), xtrain_path: OutputPath(str), ytrain_path: OutputPath(str),
                  dset_path: OutputPath(str), scaler_path: OutputPath(str), df_path: OutputPath(str)):
    """Preprocess the data to be ready for applying the LSTM model

    Args:
        df1: A pandas dataframe of the Cardlock company data
        df2: A pandas dataframe of the historical US data
        look_back: How many months of data the model needs for the predictions

    Returns:
        X_train: Training set X, which has been reshaped
        Y_train: Training set Y, no modifications
        dataset: The full dataset
        scaler: The scaler used, which can be used later on to invert the scaling

    Raises:
        IndexError: Sequence subscript is out of range.
    """
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'joblib==0.16.0'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'grpcio==1.24.3'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pyarrow==0.17.1'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas_gbq==0.13.2'])
    import pandas as pd
    import numpy as np
    #import datetime
    import joblib
    #from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
    from sklearn.preprocessing import MinMaxScaler
    import time
    import pyarrow
    from google.cloud import bigquery
    from google.api_core.exceptions import AlreadyExists, NotFound
    

    def check_values(df):  
        if any(df['Amount'] < 0):
            print('WARNING: There are negative values in the data!')

        if any(df['Amount'] > 1000000):   #values over 1 million
            print('WARNING: The data contains values over a million.')
        else:
            print('No outlier values found!')
    
    try:
        print('Preprocessing the data for LSTM modeling')
        df1 = pd.read_csv(data_path)
        df2 = pd.read_csv(edata_path)
        df_station1 = pd.read_csv(datas1_path)
        df_station2 = pd.read_csv(datas2_path)
        df_station3 = pd.read_csv(datas3_path)
        df_station4 = pd.read_csv(datas4_path)
        
        stations = ["HIDDEN"]
        
        if(station=="HIDDEN"):
            df1 = df_station1
        if(station=="HIDDEN"):
            df1 = df_station2
        if(station=="HIDDEN"):
            df1 = df_station3
        if(station=="HIDDEN"):
            df1 = df_station4
        
        print("Dataframes ready.")
        
        # The next part is for checking if the last month has full data or not.


        # Sets the latest month as being full
        latest_month_full = True

        # If the latest month is not full, set latest_month_full to False.
        def check_full_month(df):
            import datetime
            import calendar
            df['Date'] = pd.to_datetime(df['Date'])
            df = df.set_index('Date')
            df = df.asfreq(df.index.freq)
            
            df['day'] = df.index.day
            last_date = df['day'].iloc[-1]
            
            # Marks the month as being full if the date is 29 30 or 31
            # TODO: Check the last date properly
            if last_date < 28:
                return False
     
        latest_month_full_station1 = check_full_month(df_station1)
        latest_month_full_station2 = check_full_month(df_station2)
        latest_month_full_station3 = check_full_month(df_station3)
        latest_month_full_station4 = check_full_month(df_station4)
   
        check_values(df1)

        def create_dataset(dataset, look_back):
            X, Y = [], []
            for i in range(len(dataset)-int(look_back)-1):
                a = dataset[i:(i+int(look_back)), 0]
                X.append(a)
                Y.append(dataset[i + int(look_back), 0])
            return np.array(X), np.array(Y)
        
        if station in stations:
            latest_month_full = check_full_month(df1)
        else:
            # Drops the last row if last month is not full of data
            if latest_month_full_station1 == False or latest_month_full_station2 == False or latest_month_full_station3 == False or latest_month_full_station4 == False:
                latest_month_full = False

        # Modifies the data to Monthly data through resampling
        df1['Date'] = pd.to_datetime(df1['Date'])
        df1 = df1.set_index('Date')
        df1 = df1.asfreq(df1.index.freq)
        df1 = df1.resample('M').sum()
        

        if latest_month_full == False:
            df1 = df1[:-1]
        
        # Dropping unneeded dates
        df2 = df2.set_index('Date')
        df2 = df2[-168:-27]
        df2 = df2.sort_index()

        # TODO: This row drops the last 5 rows due to missing data. Remove in final version
        #df1 = df1[:-5]
        # Dropping the first 11 rows in order to get a more accurate representation of the real situation
        df1 = df1[11:]
        print(df1.tail(5))
        # Placeholders for debugging
        # df2.to_csv('LSTM_debug/df2.csv', index = True, header=True)
        # df1.to_csv('LSTM_debug/df1.csv', index = True, header=True)
        
        #testchanges
        dfmodified = df1.reset_index()
        dfmodified.to_csv(df_path, index=False)
        # Scaling both of the datasets with the MinMaxScaler
        historical_data = df2.values  # numpy.ndarray
        historical_data = historical_data.astype('float32')
        historical_data = np.reshape(historical_data, (-1, 1))
        scaler2 = MinMaxScaler(feature_range=(0, 1))
        historical_data = scaler2.fit_transform(historical_data)

        data = df1.values  # numpy.ndarray
        data = data.astype('float32')
        data = np.reshape(data, (-1, 1))
        scaler = MinMaxScaler(feature_range=(0, 1))
        data = scaler.fit_transform(data)

        # Combining the scaled datasets
        dataset = np.concatenate([historical_data, data])
        # pd.DataFrame(dataset).to_csv("test.csv")

        X_train, Y_train = create_dataset(dataset, look_back)
        # Reshaping input to be [samples, time steps, features]
        #X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
        

    except IndexError:
        print("Out of index: look back time is too long")
    else:
        print('Data successfully preprocessed')
        np.savetxt(xtrain_path, X_train)
        np.savetxt(ytrain_path, Y_train)
        np.savetxt(dset_path, dataset)
        joblib.dump(scaler, scaler_path)


In [None]:
def evaluate_model(dataset_path: InputPath(str), scaler_path: InputPath(str), look_back: int, epoch_amount:int = 122,batches:int = 64) -> float:
    """Function to evaluate model performance with past data.
    
    Conduct a train-test split and evaluate performance with test data.
    
    Args: Some data
    
    Returns: MAE metric for the test period.
    """
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'joblib==0.16.0'])
    import joblib
    import pandas as pd
    import numpy as np
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.layers import LSTM
    from tensorflow.keras.layers import Dropout
    from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import mean_absolute_error
    
    print("Libraries imported!")
    
    scaler = joblib.load(scaler_path)
    dataset = np.loadtxt(dataset_path)
    print(dataset.shape)
    print(len(dataset))
    train_size = int(len(dataset) * 0.80)
    test_size = len(dataset) - train_size
    train, test = dataset[0:train_size], dataset[train_size:len(dataset)]
    def create_dataset(dataset, look_back=18):
        X, Y = [], []
        for i in range(len(dataset)-look_back-1):
            a = dataset[i:(i+look_back)]
            X.append(a)
            Y.append(dataset[i + look_back])
        return np.array(X), np.array(Y)
    print("Creating dataset!")
    #look_back = 5
    X_train, Y_train = create_dataset(train, look_back)
    X_test, Y_test = create_dataset(test, look_back)

    # reshape input to be [samples, time steps, features]
    X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
    print("Reshaping train.")
    X_test = np.reshape(X_test, (X_test.shape[0], 1, X_test.shape[1]))
    print("Reshaping test.")
    
    model = Sequential()
    model.add(LSTM(100, input_shape=(X_train.shape[1],
                                     X_train.shape[2]), recurrent_dropout=0.2))
    model.add(Dense(60))
    model.add(Dropout(0.2))
    model.add(Dense(1))
    model.compile(loss='mean_squared_error', optimizer='adam')
    # Verbose 0 for cleanliness
    history = model.fit(X_train, Y_train, epochs=epoch_amount,
                        batch_size=batches, verbose=0, shuffle=False)
    print("Model fitted.")

    test_predict = model.predict(X_test)
    test_predict = scaler.inverse_transform(test_predict)
    Y_test = scaler.inverse_transform([Y_test])
    print('Test Mean Absolute Error:', mean_absolute_error(Y_test[0], test_predict[:,0]))
    print('Test Root Mean Squared Error:',np.sqrt(mean_squared_error(Y_test[0], test_predict[:,0])))
    return(mean_absolute_error(Y_test[0], test_predict[:,0]))
    

## Building the pipeline


In [None]:
# Convert the functions to pipeline operations.
query_op = components.func_to_container_op(
    query_data,
    base_image='jupyter/tensorflow-notebook:54462805efcb'
)

process_data_op = components.func_to_container_op(
    process_data,
    base_image='jupyter/tensorflow-notebook:54462805efcb'
)

build_model_op = components.func_to_container_op(
    build_model,
    base_image='jupyter/tensorflow-notebook:54462805efcb'
)
predict_op = components.func_to_container_op(
    forecast,
    base_image='jupyter/tensorflow-notebook:54462805efcb'
)
write_op = components.func_to_container_op(
    write_data,
    base_image='jupyter/tensorflow-notebook:54462805efcb'
)

evaluate_op = components.func_to_container_op(
    evaluate_model,
    base_image='jupyter/tensorflow-notebook:54462805efcb'
)

In [None]:
# Create pipeline.
@dsl.pipeline(
   name='LSTM pipeline',
   description='A pipeline that creates a forward forecasting prediction with LSTM modelling.'
)
def lstm_pipeline(
    epoch_amount: int =122,
    batches: int =64,
    num_prediction: int =3,
    look_back: int =18,
    station: str="all"
):
    data = query_op()
    processed = process_data_op(station, look_back, data.outputs['data'], data.outputs['datas1'], data.outputs['datas2'], data.outputs['datas3'], data.outputs['datas4'], data.outputs['extdata'])
    mae = evaluate_op(processed.outputs['dset'], processed.outputs['scaler'], look_back, epoch_amount, batches)
    model = build_model_op(processed.outputs['xtrain'], processed.outputs['ytrain'], 
                           look_back,epoch_amount,batches)
    
    
    preds = predict_op(station, processed.outputs['xtrain'], model.outputs['trained_model'], processed.outputs['dset'],
                           processed.outputs['scaler'], processed.outputs['df'], look_back=look_back, num_prediction=num_prediction)
    data.execution_options.caching_strategy.max_cache_staleness = "P1D"
    processed.execution_options.caching_strategy.max_cache_staleness = "P1D"
    mae.execution_options.caching_strategy.max_cache_staleness = "P1D"
    model.execution_options.caching_strategy.max_cache_staleness = "P1D"
    preds.execution_options.caching_strategy.max_cache_staleness = "P1D"
    write_op(preds.outputs['final_df'])
    
    


## Compile and run the pipeline

In [None]:
# Compile the pipeline
compiler.Compiler().compile(lstm_pipeline, 'lstm_pipeline.tar.gz')

# Pipeline Argument Values
arguments = {'epoch_amount': '122', 'batches': '64', 'num_prediction': '3', 'look_back': '18', 'station': 'Address_here'}

kfpclient = kfp.Client(host='HIDDEN')


exp = kfpclient.create_experiment(name='LSTM model experiment')
run = kfpclient.run_pipeline(exp.id, 'LSTM run test', 'lstm_pipeline.tar.gz', params=arguments)



# The generated link below leads to the pipeline run information page.