# Pipeline 1
This pipeline gets data from the first data bucket, processes it through five components: loading the data, cleaning the data, splitting the data, undersampling the train data and uploading it to the cloud. 

### Install (???)

In [None]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

In [None]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

In [None]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "assignment-1-399115"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2023_2056332"
# Specify your assignment
ASSIGNMENT = "Assignment_1"

### Component: Loading data

In [None]:
@dsl.component(
    packages_to_install=["pandas","google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_data(project_id: str, bucket: str, feature_dataset_name: str, label_dataset_name: str, merged_dataset: Output[Dataset]):
    '''Download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Get the client and bucket
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)

     # Download the feature dataset
    blob1 = bucket.blob(feature_dataset_name)
    blob1.download_to_filename(feature_dataset_name.path + ".csv")
    df_feature = pd.read_csv(feature_dataset_name.path + ".csv")
    
    
    # Download the label dataset
    blob2 = bucket.blob(label_dataset_name)
    blob2.download_to_filename(label_dataset_name.path + ".csv")
    df_label = pd.read_csv(feature_dataset_name.path + ".csv")
    
    
    # Merge the datasets
    merged_df = pd.merge(df_feature, df_label, on='Ind_ID')
    merged_df.to_csv(merged_dataset.path + ".csv", index=False, encoding='utf-8-sig')
    

    logging.info('Download & Merge all Data complete!')

### Component: cleaning data
Irrelevant columns are delted.

In [None]:
@dsl.component(
    packages_to_install=["pandas", "numpy"],
    base_image="python:3.10.7-slim"
)
def clean_data(merged_dataset: Input[Dataset], cleaned_dataset: Output[Dataset]):
    '''Deletes irrelevant columns and removes NA's'''
    import pandas as pd
    import logging
    import sys
    import numpy

    # Sets the logging config
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 

    #Loads the merged dataset in
    df = pd.read_csv(merged_dataset.path, index_col=None)

    # Drops the columns: Birthday_count, Employed_days, Mobile_phone, Work_Phone, Phone, EMAIL_ID, Type_Occupation and Family_Members.
    cleandf = df.drop(columns= ["Birthday_count", "Employed_days", "Mobile_phone", "Work_Phone", "Phone", "EMAIL_ID", "Type_Occupation", "Family_Members"])

    # Save the cleaned dataset
    cleandf.to_csv(cleaned_dataset.path + ".csv", index=False, encoding='utf-8-sig')

### Component: Creating Train/Test split

In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.10.7-slim"
)
def train_test_split(cleaned_dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # get the cleaned data from previous component
    alldata = pd.read_csv(cleaned_dataset.path, index_col=None) 
    
    #create a train and test dataset
    train, test = tts(alldata, test_size=0.3)

    #create a train.csv file and a test.csv file
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')


### Component: Undersampling

In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.10.7-slim"
)
def undersample_train(dataset_train: Input[Dataset], resampled_dataset_train: Output[Dataset]):
    '''Apply undersampling to training set'''
    import pandas as pd
    import logging 
    import sys
    from imblearn.under_sampling import RandomUnderSampler
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # get the train dataset from the previous component
    train_data = pd.read_csv(dataset_train.path, index_col=None)

    # random sampling
    rus = RandomUnderSampler(random_state=42)
    resampled_train_data = rus.fit_resample(train_data)

    # create a file with the randomly undersampled data
    resampled_train_data.to_csv(resampled_train_data.path + ".csv" , index=False, encoding='utf-8-sig')

### Component: upload the dataset to Google Cloud!

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_dataset_to_gcs(project_id: str, resampled_train_data: Input[Dataset], dataset_test: Input[Dataset], assignment: str):
    '''upload dataset to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob1 = bucket.blob(dataset_test)
    blob2 = bucket.blob(resampled_train_data)
   
    blob1.upload_from_filename(dataset_test.path + ".csv")   
    blob2.upload_from_filename(resampled_train_data.path + ".csv")
    logging.info("Upload to Google Cloud complete.")

### Pipeline: Call all components with the output from the previous component

In [None]:
# pipeline component
@kfp.dsl.pipeline(
    name="fraud-predictor-data-cleaning-pipeline")
def pipeline(project_id: str, data_bucket: str, model_repo: str, dataset_feature_name: str, dataset_label_name: str):    
    
    di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        feature_dataset_name= dataset_feature_name,
        label_dataset_name = dataset_label_name
    )
     
    clean_op = clean_data(merged_dataset =  di_op.outputs["merged_dataset"]) 
                                            # cleaned_dataset: Output[Dataset]) 
                                            
    train_test_split_op = train_test_split(cleaned_dataset = clean_op.outputs["cleaned_dataset"])
                                           # dataset_train: Output[Dataset], dataset_test: Output[Dataset])
    
    resampled_train_op = undersample_train(dataset_train = train_test_split_op.outputs["dataset_train"])
                                            # resampled_dataset_train: Output[Dataset]

    upload_dataset_to_gcs(resampled_train_data = resampled_train_op.outputs["resampled_train_op"], dataset_test = train_test_split_op.outputs["dataset_test"])
 

In [None]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='assignment1_pipeline.yaml')

In [None]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="assignment_1",
    enable_caching=False,
    template_path="assignment1_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'data_de2023_2056332',  # the data_bucket name
        'model_repo':'models_de2023_2056332', # GCP model repo bucket name
        'dataset_feature_name' : 'Credit_card', # the feature dataset name
        'dataset_label_name': 'Credit_card_label' # the label dataset name
    }
)

job.run()