In [None]:
from datetime import datetime
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
import json
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.dsl import (Artifact,
                        ClassificationMetrics, 
                        component,
                        Dataset,
                        Input,
                        Metrics,
                        Model,
                        Output
                       )
from typing import NamedTuple

In [None]:
# Pre-Configured Values
Val = !gcloud config list --format 'value(core.project)'
PROJECT_ID = Val[0]
print(f"PROJECT_ID - {PROJECT_ID}")
REGION = "asia-east2"
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
SERVICE_ACCOUNT = "terraform-sa@master-314712.iam.gserviceaccount.com"
BUCKET_NAME = "gs://master_asia_east_2/Vertex_AI/Sentiment_Analysis"
PIPELINE_ROOT = f"{BUCKET_NAME}/Pipeline_Root/Starter"
print(f"PIPELINE_ROOT - {PIPELINE_ROOT}")
PIPELINE_JSON_FILE = "Starter_Analysis.json"
PIPELINE_EXPERIMENT_NAME = "Starter_Scoring_Pipeline"+TIMESTAMP
MODEL_DISPLAY_NAME = "Sentiment_analysis"

In [None]:
aip.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest",
           packages_to_install = ["pandas"],          
          )
def bq_load(
    train_data: Output[Dataset]
) -> str:
    import pandas
    from google.cloud import bigquery
    client = bigquery.Client(location="US", project='hackteam-mythbusters1')
    
    query = """
    SELECT * FROM `hackteam-mythbusters1.covid_dataset.combined1`
    """
    query_job = client.query(
        query,
        location="US",
    )

    df = query_job.to_dataframe()
    df.to_csv(train_data.path, index=False)
    return(train_data.path.replace("/gcs/", "gs://"))

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest",
           packages_to_install = ["pandas","regex"],          
          )
def preprocess_tweet(
    dataset: Input[Dataset],
    final: Output[Dataset]):
    import re
    import pandas as pd
    data = pd.read_csv(dataset.path)
    data["Tweet_ID"]=data["Tweet_ID"].astype(str)
    df_tweet = data[data['Translated_Tweet_Text'].isnull() == False]
    #[['Tweet_ID','Translated_Tweet_Text']]
    def preprocess(tweet):
        processed_tweet = ""
        processed_tweet = re.sub(r"https\S+", "", tweet) #remove urls
        processed_tweet = re.sub("[^a-zA-Z]+", " ", processed_tweet) # remove punctuations, digits, symbols
        processed_tweet = re.sub("\s{2,}", " ", processed_tweet) # Merged two+ spaces to one
        return processed_tweet.strip()
    df_tweet["Preprocessed_Tweet"] = df_tweet["Translated_Tweet_Text"].apply(preprocess)
    #df_tweet = df_tweet.iloc[0:30,:]
    df_tweet.to_csv(final.path, index=False)

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest",
           packages_to_install = ["pandas","regex","transformers","tensorflow","torch","flax"],          
          )
def classify_tweet_train(
    dataset: Input[Dataset],
    final: Output[Dataset]):
    
    from transformers import pipeline
    import pandas as pd
    df_tweet = pd.read_csv(dataset.path)
    classifier = pipeline("zero-shot-classification",model="facebook/bart-large-mnli")
    def classify_tweet(tweet):
        if((tweet is None) or (tweet=='')):
            return None
        else:
            candidate_labels = ['health', 'business','technology','entertainment','sports','science']
            model_result = classifier(tweet, candidate_labels)
            return model_result
    df_tweet["Class_General"] = df_tweet["Preprocessed_Tweet"].apply(classify_tweet)
    label_li=[]
    scores_li=[]
    for index,row in df_tweet.iterrows():
        model_res = row["Class_General"]
        if(model_res is not None):
            label_li.append(model_res["labels"])
            scores_li.append(model_res["scores"])
        else:
            label_li.append(None)
            scores_li.append(None)
    df_tweet["Class_Labels_Prioirity"] = label_li
    df_tweet["Class_Scores_Prioirity"] = scores_li
    df_tweet["Output_Class"] = df_tweet["Class_Labels_Prioirity"].apply(lambda x:x[0] if x is not None else None)
    ## check 'health' in top 3 probabilities then classfiy as covid
    df_tweet["Health_vs_Others"] = df_tweet["Class_Labels_Prioirity"].apply(lambda x:"Health" if ((x is not None) and ('health'in x[:-3])) else ("Others" if ((x is not None) and ('health'not in x[:-3])) else None ) )
    df_tweet_health = df_tweet[df_tweet["Health_vs_Others"] == "Health"]
    df_tweet_health = df_tweet_health.iloc[:,0:20]
    df_tweet_health.to_csv(final.path, index=False)

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest",
           packages_to_install = ["pandas","regex","pytz","numpy","nltk"],          
          )
