# DoctorRx: Automated Handwritten Prescription Recognition Kubeflow Pipeline

## Prequisite

In [1]:
!pip install --user --upgrade pip
!pip install kfp --upgrade --user --quiet
!pip show kfp

Name: kfp
Version: 1.8.22
Summary: KubeFlow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: 
Location: /home/fathur-linux/miniconda3/envs/kubeflow_sdk/lib/python3.9/site-packages
Requires: absl-py, click, cloudpickle, Deprecated, docstring-parser, fire, google-api-core, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, uritemplate, urllib3
Required-by: 


## Import kubeflow pipeline libraries

In [2]:
import kfp 
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
from typing import NamedTuple

Component 1: Download data image from GCS, convert to numpy.array and save to csv

In [3]:
def get_data_csv_builder(folder_name:list, data_csv: OutputPath()):
    from google.cloud import storage
    import os
    import cv2
    import pandas as pd
    import numpy as np
    
    # gcp creds
    print("<=== initiate os env gcp creds ====>")
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'doctorrx-387716-cdaefd627b4a.json'
    
    #bucket
    print('<=== get bucket ===>')
    storage_client = storage.Client()
    bucket = storage_client.get_bucket('doctorrx_pipeline_bucket')
    
    pixel_columns = ['label'] + [f'pixels_{i+1}' for i in range(60000)]
    df = pd.DataFrame(columns=pixel_columns)
    
    print('<=== iterate data ===>')
    iter = 1
    for folder_img in folder_name:
        blobs = bucket.list_blobs(prefix=f'data_labelled/{folder_img}/')
        for blob in blobs:
            if blob.name.endswith('/'):
                continue
            # get label
            lab_names = {'Paracetamol': 0, 'Amoxilin': 1, 'CTM': 2, 'Amlodipin': 3, 'Metformin': 4}
            label = blob.name.split("/")[1]
            
            # get image byte -> array
            contents = blob.download_as_string()
            decoded = cv2.imdecode(np.frombuffer(contents, dtype=np.uint8), cv2.IMREAD_GRAYSCALE)
            img_resize = cv2.resize(decoded, (200, 100))
            blur = cv2.GaussianBlur(img_resize, (5,5), 0)
            ret3,th3 = cv2.threshold(blur.astype(np.uint8),0,255,cv2.THRESH_BINARY+cv2.THRESH_OTSU)
            to_rgb_array = np.repeat(th3[..., np.newaxis], 3, -1).reshape(1, 200, 100, 3)
            img_ravel = to_rgb_array.ravel()
            
            data_concate = np.concatenate([[lab_names[str(label)]], img_ravel])
            df.loc[len(df.index)] = data_concate
            print(f'!{blob.name} - {iter} berhasil! ')
            iter += 1
        

    # to_csv
    with open(data_csv, 'w') as f:
        df.to_csv(f, index=False)
    
    return(print('Done!'))

Component 2: Preprocessing

In [4]:
def preprocessing(data_csv: InputPath(), load_data_path: OutputPath(str)):
    import pandas as pd
    import numpy as np
    import os
    import pickle
    
    # open data
    with open(data_csv) as f:
        df = pd.read_csv(f)
    
    # get label and attribute    
    X = df.iloc[:, 1:].values.reshape(-1, 200, 100, 3) / 255
    Y = df['label']
    
    # createing the preprocess directory
    os.makedirs(load_data_path, exist_ok=True)
    
    # save the label and features to be used by preprocess components
    with open(f"{load_data_path}/all_data", 'wb') as f:
        pickle.dump((X, Y), f)
        
    
        
    return(print('Done!'))
    

Component 3: Train test split

