# Example of a Deep learning Pipleine using CodeCarbon & CarbonTracker

## Import libraries

In [None]:
import logging
import os
from datetime import datetime
logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)
from kfp import dsl
from kfp.v2 import compiler
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2.dsl import Artifact, Input, Metrics, Model, Output, component, Dataset, ClassificationMetrics
import tensorflow as tf


## Define global variables 

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="europe-west4" #Netherlands region

# Get projet name
shell_output=!gcloud config get-value project 2> /dev/null
PROJECT_ID=shell_output[0]

BUCKET_NAME = "test"
BUCKET_URI = f"gs://{BUCKET_NAME}"

# Create bucket
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root_mist/"

# Give the name of the pipeline
PIPELINE_JSON_NAME = 'pipeline-deep-learning-codecarbon.json'
USER_FLAG = "--user"

# Give the experiment name
TIMESTAMP =datetime.now().strftime("%Y%m%d%H%M%S")
TASK = "classif"
MODEL_TYPE = "gpu-cnn"

# Set the name of the pipeline job
DISPLAY_NAME = 'pipeline-deep-learning-cnn-gpu-job{}'.format(TIMESTAMP)

# If you need a service account 
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="../../secrets/test.json"
SA="test@YOUR_PROJECT_ID.iam.gserviceaccount.com"

## Authenticate your Google Cloud account

In the Cloud Console, go to the Create service account key page.
* **Click** Create service account.
* In the Service account name field, enter a name, and click Create.
* In the Grant this service account access to project section, click the Role drop-down list. Type "Vertex AI" into the filter box, and select Vertex AI Administrator. Type "Storage Object Admin" into the filter box, and select Storage Object Admin.
* Click Create. A JSON file that contains your key downloads to your local environment.
* Enter the path to your service account key as the GOOGLE_APPLICATION_CREDENTIALS variable in the cell below and run the cell.

## Create a custom deep learning component 

In [15]:
@component(
    packages_to_install = [
        "codecarbon",
        "carbontracker",
    ], base_image="gcr.io/deeplearning-platform-release/tf-gpu.2-11:latest",
)
def train_deep_learning_fashion_mist_job(  
    learning_rate: float, 
    epochs: int,
    batch_size: int,
    kpi_co2: Output[Metrics],
    model_trained : Output[Model],
):
    import tensorflow as tf
    import os   
    import pandas as pd
    #Import Code Carbon
    from codecarbon import  OfflineEmissionsTracker   
    # Import CarbonTracker
    from carbontracker.tracker import CarbonTracker
    from carbontracker import parser
    
    #Define variables 
    NO_CLASSES = 10
    img_rows, img_cols = 28, 28
    input_shape = (img_rows, img_cols, 1)
    DIR_LOG='log' #dir to track carbon tracker logs 

    #Fashion-MNIST is a dataset of Zalando's article 
    fashion_mnist = tf.keras.datasets.fashion_mnist
 
    def create_dir(log):
        try:
            os.makedirs(log)
        except OSError:
            pass  
    create_dir(DIR_LOG)
    
    fashion_mnist = tf.keras.datasets.fashion_mnist
    (x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()    
       

    #recale training images 
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /=  255.0
    x_test /=  255.0

    model=tf.keras.Sequential([
        tf.keras.layers.Conv2D(32,(3,3),activation='relu',input_shape=input_shape),
        tf.keras.layers.MaxPooling2D(2,2),
        tf.keras.layers.Conv2D(64,(3,3),activation='relu'),
        tf.keras.layers.MaxPooling2D(2,2),
        tf.keras.layers.Dense(128,activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128,activation='relu'),
        tf.keras.layers.Dense(NO_CLASSES,activation='softmax'),
    ])

    model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate), 
                  loss=tf.keras.losses.sparse_categorical_crossentropy,
                  metrics=["accuracy"])

    #Start Codecarbon and specify The Netherlands acronyms NLD
    codecarbon = OfflineEmissionsTracker(country_iso_code="NLD")
    codecarbon.start()
    
    #Start Carbon traker 
    carbon_tracker = CarbonTracker(epochs=epochs, log_dir="./"+DIR_LOG+"/")   
    carbon_tracker.epoch_start()
    
    #Fit model on training data
    model.fit(
        x_train,
        y_train,
        epochs=epochs,
        shuffle=True,
        batch_size=batch_size,
        validation_split=0.2)
    
    #Stop carbon tracking
    carbontracker.epoch_end()
    emissions_codecarbon: float = codecarbon.stop()
    logs = parser.parse_all_logs(log_dir="./"+DIR_LOG+"/")
    carbontracker.stop()
    
    #Log data
    first_log = logs[0]
    emissions_carbontracker: dict = first_log['pred']
    kpi_co2.log_metric("emissions_codecarbon co2eq (g/kwh)", float(emissions_codecarbon) * 1000)
    kpi_co2.log_metric("emissions_carbontraker co2eq (g/kwh)", emissions_carbontracker['co2eq (g)'])
    # Save model 
    model.save(model_trained.path)
    