def preprocess(
    dataset: Input[Dataset],
    final: Output[Dataset]):
    # Functions
    import pandas as pd
    import re
    import pytz
    import numpy as np
    from functools import reduce
    import nltk
    from nltk import word_tokenize
    from nltk.corpus import stopwords
    from nltk.stem.porter import PorterStemmer
    from nltk.stem import WordNetLemmatizer
    from nltk.corpus import wordnet
    from nltk import pos_tag
    
    nltk.download('punkt')
    nltk.download('stopwords')
    nltk.download('averaged_perceptron_tagger')
    nltk.download('wordnet')
    nltk.download('omw-1.4')
    
    stop_words = stopwords.words('english')
    lemmatizer = WordNetLemmatizer()
    
    df = pd.read_csv(dataset.path)
    
    def preprocess_data(df):
    
        # For fetching the @mentions into new column
        def get_mentions(s):
            pattern = r'\B(\@[a-zA-Z0-9_-]+\b)'
            mentions = re.findall(pattern, s['Translated_Tweet_Text'])
            return mentions

        # For removing the mentions
        def filter_mentions(s):
            pattern = r'\B(\@[a-zA-Z0-9_-]+\b)'
            filtered = re.sub(pattern, '', s['Translated_Tweet_Text'])
            return filtered

        # For fetching the hashtags into new column
        def get_hashtags(s):
            pattern = r'\B(\#[a-zA-Z0-9_-]+\b)'
            hashtags = re.findall(pattern, s['Translated_Tweet_Text'])
            return hashtags

        # For removing the hashtags
        def filter_hashtags(s):
            pattern = r'\B(\#[a-zA-Z0-9_-]+\b)'
            filtered = re.sub(pattern, '', s['Translated_Tweet_Text'])
            return filtered

        # Calculating age of profile of user
        def calculate_profile_age(s):
            return (pd.Timestamp.now().tz_localize(tz=pytz.UTC) - 
                    s['Profile_Created_At'].tz_convert(tz=pytz.UTC)).total_seconds()

        # For fetching the URLs in tweet body
        def get_urls(s):
            pattern = r'(https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|www\.[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9]+\.[^\s]{2,}|www\.[a-zA-Z0-9]+\.[^\s]{2,})'
            urls = re.findall(pattern, s['Translated_Tweet_Text'])
            return urls

        # For removing the URLs
        def filter_urls(s):
            pattern = r'(https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|www\.[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9]+\.[^\s]{2,}|www\.[a-zA-Z0-9]+\.[^\s]{2,})'
            filtered = re.sub(pattern, '', s['Translated_Tweet_Text'])
            return filtered

        def remove_punctuations(s):
            pattern = r'[^a-zA-Z0-9 ]'
            cleaned = re.sub(pattern,'',s['Translated_Tweet_Text'])
            return cleaned

        # For tokenizing the text
        def tokenize(s):
            tokens = word_tokenize(s['Translated_Tweet_Text'])
            return tokens

        # Stopwords being removed here
        def remove_stopwords(s):
            filtered = [x for x in s['Word_Tokens'] if not x in stop_words]
            return filtered

        # Performing lemmatization with POS tagging
        def lemmatize(s):   

            pos_tups = pos_tag(s['Word_Tokens'])

            wordnet_pos = []
            lemmatized = []

            for tup in pos_tups:
                if tup[1].startswith('J'):
                    wordnet_pos.append((tup[0], wordnet.ADJ))
                elif tup[1].startswith('V'):
                    wordnet_pos.append((tup[0], wordnet.VERB))
                elif tup[1].startswith('N'):
                    wordnet_pos.append((tup[0], wordnet.NOUN))
                elif tup[1].startswith('R'):
                    wordnet_pos.append((tup[0], wordnet.ADV))
                else:
                    wordnet_pos.append((tup[0], None))

            for w in wordnet_pos:
                if w[1]:
                    lemmatized.append(lemmatizer.lemmatize(w[0], pos=w[1]))
                else:
                    lemmatized.append(lemmatizer.lemmatize(w[0]))

            return lemmatized

        # Dropping Profile Location column because of too many NULLs (> 25%)
        df = df.drop('Profile_Location', axis=1)

        # Dropping rows based on nulls in the remaining columns
        df = df[df['Followers_Count'].notna()]
        df = df[df['Tweet_Text'].notna()]

        # Encoding target variable 
        df['Profile_Verified'] = df['Profile_Verified'].apply(lambda x: 0 if x == False else 1)
        df['Profile_Created_At'] = pd.to_datetime(df['Profile_Created_At'])

        df['URLs'] = df.apply(get_urls, axis=1)
        df['Translated_Tweet_Text'] = df.apply(filter_urls, axis=1)
        df['Profile_Age'] = df.apply(calculate_profile_age, axis=1)
        df['Hashtags'] =  df.apply(get_hashtags, axis=1)
        df['Translated_Tweet_Text'] = df.apply(filter_hashtags, axis=1)
        df['Mentions'] = df.apply(get_mentions, axis=1)
        df['Translated_Tweet_Text'] = df.apply(filter_mentions, axis=1)

        # Creating three derived variables based on count
        df['Mentions_Count'] = df['Mentions'].apply(lambda x:len(x))
        df['Hashtags_Count'] = df['Hashtags'].apply(lambda x: len(x))
        df['URLs_Count'] = df['URLs'].apply(lambda x: len(x))

        # Cleaning the tweet text column
        df['Translated_Tweet_Text'] = df['Translated_Tweet_Text'].replace('\n', ' ', regex=True)
        df['Translated_Tweet_Text'] = df['Translated_Tweet_Text'].apply(lambda x: x.lower())
        df['Translated_Tweet_Text'] = df.apply(remove_punctuations, axis=1)
        df['Translated_Tweet_Text'] = df['Translated_Tweet_Text'].apply(lambda x: x.strip())
        df['Word_Tokens'] = df.apply(tokenize, axis=1)
        df['Word_Tokens'] = df.apply(remove_stopwords, axis=1)
        df['Word_Tokens'] = df.apply(lemmatize, axis=1)
        to_drop = ['Tweet_ID', 'Tweet_URL', 'Tweet_Text', 'Retweet_Count', 'Reply_Count', 'Quote_Count', 'Like_Count', 
                   'Tweet_Created_At', 'Tweet_Handles', 'Profile_Created_At', 'Profile_Image_URL', 'Translated_Tweet_Text',
                  'Hashtags', 'Mentions', 'URLs']

        df_filt = df.drop(to_drop, axis=1)
        
        return df_filt

    df_filt = preprocess_data(df)
    assert len(df_filt.columns) == 12
    
    #Final Write
    df_filt.to_csv(final.path, index=False)

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest",
           packages_to_install = ["pandas","scikit-learn", "numpy"],          
          )