In [5]:
def feautre_extraction(load_data_path: InputPath(str), feature_extraction_path: OutputPath(str)):
        # transform to desire shape for model input -> (9, 9, 512)

    import pandas as pd
    import numpy as np
    import pickle
    import os
    
    with open(f"{load_data_path}/all_data", 'rb') as f:
        all_data = pickle.load(f)
        
    X, Y = all_data
    print(X.shape, Y.shape)
    
    def transform_feature(arr:np.array) -> np.array:
        from keras.applications.vgg16 import VGG16
        from keras.models import Model
        
        model_vgg16 = VGG16(weights='imagenet', include_top=False)
        model_vgg16 = Model(inputs=model_vgg16.inputs, outputs=model_vgg16.layers[-4].output)
        
        features = model_vgg16.predict(arr)
        return features

    features_train = transform_feature(arr=X)
    
    os.makedirs(feature_extraction_path, exist_ok = True)

    with open(f"{feature_extraction_path}/features_extract", 'wb') as f:
        pickle.dump((features_train, Y), f)
        

In [6]:
def train_test_splits(feature_extraction_path:InputPath(str), train_test_split_path: OutputPath(str)):
    import pandas as pd
    import os
    import pickle
    from sklearn.model_selection import train_test_split

    # load data
    with open(f"{feature_extraction_path}/features_extract", 'rb') as f:
        all_data = pickle.load(f)
        
    features_train, features_test = all_data
    
    # train test split
    X_train, X_test, y_train, y_test = train_test_split(features_train, features_test, test_size=0.2, random_state=42, stratify=features_test)
    
    os.makedirs(train_test_split_path, exist_ok = True)

    # pickle train data
    with open(f"{train_test_split_path}/train", 'wb') as f:
        pickle.dump((X_train, y_train), f)
        
    # pickle test data
    with open(f"{train_test_split_path}/test", 'wb') as f:
        pickle.dump((X_test, y_test), f)

    

Component 4: ML Modelling

Component 4.1: CNN

Component 4.2: tf vgg16

