In [1]:
%pip install tensorflow-transform

You should consider upgrading via the '/root/apache-beam-custom/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import apache_beam as beam
from apache_beam import window
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.transforms import trigger
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from tensorflow_transform.beam import impl as beam_impl

import google.auth
import json
import pandas as pd
import base64
import io
import os
import tensorflow as tf
from tensorflow_transform.coders import example_proto_coder
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
import gzip

# The Google Cloud PubSub topic that we are reading from for this example.
topic = "projects/print-nanny/topics/bounding-boxes-dev"

# So that Pandas Dataframes do not truncate data...
pd.set_option('display.max_colwidth', 20)

In [3]:
# def run_transformation_pipeline(args):
#     options = beam.pipeline.PipelineOptions(flags=[], **args)
#     # Sets the pipeline mode to streaming, so we can stream the data from PubSub.
#     options.view_as(pipeline_options.StandardOptions).streaming = True
    

In [4]:
# Setting up the Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

options.view_as(GoogleCloudOptions).region = 'us-central1'
dataflow_gcs_location = 'gs://print-nanny-dev/dataflow/bounding-box-events'

options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
options.view_as(pipeline_options.SetupOptions).sdk_location = (
    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s.tar.gz' %
    beam.version.__version__)

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the PubSub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

In [5]:
ib.options.recording_duration = '1m'

In [6]:
p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)

In [7]:
raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        'ts': tf.io.FixedLenFeature([], tf.int64),
        'device_id': tf.io.FixedLenFeature([], tf.int64),
        'device_cloudiot_name': tf.io.FixedLenFeature([], tf.string),
        'user_id': tf.io.FixedLenFeature([], tf.int64),
        'calibration_x0': tf.io.FixedLenFeature([], tf.float32),
        'calibration_y0': tf.io.FixedLenFeature([], tf.float32),
        'calibration_x1': tf.io.FixedLenFeature([], tf.float32),
        'calibration_y1': tf.io.FixedLenFeature([], tf.float32),
        'num_detections': tf.io.FixedLenFeature([], tf.int64),
        'detection_classes': tf.io.FixedLenFeature([], tf.int64),
        'detection_scores': tf.io.FixedLenFeature([], tf.float32),
        'original_image': tf.io.FixedLenFeature([], tf.string),
        "detection_boxes_x0": tf.io.FixedLenFeature([], tf.float32),
        "detection_boxes_y0": tf.io.FixedLenFeature([], tf.float32),
        "detection_boxes_x1": tf.io.FixedLenFeature([], tf.float32),
        "detection_boxes_y1": tf.io.FixedLenFeature([], tf.float32),
    }))



class WindowedWritesFn(beam.DoFn):
    """write one file per window/key"""
    def __init__(self, outdir):
        self.outdir = outdir

    def process(self, element):
        (window, elements) = element
        window_start = str(window.start.to_rfc3339())
        window_end = str(window.end.to_rfc3339())
        return (elements | beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix=os.path.join(self.outdir, f'{window_start}-{window_end}'),
            num_shards=1,
            shard_name_template='',
            file_name_suffix=".tfrecords.gz",
            coder=example_proto_coder.ExampleProtoCoder(raw_data_metadata.schema)
        ))

class AddWindowingInfoFn(beam.DoFn):
  """output tuple of window(key) + element(value)"""
  def process(self, element, window=beam.DoFn.WindowParam):
    yield (window, element)

WINDOW_SIZE = 60

def preprocess_features(input_bytes):
    fileobj = io.BytesIO(input_bytes)
    with gzip.GzipFile(fileobj=fileobj, mode='r') as f:
        output_features = f.read()
    output_features = json.loads(output_features)
    output_features["original_image"] = base64.b64decode(output_features["original_image"])
    output_features["detection_boxes_x0"] = [x[0] for x in output_features["detection_boxes"]]
    output_features["detection_boxes_y0"] = [x[1] for x in output_features["detection_boxes"]]
    output_features["detection_boxes_x1"] = [x[2] for x in output_features["detection_boxes"]]
    output_features["detection_boxes_y1"] = [x[3] for x in output_features["detection_boxes"]]
    
    output_features["calibration_x0"] = output_features["calibration"][0]
    output_features["calibration_y0"] = output_features["calibration"][1]
    output_features["calibration_x1"] = output_features["calibration"][2]
    output_features["calibration_y1"] = output_features["calibration"][3]

    del output_features["calibration"]
    del output_features["detection_boxes"]
    return output_features

windowed_dataset = (p
    | "Read ObjectDetectEvents" >> beam.io.ReadFromPubSub(topic=topic) \
    | "Deserialize data" >> beam.Map(preprocess_features) \
    #| "Add timestamps" >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['ts'])) \
    | "Window" >>  beam.WindowInto(window.FixedWindows(WINDOW_SIZE)) \
    | "Add Window Info" >> beam.ParDo(AddWindowingInfoFn()) \
    | 'Group By Window' >> beam.GroupByKey()  \
    | 'Write TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
                file_path_prefix=dataflow_gcs_location,
                file_name_suffix=".tfrecords.gz",
                coder=example_proto_coder.ExampleProtoCoder(raw_data_metadata.schema))
)

# windows, windowed_elements = windowed_dataset
# window_start = str(window.start.to_rfc3339())
# window_end = str(window.end.to_rfc3339())                  

# tf_record_writer = windowed_dataset | 'Write TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
#             file_path_prefix=dataflow_gcs_location,
#             file_name_suffix=".tfrecords.gz",
#             coder=example_proto_coder.ExampleProtoCoder(raw_data_metadata.schema))
                 
        



In [8]:
ib.show_graph(p)

In [13]:
# from apache_beam.runners import DataflowRunner

# runner = DataflowRunner()
# runner.run_pipeline(p, options=options)

with beam_impl.Context('/tmp'):
    p.run()

OSError: No files found based on the file pattern /tmp/it-1uxh2ztz140611056256912/full/078add12c1-140611049974800-140611049552400-140611056256912-*-of-*

In [10]:
!gsutil ls gs://print-nanny-dev/dataflow/bounding-boxes-dev/

CommandException: One or more URLs matched no objects.
