# Metadata
### Tracking and managing metadata of machine learning workflows in Kubeflow


The goal of the [Metadata](https://github.com/kubeflow/metadata) project is to help Kubeflow users understand and manage their machine learning workflows by tracking and managing the metadata of workflows.


Metadata comes with three components. From Kubeflow v0.6, Metadata is installed by default.

- UI
- Backend Store
- Python SDK


## Core Concepts

- _Run_ describes an execution of a machine learning workflow, which can be a pipeline or a notebook.
- _Artifact_ describes derived data used or produced in a run.
- _Execution_ describes an execution of a single step of a run with its running parameters.
- _Workspace_ groups a set of runs and related artifacts and executions.


### Install Python SDK

In [1]:
# To use the latest publish `kfmd` library, you can run:
!pip install kfmd --user

# Install other packages used in the turorial:
!pip install pandas --user

Collecting kfmd
  Downloading https://files.pythonhosted.org/packages/cf/72/048a49042dacd93925f6f4253cb765aeddef34da4cbec05066dc1ac555f5/kfmd-0.1.8.tar.gz
Building wheels for collected packages: kfmd
  Building wheel for kfmd (setup.py) ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/3d/ef/17/5f5099e588c582d66506547e0bd28bd7071959137a88b110ca
Successfully built kfmd
Installing collected packages: kfmd
Successfully installed kfmd-0.1.8
[33mYou are using pip version 19.0.1, however version 20.1b1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
[33mYou are using pip version 19.0.1, however version 20.1b1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [None]:
# Restart the kernel to pick up pip installed libraries
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [1]:
# Verify Installation
from kfmd import metadata
import pandas
from datetime import datetime

## Basic Python SDK Usage

Please follow commands here to understand basic usage of metadata SDK

### Create a workspace

In [2]:
test_workspace = metadata.Workspace(
    # Connect to metadata-service in namesapce kubeflow in k8s cluster.
    backend_url_prefix="metadata-service.kubeflow:8080",
    name="test_workspace",
    description="a workspace for testing",
    labels={"foo": "bar"})

### Create a run in a workspace

In [3]:
test_run = metadata.Run(
    workspace=test_workspace,
    name="run-" + datetime.utcnow().isoformat("T") ,
    description="a run in workspace",
)

### Create an execution in a run

In [4]:
exec = metadata.Execution(
    name = "execution" + datetime.utcnow().isoformat("T") ,
    workspace=test_workspace,
    run=test_run,
    description="execution example",
)
print("An execution is create with id %s" % exec.id)

An execution is create with id 2


### Log a data set

In [5]:
data_set = exec.log_input(
        metadata.DataSet(
            description="Training datasets",
            name="imagenet",
            owner="someone@kubeflow.org",
            uri="s3://path/to/dataset",
            version="v1.0.0",
            query="SELECT * FROM mytable"))
assert data_set.id
print("data set id is %s" % data_set.id)

data set id is 3


### Log a model

In [6]:
model = exec.log_output(
    metadata.Model(
            name="MNIST",
            description="model to recognize handwritten digits",
            owner="someone@kubeflow.org",
            uri="s3://my-bucket/mnist",
            model_type="neural network",
            training_framework={
                "name": "tensorflow",
                "version": "v1.0"
            },
            hyperparameters={
                "learning_rate": 0.5,
                "layers": [10, 3, 1],
                "early_stop": True
            },
            version="v0.0.1",
            labels={"mylabel": "l1"}))
assert model.id
print("model id is %s" % model.id)

model id is 4


### Log an evaluation(metrics) of a model

In [7]:
metrics = exec.log_output(
    metadata.Metrics(
            name="MNIST-evaluation",
            description="validating the MNIST model to recognize handwritten digits",
            owner="someone@kubeflow.org",
            uri="s3://my-bucket/mnist-eval.csv",
            data_set_id=data_set.id,
            model_id=model.id,
            metrics_type=metadata.Metrics.VALIDATION,
            values={"accuracy": 0.95},
            labels={"mylabel": "l1"}))
assert metrics.id
print("metrics id is %s" % model.id)

metrics id is 4


### List all models in the workspace

In [8]:
pandas.DataFrame.from_dict(test_workspace.list(metadata.Model.ARTIFACT_TYPE_NAME))

Unnamed: 0,create_time,description,hyperparameters,id,labels,model_type,name,owner,run,training_framework,uri,version,workspace
0,2020-04-25T23:20:02.201646Z,model to recognize handwritten digits,"{'learning_rate': 0.5, 'layers': [10, 3, 1], '...",2,{'mylabel': 'l1'},neural network,MNIST,someone@kubeflow.org,run-2020-04-25T23:20:02.118581,"{'name': 'tensorflow', 'version': 'v1.0'}",s3://my-bucket/mnist,v0.0.1,test_workspace
1,2020-04-25T23:20:32.264640Z,model to recognize handwritten digits,"{'learning_rate': 0.5, 'layers': [10, 3, 1], '...",4,{'mylabel': 'l1'},neural network,MNIST,someone@kubeflow.org,run-2020-04-25T23:20:32.218819,"{'name': 'tensorflow', 'version': 'v1.0'}",s3://my-bucket/mnist,v0.0.1,test_workspace


### Basic Lineage Tracking

In [9]:
print("model id is %s\n" % model.id)
    
# Find the execution that produces this model.
output_events = test_workspace.client.list_events2(model.id).events
assert len(output_events) == 1
execution_id = output_events[0].execution_id

# Find all events related to that execution.
all_events = test_workspace.client.list_events(execution_id).events
assert len(all_events) == 3

print("\nAll events related to this model:")
pandas.DataFrame.from_dict([e.to_dict() for e in all_events])

model id is 4


All events related to this model:


Unnamed: 0,artifact_id,execution_id,milliseconds_since_epoch,path,type
0,3,2,1587856832254,,INPUT
1,4,2,1587856832277,,OUTPUT
2,5,2,1587856832302,,OUTPUT


## Real world example

In [10]:
import tensorflow as tf
from tensorflow import keras

# Helper libraries
import numpy as np
import os
import subprocess
import argparse
import time

from kfmd import metadata


# Reduce spam logs from s3 client
os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

def preprocessing(mnist_execution):
  fashion_mnist = keras.datasets.fashion_mnist
  (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

  # scale the values to 0.0 to 1.0
  train_images = train_images / 255.0
  test_images = test_images / 255.0

  # reshape for feeding into the model
  train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)
  test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)

  class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
                'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

  print('\ntrain_images.shape: {}, of {}'.format(train_images.shape, train_images.dtype))
  print('test_images.shape: {}, of {}'.format(test_images.shape, test_images.dtype))

  return train_images, train_labels, test_images, test_labels

def train(train_images, train_labels, epochs, model_summary_path=None, mnist_execution=None):
  if model_summary_path:
    logdir=model_summary_path # + datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)

  model = keras.Sequential([
    keras.layers.Conv2D(input_shape=(28,28,1), filters=8, kernel_size=3,
                        strides=2, activation='relu', name='Conv1'),
    keras.layers.Flatten(),
    keras.layers.Dense(10, activation=tf.nn.softmax, name='Softmax')
  ])
  model.summary()

  model.compile(optimizer=tf.train.AdamOptimizer(),
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

def export_model(model, model_export_path):
  version = 1
  export_path = os.path.join(model_export_path, str(version))

  tf.saved_model.simple_save(
    keras.backend.get_session(),
    export_path,
    inputs={'input_image': model.input},
    outputs={t.name:t for t in model.outputs})

  print('\nSaved model: {}'.format(export_path))


def main(model_export_path=None, model_summary_path=None, epochs=5):
  """Fashion MNIST Tensorflow Example.
    Args:
      model_summary_path: Model export path.
      model_summary_path: Model summry files for Tensorboard visualization
      epochs: Training epochs. 
    """

  # Setting up metadata tracking
  mnist_workspace = metadata.Workspace(
    # Connect to metadata-service in namesapce kubeflow in k8s cluster.
    backend_url_prefix="metadata-service.kubeflow:8080",
    name="mnist",
    description="Mnist image classification",
    labels={"env": "develop"})

  mnist_run = metadata.Run(
    workspace=mnist_workspace,
    name="run-" + datetime.utcnow().isoformat("T") ,
    description="a run in mnist workspace",
  )

  mnist_execution = metadata.Execution(
    name = "execution" + datetime.utcnow().isoformat("T") ,
    workspace=mnist_workspace,
    run=mnist_run,
    description="execution example in mnist run",
  )

  start_time = time.time()
  train_images, train_labels, test_images, test_labels = preprocessing(mnist_execution)
  model = train(train_images, train_labels, epochs, model_summary_path, mnist_execution)

  dataset = mnist_execution.log_input(
      metadata.DataSet(
            description="MNIST Training datasets",
            name="mnist",
            owner="someone@kubeflow.org",
            uri="s3://path/to/dataset/mnist",
            version="v1.0.0",
            query="SELECT * FROM mytable"))
  print("data set id is %s" % dataset.id)

  if model_export_path:
    export_model(model, model_export_path)

  metadata_model = mnist_execution.log_output(
      metadata.Model(
        name="MNIST",
        description="model to recognize handwritten digits",
        owner="someone@kubeflow.org",
        uri=model_export_path,
        model_type="neural network",
        training_framework={
            "name": "tensorflow",
            "version": "v1.0"
        },
        hyperparameters={
            "learning_rate": 0.5,
            "layers": [10, 3, 1],
            "early_stop": True
        },
        version="v0.0.1",
        labels={"mylabel": "l1"}))
  print("model id is %s" % metadata_model.id)

  metrics = mnist_execution.log_output(
    metadata.Metrics(
            name="MNIST-evaluation",
            description="validating the MNIST model to recognize handwritten digits",
            owner="someone@kubeflow.org",
            uri="s3://my-bucket/mnist-eval.csv",
            data_set_id=dataset.id,
            model_id=metadata_model.id,
            metrics_type=metadata.Metrics.VALIDATION,
            values={"accuracy": 0.95},
            labels={"mylabel": "l1"}))

  # Measure running time
  duration_in_seconds = time.time() - start_time
  print("This model took", duration_in_seconds, "seconds to train and test.")
  mnist_execution.log_output(
      metadata.Metrics(
              name="MNIST-evaluation",
              description="validating the MNIST model to recognize handwritten digits",
              owner="someone@kubeflow.org",
              uri="s3://my-bucket/mnist-eval.csv",
              data_set_id=dataset.id,
              model_id=metadata_model.id,
              metrics_type=metadata.Metrics.VALIDATION,
              values={"time": duration_in_seconds},
              labels={"mylabel": "l1"}))

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [11]:
main()


train_images.shape: (60000, 28, 28, 1), of float64
test_images.shape: (10000, 28, 28, 1), of float64
Instructions for updating:
Colocations handled automatically by placer.
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
Conv1 (Conv2D)               (None, 13, 13, 8)         80        
_________________________________________________________________
flatten (Flatten)            (None, 1352)              0         
_________________________________________________________________
Softmax (Dense)              (None, 10)                13530     
Total params: 13,610
Trainable params: 13,610
Non-trainable params: 0
_________________________________________________________________
data set id is 6
model id is 7
This model took 5.408567428588867 seconds to train and test.


# Navigate to the Kubeflow Artifact Store

You can go to central dashboard -> Artifact Store to check details.
![artifact-store](images/artifact_store.jpg)

You can click name and check details.
![artifact-mnist](images/artifacts_mnist.jpg)