## Installing necessary libraries

In [1]:
#!python -m pip install --user --upgrade pip

#!pip3 install pandas==0.23.4 matplotlib==3.0.3 scipy==1.2.1 scikit-learn==0.22 tensorflow==2.0 keras==1.2.2 --user

In [2]:
# install kubeflow pipeline sdk
#!pip3 install kfp --upgrade --user

In [3]:
#checking if installation was successful
!which dsl-compile

In [4]:
# import libraries for pipeline
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [5]:
# create  directory for outputs.
#output_dir = "/home/jovyan/winetasting-groupf/data/"
output_dir = "/home/jovyan/data"

In [6]:
# create preprocessing fucntion

def selection(data_path):
    
    # Import Libraries
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'nltk==3.2.5'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'requests'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'zipfile'])
    import numpy as np
    import pandas as pd
    import io
    import tensorflow as tf
    from sklearn.preprocessing import LabelEncoder, OneHotEncoder
    from tensorflow.python import keras
    import requests
    import re
    import zipfile
    import nltk
    nltk.download('punkt')
    from nltk.tokenize import sent_tokenize
    
    #downloading the dataset
    url1 = 'https://raw.github.com/HamoyeHQ/stage-f-06-wine-tasting/master/data/top_40_varieties.zip'
    url2 = 'https://raw.github.com/HamoyeHQ/stage-f-06-wine-tasting/master/data/top_varieties_count.csv'
    
    #unzipping and reading data from url1
    resp = requests.get(url1)
    with zipfile.ZipFile(io.BytesIO(resp.content), 'r') as zf:
        with zf.open('top_40_varieties.csv') as f:
            top_40_varieties = pd.read_csv(f)
            
    #reading data from url2
    top_varieties_count = pd.read_csv(url2)
    
    # replacing every occurence of 'US' in country with 'United States of America'
    top_40_varieties['country'].replace('US', 'United States of America', inplace=True)

    # replacing every occurence of 'US' in not_vintage with 'United States of America'
    top_40_varieties['not_vintage'] = top_40_varieties['not_vintage'].apply(lambda x: x.replace(\
                                                                        'US', 'United States of America'))
    # renaming the columns in top_varieties_count
    top_varieties_count = top_varieties_count.rename(columns={'variety': 'count', 'Unnamed: 0': 'variety'})
    top_varieties_count = top_varieties_count.set_index('variety') # setting the index
    top_varieties_count = top_varieties_count['count'] # making it a Series
    
    top = 20 # selecting top 20 varities as our working varieties. note 1 < n <= 40

    # making a datframe of our selecting top n varieties
    top_df = top_40_varieties[top_40_varieties['variety'].isin(top_varieties_count.iloc[:top].index)]
    
    # threshold of miniority variety to over sample (use sentences as document instead of the whole description)
    minority_threshold = 5000 

    # making a dataframe of the miniority classes
    minority_df = top_df[top_df['variety'].isin(top_varieties_count[top_varieties_count < \
                                                                          minority_threshold].index)]
    
    oversampled_miniority_lst = [] # empty list to store sentences as tokens miniority corpus

    # creating a function to use sentences as tokens for the miniority classes
    def over_sample_miniority(row):
        doc_list = sent_tokenize(row['description'])
        for sent in doc_list:
            row['description'] = sent
            oversampled_miniority_lst.append(list(row)) 
            
    minority_df.apply(over_sample_miniority, axis=1); # over sample the miniority classes
    
    # converts oversampled_miniority_lst to a dataframe
    oversampled_miniority_df = pd.DataFrame(oversampled_miniority_lst, columns=minority_df.columns)

    # selecting majority classes as a dataframe to concatenate to oversampled_miniority_lst
    majority_df = top_df[top_df['variety'].isin(\
                                            top_varieties_count[top_varieties_count >= minority_threshold].index)]

    # concatenates majority_df to oversampled_miniority_lst
    balanced_df = pd.concat([majority_df, oversampled_miniority_df]) 
    balanced_df = balanced_df.reset_index().drop('index', axis=1) # resets index
    
    balanced_variety = balanced_df['variety'].value_counts() # gets a Series of the variety count in balanced_df
    
    # for sentence oversampling
    sent_oversample_corpus = [doc1 + ' ' + doc2 for doc1, doc2 in zip(\
                                                        balanced_df['description'], balanced_df['not_vintage'])]

    labels = [label for label in balanced_df['variety']]

    #Save the whole_data as a pickle file to be used by the preprocess component.
    with open(f'{data_path}/selected_data', 'wb') as f:
        pickle.dump((sent_oversample_corpus, labels), f)
    
    return (print('Done!'))

