# anomaly_score_unsupervised

Send arbitrary time-series data to the component and train an unsupervised LSTM-Autoencoder model. The moment unseen patters occur the anomaly score rises.

Future work:

- reset / rollback model (for regular flushing or after a real anomaly (true positive) occurred)
- add check-pointing for service persistence and rollback

In [1]:
import os

#os.environ['create_image']='True'
#os.environ['repository']='romeokienzler'
#os.environ['version']='0.1'
#
os.environ['install_requirements']='False'

In [2]:
if bool(os.environ.get('create_image',False)):
    docker_file="""
    FROM registry.access.redhat.com/ubi8/python-39
    RUN pip install ipython nbformat numpy ibm-cos-sdk-core ibm-cos-sdk ibm-watson-machine-learning ibm-watson-studio-pipelines ibmcloudsql pyyaml
    ADD ibm-sql-query-cpd.py .
    ADD start.sh .

    """
    with open("Dockerfile", "w") as text_file:
        text_file.write(docker_file)

    start_file="""
    #!/bin/bash
    echo "Parameter 1: $1"
    echo "Parameter 2: $2"
    echo "Parameter 3: $3"
    echo "Parameter 4: $4"
    echo "Parameter 5: $5"
    echo "Parameter 6: $6"
    echo "Parameter 7: $7"
    echo "Parameter 8: $8"
    echo "Parameter 9: $9"
    echo "Parameter 10: ${10}"
    echo "Parameter 11: ${11}"
    echo "Parameter 12: ${12}"
    echo "Parameter 13: ${13}"
    echo "Parameter 14: ${14}"
    echo "Parameter 15: ${15}"
    echo "Parameter 16: ${16}"
    echo "Parameter 17: ${17}"
    echo "Parameter 18: ${18}"
    echo "Parameter 19: ${19}"
    echo "Parameter 20: ${20}"
    python /opt/app-root/src/ibm-sql-query-cpd.py "$1$2" "$3$4" "$5$6" "$7$8" "$9${10}" "${11}${12}" "${13}${14}" "${15}${16}" "${17}${18}" "${19}${20}"
    """
    with open("start.sh", "w") as text_file:
        text_file.write(start_file)

    !chmod 755 start.sh
    !jupyter nbconvert --to script ibm-sql-query-cpd.ipynb    
    !docker build -t ibm_sql_query_cpd:`echo $version` .
    !docker tag ibm_sql_query_cpd:`echo $version` `echo $repository`/ibm_sql_query_cpd:`echo $version`
    !docker push `echo $repository`/ibm_sql_query_cpd:`echo $version`
    !rm Dockerfile
    !rm ibm-sql-query-cpd.py
    !rm start.sh
elif bool(os.environ.get('install_requirements',False)):
    !pip install tensorflow==2.9.1 numpy==1.23.2 scikit-learn==1.1.2  pandas==1.4.3 flask==2.2.2



In [3]:
import numpy as np
from numpy import concatenate
from matplotlib import pyplot
from pandas import read_csv
from pandas import DataFrame
from pandas import concat
import sklearn
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Activation

from flask import request
from flask import Flask
import time
import json

import numpy

import pickle
import logging
import sys

2022-08-22 14:07:06.682681: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-08-22 14:07:06.682726: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [4]:
# COS URL where the results of the SQL job are to be stored
target_dir_path = os.environ.get('target_dir_path')

# Asset name to register for the results written by the SQL job
target_asset_name = os.environ.get('target_asset_name')

# sql statement to execute
sql = os.environ.get('sql')

# (unique) Custom Resource Name (CRN) of IBM SQL Query Service
data_engine_crn = os.environ.get('data_engine_crn')

# default: CSV - (will be generated into according STORED AS … clause in the INTO clause)
format = os.environ.get('format' , 'CSV')

# optional, list of columns to use for partitioning the results of the SQL job, will be generated into according PARTITIONED BY (<columns>) clause in the INTO clause)
partition_columns = os.environ.get('partition_columns','')