In [None]:
def tf_vgg16(batch_size:int,
             epochs:int,
             train_test_split_path: InputPath(str),
             model_path: OutputPath(str)):
    
    import os, pickle
    import numpy as np
    import pandas as pd
    from keras.layers import Input, Dense, Conv2D, MaxPooling2D, Flatten
    from keras.applications.vgg16 import VGG16
    from keras.models import Model
    from tensorflow.keras.callbacks import ModelCheckpoint


    #loading the train data
    with open(f'{train_test_split_path}/train', 'rb') as f:
        train_data = pd.read_pickle(f)
        
    X_train, y_train = train_data
    print(X_train.shape, y_train.shape)
    

    # model building
    model_vgg = VGG16(weights='imagenet', include_top=False)
    layer_input = Input(shape= (12, 6, 512))
    
    x = layer_input
    x = Conv2D(64, (3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Flatten()(x)
    x = Dense(100,activation='relu')(x)
    x = Dense(6,activation='softmax')(x)
    

    model_vgg = Model(layer_input, x)
    model_vgg.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy', metrics=['accuracy'])    
    model_vgg.summary()
    
    os.makedirs(model_path, exist_ok=True)
    checkpoint = ModelCheckpoint(f"{model_path}/model.h5", monitor='val_loss', verbose=1,save_best_only=True, mode='auto', period=1)
    # fit the data to model
    history = model_vgg.fit(
        np.array(X_train),
        np.array(y_train),
        batch_size=batch_size,
        epochs=epochs,
        validation_split=0.2,
        callbacks=[checkpoint],
        shuffle=True
    )
    print(history.history['accuracy'])
    
    # oading the X_test and y_test
    with open(f'{train_test_split_path}/test', 'rb') as f:
        test_data = pd.read_pickle(f)
        
    X_test, y_test = test_data
    
    # evaluate the model and print the results
    test_loss, test_acc = model_vgg.evaluate(np.array(X_test), np.array(y_test), verbose=0)
    print("Test_loss: {}, Test_accuracy: {} ".format(test_loss,test_acc))

    


Component 5: confusion matrix

In [None]:
def confusion_matrix(model_path: InputPath(str),
               train_test_split_path: InputPath(str),
               mlpipeline_ui_metadata_path: OutputPath()) :
    
    import json, pickle
    import pandas as pd
    import numpy as np
    from collections import namedtuple
    from sklearn.metrics import confusion_matrix
    from tensorflow.keras.models import load_model
    from keras.models import model_from_json

    
    # loading the X_test and y_test
    with open(f'{train_test_split_path}/test', 'rb') as f:
        test_data = pd.read_pickle(f)
        
    X_test, y_test = test_data
    
    labels = {
        0: 'Paracetamol',
        1: 'Amoxilin',
        2: 'CTM',
        3: 'Amlodipin',
        4: 'Metformin'
    }
    
    # loading the model
    model = load_model(f'{model_path}/model.h5')
    # prediction
    
    y_pred = np.argmax(model.predict(X_test), axis=1)
    
    # int to label
    y_test_label = [labels[x] for x in y_test]
    y_pred_label = [labels[x] for x in y_pred]
    
    # confusion matrix
    cm = confusion_matrix(y_test_label, y_pred_label)
    print(cm)
    vocab = list(np.unique(y_test_label))
    
    # confusion_matrix pair dataset 
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))
    
    # convert confusion_matrix pair dataset to dataframe
    df = pd.DataFrame(data,columns=['target','predicted','count'])
    print(df)
    # change 'target', 'predicted' to integer strings
    df[['target', 'predicted']] = df[['target', 'predicted']].astype(str)
    print(df)
    # create kubeflow metric metadata for UI
    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {
                        "name": "target",
                        "type": "CATEGORY"
                    },
                    {
                        "name": "predicted",
                        "type": "CATEGORY"
                    },
                    {
                        "name": "count",
                        "type": "NUMBER"
                    }
                ],
                "source": df.to_csv(header=False, index=False),
                "storage": "inline",
                "labels": [
                    "Amlodipin",
                    "Amoxilin",
                    "CTM",
                    "Metformin",
                    "Paracetamol",
                ]
            }
        ]
    }
    
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)
        


Component 6: roc curve

In [None]:
def roc_curve(model_path: InputPath(str),
               train_test_split_path: InputPath(str),
               mlpipeline_ui_metadata_path: OutputPath()):
    from tensorflow.keras.models import load_model
    import pandas as pd
    import json
    import numpy as np
    from sklearn.metrics import roc_curve
    
    # loading the X_test and y_test
    with open(f'{train_test_split_path}/test', 'rb') as f:
        test_data = pd.read_pickle(f)
    
    X_test, y_test = test_data

    # loading the model
    model = load_model(f'{model_path}/model.h5')
    
    # prediction
    y_pred = np.argmax(model.predict(X_test), axis=1)
    
    fpr, tpr, thresholds = roc_curve(y_true=y_test, y_score=y_pred, pos_label=True)
    # testing
    df = pd.DataFrame({
        'fpr':fpr,
        'tpr': tpr,
        'thresholds': thresholds
    })
    print(df)
    
    metadata = {
            'outputs': [{
            'type': 'roc',
            'format': 'csv',
            'schema': [
                {'name': 'fpr', 'type': 'NUMBER'},
                {'name': 'tpr', 'type': 'NUMBER'},
                {'name': 'thresholds', 'type': 'NUMBER'},
            ],
            "source": df.to_csv(header=False, index=False),
            "storage": "inline",
            }]
        }

    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

Component 7: saving model

In [None]:
def model_registry(model_path: InputPath(str),
                   model_version: float):
    from google.cloud import storage
    import os
    
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'doctorrx-387716-cdaefd627b4a.json'

    storage_client = storage.Client()
    bucket = storage_client.get_bucket('doctorrx_pipeline_bucket')
    blob = bucket.blob(f'models/{model_version}/model.h5')
    
    blob.upload_from_filename(f'{model_path}/model.h5')
    

