In [90]:
import kfp
import kfp.dsl as dsl
import kfp.components as comp
import pandas as pd
from google.cloud import storage

In [91]:
client = kfp.Client(host='https://7213e46a0ed7fcf9-dot-us-central2.pipelines.googleusercontent.com/')

In [92]:
# data = pd.read_csv('gs://sentiment_analysis_de2020/data/training.1600000.processed.noemoticon.csv', encoding='ISO-8859-1', header=0, names=["target", "ids", "date", "flag", "user", "text"])
# sample = data.sample(1000, random_state=1)
# filename = 'test_data.csv'
# sample.to_csv(filename)
# client = storage.Client()
# bucket = client.get_bucket('sentiment_analysis_de2020')  
# blob = bucket.blob('data/' + filename)
# blob.upload_from_filename(filename)


In [93]:
data = pd.read_csv('gs://sentiment_analysis_de2020/data/preprocessed_test_data.csv')
data.text.isna().sum()

43

In [94]:
def preprocess(data_path:str, encoder:str, bucket_name: str, stem: bool, test:bool) -> str:
    import pandas as pd
    import nltk
    from nltk.corpus import stopwords
    from  nltk.stem import SnowballStemmer
    import re
    from keras.preprocessing.text import Tokenizer
    from keras.preprocessing.sequence import pad_sequences
    from google.cloud import storage

    #data = pd.read_csv(data_path, encoding=encoder, header=0, names=["target", "ids", "date", "flag", "user", "text"])
    data = pd.read_csv(data_path)
    nltk.download('stopwords')
    stop_words = stopwords.words('english')
    stemmer = SnowballStemmer("english")
    
    for i in range(0, len(data.text)):
        text = data.text[i]
        text = re.sub("@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+", ' ', str(text).lower()).strip()
        tokens = []
        for token in text.split():
            if token not in stop_words:
                if stem:
                    tokens.append(stemmer.stem(token))
                else:
                    tokens.append(token)
        temp = " ".join(tokens)
        data.text[i] = temp
    
    
    data.drop(data.columns[:1], axis=1, inplace=True)
    data.reset_index(drop=True, inplace=True)
    data.dropna()
    
    if not test:
        filename = 'preprocessed_data.csv'
    else: 
        filename = 'preprocessed_test_data.csv'
    data.to_csv(filename)
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)  
    blob = bucket.blob('data/' + filename)
    blob.upload_from_filename(filename)
    
    
    preprocessed_data_path = ('gs://sentiment_analysis_de2020/data/' + filename)
    return preprocessed_data_path

In [95]:
preprocess_op = comp.create_component_from_func(
    func=preprocess,
    base_image='python:3.7',
    output_component_file='download_data.yaml', 
    packages_to_install=['pandas', 'fsspec', 'gcsfs', 'nltk', 'gensim', 'h5py==2.10.0', 'numpy==1.16.0','keras', 'tensorflow', 'google-cloud-storage']
)

In [96]:
def build_model(bucket_name:str) -> str:
    import pandas as pd
    import numpy as np
    import nltk
    from nltk.corpus import stopwords
    from  nltk.stem import SnowballStemmer
    import re
    import json
    import pickle
    from keras.preprocessing.text import Tokenizer
    from keras.preprocessing.sequence import pad_sequences
    from keras.models import Sequential
    from keras.layers import Activation, Dense, Dropout, Embedding, Flatten, Conv1D, MaxPooling1D, LSTM
    from keras import utils
    from keras.callbacks import ReduceLROnPlateau, EarlyStopping
    from google.cloud import storage
    
    client = storage.Client() #setup the storage 
    bucket = client.get_bucket(bucket_name)
    
    blob_w2v_model = bucket.blob('models/model.w2v')
    blob_w2v_model.download_to_filename('downloaded_model.w2v')
    model_w2v = pickle.load(open("downloaded_model.w2v", 'rb'))
    
    blob_tokenizer = bucket.blob('models/tokenizer.pkl')
    blob_tokenizer.download_to_filename('downloaded_tokenizer.pkl')
    tokenizer = pickle.load(open("downloaded_tokenizer.pkl", 'rb'))
    
    vocab_size = len(tokenizer.word_index) + 1
    embedding_matrix = np.zeros((vocab_size, 300))
    
    for word, i in tokenizer.word_index.items():
         if word in model_w2v.wv:
            embedding_matrix[i] = model_w2v.wv[word]
    embedding_layer = Embedding(vocab_size, 300, weights=[embedding_matrix], input_length=300, trainable=False)
    
    model = Sequential()
    model.add(embedding_layer)
    model.add(Dropout(0.5))
    model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy',
            optimizer="adam",
            metrics=['accuracy'])
        
    callbacks = [ ReduceLROnPlateau(monitor='val_loss', patience=5, cooldown=0),
              EarlyStopping(monitor='val_acc', min_delta=1e-4, patience=5)]
    
    model.save('model.h5')
    upload_blob = bucket.blob('models/model.h5')
    upload_blob.upload_from_filename('model.h5')    
        
    return 'models/model.h5'

