In [3]:
PROJECT = "livescore-web"
BUCKET = "cost_anomaly_detection"
REGION = "europe-west1"
# ROOT_DIR = "cost_anomaly_detection"

In [4]:
import os

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
# os.environ['ROOT_DIR'] = ROOT_DIR
os.environ['TFVERSION'] = "2.1"

In [None]:
%%bash
gsutil mb -l $REGION gs://$BUCKET # Create a bucket if there doesn't exist

In [5]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


In [6]:
from google.cloud import storage


def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # bucket_name = "your-bucket-name"
    # source_blob_name = "storage-object-name"
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Blob {} downloaded to {}.".format(
            source_blob_name, destination_file_name
        )
    )


In [10]:
download_blob("cost_anomaly_detection", "preproc/cost_null_project.csv", "./cost.csv")



Blob preproc/cost_null_project.csv downloaded to ./cost.csv.


In [11]:
def read_data_csv(filepath, header_included = True):
    
    import csv
    import numpy as np
    
    data_dict = {}
    with open(filepath, newline = "") as csvfile:
        data = list(csv.reader(csvfile))
    header = data[0]
    data = np.array(data[1:], dtype = np.float32) if header_included else np.array(data, dtype = np.float32)
    for index, name in enumerate(header):
        data_dict[name] = data[:, index]

    return data_dict

read_data_csv("cost.csv")