In [None]:

# create component
create_get_data_csv_builder = comp.create_component_from_func(get_data_csv_builder,base_image='gcr.io/doctorrx-387716/doctor-rx')
create_preprocessing = comp.create_component_from_func(preprocessing, base_image='gcr.io/doctorrx-387716/doctor-rx')
create_features_extraction = comp.create_component_from_func(feautre_extraction, base_image='gcr.io/doctorrx-387716/doctor-rx')
create_train_test_split = comp.create_component_from_func(train_test_splits, base_image='gcr.io/doctorrx-387716/doctor-rx')
create_ft_vgg16 = comp.create_component_from_func(tf_vgg16, base_image='gcr.io/doctorrx-387716/doctor-rx')
create_confusion_matrix = comp.create_component_from_func(confusion_matrix, base_image='gcr.io/doctorrx-387716/doctor-rx')
create_roc_curve = comp.create_component_from_func(roc_curve, base_image='gcr.io/doctorrx-387716/doctor-rx')
create_model_registry = comp.create_component_from_func(model_registry, base_image='gcr.io/doctorrx-387716/doctor-rx')

## Kubeflow pipeline creation

In [None]:

@dsl.pipeline(name='doctor-rx-pipeline-v:1.0',
              description="Perfroms end-to-end MLOPS")
def doctorrx_pipeline(folder_name: list = ["Amlodipin", "Paracetamol", "Amoxilin", "CTM", "Metformin"],
                      load_data_path: str = '/mnt' ,
                      batch_size: int = 128,
                      epochs: int = 100,
                      train_test_split_path: str = 'testsplit',
                      feature_extraction_path:str = 'feature_extract',
                      model_path:str = 'model',
                      model_version:float = 1.0):
    
    # create get data container
    get_data_csv_comp = create_get_data_csv_builder(folder_name=folder_name)
    
    # create pre processing cotnainer
    preprocessing_comp = create_preprocessing(get_data_csv_comp.outputs['data_csv'])
    
    # create features extraction
    features_extraction_comp = create_features_extraction(preprocessing_comp.output)
    
    # create preprocess container
    train_test_split_comp = create_train_test_split(features_extraction_comp.output)
    
    # create cnn container
    
    # create modelling container
    modelling_comp = create_ft_vgg16(batch_size, epochs, train_test_split_comp.output)
    # modelling_comp.execution_options.caching_strategy.max_cache_staleness = "P0D" 
       
    # create conf container
    conf_matrix = create_confusion_matrix(modelling_comp.output, train_test_split_comp.output)
    # conf_matrix.execution_options.caching_strategy.max_cache_staleness = "P0D" 
    
    # create roc curve
    roc_curve = create_roc_curve(modelling_comp.output, train_test_split_comp.output)
    # roc_curve.execution_options.caching_strategy.max_cache_staleness = "P0D" 
    
    model_registry = create_model_registry(modelling_comp.output, 1.0)


In [None]:

folder_name = ["Amlodipin", "Paracetamol", "Amoxilin", "CTM", "Metformin"]
load_data_path = "/mnt"
batch_size = 128
epochs = 100
train_test_split_path = "testsplit"
feature_extraction_path = 'feature_extract'
model_path = "model"
version = 1.0

In [None]:
pipeline_func = doctorrx_pipeline
experiment_name = 'doctorrx_pipeline_v2'
run_name = pipeline_func.__name__ + 'run'

arguments = {
    "folder_name": folder_name,
    "load_data_path": load_data_path,
    "batch_size": batch_size,
    "epochs": epochs,
    "train_test_split_path": train_test_split_path,
    "feature_extraction_path": feature_extraction_path,
    "model_path": model_path,
    "version" : version
}

# compiler
kfp.compiler.Compiler().compile(pipeline_func,
                                '{}.yaml'.format(experiment_name))