# KFP V2

In [13]:
import kfp
from kfp import dsl


In [309]:
# Ingesta de archivo

@dsl.component
def cargar_csv(file: str, file2: str, bk: str, rn: str, aws_id: str, aws_secret: str) -> int:
    import subprocess
    package_name = 'boto3'
    subprocess.call(['pip', 'install', package_name])
    package_name1 = 'pandas'
    subprocess.call(['pip', 'install', package_name1])
    package_name2 = 'smart_open'
    subprocess.call(['pip', 'install', package_name2])
    
    
    import boto3
    import pandas as pd
    from smart_open import open
    from os import environ
    import io

    key = file
    region_name = rn  
    bucket_name = bk
    key_id = aws_id
    aws_secret_access_key = aws_secret
    
    print("Region Name ", region_name)
    print("Bucket Name ", bucket_name)
   
    s3 = boto3.client('s3', aws_access_key_id = key_id, 
                        aws_secret_access_key = aws_secret_access_key, 
                        region_name = region_name)

    try:
        # Descarga el archivo CSV desde S3
        print("bucket name:", bucket_name)
        print("Key: ", key)
        response = s3.get_object(Bucket=bucket_name, Key=key)
        print("Response: ", response['Body'])
        df = pd.read_csv(response['Body'])
        if df is not None:
            print("Archivo CSV cargado exitosamente desde S3 !!!")

            # Guardamos el CSV en el storage S3
            bucket_name = bk
            file_path = file2
            csv_buffer = io.StringIO()
            df.to_csv(csv_buffer, index=False)
            s3.put_object(Bucket=bucket_name, Key=file_path, Body=csv_buffer.getvalue())
        
            print("Archivo CSV copiado al Storage S3 !!!")
            return_code = 0
            return return_code
        else:
            print("Error al cargar el archivo CSV desde S3.")
            return_code = 1
            return return_code

    except Exception as e:
        print(f"Error Exception in Python File: {e}")
        return_code = 2
        return return_code


  return component_factory.create_component_from_func(


In [355]:
# Pre-Processing

@dsl.component(base_image='python:3.11', packages_to_install=['boto3', 'pandas', 'imblearn'])
def preprocess(file: str, bk: str, rn: str, aws_id: str, aws_secret: str) -> int:
    
    import boto3
    print("boto3")
    import numpy as np
    print("Numpy")
    import pandas as pd
    print("Pandas")
    from imblearn.over_sampling import SMOTE
    print("SMOTE")
    from sklearn.model_selection import StratifiedKFold
    print("KFold")
    from pandas import read_csv
    print("read csv")
    from sklearn.preprocessing import RobustScaler
    print("Robost Scaller")
    import io
    print("IO")

    key = file
    bucket_name = bk
    region_name = rn
    key_id = aws_id
    aws_secret_access_key = aws_secret
    
    print('Scaling data')
    # Leemos el CSV
    
    s3 = boto3.client('s3', aws_access_key_id = key_id, 
                        aws_secret_access_key = aws_secret_access_key, 
                        region_name = region_name)

    
    response = s3.get_object(Bucket=bucket_name, Key=key)
    df = pd.read_csv(response['Body']) 
    print("File read OK !")

    # Drop de ultima fila
    df = df.drop(182328, axis=0)

    # Escalamos Amount y Time centrados en 0 (-1, 1)
    rob_scaler = RobustScaler()
    print("Scaler session Ok")

    df['scaled_amount'] = rob_scaler.fit_transform(df['Amount'].values.reshape(-1, 1))
    print("Scale amount OK")
    df['scaled_time'] = rob_scaler.fit_transform(df['Time'].values.reshape(-1, 1))
    print("Scale time OK")


    # Eliminamos los Features originales y dejamos los reescalados
    df.drop(['Time', 'Amount'], axis=1, inplace=True)
    scaled_amount = df['scaled_amount']
    scaled_time = df['scaled_time']

    df.drop(['scaled_amount', 'scaled_time'], axis=1, inplace=True)
    df.insert(0, 'scaled_amount', scaled_amount)
    df.insert(1, 'scaled_time', scaled_time)


    # Armo sets de entrenamiento
    X = df.drop('Class', axis=1)
    y = df['Class']
    sss = StratifiedKFold(n_splits=5, random_state=None, shuffle=False)


    for train_index, test_index in sss.split(X, y):
        original_Xtrain = X.iloc[train_index]
        original_ytrain = y.iloc[train_index]


    # Hacemos SMOTE para balancear los datos...
    sm = SMOTE(sampling_strategy='minority', random_state=42)
    Xsm_train, ysm_train = sm.fit_resample(original_Xtrain, original_ytrain)

    
    # Guardamos el CSV en el storage S3
    bucket_name = bk
    file_path = 'training_labels.npy'
    csv_buffer = io.StringIO()
    np.savetxt(csv_buffer, ysm_train, delimiter=",")
    s3.put_object(Bucket=bucket_name, Key=file_path, Body=csv_buffer.getvalue())
    
    
    bucket_name = bk
    file_path = 'training_samples.npy'
    csv_buffer = io.StringIO()
    np.savetxt(csv_buffer, Xsm_train, delimiter=",")
    s3.put_object(Bucket=bucket_name, Key=file_path, Body=csv_buffer.getvalue())
    
        
    print('Data processing done!')
    return 0
    

In [387]:
# Training

@dsl.component(base_image='tensorflow/tensorflow:2.11.0')
def training(code: int, bk: str, rn: str, aws_id: str, aws_secret: str, model_name: str) -> int:
    import subprocess
    package_name = 'boto3'
    subprocess.call(['pip', 'install', package_name])
    package_name = 'boto3'
    subprocess.call(['pip', 'install', package_name])
    package_name = 'tensorflow'
    subprocess.call(['pip', 'install', package_name])
    package_name = 'onnx'
    subprocess.call(['pip', 'install', package_name])
    package_name = 'tf2onnx'
    subprocess.call(['pip', 'install', package_name])
    
    from os import environ
    import boto3
    import numpy as np
    import tensorflow as tf
    import onnx
 

    environ['CUDA_VISIBLE_DEVICES'] = '-1'

    from keras.models import Sequential
    from keras.layers import Dense
    from keras.optimizers import Adam
    from numpy import load
    from onnx import save
    from tf2onnx import convert
    import io

    print('training model')

    epoch_count = 20
    learning_rate = 0.001
    
    
    bucket_name = bk
    region_name = rn
    key_id = aws_id
    aws_secret_access_key = aws_secret
    
    
    
    s3 = boto3.client('s3', aws_access_key_id = key_id, 
                        aws_secret_access_key = aws_secret_access_key, 
                        region_name = region_name)

  
    # Descarga los NPY
    key = 'training_labels.npy'
    print("bucket name:", bucket_name)
    print("Key: ", key)
    response = s3.get_object(Bucket=bucket_name, Key=key)
    
    # Leer el contenido del archivo en un buffer
    npy_buffer = io.StringIO(response['Body'].read().decode('utf-8'))
    # Convertir el buffer a un array de NumPy
    ysm_train = np.loadtxt(npy_buffer, delimiter=",")
    
    
    key = 'training_samples.npy'
    print("bucket name:", bucket_name)
    print("Key: ", key)
    response = s3.get_object(Bucket=bucket_name, Key=key)
    # Leer el contenido del archivo en un buffer
    npy_buffer = io.StringIO(response['Body'].read().decode('utf-8'))
    # Convertir el buffer a un array de NumPy
    Xsm_train = np.loadtxt(npy_buffer, delimiter=",")
    
    n_inputs = Xsm_train.shape[1]

    model = Sequential([
        Dense(n_inputs, input_shape=(n_inputs, ), activation='relu'),
        Dense(32, activation='relu'),
        Dense(2, activation='sigmoid'),
    ])
    model.compile(
        Adam(learning_rate=learning_rate),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy'],
    )
    model.fit(
        Xsm_train,
        ysm_train,
        validation_split=0.2,
        batch_size=300,
        epochs=epoch_count,
        shuffle=True,
        verbose=2,
    )

    onnx_model, _ = convert.from_keras(model)
    
    # Convertir el modelo ONNX a bytes en memoria
    onnx_buffer = io.BytesIO()
    onnx.save_model(onnx_model, onnx_buffer)
    onnx_buffer.seek(0)  # Reiniciar el cursor al inicio del flujo de bytes

    # Subir el modelo ONNX a S3 directamente desde el flujo de bytes en memoria
    s3.put_object(Bucket=bucket_name, Key=model_name, Body=onnx_buffer)

   
    return 0



In [401]:
# Upload del modelo

@dsl.component
def upload_model(code: int, key: str, key2: str, region_name: str, aws_id: str, aws_secret: str, bk: str) -> int:
    import subprocess
    package_name = 'boto3'
    subprocess.call(['pip', 'install', package_name])
    package_name = 'pandas'
    subprocess.call(['pip', 'install', package_name])
    import boto3
    import os
    import pandas as pd
    from os import environ

    bucket_name = bk
    aws_access_key_id = aws_id
    aws_secret_access_key = aws_secret
    
    s3_key_origen = key
    s3_key_dest = key2
    
    print("Origen ", s3_key_origen)
    print("Destino ", s3_key_dest)
    print("Bucket ", bucket_name)
    print("Region ", region_name)
    
    s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, 
                      aws_secret_access_key=aws_secret_access_key, 
                      region_name=region_name)

    # Leo del S3
    response = s3.get_object(Bucket=bucket_name, Key=s3_key_origen)
    onnx_model_bytes = response['Body'].read()
    
   
    # Subir el modelo ONNX a otra ubicación en S3
    s3.put_object(Bucket=bucket_name, Key=s3_key_dest, Body=onnx_model_bytes)

    print("El modelo ONNX se ha guardado correctamente en el bucket de S3 proporcionado.")
    return 0
    


