## REQUIREMENTS
    * Deploy Kubeflow
    * This notebook is to be run from Kubeflow's Jupyter Notebook Server

## Install Libraries


In [2]:
!python -m pip install --user --upgrade pip

!pip3 install pandas==0.23.4 matplotlib==3.0.3 scipy==1.2.1 scikit-learn==0.22 tensorflow==2.0 tensorflow_hub --user

Requirement already up-to-date: pip in c:\users\tabitha\appdata\roaming\python\python36\site-packages (20.2.3)
^C


In [None]:
import os
import numpy as np
import pandas as pd
#%tensorflow_version 1.x
import tensorflow as tf
import tensorflow_hub as hub
import json
import pickle
import urllib

from sklearn.preprocessing import LabelBinarizer

print(tf.__version__)

## Get Data
    * The data is stored in a bucket in gcs
    * This might require installing some dependencies
    * bucket = "tabbie_financial"

In [None]:
bucket = "tabbie_financial"

In [None]:
data = pd.read_csv("gs://{}/data/consumer_complaints.csv".format(bucket))

In [None]:
data = data[['product', 'consumer_complaint_narrative']]

In [None]:
data.head()

In [None]:
def preprocess(data_path):
    import re
    import pandas as pd
    
    data = pd.read_csv(data_path)
    data = data[['product', 'consumer_complaint_narrative']]
    
    data = data[pd.notnull(data['consumer_complaint_narrative'])]
    pd.set_option('max_colwidth', 1000)
    data['consumer_complaint_narrative'] = data['consumer_complaint_narrative'].str.lower()
    # chr = ['x', '{', '}', '/']
    data['consumer_complaint_narrative'] = data['consumer_complaint_narrative'].str.replace('x', '')
    data['consumer_complaint_narrative'] = data['consumer_complaint_narrative'].str.replace('{', '')
    data['consumer_complaint_narrative'] = data['consumer_complaint_narrative'].str.replace('}', '')
    data['consumer_complaint_narrative'] = data['consumer_complaint_narrative'].str.replace('/', '')
    
    data.dropna(inplace=True)
    
    #Save preprocessed data
    data.to_csv("data/preprocessed", index=False)
    

In [None]:
data_path = "gs://{}/data/consumer_complaints.csv".format(bucket)

In [None]:
preprocess(data_path)

### Save preprocessed data to google cloud bucket

In [None]:
!gsutil cp data/preprocessed gs://${bucket}/data/preprocessed

## Install Kubeflow pipelines SDK

In [None]:
!pip3 install kfp --upgrade --user

## Build Pipeline Components

In [None]:
# Import Kubeflow SDK
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [None]:
# where the outputs are stored
out_dir = "/home/jovyan/11-financial-products/data/out/"

In [None]:
# where preprocessed data is stored
in_dir = "gs://{}/data/preprocessed".format(bucket)

In [None]:
def train(out_data_path, model_dir):
    
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    import os
    import numpy as np
    import pandas as pd
    #%tensorflow_version 1.x
    import tensorflow as tf
    import tensorflow_hub as hub
    import json
    import pickle
    import urllib

    from sklearn.preprocessing import LabelBinarizer
    
    # get data
    data = pd.read_csv("gs://tabbie_financial/data/preprocessed")
    
    data_comp=data[['consumer_complaint_narrative']]
    data_prod=data[['product']]
    
    # Why didn't you use train_test split?
    
    train_size = int(len(data_comp) * .999)
    train_descriptions = data_comp[:train_size].astype('str')
    train_prod = data_prod[:train_size]
    test_descriptions = data_comp[train_size:].astype('str')
    test_prod =data_prod[train_size:]
    
    train_size = int(len(train_descriptions) * .8)
    train_desc = train_descriptions[:train_size]
    train_pr = train_prod[:train_size]
    val_desc = train_descriptions[train_size:]
    val_pr =train_prod[train_size:]
    
    # Encoder Preprocessing
    encoder = LabelBinarizer()
    encoder.fit_transform(train_pr)
    train_encoded = encoder.transform(train_pr)
    val_encoded = encoder.transform(val_pr)
    num_classes = len(encoder.classes_)
    
    description_embeddings = hub.text_embedding_column("descriptions", module_spec="https://tfhub.dev/google/universal-sentence-encoder-large/3", trainable=False)
    
    multi_label_head = tf.contrib.estimator.multi_label_head(
        num_classes,
        loss_reduction=tf.losses.Reduction.SUM_OVER_BATCH_SIZE
    )
    
    features = {
      "descriptions": np.array(train_desc).astype(np.str)
    }
    labels = np.array(train_encoded).astype(np.int32)
    train_input_fn = tf.estimator.inputs.numpy_input_fn(features, labels, shuffle=True, batch_size=100, num_epochs=10)
    
    estimator = tf.estimator.DNNEstimator(
        head=multi_label_head,
        hidden_units=[64,10],
        feature_columns=[description_embeddings])
    
    # Train
    estimator.train(input_fn=train_input_fn)
    
    train_input_fn_1 = tf.estimator.inputs.numpy_input_fn({"descriptions": np.array(train_desc).astype(np.str)}, train_encoded.astype(np.int32), shuffle=False)
    estimator.evaluate(input_fn=train_input_fn_1)
    
    # Define our eval input_fn and run eval
    eval_input_fn = tf.estimator.inputs.numpy_input_fn({"descriptions": np.array(val_desc).astype(np.str)}, val_encoded.astype(np.int32), shuffle=False)
    estimator.evaluate(input_fn=eval_input_fn)
    
    predict_input_fn = tf.estimator.inputs.numpy_input_fn({"descriptions": np.array(test_descriptions).astype(np.str)}, shuffle=False)

    results = estimator.predict(predict_input_fn)
    
    # Save the model to the designated
    # estimator.save(f'{out_data}/{model_file}')
    serving_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
          tf.feature_column.make_parse_example_spec([description_embeddings]))
    estimator.export_saved_model(f'{out_data_path}/{model_dir}/', serving_input_fn)
    
    #Save the test_data as a pickle file to be used by the predict component.
    with open(f'{out_data_path}/test_data', 'wb') as f:
        pickle.dump((test_descriptions,  test_prod), f)
        
    # return estimator

