In [1]:
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2

import tensorflow as tf
print('TF version: {}'.format(tf.__version__))

import tensorflow_data_validation as tfdv
print('TFDV version: {}'.format(tfdv.version.__version__))

import urllib
import zipfile

TF version: 2.8.2
TFDV version: 1.8.0


In [2]:
# Download the zip file from GCP and unzip it
url = 'https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/chicago_data.zip'
zip, headers = urllib.request.urlretrieve(url)
zipfile.ZipFile(zip).extractall()
zipfile.ZipFile(zip).close()

print("Here's what we downloaded:")
!ls -R data

Here's what we downloaded:
[34meval[m[m    [34mserving[m[m [34mtrain[m[m

data/eval:
data.csv

data/serving:
data.csv

data/train:
data.csv


In [3]:
# Define ML Metadata's Storage Backend
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.fake_database.SetInParent()
store = metadata_store.MetadataStore(connection_config)

In [4]:
store

<ml_metadata.metadata_store.metadata_store.MetadataStore at 0x1607da460>

In [6]:
# Create ArtifacType for the input dataset
data_artifact_type = metadata_store_pb2.ArtifactType()
data_artifact_type.name = 'DataSet'
data_artifact_type.properties['name'] = metadata_store_pb2.STRING
data_artifact_type.properties['split'] =  metadata_store_pb2.STRING
data_artifact_type.properties['version'] = metadata_store_pb2.INT

data_artifact_type_id = store.put_artifact_type(data_artifact_type)

In [8]:
# Create ArtifactType for Schema
schema_artifact_type = metadata_store_pb2.ArtifactType()
schema_artifact_type.name = 'Schema'
schema_artifact_type.properties['name'] = metadata_store_pb2.STRING
schema_artifact_type.properties['version'] = metadata_store_pb2.INT

# Register artifact type to the Metadata Store
schema_artifact_type_id = store.put_artifact_type(schema_artifact_type)

In [14]:
print('Data artifact type:\n', data_artifact_type)
print('Schema artifact type:\n', schema_artifact_type)
print('Data artifact type ID:', data_artifact_type_id)
print('Schema artifact type ID:', schema_artifact_type_id)

Data artifact type:
 name: "DataSet"
properties {
  key: "name"
  value: STRING
}
properties {
  key: "split"
  value: STRING
}
properties {
  key: "version"
  value: INT
}

Schema artifact type:
 name: "Schema"
properties {
  key: "name"
  value: STRING
}
properties {
  key: "version"
  value: INT
}

Data artifact type ID: 10
Schema artifact type ID: 11


In [15]:
# Create ExecutionType for Data Validation component
dv_execution_type = metadata_store_pb2.ExecutionType()
dv_execution_type.name = 'Data Validation'
dv_execution_type.properties['state'] = metadata_store_pb2.STRING

# Register execution type to the Metadata Store
dv_execution_type_id = store.put_execution_type(dv_execution_type)

print('Data validation execution type:\n', dv_execution_type)
print('Data validation execution type ID:', dv_execution_type_id)

Data validation execution type:
 name: "Data Validation"
properties {
  key: "state"
  value: STRING
}

Data validation execution type ID: 12


In [17]:
# Generate input artifact unit: https://github.com/hsuyuming/ml-metadata/blob/master/ml_metadata/proto/metadata_store.proto
# Declare input artifact of type Dataset
data_artifact = metadata_store_pb2.Artifact()
data_artifact.uri = './data/train/data.csv'
data_artifact.type_id = data_artifact_type_id
data_artifact.properties['name'].string_value = 'Chicago Taxi dataset'
data_artifact.properties['split'].string_value = 'train'
data_artifact.properties['version'].int_value = 1

# Submit input artifact to the Metadata Store
data_artifact_id = store.put_artifacts([data_artifact])[0]
print('Data artifact:\n', data_artifact)
print('Data artifact ID:', data_artifact_id)

Data artifact:
 type_id: 10
uri: "./data/train/data.csv"
properties {
  key: "name"
  value {
    string_value: "Chicago Taxi dataset"
  }
}
properties {
  key: "split"
  value {
    string_value: "train"
  }
}
properties {
  key: "version"
  value {
    int_value: 1
  }
}

Data artifact ID: 1


In [19]:
# Generate execution unit: https://github.com/hsuyuming/ml-metadata/blob/master/ml_metadata/proto/metadata_store.proto
# Register the Execution of a Data Validation run
dv_execution = metadata_store_pb2.Execution()
dv_execution.type_id = dv_execution_type_id
dv_execution.properties['state'].string_value = 'RUNNING'

dv_execution_id = store.put_executions([dv_execution])[0]
print('Data validation execution:\n', dv_execution)
print('Data validation execution ID:', dv_execution_id)

Data validation execution:
 type_id: 12
properties {
  key: "state"
  value {
    string_value: "RUNNING"
  }
}

Data validation execution ID: 1


In [21]:
# Rigister input event
# Declare the input event
input_event = metadata_store_pb2.Event()
input_event.artifact_id = data_artifact_id
input_event.execution_id = dv_execution_id
input_event.type = metadata_store_pb2.Event.DECLARED_INPUT

store.put_events([input_event])

print('Input event:\n', input_event)

Input event:
 artifact_id: 1
execution_id: 1
type: DECLARED_INPUT



In [22]:
# Run the TFDV(tensorflow DataValidation) component
# Infer a schema by passing statistics to `infer_schema()`

train_data = './data/train/data.csv'
train_stats = tfdv.generate_statistics_from_csv(data_location=train_data)
schema = tfdv.infer_schema(statistics=train_stats)

schema_file = './schema.pbtxt'
tfdv.write_schema_text(schema, schema_file)

print("Dataset's Schema has been generated at:", schema_file)






Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Dataset's Schema has been generated at: ./schema.pbtxt


In [23]:
# Generate output artifact unit
# Declare output artifact of type Schema_artifact
schema_artifact = metadata_store_pb2.Artifact()
schema_artifact.uri = schema_file
schema_artifact.type_id = schema_artifact_type_id
schema_artifact.properties['version'].int_value = 1
schema_artifact.properties['name'].string_value = 'Chicago Taxi Schema'

# Submit output artifact to the Metadata Store
schema_artifact_id = store.put_artifacts([schema_artifact])[0]

print('Schema artifact:\n', schema_artifact)
print('Schema artifact ID:', schema_artifact_id)


Schema artifact:
 type_id: 11
uri: "./schema.pbtxt"
properties {
  key: "name"
  value {
    string_value: "Chicago Taxi Schema"
  }
}
properties {
  key: "version"
  value {
    int_value: 1
  }
}

Schema artifact ID: 2


In [24]:
# Register output event
output_event = metadata_store_pb2.Event()
output_event.artifact_id = schema_artifact_id
output_event.execution_id = dv_execution_id
output_event.type = metadata_store_pb2.Event.DECLARED_OUTPUT

# Submit output event to the Metadata Store
store.put_events([output_event])

print('Output event:\n', output_event)


Output event:
 artifact_id: 2
execution_id: 1
type: DECLARED_OUTPUT



In [25]:
# Update the execution unit
dv_execution.id = dv_execution_id
dv_execution.properties['state'].string_value = 'COMPLETED'

store.put_executions([dv_execution])

print('Data validation execution:\n', dv_execution)

Data validation execution:
 id: 1
type_id: 12
properties {
  key: "state"
  value {
    string_value: "COMPLETED"
  }
}



In [26]:
# Setting up Context Types and Generating a Context Unit
# Create a ContextType
expt_context_type = metadata_store_pb2.ContextType()
expt_context_type.name = 'Experiment'
expt_context_type.properties['note'] = metadata_store_pb2.STRING

# Register context type to the Metadata Store
expt_context_type_id = store.put_context_type(expt_context_type)

In [27]:
# Generate the context 
expt_context = metadata_store_pb2.Context()
expt_context.type_id = expt_context_type_id
expt_context.name = 'Demo'
expt_context.properties['note'].string_value = 'Walkthrough of metadata'

expt_context_id = store.put_contexts([expt_context])[0]
print('Experiment Context type:\n', expt_context_type)
print('Experiment Context type ID: ', expt_context_type_id)

print('Experiment Context:\n', expt_context)
print('Experiment Context ID: ', expt_context_id)


Experiment Context type:
 name: "Experiment"
properties {
  key: "note"
  value: STRING
}

Experiment Context type ID:  13
Experiment Context:
 type_id: 13
name: "Demo"
properties {
  key: "note"
  value {
    string_value: "Walkthrough of metadata"
  }
}

Experiment Context ID:  1


In [29]:
# Generate attribution and association relationships
# Generate the attribution
expt_attribution = metadata_store_pb2.Attribution()
expt_attribution.artifact_id = schema_artifact_id
expt_attribution.context_id = expt_context_id

# Generate the association
expt_association = metadata_store_pb2.Association()
expt_association.execution_id = dv_execution_id
expt_association.context_id = expt_context_id

# Submit attribution and association to the Metadata Store
expt_association.context_id = expt_context_id
store.put_attributions_and_associations([expt_attribution],[expt_association])
print('Experiment Attribution:\n', expt_attribution)
print('Experiment Association:\n', expt_association)

Experiment Attribution:
 artifact_id: 2
context_id: 1

Experiment Association:
 execution_id: 1
context_id: 1



In [32]:
# Retrieving information from the Metadata Store
# Get artifact types
store.get_artifact_types()

[id: 10
 name: "DataSet"
 properties {
   key: "name"
   value: STRING
 }
 properties {
   key: "split"
   value: STRING
 }
 properties {
   key: "version"
   value: INT
 },
 id: 11
 name: "Schema"
 properties {
   key: "name"
   value: STRING
 }
 properties {
   key: "version"
   value: INT
 }]

In [38]:
schema_to_inv = store.get_artifacts_by_type('Schema')[0]
# print output
print(schema_to_inv)

id: 2
type_id: 11
uri: "./schema.pbtxt"
properties {
  key: "name"
  value {
    string_value: "Chicago Taxi Schema"
  }
}
properties {
  key: "version"
  value {
    int_value: 1
  }
}
create_time_since_epoch: 1653864627769
last_update_time_since_epoch: 1653864627769



In [39]:
# Get events related to the schema id 
schema_events = store.get_events_by_artifact_ids([schema_to_inv.id])
print(schema_events)

[artifact_id: 2
execution_id: 1
type: DECLARED_OUTPUT
milliseconds_since_epoch: 1653864664112
]


In [40]:
# Get events related to the output above
execution_events = store.get_events_by_execution_ids([schema_events[0].execution_id])

print(execution_events)

[artifact_id: 1
execution_id: 1
type: DECLARED_INPUT
milliseconds_since_epoch: 1653864369495
, artifact_id: 2
execution_id: 1
type: DECLARED_OUTPUT
milliseconds_since_epoch: 1653864664112
]


In [41]:
# Look up the artifact that is a declared input
artifact_input = execution_events[0]

store.get_artifacts_by_id([artifact_input.artifact_id])

[id: 1
 type_id: 10
 uri: "./data/train/data.csv"
 properties {
   key: "name"
   value {
     string_value: "Chicago Taxi dataset"
   }
 }
 properties {
   key: "split"
   value {
     string_value: "train"
   }
 }
 properties {
   key: "version"
   value {
     int_value: 1
   }
 }
 create_time_since_epoch: 1653864032130
 last_update_time_since_epoch: 1653864032130]