# optional, number of objects to store the results of the SQL job in, will be generated into according PARTITIONED INTO <num> OBJECTS clause in INTO clause
number_of_objects = int(os.environ.get('number_of_objects', 0))

# optional, number of rows to be stored in each result object of the SQL job, will be generated into according PARTITIONED EVERY <num> ROWS clause in INTO clause
rows_per_object = int(os.environ.get('rows_per_object', 0))

# default: False, only valid when none of the above partitioning option is specified, produces exactly one object with name specified in target_dir_path, twill be generated into sqlClient.rename_exact_result(jobid) after SQL has run.
exact_name = os.environ.get('exact_name', 'False')

# default: False - will be generated into JOBPREFIX NONE in the INTO clause. Will cause results of previous runs with same output_uri to be overwritten, because no unique sub folder will be created for the result)
no_jobid_folder = os.environ.get('no_jobid_folder', 'False')

# default: output.txt - output file name containing the CPD path of the resulting asset
data_asset = os.environ.get('data_asset','output.txt')


In [5]:
for element in sys.argv:
    logging.warning('argv raw ' +  element)

parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))


for parameter in parameters:
    exec(parameter)
    logging.warning('Parameter: ' + parameter)


for parameter in parameters:
    exec("logging.warning('final parameter: ' + str({}))".format(parameter.split('=')[0]))
    exec("logging.warning('final parameter type: ' + str(type({})))".format(parameter.split('=')[0]))



In [6]:
with open('./anomaly-score-unsupervised/watsoniotp.healthy.phase_aligned.pickle','rb') as file_object:
    raw_data = file_object.read()
    data_healthy = pickle.loads(raw_data, encoding='latin1')

with open('./anomaly-score-unsupervised/watsoniotp.broken.phase_aligned.pickle','rb') as file_object:
    raw_data = file_object.read()
    data_broken = pickle.loads(raw_data, encoding='latin1')

In [7]:
data_healthy = data_healthy.reshape(3000,3)
data_broken = data_broken.reshape(3000,3)


In [8]:
def scaleData(data):
    # normalize features
    scaler = MinMaxScaler(feature_range=(0, 1))
    return scaler.fit_transform(data)


In [9]:
data_healthy_scaled = scaleData(data_healthy)
data_broken_scaled = scaleData(data_broken)


In [10]:
x = np.array([0,1,2,3,4,5,6,7,8,9])
y = np.array([10,11,12,13,14,15,16,17,18,19])
z = np.array([20,21,22,23,24,25,26,27,28,29])
data = np.vstack((x,y,z))

data

