### CLARENCE
#### Notebook primarily for testing components 

### 18 Jun 
Pipeline not running as expected

In [None]:
import json
import pathlib

from google_cloud_pipeline_components.experimental.custom_job.utils import (
    create_custom_training_job_op_from_component,
)

from kfp.v2 import compiler, dsl
from pipelines.kfp.helpers import generate_query

from pipelines.kfp_components.ingest import bq_extract_data, bq_query_to_table
from pipelines.kfp_components.training import train_xgboost_model
from pipelines.kfp_components.dependencies import PROJECT_ID, ROOT_DIR, sql_query

print(ROOT_DIR)

@dsl.pipeline(name="xgboost-train-pipeline")
def xgboost_pipeline(
    # project_id: str,
    # project_location: str,
    # pipeline_files_gcs_path: str,
    # ingestion_project_id: str,
    # tfdv_schema_filename: str,
    # tfdv_train_stats_path: str,
    # model_name: str,
    # model_label: str,
    # dataset_id: str,
    # dataset_location: str,
    # ingestion_dataset_id: str,
    # timestamp: str,
):
    """
    Query a view from BQ
    Extract the view to GCS

    """

    sql_query = generate_query(
        input_file=ROOT_DIR
        / "pipelines"
        / "kfp"
        / "ingest"
        / "queries"
        / "query_bq.sql"
    )

    ingest = bq_query_to_table(
        query=sql_query,
        bq_client_project_id=None,
        destination_project_id=PROJECT_ID,
        dataset_id="dwh_pacific_torus",
        table_id="credit_card_default",
        dataset_location="US",
        query_job_config=None,
    )
    # .set_display_name("Ingest data")

    # ingest_to_gcs = (
    #     bq_extract_data(
    #         source_project_id="pacific-torus-347809",
    #         source_dataset_id="dwh_pacific_torus",
    #         source_table_id="credit_card_default",
    #         destination_project_id="pacific-torus-347809",
    #         destination_bucket="mle-dwh-torus",
    #         destination_file="raw/credit_cards.csv",
    #         dataset_location="US",
    #     )
    #     # .after(ingest)
    #     # .set_display_name("Export to GCS")
    # )


def compile():

    compiler.Compiler().compile(
        pipeline_func=xgboost_pipeline,
        pipeline_name="xgboost-train-pipeline",
        package_path="training.yaml",
        type_check=True,
    )


if __name__ == "__main__":

    custom_train_job = create_custom_training_job_op_from_component(
        component_spec=train_xgboost_model,
        replica_count=1,
        machine_type="n1-standard-4",
    )

    compile()

### 22 Jun Test ingest component step

In [None]:
import os
import kfp
from kfp.v2 import compiler, dsl

In [None]:
ingest_op = kfp.components.load_component_from_file("../pipelines/kfp_components/ingest/ingest_component.yaml")

In [None]:
## ingest_op.component_spec

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='testing pipeline',
   description='NIL'
)
def xgboost_pipeline(
):
    ingest = ingest_op(
        source_project_id="pacific-torus-347809",
        source_dataset_id="dwh_pacific_torus",
        source_table_id="credit_card_defaults",
        destination_project_id="pacific-torus-347809",
        destination_bucket="mle-dwh-torus",
        destination_file="raw/test.csv",
        dataset_location="US",
        extract_job_config="None",
    ) # .apply(gcp.use_gcp_secret('user-gcp-sa'))
    

In [None]:
compiler.Compiler().compile(
    pipeline_func=xgboost_pipeline,
    pipeline_name="xgboost-train-pipeline",
    package_path="training.json",
    type_check=True,
)

### 23 Jun Simple pipeline trial

In [None]:
from typing import NamedTuple
from kfp.v2.dsl import Dataset, Output, component, OutputPath, Artifact, Input
import google.cloud.aiplatform as aip
from kfp.v2 import compiler, dsl

aip.init(project="pacific-torus-347809", staging_bucket="gs://mle-dwh-torus")

In [None]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "pacific-torus.json"


