In [1]:
import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade tfx[kfp]==0.30.0

Collecting tfx[kfp]==0.30.0
  Downloading tfx-0.30.0-py3-none-any.whl (2.4 MB)
[K     |████████████████████████████████| 2.4 MB 1.6 MB/s eta 0:00:01
[?25hCollecting ml-metadata<0.31,>=0.30
  Downloading ml_metadata-0.30.0-cp37-cp37m-manylinux2010_x86_64.whl (2.9 MB)
[K     |████████████████████████████████| 2.9 MB 15.2 MB/s eta 0:00:01
Collecting tfx-bsl<0.31,>=0.30
  Downloading tfx_bsl-0.30.0-cp37-cp37m-manylinux2010_x86_64.whl (2.2 MB)
[K     |████████████████████████████████| 2.2 MB 25.3 MB/s eta 0:00:01
[?25hCollecting kubernetes<12,>=10.0.1
  Downloading kubernetes-11.0.0-py3-none-any.whl (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 31.1 MB/s eta 0:00:01
[?25hCollecting google-cloud-aiplatform<0.8,>=0.5.0
  Downloading google_cloud_aiplatform-0.7.1-py2.py3-none-any.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 43.3 MB/s eta 0:00:01
[?25hCollecting keras-tuner<1.0.2,>=1
  Downloading keras-tuner-1.0.1.tar.gz (54 kB)
[K     |██████████████████

In [3]:
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)

env: GOOGLE_CLOUD_PROJECT=astute-pride-317802
GCP project ID:astute-pride-317802


In [4]:
# This refers to the KFP cluster endpoint
ENDPOINT='314cd0b9ca67984e-dot-asia-east1.pipelines.googleusercontent.com' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')

In [5]:
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

In [9]:
PIPELINE_NAME="data_centric_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"data-centric",PIPELINE_NAME)

## Testing models

In [12]:
!{sys.executable} -m models.preprocessing_test
!{sys.executable} -m models.keras.model_test


2021-06-25 23:40:56.219453: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
Running tests under Python 3.7.10: /opt/conda/bin/python
[ RUN      ] PreprocessingTest.testPreprocessingFn
INFO:tensorflow:time(__main__.PreprocessingTest.testPreprocessingFn): 0.0s
I0625 23:40:58.301542 140712543123264 test_util.py:2076] time(__main__.PreprocessingTest.testPreprocessingFn): 0.0s
[       OK ] PreprocessingTest.testPreprocessingFn
[ RUN      ] PreprocessingTest.test_session
[  SKIPPED ] PreprocessingTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK (skipped=1)
/opt/conda/bin/python: No module named models.keras.model_test


## Create first TFX pipeline
copy data to storage
execute after convert data into tfrecords (interactive notebook)

In [34]:
!gsutil -m cp -r data/train/ gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/data-centric/data/train/

Copying file://data/train/i.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/iv.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/x.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/vi.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/ix.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/v.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/iii.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/viii.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/ii.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/train/vii.tfrecords [Content-Type=application/octet-stream]...
/ [10/10 files][814.2 KiB/814.2 KiB] 100% Done                                  
Operation completed over 10 objects/814.2 KiB.                                   


In [32]:
!gsutil -m cp -r data/val/ gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/data-centric/data/val/

Copying file://data/val/x.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/val/vi.tfrecords [Content-Type=application/octet-stream]... 
Copying file://data/val/i.tfrecords [Content-Type=application/octet-stream]...  
Copying file://data/val/ix.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/val/iii.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/val/iv.tfrecords [Content-Type=application/octet-stream]... 
Copying file://data/val/ii.tfrecords [Content-Type=application/octet-stream]... 
Copying file://data/val/v.tfrecords [Content-Type=application/octet-stream]...  
Copying file://data/val/viii.tfrecords [Content-Type=application/octet-stream]...
Copying file://data/val/vii.tfrecords [Content-Type=application/octet-stream]...
/ [10/10 files][250.2 KiB/250.2 KiB] 100% Done                                  
Operation completed over 10 objects/250.2 KiB.                                   


## build docker image

In [51]:
!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image

2021-06-26 03:50:05.301928: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
CLI
Creating pipeline
Detected Kubeflow.
Use --engine flag if you intend to use a different orchestrator.
  ' Defaults to "https".' % host)
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
[Docker] Step 1/4 : FROM tensorflow/tfx:0.30.0[Docker] 
[Docker]  ---> 68adb9229d27
[Docker] Step 2/4 : WORKDIR /pipeline[Docker] 
[Docker]  ---> Using cache
[Docker]  ---> 62adad88abd0
[Docker] Step 3/4 : COPY ./ ./[Docker] 
[Docker]  ---> 5a2d0f74d12b
[Docker] Step 4/4 : ENV PYTHONPATH="/pipeline:${PYTHONPATH}"[Docker] 
[Docker]  ---> Running in cbea82a96a54
[Docker] Removing intermediate container cbea82a96a54
[Docker]  ---> d0a66ec771bc
[Docker] Successfully built d0a66ec771bc
[Docker] Successfully tagged gcr.

## Create Pipeline
check kubeflow dashboard

In [49]:
!tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}

2021-06-26 02:13:34.277943: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
CLI
Creating a run for pipeline: data_centric_pipeline
Detected Kubeflow.
Use --engine flag if you intend to use a different orchestrator.
  ' Defaults to "https".' % host)
Run created for pipeline: data_centric_pipeline
| pipeline_name         | run_id                               | status | created_at                | link                                                                                                                       |
| data_centric_pipeline | 7c5313d4-bd94-43c8-885d-d9afc2db34f5 | None   | 2021-06-26T02:13:39+00:00 | http://314cd0b9ca67984e-dot-asia-east1.pipelines.googleusercontent.com/#/runs/details/7c5313d4-bd94-43c8-885d-d9afc2db34f5 |

[0m

In [53]:
# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

2021-06-26 04:01:58.266042: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
CLI
Updating pipeline
Detected Kubeflow.
Use --engine flag if you intend to use a different orchestrator.
  ' Defaults to "https".' % host)
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generated pipeline:
 pipeline_info {
  id: "data_centric_pipeline"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_gen.import_example_gen.component.ImportExampleGen"
      }
      id: "ImportExampleGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "data_centric_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "p