In [None]:
# Copyright 2020 Google LLC. All Rights Reserved.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [None]:
# With the exception of matplotlib all other requirements are the same as 
!pip install matplotlib

# Timeseries sample notebook

This notebook can be used to explore the various stages of data engineering used within the time series library. It has various sections, which correspond to different parts of the ML part of the solution.

The Java part of the library is not explored yet, this will be made availabel through Apache Beam xlang transforms at a later date.

The notebook will make use of the libraries made avialable via the timeseries python samples. This notebook should be run from within a virtual env that has those samples installed.

## [Inference](#inference) 
* [Explore Inference Batch](#b_inference)
* [Explore Inference Stream](#s_inference)

In [None]:
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import pandas as pd 
from datetime import datetime

import matplotlib

import apache_beam as beam

# Import the tfx components we will make use of.
import tfx
from tfx.components.example_gen.import_example_gen.component import ImportExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform

from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor

from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.proto.evaluator_pb2 import SingleSlicingSpec
from tfx.utils.dsl_utils import external_input
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

print('TF version', tf.__version__, '\nTFX version', tfx.__version__)

# <a id = "inference"> Inference </a>

The two examples below use the same code with the exception of the source, one being bounded the other unbounded

* Batch
* Streaming


In [None]:
INFERENCE_EXAMPLES= os.path.expanduser('~/demo/timeseries/data/inference/data')
print(f'The path {INFERENCE_EXAMPLES} should contain the inference TF.Example files')

SAVED_MODEL_LOCATION = os.path.expanduser('~/demo/timeseries/serving_model_dir/')

In [None]:
# Before we run the inference, we will plot the data to see what things look like with a few outliers

# Given a Dataset with time series examples, extract the LAST value
def convert_time_series_data_to_raw_values(dataset: tf.data.Dataset, num_records: int, num_timesteps : int):
    for tfrecord in dataset.take(num_records):
        serialized_example = tfrecord.numpy()
        example = tf.train.Example()
        example.ParseFromString(serialized_example)
        features = example.features.feature
        
        output = {}
        # Extract the time bounds
        output['span_start_timestamp'] = datetime.fromtimestamp(features['METADATA_SPAN_START_TS'].int64_list.value[0] / 1000)
        output['span_end_timestamp'] = datetime.fromtimestamp(features['METADATA_SPAN_END_TS'].int64_list.value[0] / 1000)
        
        for key in features:
            if key.endswith('-LAST') or key.endswith('-FIRST'):
                output[key] = features[key].float_list.value[num_timesteps-1]
        yield output

        # Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(INFERENCE_EXAMPLES, name)
                      for name in os.listdir(INFERENCE_EXAMPLES) if name.startswith('time')]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames)

output = convert_time_series_data_to_raw_values(dataset,num_records=2000, num_timesteps=5)
df = pd.DataFrame.from_dict(output)
df.plot('span_end_timestamp',y=['value-LAST', 'value-FIRST'],figsize=(18,9))

In [None]:
print(df.sort_values(by=['span_end_timestamp'], inplace=False).to_string())

In [None]:
from typing import Dict, Text, Any

class OutputOutliers(beam.DoFn):
    """
    Naive threshold based entirely on % difference cutoff value.
    """

    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    def __init__(self, threshold: int = 5, *unused_args, **unused_kwargs):
        beam.DoFn.__init__(self)
        self.threshold = threshold

    def process(self, element: Dict[Text, Any], *unused_args, **unused_kwargs):

        for k in element.keys():
            if k.endswith('-LAST') or k.endswith('-FIRST'):
                span_start_timestamp = element['span_start_timestamp']
                span_end_timestamp = element['span_end_timestamp']
                input_value = element[k]['input_value']
                output_value = element[k]['output_value']
                raw_data = element[k]['raw_data_array']
                diff = abs(input_value - output_value)
                if diff > self.threshold:
                    output = {'span_start_timestamp': span_start_timestamp,
                               'span_end_timestamp':span_end_timestamp,
                               'output_value': output_value,
                               'input_value' : input_value}
                    if k.endswith('-LAST'):
                        output['last_value_anom'] = 0
                    else:
                        output['first_value_anom'] = 0
                    yield output


In [None]:
import sys
import argparse
import apache_beam as beam
from google.protobuf.json_format import Parse
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public.proto import model_spec_pb2
from timeseries.transforms import process_inference_return

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

"""
Run inference pipeline using data generated from streaming pipeline.
"""

tfrecord_filenames = os.path.join(INFERENCE_EXAMPLES, 'tim*')

options = pipeline_options.PipelineOptions()

pipeline = beam.Pipeline(InteractiveRunner(), options=options)

_ = (
        pipeline
            | "ReadTFExample" >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern=tfrecord_filenames)
            | 'ParseExamples' >> beam.Map(tf.train.Example.FromString)
            | RunInference(
                    model_spec_pb2.InferenceSpecType(
                            saved_model_spec=model_spec_pb2.SavedModelSpec(
                                    signature_name=['serving_default'],
                                    model_path=SAVED_MODEL_LOCATION)))
            | beam.ParDo(process_inference_return.ProcessReturn())
            | beam.ParDo(OutputOutliers()))

df_anom = ib.collect(_) 

In [None]:
print(df_anom.sort_values(by=['span_end_timestamp'], inplace=False).to_string())

In [None]:
df_anom.set_index('span_end_timestamp')
df.set_index('span_end_timestamp')
output = df.merge(df_anom,how='outer', suffixes=['','_y'])
output.plot('span_end_timestamp', y=['value-FIRST','value-LAST','first_value_anom', 'last_value_anom'],figsize=(18,9),linestyle='none', marker='x')

In [None]:
print(output.sort_values(by=['span_end_timestamp'], inplace=False).to_string())

In [None]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
from datetime import timedelta

ib.options.capture_duration = timedelta(seconds=60)

In [None]:
INFERENCE_PUBSUB_SUBS= os.path.expanduser('projects/<project>/subscriptions/outlier-detection')
print(f'The subscription {INFERENCE_PUBSUB_SUBS} should contain the inference TF.Example files')

In [None]:
def run(args, pipeline_args):
    """
    Run inference pipeline using data generated from streaming pipeline.
    """
    pipeline_options = PipelineOptions(
            pipeline_args, save_main_session=True, streaming=True)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
                pipeline
                | 'ReadTFExample' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=args.pubsub_subscription)
                | 'ParseExamples' >> beam.Map(lambda x: Parse(x, tf.train.Example()))
                | RunInference(
                        model_spec_pb2.InferenceSpecType(
                                saved_model_spec=model_spec_pb2.SavedModelSpec(
                                        signature_name=['serving_default'],
                                        model_path=args.saved_model_location)))
                | beam.ParDo(process_inference_return.ProcessReturn())
                | beam.ParDo(process_inference_return.CheckAnomalous())
                | beam.ParDo(print))