In [None]:
estimator = train(out_dir, "model")

## Export saved model to google cloud storage bucket.


In [None]:
!gsutil cp {out_dir}/model gs://${bucket}/{out_dir}/model

In [None]:
def predict(data_path, model_dir):
    
    import pickle
    import tensorflow as tf
    import numpy as np
    
    # Load and unpack the test_data
    with open(f'{data_path}/test_data','rb') as f:
        test_data = pickle.load(f)
    # Separate the test_descriptions from test_prod.
    test_descriptions,  test_prod = test_data
    
    # Load model from export directory, and make a predict function.
    # You can also load from gcs
    # Use contrib.predictor
    #predict_fn = tf.contrib.predictor.from_saved_model(f'{data_path}/{model_dir}')
    
    #OR
    #loading saved model
    #estimator = tf.saved_model.load(f'{data_path}/{model_dir}')
    predict_input_fn = tf.estimator.inputs.numpy_input_fn({"descriptions": np.array(test_descriptions).astype(np.str)}, shuffle=False)
    
    # Loading the estimator
    predict_fn = tf.saved_model.load(f'{data_path}/{model_dir}').signatures['predict']
    # Predict
    results = predict_fn(examples=predict_input_fn)
    
    with open(f'{data_path}/result.txt', 'w') as result:
        for product in results:
            top = product['probabilities'].argsort()[-1:]
            for prod in top:
                text_prod = encoder.classes_[prod]
                # print(text_prod + ': ' + str(round(product['probabilities'][prod] * 100, 2)) + '%')
                result.write(text_prod + ': ' + str(round(product['probabilities'][prod] * 100, 2)) + '%')
        print('')

In [None]:
predict(out_dir, "model")

In [None]:
# Create train and predict lightweight components.
train_op = comp.func_to_container_op(train , base_image = "tensorflow/tensorflow:latest-gpu-py3")
predict_op = comp.func_to_container_op(predict , base_image = "tensorflow/tensorflow:latest-gpu-py3")

## Build Kubeflow Pipeline

In [None]:
#Create a client to enable communication with the Pipelines API server.
client = kfp.Client()

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='Consumer Complaints Pipeline',
   description='An ML pipeline that performs Consumer Complaints Classification model training and prediction.'
)

# Define parameters to be fed into pipeline
def consumer_complaints_pipeline(
    data_path: str,
    model_dir: str
):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    # Create training component.
    training_container = train_op(data_path, model_dir) \
                                    .add_pvolumes({data_path: vop.volume})

    # Create prediction component.
    predict_container = predict_op(data_path, model_dir) \
                                    .add_pvolumes({data_path: churn_training_container.pvolume})
    
    # Print the result of the prediction
    result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: predict_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt']
    )


## Run the Pipeline

In [None]:
DATA_PATH = '/mnt'
MODEL_PATH='customer_complaints'

In [None]:
pipeline_func = consumer_complaints_pipeline

In [None]:
experiment_name = 'consumer_complaints_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
             "model_file":MODEL_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)