In [None]:
def bq_extract_data(
    source_project_id: str,
    source_table_url: str,
    destination_project_id: str,
    destination_bucket: str, 
    destination_file: str,
    dataset_location: str,
    extract_job_config: dict = None,
) -> NamedTuple('outputs', [('dataset_gcs_uri', str), 
    ('dataset_gcs_directory', str)]
):

    import logging
    import os 
    from google.cloud.exceptions import GoogleCloudError
    from google.cloud import bigquery, storage


    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "pacific-torus.json"

    # logging.config.fileConfig(LOGGING_CONF)
    logger = logging.getLogger("root")

    storage_client = storage.Client(project=destination_project_id)

    if not storage.Bucket(storage_client, destination_bucket).exists():
        bucket = storage_client.bucket(destination_bucket)
        bucket.storage_class = "STANDARD" 
        storage_client.create_bucket(
            bucket_or_name=bucket, location="ASIA-SOUTHEAST1", project=destination_project_id
        )
        logger.info(f"Bucket created {destination_bucket}")

    full_table_url = f"{source_project_id}.{source_table_url}"
    table = bigquery.table.Table(table_ref=full_table_url)

    if extract_job_config is None:
        extract_job_config = {}
    if destination_file.endswith(".json"):
        extract_job_config = {"destination_format": "NEWLINE_DELIMITED_JSON"}
    job_config = bigquery.ExtractJobConfig(**extract_job_config)


    dataset_gcs_uri = f"gs://{destination_bucket}/{destination_file}"
 
    bq_client = bigquery.Client(project=destination_project_id)

    logger.info(f"Extract {source_table_url} to {dataset_gcs_uri}")
    extract_job = bq_client.extract_table(
        source=table,
        destination_uris=dataset_gcs_uri,
        job_config=job_config,
        location=dataset_location,
    )

    dataset_gcs_directory = os.path.dirname(dataset_gcs_uri)

    try:
        extract_job.result()
        logger.info(f"Table extracted: {dataset_gcs_uri}")
    except GoogleCloudError as e:
        logger.error(e)
        logger.error(extract_job.error_result)
        logger.error(extract_job.errors)
        raise e

    return (dataset_gcs_uri, dataset_gcs_directory)

In [None]:
def basic_preprocessing(
    input_file: str,
    output_bucket: str,
    output_file: str
) -> Artifact: 

    import pandas as pd
    from functools import reduce

    df = pd.read_csv(input_file)

    df.columns = [col.lower().strip() for col in df.columns] 

    df.dropna(inplace=True)
    
    df.rename({'default_payment_next_month': "default"}, inplace=True)
    df.loc[df['education'] == '0', 'education'] = 'Unknown'
    df.loc[df['marriage'] == '0', 'marriage'] = 'Other'
    sex = pd.get_dummies(df.sex, prefix='gender')
    education = pd.get_dummies(df.education, prefix='ed')
    marriage = pd.get_dummies(df.marriage, prefix='mstatus')
    frames = [df, sex, education, marriage]
    final = reduce(lambda l, r: pd.concat([l, r], axis=1), frames)
    final.drop(['default_payment_next_month', 'sex', 'education', 'marriage'], axis=1, inplace=True)

    output_path = f"gs://mle-dwh-torus/{output_bucket}/{output_file}" 
    final.to_csv(output_path, index=False)

    return final


def train_test_split(
    input_file: InputPath("CSV"),
    output_bucket: str,
) -> NamedTuple("outputs", [
    ("train_data", Artifact),
    ("test_data", Artifact)
]):

    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(input_file)
    train, test = train_test_split(df, test_size=0.2, random_state=2022)

    output_train_path = f"gs://mle-dwh-torus/{output_bucket}/train.csv"
    output_test_path = f"gs://mle-dwh-torus/{output_bucket}/test.csv" 

    train.to_csv(output_train_path)
    test.to_csv(output_test_path)

    return (train, test)


def read_data(
    a: InputPath("CSV"),
): 
    print("done")

In [None]:
import kfp