In [7]:
selection(output_dir)

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
Done!


In [8]:
def preprocess(data_path):
    
    #importing and installing libraries
    # Import Libraries
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'spacy==2.2.0'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'dill'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'keras==1.2.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'requests'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.2.0/en_core_web_sm-2.2.0.tar.gz'])
    import numpy as np
    import pandas as pd
    import dill
    import spacy
    import requests
    import re
    import time
    from sklearn.preprocessing import LabelEncoder, OneHotEncoder
    from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin
    import tensorflow as tf
    import keras
    from tensorflow.keras.preprocessing.text import Tokenizer
    from keras.preprocessing.sequence import pad_sequences
    from sklearn.pipeline import Pipeline
    
    # Load and unpack the selected_data
    with open(f'{data_path}/selected_data','rb') as f:
        selected_data = pickle.load(f)
        
    # Separate the independent data (X) from the dependent data(y).
    X_select,  y_select = selected_data
    
    # creating a spacy pipeline and disabling tagger, parser and ner to speed up tokenizer
    nlp = spacy.load('en_core_web_sm', disable=['tagger', 'parser', 'ner']) 
    
    spacy_stop_words = spacy.lang.en.STOP_WORDS # getting spacy's stop-words
    
    # downloading yoast_stop_words to be included to spacy's stop words to improve performance
    response = requests.get('https://raw.github.com/Yoast/YoastSEO.js/develop/src/config/stopwords.js')
    yoast_stop_words = response.content.decode()
    
    pattern = r'\[.+\]'
    match = re.search(pattern, yoast_stop_words)
    yoast_stop_words = set(match.group()[1:-1].replace('"', '').replace(',', '').split())
    
    stop_words_lemma = {word.lemma_.lower() for word in nlp(' '.join(spacy_stop_words | yoast_stop_words))} | \
            {'-pron-', '10', '12', 'aah', 'aa', 'ab', 'aaa', 'aand', '16', '2', '20', '30', '4', '40', '5', '6', '7', \
             '8', '9'}
    
    #creating custom transformers to encapsulate our preprocessing
    class GetTokens(BaseEstimator, TransformerMixin):
        def __init__(self, stop_words=stop_words_lemma):
            self.stop_words = stop_words

        # defining tokenzer function to tokenize the lower case lemma of documents in a corpus and 
        # filter out stop-words  
        def tokenize(self, text):
            return [word.lemma_.lower() for word in nlp(text) if word.is_alpha and word.lemma_.lower() not in self.stop_words]

        def fit(self, X, y=None):
            return self

        def transform(self, X):
            self.tokens = [self.tokenize(doc) for doc in X]

            return self.tokens
    #GetTokens object
    tokens = GetTokens()
        
    class Text2Sequence(BaseEstimator, TransformerMixin):
        def __init__(self):
            self.sequence_tokenizer = Tokenizer(oov_token=-99)

        def fit(self, X, y=None):
            self.sequence_tokenizer.fit_on_texts(X)
            self.words_indices = self.sequence_tokenizer.word_index
            return self

        def transform(self, X):
            self.get_sequences = self.sequence_tokenizer.texts_to_sequences(X)
            return self.get_sequences
    #text2sequence object
    text_2_seq = Text2Sequence()
    
    class Padding(BaseEstimator, TransformerMixin):
        def __init__(self, pad='post'):
            self.pad = pad

        def fit(self, X, y=None):
            return self

        def transform(self, X):
            self.get_paddings = pad_sequences(X, padding=self.pad)
            return self.get_paddings
    #padding object
    pad = Padding()
    
    #building a pipeline for the transformers steps
    data_prep_pipe = Pipeline([('get_tokens', tokens), ('text_2_sequence', text_2_seq), ('padding', pad)], verbose=1)
    
    #Save the data prep pipeline to be used by the train component.
    with open(f'{data_path}/data_pipeline', 'wb') as f:
        dill.dump((data_prep_pipe), f)
    
    return (print('Done!'))
    
    #return data_prep_pipe.fit_transform(X_select, y_select)

In [9]:
preprocess(output_dir)

Using TensorFlow backend.


Done!


In [38]:
# create training function

