# Metadata
### Tracking and managing metadata of machine learning workflows in Kubeflow


The goal of the [Metadata](https://github.com/kubeflow/metadata) project is to help Kubeflow users understand and manage their machine learning workflows by tracking and managing the metadata of workflows.


Metadata comes with three components. From Kubeflow v0.6, Metadata is installed by default.

- UI
- Backend Store
- Python SDK


## Core Concepts

- _Run_ describes an execution of a machine learning workflow, which can be a pipeline or a notebook.
- _Artifact_ describes derived data used or produced in a run.
- _Execution_ describes an execution of a single step of a run with its running parameters.
- _Workspace_ groups a set of runs and related artifacts and executions.


### Install Python SDK

In [17]:
# To use the latest publish `kfmd` library, you can run:
!pip install kfmd==0.1.8 --user

# Install other packages used in the turorial:
!pip install pandas==1.0.1 --user

You should consider upgrading via the 'pip install --upgrade pip' command.[0m
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [18]:
!pip list

Package                  Version   
------------------------ ----------
absl-py                  0.9.0     
argo-models              2.2.1a0   
asn1crypto               0.24.0    
astor                    0.8.1     
attrs                    19.3.0    
backcall                 0.1.0     
bleach                   3.1.0     
cachetools               4.0.0     
certifi                  2019.11.28
cffi                     1.14.0    
chardet                  3.0.4     
Click                    7.0       
cloudpickle              1.1.1     
cryptography             2.8       
cycler                   0.10.0    
decorator                4.4.1     
defusedxml               0.6.0     
Deprecated               1.2.7     
docker                   4.1.0     
entrypoints              0.3       
enum34                   1.1.6     
fairing                  0.5       
future                   0.18.2    
gast                     0.2.2     
google-api-core          1.16.0    
google-api-python-client 1.7

## Restart the kernel to pick up pip installed libraries

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
# Verify Installation
from kfmd import metadata
import pandas
from datetime import datetime

## Basic Python SDK Usage

Please follow commands here to understand basic usage of metadata SDK

### Create a workspace

In [None]:
test_workspace = metadata.Workspace(
    # Connect to metadata-service in namesapce kubeflow in k8s cluster.
    backend_url_prefix="metadata-service.kubeflow:8080",
    name="test_workspace",
    description="a workspace for testing",
    labels={"foo": "bar"})

### Create a run in a workspace

In [None]:
test_run = metadata.Run(
    workspace=test_workspace,
    name="run-" + datetime.utcnow().isoformat("T") ,
    description="a run in workspace",
)

### Create an execution in a run

In [None]:
exec = metadata.Execution(
    name = "execution" + datetime.utcnow().isoformat("T") ,
    workspace=test_workspace,
    run=test_run,
    description="execution example",
)
print("An execution is create with id %s" % exec.id)

### Log a data set

In [None]:
data_set = exec.log_input(
        metadata.DataSet(
            description="Training datasets",
            name="imagenet",
            owner="someone@kubeflow.org",
            uri="s3://path/to/dataset",
            version="v1.0.0",
            query="SELECT * FROM mytable"))
assert data_set.id
print("data set id is %s" % data_set.id)

### Log a model

In [None]:
model = exec.log_output(
    metadata.Model(
            name="MNIST",
            description="model to recognize handwritten digits",
            owner="someone@kubeflow.org",
            uri="s3://my-bucket/mnist",
            model_type="neural network",
            training_framework={
                "name": "tensorflow",
                "version": "v1.0"
            },
            hyperparameters={
                "learning_rate": 0.5,
                "layers": [10, 3, 1],
                "early_stop": True
            },
            version="v0.0.1",
            labels={"mylabel": "l1"}))
assert model.id
print("model id is %s" % model.id)

### Log an evaluation(metrics) of a model

In [None]:
metrics = exec.log_output(
    metadata.Metrics(
            name="MNIST-evaluation",
            description="validating the MNIST model to recognize handwritten digits",
            owner="someone@kubeflow.org",
            uri="s3://my-bucket/mnist-eval.csv",
            data_set_id=data_set.id,
            model_id=model.id,
            metrics_type=metadata.Metrics.VALIDATION,
            values={"accuracy": 0.95},
            labels={"mylabel": "l1"}))
assert metrics.id
print("metrics id is %s" % model.id)

### List all models in the workspace

In [None]:
pandas.DataFrame.from_dict(test_workspace.list(metadata.Model.ARTIFACT_TYPE_NAME))

### Basic Lineage Tracking

In [None]:
print("model id is %s\n" % model.id)
    
# Find the execution that produces this model.
output_events = test_workspace.client.list_events2(model.id).events
assert len(output_events) == 1
execution_id = output_events[0].execution_id

# Find all events related to that execution.
all_events = test_workspace.client.list_events(execution_id).events
assert len(all_events) == 3

print("\nAll events related to this model:")
pandas.DataFrame.from_dict([e.to_dict() for e in all_events])

## Real world example

In [None]:
import tensorflow as tf
from tensorflow import keras

# Helper libraries
import numpy as np
import os
import subprocess
import argparse
import time

from kfmd import metadata


# Reduce spam logs from s3 client
os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

def preprocessing(mnist_execution):
  fashion_mnist = keras.datasets.fashion_mnist
  (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

  # scale the values to 0.0 to 1.0
  train_images = train_images / 255.0
  test_images = test_images / 255.0

  # reshape for feeding into the model
  train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)
  test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)

  class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
                'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

  print('\ntrain_images.shape: {}, of {}'.format(train_images.shape, train_images.dtype))
  print('test_images.shape: {}, of {}'.format(test_images.shape, test_images.dtype))

  return train_images, train_labels, test_images, test_labels

def train(train_images, train_labels, epochs, model_summary_path=None, mnist_execution=None):
  if model_summary_path:
    logdir=model_summary_path # + datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)

  model = keras.Sequential([
    keras.layers.Conv2D(input_shape=(28,28,1), filters=8, kernel_size=3,
                        strides=2, activation='relu', name='Conv1'),
    keras.layers.Flatten(),
    keras.layers.Dense(10, activation=tf.nn.softmax, name='Softmax')
  ])
  model.summary()

  model.compile(optimizer='adam',
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

def export_model(model, model_export_path):
  version = 1
  export_path = os.path.join(model_export_path, str(version))

  tf.saved_model.simple_save(
    keras.backend.get_session(),
    export_path,
    inputs={'input_image': model.input},
    outputs={t.name:t for t in model.outputs})

  print('\nSaved model: {}'.format(export_path))


def main(model_export_path=None, model_summary_path=None, epochs=5):
  """Fashion MNIST Tensorflow Example.
    Args:
      model_summary_path: Model export path.
      model_summary_path: Model summry files for Tensorboard visualization
      epochs: Training epochs. 
    """

  # Setting up metadata tracking
  mnist_workspace = metadata.Workspace(
    # Connect to metadata-service in namesapce kubeflow in k8s cluster.
    backend_url_prefix="metadata-service.kubeflow:8080",
    name="mnist",
    description="Mnist image classification",
    labels={"env": "develop"})

  mnist_run = metadata.Run(
    workspace=mnist_workspace,
    name="run-" + datetime.utcnow().isoformat("T") ,
    description="a run in mnist workspace",
  )

  mnist_execution = metadata.Execution(
    name = "execution" + datetime.utcnow().isoformat("T") ,
    workspace=mnist_workspace,
    run=mnist_run,
    description="execution example in mnist run",
  )

  start_time = time.time()
  train_images, train_labels, test_images, test_labels = preprocessing(mnist_execution)
  model = train(train_images, train_labels, epochs, model_summary_path, mnist_execution)

  dataset = mnist_execution.log_input(
      metadata.DataSet(
            description="MNIST Training datasets",
            name="mnist",
            owner="someone@kubeflow.org",
            uri="s3://path/to/dataset/mnist",
            version="v1.0.0",
            query="SELECT * FROM mytable"))
  print("data set id is %s" % dataset.id)

  if model_export_path:
    export_model(model, model_export_path)

  metadata_model = mnist_execution.log_output(
      metadata.Model(
        name="MNIST",
        description="model to recognize handwritten digits",
        owner="someone@kubeflow.org",
        uri=model_export_path,
        model_type="neural network",
        training_framework={
            "name": "tensorflow",
            "version": "v2.0"
        },
        hyperparameters={
            "learning_rate": 0.5,
            "layers": [10, 3, 1],
            "early_stop": True
        },
        version="v0.0.1",
        labels={"mylabel": "l1"}))
  print("model id is %s" % metadata_model.id)

  metrics = mnist_execution.log_output(
    metadata.Metrics(
            name="MNIST-evaluation",
            description="validating the MNIST model to recognize handwritten digits",
            owner="someone@kubeflow.org",
            uri="s3://my-bucket/mnist-eval.csv",
            data_set_id=dataset.id,
            model_id=metadata_model.id,
            metrics_type=metadata.Metrics.VALIDATION,
            values={"accuracy": 0.95},
            labels={"mylabel": "l1"}))

  # Measure running time
  duration_in_seconds = time.time() - start_time
  print("This model took", duration_in_seconds, "seconds to train and test.")
  mnist_execution.log_output(
      metadata.Metrics(
              name="MNIST-evaluation",
              description="validating the MNIST model to recognize handwritten digits",
              owner="someone@kubeflow.org",
              uri="s3://my-bucket/mnist-eval.csv",
              data_set_id=dataset.id,
              model_id=metadata_model.id,
              metrics_type=metadata.Metrics.VALIDATION,
              values={"time": duration_in_seconds},
              labels={"mylabel": "l1"}))

In [None]:
main()

You can go to central dashboard -> Artifact Store to check details.
![artifact-store](img/artifact_store.jpg)

You can click name and check details.
![artifact-mnist](img/artifacts_mnist.jpg)