{'cost': array([4.1900e+00, 1.0420e+01, 8.3900e+00, 1.1680e+01, 9.9400e+00,
        9.9400e+00, 1.0040e+01, 9.3300e+00, 9.9400e+00, 9.5300e+00,
        9.8400e+00, 9.8400e+00, 9.4300e+00, 1.0760e+01, 9.3300e+00,
        1.1270e+01, 8.0800e+00, 1.4210e+01, 1.3750e+01, 9.0000e+00,
        1.7150e+01, 1.6530e+01, 1.9330e+01, 1.0820e+01, 1.1880e+01,
        1.1550e+01, 3.0440e+01, 3.9590e+01, 2.3500e+01, 3.7640e+01,
        4.5870e+01, 4.2930e+01, 3.8640e+01, 1.4066e+02, 1.6766e+02,
        6.7270e+01, 3.1520e+01, 3.1820e+01, 3.0470e+01, 4.0460e+01,
        6.7840e+01, 7.1500e+01, 3.9850e+01, 3.8600e+01, 2.5290e+01,
        4.5340e+01, 2.1200e+01, 8.8790e+01, 1.2041e+02, 7.7400e+01,
        3.3150e+01, 6.0130e+01, 6.9050e+01, 4.8160e+01, 1.1256e+02,
        1.5001e+02, 6.1530e+01, 3.0570e+01, 4.9230e+01, 8.3990e+01,
        3.8290e+01, 1.1744e+02, 1.0641e+02, 6.9170e+01, 5.7880e+01,
        9.9730e+01, 9.5250e+01, 5.9990e+01, 6.0670e+01, 1.8634e+02,
        1.4864e+02, 7.1200e+01, 4.7870e+

In [155]:
%%writefile trainer/task.py

import argparse
import json
import os
import tensorflow as tf
import numpy as np

from . import model
from . import preprocess


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--bucket",
        help = "GCS path to data. We assume that data is in gs://BUCKET/cost_anomaly_detection/preprocess/",
        required = True
    )
    parser.add_argument(
        "--output_dir",
        help = "GCS location to write checkpoints and export models",
        required = True
    )
    parser.add_argument(
        "--train_split",
        help = "Number of examples to train the model.",
        type = int,
        default = 595
    )
    parser.add_argument(
        "--batch_size",
        help = "Number of examples to compute gradient over.",
        type = int,
        default = 5
    )
    parser.add_argument(
        "--sequence_length",
        help = "Number of historical examples to predict.",
        type = int,
        default = 7
    )
    parser.add_argument(
        "--extract_mode",
        help = "Method to feature extraction from raw features -- provide raw or ensemble.",
        type = str,
        default = "raw"
    )
    parser.add_argument(
        "--epochs_encoder",
        help = "Number of epochs for training LSTM encoder.",
        type = int,
        default = 100
    )
    parser.add_argument(
        "--epochs_forecaster",
        help = "Number of epochs for training LSTM forecaster.",
        type = int,
        default = 50
    )
    parser.add_argument(
        "--job-dir",
        help = 'this model ignores this field, but it is required by gcloud',
        default = "junk"
    )
    parser.add_argument(
        "--encode_wide",
        help = "Hidden layer sizes to use for LSTM encoder feature columns -- provide space-separated layers",
        nargs = '+',
        type = int,
        default = [128]
    )
    parser.add_argument(
        "--decode_wide",
        help = "Hidden layer sizes to use for LSTM decoder feature columns -- provide space-separated layers",
        nargs = '+',
        type = int,
        default = [32]
    )
    parser.add_argument(
        "--forecaster_wide",
        help = "Hidden layer sizes in LSTM cell to use for LSTM forecaster feature columns -- \
        provide space-separated layers",
        nargs = '+',
        type = int,
        default = [400, 200]
    )
    parser.add_argument(
        "--dense_size",
        help = "Hidden layer sizes to use for LSTM forecaster feature columns",
        type = int,
        default = 200
    )
    parser.add_argument(
        "--pattern",
        help = "Specify a pattern that has to be in input files. For example 00001-of will process only one shard",
        default = "of"
    )
    
    ## parse all arguments
    args = parser.parse_args()
    arguments = args.__dict__

    # unused args provided by service
    arguments.pop("job_dir", None)

    ## assign the arguments to the model and preprocess variables
    output_dir = arguments.pop("output_dir")
    preprocess.bucket = arguments.pop("bucket")
    preprocess.train_split = arguments.pop("train_split")
    preprocess.sequence_length = arguments.pop("sequence_length")
    preprocess.batch_size = arguments.pop("batch_size")
    preprocess.pattern = arguments.pop("pattern")
    model.bucket = arguments.pop("bucket')
    model.batch_size = arguments.pop("batch_size")
    model.sequence_length = arguments.pop("sequence_length")
    model.extract_mode = arguments.pop("extract_mode")
    model.epochs_encoder = arguments.pop("epochs_encoder")
    model.epochs_forecaster = arguments.pop("epochs_forecaster")
    model.encode_wide = arguments.pop("encode_wide")
    model.decode_wide = arguments.pop("decode_wide")
    model.forecaster_wide = arguments.pop("forecaster_wide")
    model.dense_size = arguments.pop("dense_size")  
    model.pattern = arguments.pop("pattern")

    # This code can be removed if you are not using hyperparameter tuning
    output_dir = os.path.join(
        output_dir,
        json.loads(
            os.environ.get("TF_CONFIG", "{}")
        ).get("task", {}).get("trial", "")
    )

    # Run the training job
    dataset_train, dataset_test, tensor_to_extract, tensor_to_eval, label_train, label_test = preprocess.load_data()
    lstm_encoder = model.deploy_lstm_autoencoder(dataset_train, dataset_test, output_dir)
    model.deploy_lstm_forecaster(
        tensor_to_extract, tensor_to_eval, lstm_encoder, label_train, label_test, output_dir 
    )

Writing trainer/task.py


In [152]:
%%writefile trainer/preprocess.py

import shutil, os, datetime
import numpy as np
import tensorflow as tf

bucket = None  # set from task.py
pattern = "of" # gets all files

feature_columns = ["cost", "noCost"]
label_column = "cost"
defaults = [[0.0], [0.0]]

# Define some hyperparamters
sequence_length = 7
train_split = 595
num_features = len(feature_columns)
batch_size = 5
buffer_size = 1000        


# Create a read-csv-data function to read data from GCP Platform storage
def read_data_csv(filepath, header_included = True):
    
    import csv
    import numpy as np
    
    data_dict = {}
    with open(filepath, newline = "") as csvfile:
        data = list(csv.reader(csvfile))
    header = data[0]
    data = np.array(data[1:], dtype = np.float32) if header_included else np.array(data, dtype = np.float32)
    for index, name in enumerate(header):
        data_dict[name] = data[:, index]

    return data_dict

# Create a function to generate sequence and label prepared for LSTM model
def get_sequence(data, sequence_length):
    
    sequence = []
    num_elements = data.shape[0]

    for start, end in zip(range(0, num_elements-sequence_length), range(sequence_length, num_elements)):
        sequence.append(data[start:end])
    sequence = np.asarray(sequence).reshape((num_elements-sequence_length, sequence_length, 1))
    return sequence

def get_label(data, sequence_length = 7):

    num_elements = data.shape[0]
    label = np.asarray(data[sequence_length:num_elements])
    label = label.reshape((data[sequence_length:num_elements].shape[0], 1))

    return label

# Create a function to preprocess our data by using function above
def min_max_scaler(tensor):
    
    tensor_train_min = tf.math.reduce_min(tensor)
    tensor_train_max = tf.math.reduce_max(tensor)
    tensor = tf.math.divide(
        tf.math.subtract(tensor, tensor_train_min), 
        tf.math.subtract(tensor_train_max, tensor_train_min)
    )
    
    return tensor, tensor_train_min, tensor_train_max

def preprocess_data(data, feature_columns, label_column, train_split, sequence_length):
    
    num_elements = len(data[label_column])
    feature_train = []
    feature_test = []
    scaler_parameter = {}
    
    for key in feature_columns:
        tensor_train = tf.convert_to_tensor(data[key][:train_split])
        # tensor_train, min_value, max_value = min_max_scaler(tensor_train, train_split)
        tensor_train = get_sequence(tensor_train, sequence_length)
        feature_train.append(tensor_train)
        
        tensor_test = tf.convert_to_tensor(data[key][train_split:])
        #tensor_test = tf.math.divide(tf.math.subtract(tensor_test, min_value),tf.math.subtract(max_value, min_value))
        tensor_test = get_sequence(tensor_test, sequence_length)
        feature_test.append(tensor_test)
        
        # scaler_parameter[key] = [min_value, max_value]
    feature_train = tf.concat([feature_train[_] for _ in range(len(feature_train))], axis = 2)
    feature_test = tf.concat([feature_test[_] for _ in range(len(feature_test))], axis = 2)
    
    tensor_train = data[label_column][:train_split]
    # tensor_train, min_value, max_value = min_max_scaler(tensor_train, train_split)
    label_train = tf.convert_to_tensor(get_label(tensor_train, sequence_length))
    
    tensor_test = data[label_column][train_split:]
    # tensor_test = tf.math.divide(tf.math.subtract(tensor_test, min_value), tf.math.subtract(max_value, min_value))
    label_test = tf.convert_to_tensor(get_label(tensor_test, sequence_length))
    
    # scaler_parameter[label_column] = [min_value, max_value]
    # label = min_max_scaler(label_tensor)
    
    return feature_train, feature_test, label_train, label_test #, scaler_parameter

def tensorflow_transform_for_encoder(feature, label, buffer_size, batch_size):
    
    dataset = tf.data.Dataset.from_tensor_slices((feature, label))
    dataset = dataset.cache().shuffle(buffer_size).batch(batch_size).repeat()
    tensor_predict = feature
    
    return dataset, tensor_predict

def load_data():
    
    # Read data from csv.file and generate sequence ready for training
    data_file_path = "gs://{}/cost_anomaly_detection/preprocess/{}*{}*".format(bucket, "cost", pattern)
    data_dict = read_data_csv(data_file_path)
    feature_train, feature_test, label_train, label_test = preprocess_data(
        data_dict, feature_columns, label_column, train_split, sequence_length
    )
    # Convert sequence to tensorflow dataset
    dataset_train, tensor_to_extract = tensorflow_transform_for_encoder(
        feature_train, feature_train, buffer_size, batch_size
    )
    dataset_test, tensor_to_eval = tensorflow_transform_for_encoder(
        feature_test, feature_test, buffer_size, batch_size
    )
    
    return dataset_train, dataset_train, tensor_to_extract, tensor_to_eval, label_train, label_test

Writing trainer/preprocess.py


In [153]:
%%writefile trainer/model.py

import shutil, os, datetime
import numpy as np
import tensorflow as tf

from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.optimizers import *

bucket = None  # set from task.py
pattern = "of" # gets all files

# Define some hyperparamters
epochs_encoder = 100
epochs_forecaster = 50
validation_steps = 50
sequence_length = 7
extract_mode = "raw"
num_features = len(feature_columns)
batch_size = 5
buffer_size = 1000
dropout = 0.3
encode_wide = [128]
decode_wide = [32]
num_extracts = num_features + encode_wide[0] if extract_mode == "raw" else num_features + 1
forecaster_wide = [400, 200]
dense_size = 200
evaluation_interval = 200

# Create a function for build a LSTM Autoencoder
def build_lstm_autoencoder(sequence_length, num_features, dropout, encode_wide, decode_wide):
    
    # Neuron Architecture
    # Input Layer
    input_layer = Input(shape = (sequence_length, num_features))
    # Encoder Layer
    for deep, wide in enumerate(encode_wide):
        if deep == 0:
            encoded_layer = LSTM(wide, return_sequences = True)(input_layer, training = True)
            # encoded_layer = BatchNormalization()(encoded_layer, training = True)
            encoded_layer = Dropout(dropout)(encoded_layer, training = True)       
        else:
            encoded_layer = LSTM(wide, return_sequences = True)(encoded_layer, training = True)
            # encoded_layer = BatchNormalization()(encoded_layer, training = True)
            encoded_layer = Dropout(dropout)(encoded_layer, training = True)
    # Decoder Layer
    for deep, wide in enumerate(decode_wide):
        if deep == 0:
            decoded_layer = LSTM(wide, return_sequences = True)(encoded_layer, training = True)
            # decoded_layer = BatchNormalization()(decoded_layer, training = True)
            decoded_layer = Dropout(dropout)(decoded_layer, training = True)
        else:
            decoded_layer = LSTM(wide, return_sequences = True)(decoded_layer, training = True)
            # decoded_layer = BatchNormalization()(decoded_layer, training = True)
            decoded_layer = Dropout(dropout)(decoded_layer, training = True)
    # Output Layer
    output_layer = TimeDistributed(Dense(2))(decoded_layer)
    
    lstm_encoder = Model(input_layer, encoded_layer)
    lstm_autoencoder = Model(input_layer, output_layer)
    lstm_autoencoder.compile(optimizer = RMSprop(), loss = "mean_squared_error")
    
    return lstm_encoder, lstm_autoencoder

def train_lstm_autoencoder(dataset_train, dataset_test, output_dir):

    lstm_encoder, lstm_autoencoder = build_lstm_autoencoder(
        sequence_length, num_features, dropout, encode_wide, decode_wide
    )
    print("Here is our LSTM-Autoencoder architecture so far:\n")
    lstm_autoencoder.summary()
    print("\n")
    
    log_path = os.path.join(output_dir, "logs_autoencoder")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir = log_path)
    lstm_autoencoder.fit(
        dataset_train, epochs = epochs_encoder, steps_per_epoch = evaluation_interval, 
        validation_data = dataset_test, validation_steps = validation_steps, verbose = 2, 
        callbacks = [tensorboard_callback]
    )
    
    export_path = os.path.join(output_dir, "lstm_encoder", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    tf.saved_model.save(lstm_encoder, export_path)
    print("Saved LSTM-Encoder to {}".format(export_path))

# Create a function to extract features from LSTM encoder and prepare them for LSTM forecaster
def ensemble_averaging(feature):
    
    feature_extract = np.array(
        [tf.math.reduce_mean(feature[_1, _2, :]) for _1 in range(feature.shape[0]) for _2 in range(feature.shape[1])]
    )
    feature_extract = tf.convert_to_tensor(feature_extract.reshape((feature.shape[0], feature.shape[1], 1)))
    
    return feature_extract
    
def sequence_extraction(encoder, feature, sequence_length, mode):
    
    feature_encoded = encoder.predict(feature)
    feature_ensemble = ensemble_averaging(feature)
    
    # Encoder will generate multiple sequences from original one.
    # We can directly use the raw sequences generated from encoder Or use the ensembled sequence (averaging).
    # We can also consider whether includes fixtures in the forecaster.
    if mode == "raw":
        feature_extract = tf.concat([feature_encoded, feature], axis = 2)     
    else:
        feature_extract = tf.concat([feature_ensemble, feature], axis = 2)       
    
    return feature_extract

def tensorflow_transform_for_forecaster(feature, label, encoder, buffer_size, batch_size):
    
    feature_extract = sequence_extraction(
        encoder, feature = feature, sequence_length = sequence_length, mode = extract_mode
    )
    dataset, tensor_predict = tensorflow_transform_for_encoder(feature_extract, label, buffer_size, batch_size)
    
    return dataset, tensor_predict

# Create a function to build LSTM forecaster
def rmse(y, y_pred):
    return tf.sqrt(tf.math.reduce_mean(tf.math.square(y_pred - y))) 

def build_lstm_forecaster(dropout, num_extracts, forecaster_wide, dense_size):
    
    ### Neuron Architecture
    # Input Layer
    input_lstm = Input(shape = (sequence_length, num_extracts))
    # LSTM Layer
    if len(forecaster_wide) == 1:
        lstm = LSTM(forecaster_wide[0], return_sequences = False)(input_lstm, training = True)
        # lstm = BatchNormalization()(lstm, training = True)
        lstm = Dropout(dropout)(lstm, training = True)
    else:
        for deep, wide in enumerate(forecaster_wide):
            if deep == 0:
                lstm = LSTM(wide, return_sequences = True)(input_lstm, training = True)
                # lstm = BatchNormalization()(lstm, training = True)
                lstm = Dropout(dropout)(lstm, training = True)
            elif deep == len(forecaster_wide) - 1:
                lstm = LSTM(wide, return_sequences = False)(lstm, training = True)
                # lstm = BatchNormalization()(lstm, training = True)
                lstm = Dropout(dropout)(lstm, training = True)
            else:
                lstm = LSTM(wide, return_sequences = True)(lstm, training = True)
                # lstm = BatchNormalization()(lstm, training = True)
                lstm = Dropout(dropout)(lstm, training = True)
    # Output Layer
    dense = Dense(dense_size)(lstm)
    output_lstm = Dense(1)(dense)
    
    forecaster = Model(input_lstm, output_lstm)
    forecaster.compile(optimizer = RMSprop(), loss = "mean_squared_error", metrics = [rmse, "mse"])
    
    return forecaster

def train_lstm_forecaster(dataset_train, dataset_test, output_dir):
    
    lstm_forecaster = build_lstm_forecaster(
        dropout, num_extracts, forecaster_wide, dense_size
    )
    
    print("Here is our LSTM-Forecaster architecture so far:\n")
    lstm_forecaster.summary()
    print("\n")
    
    log_path = os.path.join(output_dir, "logs_forecaster", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir = log_path)
    lstm_forecaster.fit(
        dataset_train, epochs = epochs_forecaster, steps_per_epoch = evaluation_interval, 
        validation_data = dataset_test, verbose = 2, validation_steps = validation_steps,
        callbacks = [tensorboard_callback]
    )
    export_path = os.path.join(output_dir, "lstm_forecaster", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    tf.saved_model.save(lstm_forecaster, export_path)
    print("Saved LSTM-Forecaster to {}".format(export_path))


def deploy_lstm_autoencoder(dataset_train, dataset_test, output_dir):
    
    # Build LSTM-Autoencoder and train the encoder
    lstm_encoder, lstm_autoencoder = build_lstm_autoencoder(
        sequence_length, num_features, dropout, encode_wide, decode_wide
    )
    train_lstm_autoencoder(dataset_train, dataset_test, output_dir)
    
    return lstm_encoder
    
def deploy_lstm_forecaster(tensor_to_extract, tensor_to_eval, encoder, label_train, label_test, output_dir):
    
    # Generate extracted features from raw features and convert them to tensorflow dataset
    dataset_train, _ = tensorflow_transform_for_forecaster(
        tensor_to_extract, label_train, lstm_encoder, buffer_size, batch_size
    )
    dataset_test, _ = tensorflow_transform_for_forecaster(
        tensor_to_eval, label_test, lstm_encoder, buffer_size, batch_size
    )
    # Build LSTM-Forecaster and train the forecaster
    lstm_forecaster = build_lstm_forecaster(
        dropout, num_extracts, forecaster_wide, dense_size
    )
    train_lstm_forecaster(dataset_train, dataset_test, output_dir)

Writing trainer/model.py


Training the model locally

In [None]:
%%bash

gcloud ai-platform local train \
  --package-path trainer \
  --module-name trainer.task \
  --job-dir local-training-output

In [7]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

In [11]:
%%bash
gsutil ls gs://${BUCKET}/preproc/

gs://cost_anomaly_detection/preproc/cost_null_project.csv


Training the model on GCP AI platform

In [14]:
%%bash
JOB_NAME="anomaly_lstm_$(date +%Y%m%d_%H%M%S)"
BUCKET="cost_anomaly_detection"
OUTDIR="gs://$BUCKET/$JOB_NAME"
TFVERSION="2.1"
REGION="europe-west1"
gcloud ai-platform jobs submit training $JOB_NAME \
    --module-name=trainer.task \
    --package-path= trainer/ \
    --region=$REGION \
    --bucket=$BUCKET \
    --job-dir=$OUTDIR \
    --runtime-version=$TFVERSION \
    -- \
    --output_dir=$OUTDIR \
    --stream-logs

ERROR: (gcloud.ai-platform.jobs.submit.training) unrecognized arguments:
  trainer/
  --bucket=cost_anomaly_detection (did you mean '--staging-bucket'?)
  To search the help text of gcloud commands, run:
  gcloud help -- SEARCH_TERMS


CalledProcessError: Command 'b'JOB_NAME="anomaly_lstm_$(date +%Y%m%d_%H%M%S)"\nBUCKET="cost_anomaly_detection"\nOUTDIR="gs://$BUCKET/$JOB_NAME"\nTFVERSION="2.1"\nREGION="europe-west1"\ngcloud ai-platform jobs submit training $JOB_NAME \\\n    --module-name=trainer.task \\\n    --package-path= trainer/ \\\n    --region=$REGION \\\n    --bucket=$BUCKET \\\n    --job-dir=$OUTDIR \\\n    --runtime-version=$TFVERSION \\\n    -- \\\n    --output_dir=$OUTDIR \\\n    --stream-logs\n'' returned non-zero exit status 2.