def train_component(
    dataset: Input[Dataset],
    accuracy: Output[Metrics],
    f1score: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
    model : Output[Model],
    encoder: Output[Model],
    tfidfvec: Output[Model],
    standardscaler: Output[Model]
)-> NamedTuple(
    "Outputs",
    [
        ("model_path", str),
        ("encoder_path", str),
        ("standard_scaler_path", str),
        ("tfidf_vec_path", str),
    ],
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.metrics import classification_report
    from sklearn.metrics import confusion_matrix
    import numpy as np
    import pickle
    import os
    from google.cloud import aiplatform, storage
    from sklearn.metrics import  accuracy_score, confusion_matrix, f1_score, log_loss, roc_curve
    
    df_filt = pd.read_csv(dataset.path)
    
    def train_model(df_filt):
        # Randomizing the dataset
        df_filt = df_filt.sample(frac=1)

        # Performing one hot encoding on Tweet Source variable
        one_hot_enc = OneHotEncoder(handle_unknown='ignore')
        source_onehot = one_hot_enc.fit_transform(df_filt[['Tweet_Source']])
        df_filt[one_hot_enc.categories_[0]] = source_onehot.toarray().astype(int)

        # Fitting standardscaler on the numerical columns, this scaler needs to be saved for transforming during runtime
        scaler = StandardScaler()
        df_filt[['Followers_Count', 'Following_Count', 'Tweet_Count', 'Listed_Count', 'Profile_Age', 'Mentions_Count', 'Hashtags_Count', 'URLs_Count']] = scaler.fit_transform(df_filt[['Followers_Count', 'Following_Count', 'Tweet_Count', 'Listed_Count', 'Profile_Age', 'Mentions_Count', 'Hashtags_Count', 'URLs_Count']])

        # Fitting tf-idf vectorizer on the word tokens, this vectorizer needs to be saved for transforming during runtime
        v = TfidfVectorizer()
        df_filt['Word_Tokens'] = df_filt['Word_Tokens'].apply(lambda x:''.join(x))
        text_vectors = v.fit_transform(df_filt['Word_Tokens']).todense().tolist()
        rr = pd.DataFrame(text_vectors)
        df_filt = df_filt.reset_index()
        df_final = df_filt.merge(rr, left_index=True, right_index=True)

        to_drop_final = ['index', 'Word_Tokens', 'Tweet_Source']
        df_final = df_final.drop(to_drop_final, axis=1)

        # Separating feature and target variables
        Y = df_final['label']
        X = df_final.drop(['label'], axis=1)

        # 70-30 split between train and test datasets
        X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3)

        # Defining model and training the same
        model_gbm = GradientBoostingClassifier()
        model_gbm.fit(X_train, y_train)
        predictions = model_gbm.predict(X_test)
        
        predictions = predictions.reshape(-1,1)
        y_test = np.array(y_test).reshape(-1,1)
        
        # accuracy on test set
        Classifier_Accuracy = accuracy_score(y_test,predictions)
        accuracy.log_metric("accuracy", (Classifier_Accuracy * 100.0))

        # F1 
        F1_Score = f1_score(y_test,predictions)
        f1score.log_metric("F1 Score", (F1_Score*100.0))
        
        metricsc.log_confusion_matrix(
        ["Genuine", "Fake"],
        confusion_matrix(
            y_test, predictions
            ).tolist(),  # .tolist() to convert np array to list.
        )
        
        

        # Saving the encoder, scaler, vectorizer and model to pickles    
    #     with open('one_hot_enc.pkl', 'wb') as handle:
    #         pickle.dump(one_hot_enc, handle, protocol=pickle.HIGHEST_PROTOCOL)

    #     with open('scaler.pkl', 'wb') as handle:
    #         pickle.dump(scaler, handle, protocol=pickle.HIGHEST_PROTOCOL)

    #     with open('tfidf_vectorizer.pkl', 'wb') as handle:
    #         pickle.dump(v, handle, protocol=pickle.HIGHEST_PROTOCOL)

    #     with open('model_gbm.pkl', 'wb') as handle:
    #         pickle.dump(model_gbm, handle, protocol=pickle.HIGHEST_PROTOCOL)

        # Printing classification report
        # print(classification_report(y_test, predictions))
        return(model_gbm,one_hot_enc,scaler,v)
    
    def save_model_to_bucket(artifact_filename,Output_Model,Model):
        local_path = artifact_filename
        pickle.dump(Model, open(artifact_filename, "wb"))
        # Upload model artifact to Cloud Storage
        model_directory = Output_Model.path
        model_directory_gs = model_directory.replace("/gcs/", "gs://")
        storage_path = os.path.join(model_directory_gs, artifact_filename)
        blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
        blob.upload_from_filename(local_path)
    
    model_gbm,one_hot_enc,scaler,v = train_model(df_filt)
    
    save_model_to_bucket('model.pkl',model,model_gbm)
    save_model_to_bucket('model.pkl',encoder,one_hot_enc)
    save_model_to_bucket('model.pkl',standardscaler,scaler)
    save_model_to_bucket('model.pkl',tfidfvec,v)
    return(model.path.replace("/gcs/", "gs://"),
           encoder.path.replace("/gcs/", "gs://"),
           standardscaler.path.replace("/gcs/", "gs://"),
           tfidfvec.path.replace("/gcs/", "gs://"),
          )