array([[ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9],
       [10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
       [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]])

In [11]:
def lstm_data_transform(x_data, y_data, num_steps=5):
    """ Changes data to the format for LSTM training 
for sliding window approach """    # Prepare the list for the transformed data
    X, y = list(), list()    # Loop of the entire data set
    for i in range(x_data.shape[0]):
        # compute a new (sliding window) index
        end_ix = i + num_steps        # if index is larger than the size of the dataset, we stop
        if end_ix >= x_data.shape[0]:
            break        # Get a sequence of data for x
        seq_X = x_data[i:end_ix]
        # Get only the last element of the sequency for y
        seq_y = y_data[end_ix]        # Append the list with sequencies
        X.append(seq_X)
        y.append(seq_y)    # Make final arrays
    x_array = np.array(X)
    y_array = np.array(y)
    return x_array, y_array

In [12]:
d1, d2 = lstm_data_transform(data,data)


In [13]:
data.shape = (2,3,5)
data

array([[[ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9],
        [10, 11, 12, 13, 14]],

       [[15, 16, 17, 18, 19],
        [20, 21, 22, 23, 24],
        [25, 26, 27, 28, 29]]])

In [14]:
timesteps = 10
dim = 3
samples = 3000
data_healthy_scaled_reshaped = data_healthy_scaled
#reshape to (300,10,3)
data_healthy_scaled_reshaped.shape = (int(samples/timesteps),timesteps,dim)


In [15]:
loss_history = []


class LossHistory(Callback):
    def on_train_begin(self, logs):
        loss_history = [] 

    def on_train_batch_end(self, batch, logs):
        print('Loss on_train_batch_end '+str(logs.get('loss')))
        loss_history.append(logs.get('loss'))

In [16]:
# design network

model = Sequential()
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(Dense(3))
model.compile(loss='mae', optimizer='adam')

def train(data):
    data.shape = (300, 10, 3)
    model.fit(data, data, epochs=3, batch_size=72, validation_data=(data, data), verbose=0, shuffle=False,callbacks=[LossHistory()])
    data.shape = (3000, 3)

def score(data):
    data.shape = (300, 10, 3)
    yhat =  model.predict(data)
    yhat.shape = (3000, 3)
    return yhat


2022-08-22 14:07:09.458958: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-08-22 14:07:09.458993: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-08-22 14:07:09.459019: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (elyra): /proc/driver/nvidia/version does not exist
2022-08-22 14:07:09.459292: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [17]:
"""for i in range(0):

    print("----------------")
    train(data_healthy_scaled)
    yhat_healthy = score(data_healthy_scaled)
    yhat_broken = score(data_broken_scaled)
    data_healthy_scaled.shape = (3000, 3)
    data_broken_scaled.shape = (3000, 3)

print("----------------broken")
train(data_broken_scaled)
yhat_healthy = score(data_healthy_scaled)
yhat_broken = score(data_broken_scaled)
data_healthy_scaled.shape = (3000, 3)
data_broken_scaled.shape = (3000, 3)
"""

'for i in range(0):\n\n    print("----------------")\n    train(data_healthy_scaled)\n    yhat_healthy = score(data_healthy_scaled)\n    yhat_broken = score(data_broken_scaled)\n    data_healthy_scaled.shape = (3000, 3)\n    data_broken_scaled.shape = (3000, 3)\n\nprint("----------------broken")\ntrain(data_broken_scaled)\nyhat_healthy = score(data_healthy_scaled)\nyhat_broken = score(data_broken_scaled)\ndata_healthy_scaled.shape = (3000, 3)\ndata_broken_scaled.shape = (3000, 3)\n'

In [18]:
def doNN(data):
    data_scaled = scaleData(data)
    train(data_scaled)
    yhat = score(data_scaled)
    data_scaled.shape = (3000, 3)


In [None]:
app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    message = request.get_json()
    #message = message[1:-1] # get rid of encapsulating quotes
    #json_array = json.loads()
    data = numpy.asarray(message)
    print(data)
    doNN(data)
    return json.dumps(loss_history)

app.run(host='0.0.0.0', port=8080)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:8080
 * Running on http://10.244.0.22:8080
INFO:werkzeug:[33mPress CTRL+C to quit[0m


[[ 2.08        3.37536     3.97083266]
 [ 2.1836288   3.76812337  3.95194702]
 [ 2.31038837  4.18246112  3.9449437 ]
 ...
 [ 4.67227426  8.55622075 11.7405177 ]
 [ 4.98298998  9.13593768 11.85424761]
 [ 5.31522579  9.74939673 12.01591895]]
Loss on_train_batch_end 0.3888910114765167
Loss on_train_batch_end 0.4272225499153137
Loss on_train_batch_end 0.44438934326171875
Loss on_train_batch_end 0.44490236043930054
Loss on_train_batch_end 0.447885125875473
Loss on_train_batch_end 0.34840255975723267
Loss on_train_batch_end 0.3837062120437622
Loss on_train_batch_end 0.3976879119873047
Loss on_train_batch_end 0.394283652305603
Loss on_train_batch_end 0.3962740898132324
Loss on_train_batch_end 0.26323315501213074
Loss on_train_batch_end 0.29454872012138367
Loss on_train_batch_end 0.30323049426078796
Loss on_train_batch_end 0.2998073101043701
Loss on_train_batch_end 0.30089956521987915


INFO:werkzeug:127.0.0.1 - - [22/Aug/2022 14:08:27] "POST / HTTP/1.1" 200 -