In [97]:
build_model_op = comp.create_component_from_func(
    func=build_model,
    base_image='python:3.7',
    output_component_file='build_model.yaml', 
    packages_to_install=['pandas', 'fsspec', 'gcsfs', 'nltk', 'gensim', 'h5py==2.10.0', 'numpy==1.16.0','keras', 'tensorflow', 'google-cloud-storage']
)

In [98]:
def train(data_path:str, model_path:str, bucket_name:str, num_epochs:int) -> str:
    import pandas as pd
    import numpy as np
    import nltk
    from nltk.corpus import stopwords
    from  nltk.stem import SnowballStemmer
    import re
    import json
    import pickle
    from keras.preprocessing.text import Tokenizer
    from keras.preprocessing.sequence import pad_sequences
    from keras.models import Sequential
    from keras.layers import Activation, Dense, Dropout, Embedding, Flatten, Conv1D, MaxPooling1D, LSTM
    from keras import utils
    from keras.callbacks import ReduceLROnPlateau, EarlyStopping
    from google.cloud import storage
    from keras.models import load_model

    client = storage.Client() #setup the storage 
    bucket = client.get_bucket(bucket_name)
    blob_model = bucket.blob(model_path) # get the models from the bucket
    blob_model.download_to_filename('downloaded_model.h5') # download the model
    model = load_model('downloaded_model.h5') #load the model

    blob_tokenizer = bucket.blob('models/tokenizer.pkl')
    blob_tokenizer.download_to_filename('downloaded_tokenizer.pkl')
    tokenizer = pickle.load(open("downloaded_tokenizer.pkl", 'rb'))
    
    data = pd.read_csv(data_path)
    X_train = pad_sequences(tokenizer.texts_to_sequences(data.text.astype(str)), maxlen=300)
    y_train = data.target
    
    model.fit(X_train, y_train,
                epochs=num_epochs, 
                batch_size=32)
    model.save('model.h5')
    upload_blob = bucket.blob('models/model.h5')
    upload_blob.upload_from_filename('model.h5')    
        
    return 'models/model.h5'

In [99]:
train_op = comp.create_component_from_func(
    func=train,
    base_image='python:3.7',
    output_component_file='train_model.yaml', 
    packages_to_install=['pandas', 'fsspec', 'gcsfs', 'nltk', 'gensim', 'h5py==2.10.0', 'numpy==1.16.0','keras', 'tensorflow', 'google-cloud-storage']
)
#train_op = comp.load_component_from_file('train_model.yaml')

In [100]:
def retrain(data_path:str, bucket_name:str, num_epochs: int) -> str: 
    import pandas as pd
    import numpy as np
    import nltk
    from nltk.corpus import stopwords
    from  nltk.stem import SnowballStemmer
    import re
    import json
    import pickle
    from keras.preprocessing.text import Tokenizer
    from keras.preprocessing.sequence import pad_sequences
    from keras.models import Sequential
    from keras.models import load_model
    from keras.layers import Activation, Dense, Dropout, Embedding, Flatten, Conv1D, MaxPooling1D, LSTM
    from keras import utils
    from keras.callbacks import ReduceLROnPlateau, EarlyStopping
    from google.cloud import storage
 
    client = storage.Client() #setup the storage 
    bucket = client.get_bucket(bucket_name)
    blob_model = bucket.blob('models/model.h5') # get the models from the bucket
    blob_model.download_to_filename('downloaded_model.h5') # download the model
    model = load_model('downloaded_model.h5') #load the model

    blob_tokenizer = bucket.blob('models/tokenizer.pkl')
    blob_tokenizer.download_to_filename('downloaded_tokenizer.pkl')
    tokenizer = pickle.load(open("downloaded_tokenizer.pkl", 'rb'))
    
    data = pd.read_csv(data_path)
    X_train = pad_sequences(tokenizer.texts_to_sequences(data.text.astype(str)), maxlen=300)
    y_train = data.target
    
    model.fit(X_train, y_train,
                epochs=num_epochs, 
                batch_size=100)
    model.save('model.h5')
    upload_blob = bucket.blob('models/model.h5')
    upload_blob.upload_from_filename('model.h5')    
        
    return 'models/model.h5'
    

