In [None]:
import os
from zenml.repo import Repository
from zenml.datasources import CSVDatasource
from zenml.pipelines import TrainingPipeline
from zenml.steps.evaluator import TFMAEvaluator
from zenml.steps.preprocesser import StandardPreprocesser
from zenml.steps.split import RandomSplit
from zenml.steps.trainer import TFFeedForwardTrainer
from zenml.repo import Repository, BaseArtifactStore
from zenml.utils.naming_utils import transformed_label_name
from zenml.steps.deployer import GCAIPDeployer
from zenml.steps.deployer import CortexDeployer
from examples.cortex.predictor.tf import TensorFlowPredictor
from zenml.backends.orchestrator import OrchestratorGCPBackend
from zenml.metadata import MySQLMetadataStore
from zenml.backends.processing import ProcessingDataFlowBackend
from zenml.backends.training import SingleGPUTrainingGCAIPBackend
from zenml.backends.processing import ProcessingDataFlowBackend

We are going to be creating a ZenML training pipeline and showcasing the modularity of ZenML backends in this example. On a high level, here is what a ZenML training pipeline looks like: 

<img src="graphics/architecture.png" width="600" height="600" />

# Set up some variables

In [None]:
GCP_BUCKET=os.getenv('GCP_BUCKET')
GCP_PROJECT=os.getenv('GCP_PROJECT')
GCP_REGION=os.getenv('GCP_REGION')
GCP_CLOUD_SQL_INSTANCE_NAME=os.getenv('GCP_CLOUD_SQL_INSTANCE_NAME')
MODEL_NAME=os.getenv('MODEL_NAME')
CORTEX_ENV=os.getenv('CORTEX_ENV')
MYSQL_DB=os.getenv('MYSQL_DB')
MYSQL_USER=os.getenv('MYSQL_USER')
MYSQL_PWD=os.getenv('MYSQL_PWD')
MYSQL_PORT=os.getenv('MYSQL_PORT')
MYSQL_HOST=os.getenv('MYSQL_HOST')
CONNECTION_NAME = f'{GCP_PROJECT}:{GCP_REGION}:{GCP_CLOUD_SQL_INSTANCE_NAME}'
TRAINING_JOB_DIR = os.path.join(GCP_BUCKET, 'gcp_gcaip_training/staging')

In [None]:
repo: Repository = Repository.get_instance()
    
# Define artifact store in the cloud
cloud_artifact_store = BaseArtifactStore(os.path.join(GCP_BUCKET, 'all_feature_demo'))

# Define metadata store in the cloud
cloud_metadata_store = MySQLMetadataStore(
    host=MYSQL_HOST,
    port=int(MYSQL_PORT),
    database=MYSQL_DB,
    username=MYSQL_USER,
    password=MYSQL_PWD,
)

# Create first pipeline

In [None]:
training_pipeline = TrainingPipeline(name='Experiment 1')

#### Add a datasource. This will automatically track and version it.

In [None]:
try:
    ds = CSVDatasource(name='Pima Indians Diabetes', path='gs://zenml_quickstart/diabetes.csv')
except:
    repo: Repository = Repository.get_instance()
    ds = repo.get_datasource_by_name('Pima Indians Diabetes')
training_pipeline.add_datasource(ds)

#### Add a split step to partition data into train and eval

In [None]:
training_pipeline.add_split(RandomSplit(split_map={'train': 0.7, 'eval': 0.2, 'test':0.1}))

#### Add a preprocessing step to transform data to be ML-capable

In [None]:
training_pipeline.add_preprocesser(
    StandardPreprocesser(
        features=['times_pregnant', 'pgc', 'dbp', 'tst', 'insulin', 'bmi',
                  'pedigree', 'age'],
        labels=['has_diabetes'],
        overwrite={'has_diabetes': {
            'transform': [{'method': 'no_transform', 'parameters': {}}]}}
    ))

#### Add a trainer which defines model and training

In [None]:
training_pipeline.add_trainer(TFFeedForwardTrainer(
    loss='binary_crossentropy',
    last_activation='sigmoid',
    output_units=1,
    metrics=['accuracy'],
    epochs=5))

#### Add an evaluator to calculate slicing metrics