In [16]:
@component(base_image="gcr.io/deeplearning-platform-release/tf-gpu.2-11:latest")
def evaluate_model(
    model_trained: Input[Model], 
    algo_metrics: Output[Metrics], 
    metric_confusion_matrix: Output[ClassificationMetrics]):
    
    import numpy as np
    import tensorflow as tf
    from sklearn.metrics import confusion_matrix
    img_rows, img_cols = 28, 28
    
    # Get test data 
    fashion_mnist = tf.keras.datasets.fashion_mnist
    (_, _), (x_test, y_test) = fashion_mnist.load_data() 
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    x_test = x_test.astype('float32')
    x_test /=  255.0

    #Load the model
    model = tf.keras.models.load_model(model_trained.path)
    #Generate predictions on test dataset 
    predictions = model.predict(x_test).argmax(1)
    # Evaluate the model
    loss, acc = model.evaluate(x_test, y_test, batch_size=64)
 
    # Log metrics
    algo_metrics.log_metric("accuracy", acc * 100)
    algo_metrics.log_metric("loss", loss)
    metric_confusion_matrix.log_confusion_matrix([str(i) for i in range(10)], confusion_matrix(y_test, predictions).tolist())

## Create the Pipeline
### The pipeline compilation generates the **pipeline-deep-learning-codecarbon.json**  job spec file.


In [20]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-deep-learning",
)
def pipeline(
    learning_rate: float, 
    epochs: int, 
    batch_size: int, 
    project: str = PROJECT_ID,
    region: str = REGION, 
    display_name: str = DISPLAY_NAME,
    api_endpoint: str = REGION+"-aiplatform.googleapis.com",  
    serving_container_image_uri: str = "europe-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-11:latest"
    ): 
    
    deep_learning_mist_task = (
        train_deep_learning_fashion_mist_job(learning_rate=learning_rate, epochs=epochs, batch_size=batch_size)
        .add_node_selector_constraint(
            label_name="cloud.google.com/gke-accelerator",
            value="NVIDIA_TESLA_T4")
        .set_gpu_limit(1))
    
    model_evaluation_task = evaluate_model(deep_learning_mist_task.outputs['model_trained'])

#Compile the pipeline
compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_JSON_NAME)

## Submit the pipeline job in an experiment

In [None]:
run_conf_params = {"learning_rate": 0.001, "epochs": 10, "batch_size": 64}
job = aiplatform.PipelineJob(
        display_name=f"{DISPLAY_NAME}-pipeline-run",
        template_path=PIPELINE_JSON_NAME,
        pipeline_root=PIPELINE_ROOT,
        parameter_values={
            **run_conf_params,
        },
        enable_caching=False,
        location=REGION,
    )
job.submit(service_account=SA)