In [101]:
retrain_op = comp.create_component_from_func(
     func=retrain,
     base_image='python:3.7',
     output_component_file='retrain_model.yaml', 
     packages_to_install=['pandas', 'fsspec', 'gcsfs', 'nltk', 'gensim', 'h5py==2.10.0', 'numpy==1.16.0','keras', 'tensorflow', 'google-cloud-storage', 'sklearn']
)
#retrain_op = comp.load_component_from_file('retrain_model.yaml')

In [102]:
def evaluate(model_path:str, test_data_path:str, bucket_name:str) -> float:
    import pandas as pd
    import numpy as np
    import pickle
    from sklearn.metrics import confusion_matrix
    from sklearn.metrics import accuracy_score
    from sklearn.metrics import precision_score
    from sklearn.metrics import recall_score
    from keras import utils
    from keras.models import load_model
    from google.cloud import storage
    from keras.preprocessing.text import Tokenizer
    from keras.preprocessing.sequence import pad_sequences
    
    client = storage.Client() #setup the storage 
    bucket = client.get_bucket(bucket_name)
    
    blob_tokenizer = bucket.blob('models/tokenizer.pkl')
    blob_tokenizer.download_to_filename('downloaded_tokenizer.pkl')
    tokenizer = pickle.load(open("downloaded_tokenizer.pkl", 'rb'))
    
    data = pd.read_csv(test_data_path)
    X_test = pad_sequences(tokenizer.texts_to_sequences(data.text.astype(str)), maxlen=300)
    y_test = data.target
    
  
    blob_model = bucket.blob(model_path) # get the models from the bucket
    blob_model.download_to_filename('downloaded_model.h5') # download the model
    model = load_model('downloaded_model.h5') #load the model
    
    predictions = model.predict(X_test,  verbose=1)
    
    for i in range(0, len(predictions)):
        if (predictions[i] < 0.5):
            predictions[i] = 0
        elif (predictions[i] >= 0.5):
            predictions[i] = 4
        
    print("confusion matrix : \n")
    conf = confusion_matrix(y_test, predictions)
    print(conf)
    
    acc = accuracy_score(y_test, predictions)
    print('Accuracy: ' + str(acc))
    
    prec = precision_score(y_test, predictions, average='macro')
    print('Precision: ' + str(prec))
    
    recall = recall_score(y_test, predictions, average='macro')
    print('Recall: ' + str(acc))
    
    return(float((sum(predictions)/len(predictions))[0]))

In [103]:
evaluate_op = comp.create_component_from_func(
     func=evaluate,
     base_image='python:3.7',
     output_component_file='evaluate.yaml', 
     packages_to_install=['pandas', 'fsspec', 'gcsfs', 'nltk', 'gensim', 'h5py==2.10.0', 'numpy==1.16.0','keras', 'tensorflow', 'google-cloud-storage', 'sklearn']
)

In [104]:
@dsl.pipeline(
    name='twitter sentiment',
    description='sentiment anlysis on live twitter data'
)
def sentiment_pipeline(data_path, encoder, bucket_name, stem, num_epochs, retrain, test_data_path, disable_cache):
    
    preprocess_task = preprocess_op(data_path, encoder, bucket_name, stem, False)
    preprocess_test_task = preprocess_op(test_data_path, encoder, bucket_name, stem, True)
    
    with dsl.Condition(retrain == False):
        build_model_task = build_model_op(bucket_name)
        train_task = train_op(preprocess_task.output, build_model_task.output, bucket_name, num_epochs)
        eval_task_train = evaluate_op(train_task.output, preprocess_test_task.output, bucket_name)
        
    with dsl.Condition(retrain == True):
        retrain_task = retrain_op(preprocess_task.output, bucket_name, num_epochs)
        eval_task_retrain = evaluate_op(retrain_task.output, preprocess_test_task.output, bucket_name)
        
    if disable_cache:
      preprocess_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      preprocess_test_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      build_model_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      train_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      retrain_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      eval_task_retrain.execution_options.caching_strategy.max_cache_staleness = "P0D"
      eval_task_train.execution_options.caching_strategy.max_cache_staleness = "P0D"

    
arguments = {
    'data_path': 'gs://sentiment_analysis_de2020/data/test_data.csv',
    'encoder': 'ISO-8859-1',
    'bucket_name': 'sentiment_analysis_de2020',
    'stem': False, 
    'num_epochs': 4,
    'retrain': True, 
    'test_data_path': 'gs://sentiment_analysis_de2020/data/sampled_data.csv', 
    'disable_cache': True
}

client.create_run_from_pipeline_func(sentiment_pipeline, arguments=arguments)

RunPipelineResult(run_id=52c810e7-eef0-4ab4-bf34-1e9f1fc8d0ed)