In [None]:
training_pipeline.add_evaluator(
    TFMAEvaluator(slices=[['has_diabetes']],
                  metrics={transformed_label_name('has_diabetes'):
                     ['binary_crossentropy', 'binary_accuracy']}))

#### Run and evaluate

In [None]:
training_pipeline.run()

In [None]:
training_pipeline.view_statistics(magic=True)

In [None]:
training_pipeline.evaluate(magic=True)

#### Inspect datasource

In [None]:
datasources = repo.get_datasources()
datasource = datasources[0]
print(datasource)

In [None]:
df = datasource.sample_data()
df.head()

In [None]:
df.shape

In [None]:
df.columns

## Skip preprocessing with your next (warm-starting) pipeline

#### Clone first experiment and only change one hyper-parameter

In [None]:
training_pipeline_2 = training_pipeline.copy('Experiment 2')
training_pipeline_2.add_trainer(TFFeedForwardTrainer(
    loss='binary_crossentropy',
    last_activation='sigmoid',
    output_units=1,
    metrics=['accuracy'],
    epochs=20))

In [None]:
training_pipeline_2.run()

In [None]:
training_pipeline_2.evaluate(magic=True)

## Post-training

#### Verify theres still only one datasource

In [None]:
datasources = repo.get_datasources()
print(f"We have {len(datasources)} datasources")

#### Compare pipelines

In [None]:
repo.compare_training_runs()

# Distribute splitting/preprocessing easily

In [None]:
training_pipeline_3 = repo.get_pipeline_by_name('Experiment 1').copy('Experiment 3')

# Define the processing backend
processing_backend = ProcessingDataFlowBackend(
    project=GCP_PROJECT,
    staging_location=os.path.join(GCP_BUCKET, 'dataflow_processing/staging'),
)


# Run processing step with that backend
training_pipeline_3.add_split(
    RandomSplit(split_map={'train': 0.7, 'eval': 0.2, 'test': 0.1}).with_backend(
        processing_backend)
)

training_pipeline_3.run(artifact_store=cloud_artifact_store)

# Easily train on the cloud

In [None]:
training_pipeline_4 = training_pipeline.copy('Experiment 4')

# Add a trainer with a GCAIP backend
training_backend = SingleGPUTrainingGCAIPBackend(
    project=GCP_PROJECT,
    job_dir=TRAINING_JOB_DIR
)

training_pipeline_4.add_trainer(TFFeedForwardTrainer(
    loss='binary_crossentropy',
    last_activation='sigmoid',
    output_units=1,
    metrics=['accuracy'],
    epochs=20).with_backend(training_backend))

training_pipeline_4.run(artifact_store=cloud_artifact_store)

# Orchestrate pipeline whereever you like

In [None]:
training_pipeline_5 = training_pipeline.copy('Experiment 5')

# Define the orchestrator backend
cloud_orchestrator_backend = OrchestratorGCPBackend(
    cloudsql_connection_name=CONNECTION_NAME,
    project=GCP_PROJECT,
    preemptible=True,  # reduce costs by using preemptible instances
    machine_type='n1-standard-4',
    gpu='nvidia-tesla-k80',
    gpu_count=1,
)

# Run the pipeline
training_pipeline_5.run(
    backend=cloud_orchestrator_backend,
    metadata_store=cloud_metadata_store,
    artifact_store=cloud_artifact_store,
)

# Add a deployer step with different integrations

## Option 1: Deploy to Google Cloud AI Platform

In [None]:
training_pipeline_6 = training_pipeline.copy('Experiment 6')
training_pipeline_6.add_deployment(
    GCAIPDeployer(
        project_id=GCP_PROJECT,
        model_name=MODEL_NAME,
    )
)

training_pipeline_6.run(artifact_store=cloud_artifact_store)

## Option 2: Deploy to Kubernetes via Cortex

In [None]:
training_pipeline_7 = training_pipeline.copy('Experiment 7')

# Add cortex deployer
api_config = {
    "name": MODEL_NAME,
    "kind": "RealtimeAPI",
    "predictor": {
        "type": "tensorflow",
        "models": {"signature_key": "serving_default"}}
}
training_pipeline_7.add_deployment(
    CortexDeployer(
        env=CORTEX_ENV,
        api_config=api_config,
        predictor=TensorFlowPredictor,
    )
)

training_pipeline_7.run(artifact_store=cloud_artifact_store)