In [None]:
@component(base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:latest", packages_to_install=['google-cloud-aiplatform'],)
def import_model(
    project_id: str,
    display_name: str,
    artifact_gcs_bucket: str,
    model: Output[Model],
    location: str,
    serving_container_image_uri: str,
    description: str
) -> NamedTuple(
    'Outputs', 
    [ 
        ('display_name', str), 
        ('resource_name', str)
    ]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project_id, location=location)
    model_resp = aiplatform.Model.upload(
        display_name=display_name,
        artifact_uri=artifact_gcs_bucket,
        serving_container_image_uri=serving_container_image_uri,
        description=description)
    model_resp.wait()
    with open(model.path, 'w') as f: 
      f.write(model_resp.resource_name)
    model.path = f"aiplatform://v1/{model_resp.resource_name}" #update the resource path to aiplaform://v1 prefix so that off the shelf tasks can consume the output
    return (model_resp.display_name, model_resp.resource_name,)   

In [None]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=PIPELINE_EXPERIMENT_NAME,
)
def pipeline(
    project: str = PROJECT_ID,
    region:str = REGION,
):
    dataset_op = bq_load()
    dataset_filtered = preprocess_tweet(dataset_op.outputs['train_data'])
    #classify_tweets = classify_tweet_train(dataset_filtered.outputs['final'])
    processed_data = preprocess(dataset_op.outputs["train_data"])
    train_val = train_component(processed_data.outputs["final"])
    
    model_upload_op_1 = import_model(
        project_id=project,
        display_name='main-pipeline-scoring-2',
        serving_container_image_uri="europe-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
        artifact_gcs_bucket=train_val.outputs["model_path"] ,
        location=REGION,
        description="final model"
    )
    

    
#     create_endpoint_op = gcc_aip.EndpointCreateOp(
#         project=project,
#         display_name = "main-scoring-model",
#         location = region
#     )

#     model_deploy_op = gcc_aip.ModelDeployOp(
#         model=model_upload_op_1.outputs["model"],
#         endpoint=create_endpoint_op.outputs['endpoint'],
#         automatic_resources_min_replica_count=1,
#         automatic_resources_max_replica_count=1,
#     )
    
   

In [None]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_JSON_FILE
)


job = aip.PipelineJob(
    display_name=PIPELINE_EXPERIMENT_NAME,
    template_path=PIPELINE_JSON_FILE,
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project":PROJECT_ID,"region":REGION},
)

job.run()

In [None]:
from kfp.v2.google.client import AIPlatformClient  # noqa: F811

api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)

# adjust time zone and cron schedule as necessary
response = api_client.create_schedule_from_job_spec(
    job_spec_path=PIPELINE_JSON_FILE,
    schedule="0 6 1 * *",
    time_zone="America/Los_Angeles",  # change this as necessary
    pipeline_root=PIPELINE_ROOT  # this argument is necessary if you did not specify PIPELINE_ROOT as part of the pipeline definition.
)