In [45]:
import kfp
import kfp.components as components
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath

In [46]:
def train_export_model(trainingjobName: str, epochs: str, version: str):
    
    import math
    import tensorflow as tf
    from numpy import array
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import LSTM, Dense, GRU
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.metrics import mean_squared_error
    from tensorflow.keras.layers import Flatten, Dropout, Activation
    from tensorflow.keras.layers import LSTM
    import numpy as np
    print("numpy version")
    print(np.__version__)
    import pandas as pd
    import os
    from featurestoresdk.feature_store_sdk import FeatureStoreSdk
    from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
    
    fs_sdk = FeatureStoreSdk()
    mm_sdk = ModelMetricsSdk()
    

    df_new_analytics = fs_sdk.get_features(trainingjobName, ['DRB.UEThpDl', 'RRU.PrbUsedDl', 'PEE.AvgPowerg', 'Viavi.Cell.Name'])
    print("Dataframe:")
    print(df_new_analytics)


    target = df_new_analytics['DRB_UEThpDl']
    features = df_new_analytics[['RRU_PrbUsedDl', 'PEE_AvgPowerg']]


    scaler_features = MinMaxScaler(feature_range=(-1, 1))
    scaler_target = MinMaxScaler(feature_range=(-1, 1))

    features_scaled = scaler_features.fit_transform(features)
    target_scaled = scaler_target.fit_transform(target.values.reshape(-1, 1))


    df_scaled = np.hstack((features_scaled, target_scaled))

    def create_sequences(data, lookback):
        X, y = [], []
        for i in range(len(data) - lookback):
            X.append(data[i:(i + lookback), :])
            y.append(data[i + lookback, -1]) 
        return np.array(X), np.array(y)

    sequence_length = 30
    X, y = create_sequences(df_scaled, sequence_length)

    split_index = int(0.8 * len(X))
    X_train, X_test = X[:split_index], X[split_index:]
    y_train, y_test = y[:split_index], y[split_index:]

    lr_scheduler = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.001,
        decay_steps=10000,
        decay_rate=0.96
    )

    model = Sequential()
    model.add(LSTM(256, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(LSTM(128, return_sequences=True))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(1, activation='linear'))

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr_scheduler), loss='mse', metrics=['mae'])

    historyFinal = model.fit(X_train, y_train, epochs=int(epochs), batch_size=128, verbose=1, validation_split=0.2)

    y_pred = model.predict(X_test)


    y_test_rescaled = scaler_target.inverse_transform(y_test.reshape(-1, 1))
    y_pred_rescaled = scaler_target.inverse_transform(y_pred)

    mse = mean_squared_error(y_test_rescaled, y_pred_rescaled)
    rmse = math.sqrt(mse)

    print(f'Overall MSE: {mse}')
    print(f'Overall RMSE: {rmse}')


    cell_performance = {}
    cell_names = df_new_analytics['Viavi_Cell_Name'].iloc[split_index:]
    for cell in df_new_analytics['Viavi_Cell_Name'].unique():
        cell_mask = cell_names == cell
        cell_y_test = y_test_rescaled[cell_mask]
        cell_y_pred = y_pred_rescaled[cell_mask]
        cell_mse = mean_squared_error(cell_y_test, cell_y_pred)
        cell_rmse = math.sqrt(cell_mse)
        cell_performance[cell] = {'MSE': cell_mse, 'RMSE': cell_rmse}
        print(f'Cell {cell} - MSE: {cell_mse}, RMSE: {cell_rmse}')

    model.save("./")
    

    data = {}
    data['metrics'] = []
    data['metrics'].append({'Overall_MSE': str(mse), 'Overall_RMSE': str(rmse)})
    for cell, metrics in cell_performance.items():
        data['metrics'].append({f'{cell}_MSE': str(metrics['MSE']), f'{cell}_RMSE': str(metrics['RMSE'])})

    mm_sdk.upload_metrics(data, trainingjobName, version)
    mm_sdk.upload_model("./", trainingjobName, version)

In [47]:
BASE_IMAGE = "traininghost/pipelineimage:latest"

In [48]:
def train_and_export(trainingjobName: str, epochs: str, version: str):
    trainOp = components.func_to_container_op(train_export_model, base_image=BASE_IMAGE)(trainingjobName, epochs,version)
    # Below line to disable caching of pipeline step
    trainOp.execution_options.caching_strategy.max_cache_staleness = "P0D"
    trainOp.container.set_image_pull_policy("IfNotPresent")

In [49]:
@dsl.pipeline(
    name="es Pipeline",
    description="es",
)
def super_model_pipeline( 
    trainingjob_name: str, epochs: str, version: str):
    
    train_and_export(trainingjob_name, epochs, version)

In [50]:
pipeline_func = super_model_pipeline
file_name = "es_model_pipeline"

kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(file_name))

In [51]:
import requests
pipeline_name="es Pipeline"
pipeline_file = file_name+'.zip'
requests.post("http://tm.traininghost:32002/pipelines/es/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')})

<Response [200]>