In [402]:
# Error

@dsl.component
def error_component() -> int:
    print("Error leyando el archivo de datos Origen...")
    return 1


## Generamos el Pipeline

In [403]:


@dsl.pipeline(name='model-pipeline')
def model_pipeline():
    import os
    from os import environ
    
    aws_id = environ.get('AWS_ACCESS_KEY_ID')
    aws_secret = environ.get('AWS_SECRET_ACCESS_KEY')
    
    t1_task = cargar_csv(file = 'creditcard.csv', file2 = 'creditcard2.csv', bk = 'semper-pipelines', rn = 'us-east-1', aws_id = aws_id, aws_secret = aws_secret)
    with dsl.If(t1_task.output == 0):
        t2_task = preprocess(file = 'creditcard2.csv', bk = 'semper-pipelines', rn = 'us-east-1', aws_id = aws_id, aws_secret = aws_secret)
        t3_task = training(code = t2_task.output, bk = 'semper-pipelines', rn = 'us-east-1', aws_id = aws_id, aws_secret = aws_secret, model_name = 'model.onnx')
        t4_task = upload_model(code = t3_task.output, key = 'model.onnx', key2 = 'data/model-final.onnx', region_name = 'us-east-1',
                                                            aws_id = aws_id, aws_secret = aws_secret, bk = 'semper-pipelines')
    with dsl.Elif(t1_task.output != 0):
        t5_task = error_component()
    

In [404]:
from kfp import compiler

compiler.Compiler().compile(model_pipeline, 'pipeline.yaml')