def trainPredictCNN(data_path):
    
    # import Library
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn==0.23.1'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','gensim'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'dill'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'requests'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'keras==1.2.2'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'spacy==2.2.0'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.2.0/en_core_web_sm-2.2.0.tar.gz'])
    import pandas as pd
    import numpy as np
    import time
    import gzip
    import requests
    import re
    from gensim.models import Word2Vec # importing Word2Vec
    import dill
    import spacy
    from sklearn.utils import class_weight
    from sklearn.preprocessing import LabelEncoder, OneHotEncoder
    # importing deep learning libraries
    import tensorflow as tf
    import keras
    from tensorflow.keras.layers import Embedding, Dense, GlobalMaxPool1D, Conv1D, Dropout
    from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
    from sklearn.pipeline import Pipeline
    
    # creating a spacy pipeline and disabling tagger, parser and ner to speed up tokenizer
    nlp = spacy.load('en_core_web_sm', disable=['tagger', 'parser', 'ner']) 
    
    spacy_stop_words = spacy.lang.en.STOP_WORDS # getting spacy's stop-words
    
    # downloading yoast_stop_words to be included to spacy's stop words to improve performance
    response = requests.get('https://raw.github.com/Yoast/YoastSEO.js/develop/src/config/stopwords.js')
    yoast_stop_words = response.content.decode()
    
    pattern = r'\[.+\]'
    match = re.search(pattern, yoast_stop_words)
    yoast_stop_words = set(match.group()[1:-1].replace('"', '').replace(',', '').split())
    
    stop_words_lemma = {word.lemma_.lower() for word in nlp(' '.join(spacy_stop_words | yoast_stop_words))} | \
            {'-pron-', '10', '12', 'aah', 'aa', 'ab', 'aaa', 'aand', '16', '2', '20', '30', '4', '40', '5', '6', '7', '8', '9'} 
    
    # Load and unpack the selected_data
    with open(f'{data_path}/selected_data','rb') as f:
        selected_data = pickle.load(f)
        
    # Separate the independent data (X) from the dependent data(y).
    X_select, y_select = selected_data
    
    # Load and unpack the data  pipeline
    with open(f'{data_path}/data_pipeline','rb') as f:
        data_pipeline = dill.load(f)
    
    le = LabelEncoder()
    one_hot = OneHotEncoder(sparse=False) 
        
    encoded_labels = le.fit_transform(y_select)
    one_hot_labels = one_hot.fit_transform(encoded_labels.reshape(-1, 1))
    
    
    def get_embedding_matrix(model, word_index):
        vocab_size = len(word_index) + 1
        embedding_dim = model.wv.vector_size
        embedding_matrix = np.zeros((vocab_size, embedding_dim))

        for word in model.wv.vocab:
            ind = word_index[word]
            embedding_matrix[ind] = model[word]

        return embedding_matrix

    def multi_class_fbeta(ytrue , ypred, beta=1, weighted=True, raw=False, epsilon=1e-7):
        beta_squared = beta**2

        ytrue = tf.cast(ytrue, tf.float32)
        ypred= tf.cast(ypred, tf.float32)

        max_prob = tf.reduce_max(ypred, axis=-1, keepdims=True)
        ypred = tf.cast(tf.equal(ypred, max_prob), tf.float32)

        tp = tf.reduce_sum(ytrue*ypred, axis=0)
        predicted_positive = tf.reduce_sum(ypred, axis=0)
        actual_positive = tf.reduce_sum(ytrue, axis=0)

        precision = tp/(predicted_positive+epsilon)
        recall = tp/(actual_positive+epsilon)

        fb = (1+beta_squared)*precision*recall / (beta_squared*precision + recall + epsilon)

        if raw:
            return fb

        if weighted:
            supports = tf.reduce_sum(ytrue, axis=0)
            return tf.reduce_sum(fb*supports / tf.reduce_sum(supports))

        return tf.reduce_mean(fb)
    
    def build_cnn_model(embedding_matrix, input_length):
        model = Sequential()
        model.add(Embedding(embedding_matrix.shape[0], embedding_matrix.shape[1], 
                               weights=[embedding_matrix], 
                               input_length=input_length, 
                               trainable=False))

        model.add(Conv1D(128, 3, activation='relu'))
        model.add(Conv1D(128, 3, activation='relu'))

        model.add(GlobalMaxPool1D())

        model.add(Dropout(0.2))

        model.add(Dense(20, activation='softmax'))

        model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy', multi_class_fbeta])

        return model
    
    # setting class weights due to class imbalance
    class_weights = class_weight.compute_class_weight('balanced', np.arange(20), encoded_labels)
    class_weights = dict(enumerate(class_weights))
    
    #general transformer to run lstm and cnn model
    class NLPModel(BaseEstimator, TransformerMixin):
        def __init__(self, build_fn, name, epochs=7, batch_size=128, verbose=0):
            self.build_fn = build_fn
            self.name = name
            self.epochs = epochs
            self.batch_size = batch_size
            self.verbose = verbose
        
        def fit(self, X, y):
            self.corpus = data_pipeline.named_steps['get_tokens'].tokens

            t1 = time.time()
            self.w2v_model = Word2Vec(self.corpus, size=300, min_count=1, iter=10)

            print('Done training Word2Vec for {}                    total: {}mins'.format(self.name, \
                                                                              round((time.time()-t1)/60, 1)))

            self.embedding_matrix = get_embedding_matrix(self.w2v_model, \
                                                         data_pipeline.named_steps['text_2_sequence'].words_indices)

            self.model = self.build_fn(self.embedding_matrix, X.shape[1])

            t2 = time.time()

            self.history = self.model.fit(X, y, epochs=self.epochs, batch_size=self.batch_size, \
                                          class_weight=class_weights, verbose=self.verbose)

            print('Done training {} model                           total: {}mins'.format(self.name, \
                                                                              round((time.time()-t2)/60, 1)))

            return self
    
        def transform(self, X):
            self.pred = self.model.predict(X)
            return self.pred
        
    #object of CNN
    cnn_model = NLPModel(build_cnn_model, 'cnn_model', epochs=7) # instatiating cnn model object
    
    cnn_model_pipe = Pipeline([('data_prep', data_pipeline), ('model', cnn_model)])
    
    #traning the data
    cnn_model_pipe.fit(X_select, encoded_labels)
    
    #predicting the data
    y_pred = cnn_model_pipe.transform(X_select)
    
    # saving embedding matrix
    with gzip.open(f'{data_path}/embedding_matrix.dill.gz', 'wb') as emb:
        dill.dump(cnn_model.embedding_matrix, emb)
        
    # write predictions to results.txt
    with open(f'{data_path}/CNNresults.txt','w') as result:
        result.write(f'Prediciton: {le.inverse_transform(y_pred)[0]} | Actual {y_select}')
    
    print('CNN Prediction has be saved successfully!')