kfp.components.func_to_container_op(
    bq_extract_data,
    extra_code="from typing import NamedTuple",
    output_component_file='ingest_component.yaml', 
    base_image='gcr.io/pacific-torus-347809/mle-fp/base:latest')

kfp.components.func_to_container_op(
    basic_preprocessing,
    extra_code="from kfp.v2.dsl import Dataset, InputPath, OutputPath",
    output_component_file='basic_preprocessing_component.yaml', 
    base_image='gcr.io/pacific-torus-347809/mle-fp/base:latest',
    packages_to_install=["fsspec", "gcsfs"])

kfp.components.func_to_container_op(
    train_test_split,
    extra_code="from kfp.v2.dsl import Dataset, InputPath",
    output_component_file='tts_component.yaml', 
    base_image='gcr.io/pacific-torus-347809/mle-fp/base:latest',
    packages_to_install=["fsspec", "gcsfs", "sklearn"])

kfp.components.func_to_container_op(
    read_data,
    extra_code="from kfp.v2.dsl import Dataset, InputPath",
    output_component_file='data_component.yaml', 
    base_image='gcr.io/pacific-torus-347809/mle-fp/base:latest',
    packages_to_install=["fsspec", "gcsfs", "sklearn"])

In [None]:
ingest_op = kfp.components.load_component_from_file("./ingest_component.yaml")
remove_na_op = kfp.components.load_component_from_file("./basic_preprocessing_component.yaml")
tts_op = kfp.components.load_component_from_file("./tts_component.yaml")

data_op = kfp.components.load_component_from_file("./data_component.yaml")

PIPELINE_ROOT = "{}/pipeline/".format("gs://mle-dwh-torus")

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='testing pipeline',
   description='NIL',
   pipeline_root=PIPELINE_ROOT
)
def xgboost_test_pipeline(
):
    ingest = ingest_op(
        source_project_id="pacific-torus-347809",
        source_table_url="dwh_pacific_torus.credit_card_defaults",
        destination_project_id="pacific-torus-347809",
        destination_bucket="mle-dwh-torus",
        destination_file="raw/new_test.csv",
        dataset_location="US",
        extract_job_config={},
    ) # .apply(gcp.use_gcp_secret('user-gcp-sa'))

    remove_na = remove_na_op(
        input_file=ingest.outputs["dataset_gcs_uri"],
        output_bucket="int",
        output_file="ccd2.csv"
    )
    
    tts = tts_op(
        input=remove_na.output,
        output_bucket="fin"
    )

    data = data_op(
        a=tts.outputs["train_data"]
    )

In [None]:
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../pacific-torus.json"
from datetime import datetime
id = datetime.now().strftime(f"%d%H%M")

compiler.Compiler().compile(
    pipeline_func=xgboost_test_pipeline,
    pipeline_name="xgboost-train-pipeline",
    package_path="./test.json",
    type_check=True,
)

job = aip.PipelineJob(
    display_name="testpipeline",
    template_path="./test.json",
    job_id=f'test-{id}',
    pipeline_root=PIPELINE_ROOT
)

job.run()

### 24 Jun Testing on TFDV 

In [None]:
import tensorflow_data_validation as tfdv
import os


stats1 = tfdv.load_statistics(
    "evaltest.pb")
stats2 = tfdv.load_statistics(
	"evaltest.pb"
)
schema1 = tfdv.infer_schema(statistics=stats1)
schema2 = tfdv.infer_schema(statistics=stats2)

tfdv.get_feature(schema1, 'default').drift_comparator.jensen_shannon_divergence.threshold = 0.01

drift_anomalies = tfdv.validate_statistics(
    statistics=stats2, schema=schema1, previous_statistics=stats1)
print(drift_anomalies.drift_skew_info)

from google.protobuf.json_format import MessageToDict
d = MessageToDict(drift_anomalies)

val = d['driftSkewInfo'][0]['driftMeasurements'][0]['value']

thresh = d['driftSkewInfo'][0]['driftMeasurements'][0]['threshold']
print(val, thresh)