In [39]:
trainPredictCNN(output_dir)

[Pipeline] ........ (step 1 of 3) Processing get_tokens, total= 1.3min
[Pipeline] ... (step 2 of 3) Processing text_2_sequence, total=   5.6s
[Pipeline] ........... (step 3 of 3) Processing padding, total=   1.1s
Done training Word2Vec for cnn_model                    total: 1.2mins




Done training cnn_model model                           total: 26.8mins
CNN Prediction has be saved successfully!


In [40]:
# Create train and predict lightweight components.
selection_op = comp.func_to_container_op(selection , base_image = "tensorflow/tensorflow:latest-gpu-py3")
preprocess_op = comp.func_to_container_op(preprocess , base_image = "tensorflow/tensorflow:latest-gpu-py3")
trainPredictCNN_op = comp.func_to_container_op(trainPredictCNN , base_image = "tensorflow/tensorflow:latest-gpu-py3")

## Build kubeflow pipeline

In [41]:
#Create a client to enable communication with the Pipelines API server.
client = kfp.Client()

In [46]:
# Define the pipeline
@dsl.pipeline(
   name='A wine tasting pipeline',
   description='An ML pipeline that performs wine tasting model training and prediction.'
)

# Define parameters to be fed into pipeline
def wine_tasting_pipeline(data_path: str):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)#RWO
    
    # Create winetasting selection component.
    wine_tasting_selection_container = selection_op(data_path).add_pvolumes({data_path: vop.volume})

    # Create winetasting preprocess component.
    wine_tasting_preprocess_container = preprocess_op(data_path) \
                            .add_pvolumes({data_path: wine_tasting_selection_container.pvolume})

    
    # Create winetasting trainPredict component for both CNN and LSTM together.
    wine_tasting_trainPredictCNN_container = trainPredictCNN_op(data_path) \
                            .add_pvolumes({data_path: wine_tasting_preprocess_container.pvolume})
    
    
    # Print the result of the prediction
    wine_tasting_result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: wine_tasting_trainPredictCNN_container.pvolume},
        arguments=['cat', f'{data_path}/CNNresults.txt']
    )

## Run pipeline

In [47]:
DATA_PATH ="/home/jovyan/data"
#MODEL_PATH_CNN='cnn_model.hdf5'
#MODEL_PATH_LSTM='lstm_model.hdf5'
#MODEL_PATH_BLENDER='blender.hdf5'

In [48]:
pipeline_func = wine_tasting_pipeline

In [49]:
experiment_name = 'wine_tasting_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH
            }

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

