In [1]:
%load_ext autoreload
%autoreload 2

import datetime
import importlib
import joblib
import json
import keras
import logging
import numpy as np
import operator
import os
import pandas as pd
from es_pandas import es_pandas
from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition
from keras.preprocessing.sequence import TimeseriesGenerator
from mods_models.models import config as cfg

In [2]:
# logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S')
logger = logging.getLogger(__name__)

mods_model = {
    'name': '20210316-6m1m-conn-ssh-robust-delta_False',
    'file': 'data_train-seq-12.h5',
    'scaler': 'data_train-seq-12.h5.scaler',
    'sequence_length': 12,
    'prediction_steps': 10,
    'transformation': 'none',
    'differential': False,
    'kafka': {
        'bootstrap_servers': {
            'in': ['127.0.0.1:9092'],
            'out': ['127.0.0.1:9092']
        },
        'topics': {
            'in': 'mods-agg-10m',
            'out': 'models-10m-data_train-seq-12'
        }
    },
    'data': {
        'in': {
            'index':['ts'],
            'columns':[
                'conn_count_uid_in',
                'ssh_count_uid_in'
            ]
        },
        'out': [
            'conn_count_uid_in',
            'ssh_count_uid_in'
        ]
    }
}

logger.debug('mods_model:\n%s' % mods_model)

STREAM_OFFSET = 0
KAFKA_CFG = mods_model['kafka']

In [3]:
consumer = KafkaConsumer(
    KAFKA_CFG['topics']['in'],
    bootstrap_servers=KAFKA_CFG['bootstrap_servers']['in'],
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
logger.info('consuming \'%s\' messages from: %s' % (KAFKA_CFG['topics']['in'], KAFKA_CFG['bootstrap_servers']['in']))

# producer = KafkaProducer(
#     bootstrap_servers=KAFKA_CFG['bootstrap_servers']['out'],
#     value_serializer=lambda v: json.dumps(v).encode('utf-8')
# )
# logger.info('producing \'%s\' messages to: %s' % (KAFKA_CFG['topics']['out'], KAFKA_CFG['bootstrap_servers']['in']))

ep = es_pandas('127.0.0.1:9200')

In [4]:
transformations = {
    'none': None,
    'diff': {
        'f': operator.sub,
        'f-1': operator.add
    },
    'perc': {
        'f': operator.truediv,
        'f-1': operator.mul
    },
    'log': {
        'f': lambda x,y: np.log(x)-np.log(y),
        'f-1': lambda x,y: np.exp(y)*x,
    }
}


def transform(df, k, t=transformations['diff']):
    if t != None:
        if isinstance(df, pd.DataFrame):
            return t['f'](df[k:],df[:-k].values)
        else:
            return t['f'](df[k:],df[:-k])
    else:
        return df


def inverse_transform(pred, prev, t=transformations['diff']):
    if t != None:
        if isinstance(df, pd.DataFrame):
            return t['f-1'](prev.values, pred)
        else:
            return t['f-1'](prev, pred)
    else:
        return pred


def normalize(df, scaler, fit=False, features=None):
    fn = scaler.fit_transform if fit else scaler.transform
    if isinstance(df, pd.DataFrame):
        if features is not None:
            df[features] = fn(df[features])
            return df
        else:
            index = df.index
            columns = df.columns
            df = pd.DataFrame(fn(df), index=index)
            df = df.set_axis(columns, axis=1)
            return fn(df)
    else:
        return fn(df)


def inverse_normalize(df, scaler, features):
    fn = scaler.inverse_transform
    if isinstance(df, pd.DataFrame):
        if features is not None:
            df[features] = fn(df[features].values)
            return df
        else:
            index = df.index
            columns = df.columns
            df = pd.DataFrame(fn(df), index=index)
            df = df.set_axis(columns, axis=1)
            return fn(df)
    else:
        return fn(df)

def load_model(model_file, scaler_file):
    model = keras.models.load_model(model_file)
    scaler = joblib.load(scaler_file)
    return model, scaler

In [5]:
logger.info('loading model and scaler...')
MODEL_NAME = mods_model['name']
MODEL_FILE = os.path.join('mods_models', 'models', mods_model['name'], mods_model['file'])
SCALER_FILE = os.path.join('mods_models', 'models', mods_model['name'], mods_model['scaler'])
logger.debug('MODEL_FILE: %s' % MODEL_FILE)
logger.debug('SCALER_FILE: %s' % SCALER_FILE)
MODEL,SCALER = load_model(MODEL_FILE, SCALER_FILE)

In [11]:
features = mods_model['data']['in']['columns']
features_predicted = mods_model['data']['out']
context_length = mods_model['sequence_length'] + (mods_model['prediction_steps'] if mods_model['differential'] else 0)

# store incomming messages
buffer = pd.DataFrame([], columns=features)

if STREAM_OFFSET >= 0:
    buffer = pd.DataFrame([], columns=features)
    logger.info('seeking to beginning in the streams...')
    for x in consumer.assignment():
        consumer.seek(x, STREAM_OFFSET)
        logger.debug('stream %s at %d' % (x, STREAM_OFFSET))

logger.debug('features: %s' % features)
logger.debug('features_predicted: %s' % features_predicted)
logger.debug('context_length: %s' % context_length)

In [12]:
display(buffer)
count=0
stripped_beg = False
for message in consumer:
    protocol = message.key.decode('ascii')
    df = pd.read_json(message.value, orient='index')
    df.set_index(mods_model['data']['in']['index'], inplace=True)
    cols = [col for col in df if col in features]
    df = df[cols]
    if df.empty: continue
    buffer = buffer.combine_first(df)
    if not stripped_beg and len(buffer.index) > 1 and buffer.iloc[[0]].isnull().values.any():
        buffer = buffer[1:]
        stripped_beg = True
    if len(buffer.index) >= context_length and not (buffer[-1:].isnull().values.any()):
        df = buffer[-context_length:]
        df.index = pd.to_datetime(df.index, unit='ms')
#         print("now: %s" % datetime.datetime.now())
#         print("context")
#         display(df)
#         print("transform")
        t = transformations[mods_model['transformation']]
#         print(t)
        df_x = df[1:].copy(deep=True) if mods_model['differential'] else df.copy(deep=True)
        df_x[:] = transform(df.to_numpy(), mods_model['prediction_steps'], t=t)
#         display(df_x)
#         print("normalize")
        df_x[:] = normalize(df_x.to_numpy(), SCALER)
#         display(df_x)
#         print("pred")
        pred = MODEL.predict(df_x.to_numpy()[np.newaxis])
#         display(pred)
#         print("y-pred-reshape")
        pred = pred.reshape((mods_model['prediction_steps'], len(mods_model['data']['out'])))
#         display(pred)
#         print("y-pred")
        y = df[-mods_model['prediction_steps']:][features_predicted]
        y[:] = pred
#         display(y)
#         print("y-invnorm")        
        y[:] = inverse_normalize(y.to_numpy(), SCALER, features)
#         display(y)
#         print("y-invtran")
        y[:] = inverse_transform(y.to_numpy(), prev=df[-mods_model['prediction_steps']:][features_predicted], t=t)
#         display(y)
#         print("y")
        y['model'] = MODEL_NAME
        y.reset_index(level=0, inplace=True)
        y['ts'] = y['ts'] + pd.tseries.offsets.Minute(n=10*mods_model['prediction_steps'])
#         print('BUFFER:')
#         display(buffer)
#         print('X:')
#         display(df[-mods_model['prediction_steps']:])
#         print('Y:')
#         display(y)        
        data_written = ep.to_es(
                    y,
                    'mods-10m-pred',
                    use_pandas_json=True,
                    doc_type='pred',
                    use_index=False
                )
#         display(data_written)
        #display(pred)
#     count+=1
#     if count >= 100:
#         break

Unnamed: 0,conn_count_uid_in,ssh_count_uid_in


N/A% (0 of 10) |                         | Elapsed Time: 0:00:00 ETA:  --:--:--

KeyboardInterrupt: 

In [44]:
buffer.to_csv('buffer.tsv', sep='\t')

In [57]:
data = pd.read_csv('buffer.tsv', sep='\t', index_col=['ts'])
data.index = pd.to_datetime(data.index, unit='ms')
split_at = '2021-03-17 17:00'
data_a = data.loc[:split_at] # in kibana 18:00
data_b = data.loc[split_at:][1:] # in kibana 18:00
preds = pd.DataFrame()

step = 0
df = data_a[step:step+context_length]
while len(df.index) == context_length:
    step+=1
    t = transformations[MODEL_CFG['transformation']]
    df_x = df[1:].copy(deep=True)
    df_x[:] = transform(df.to_numpy(), MODEL_CFG['prediction_steps'], t=t)
#     display(df_x)
    df_x[:] = normalize(df_x.to_numpy(), SCALER)
#     display(df_x)
    pred = MODEL.predict(df_x.to_numpy()[np.newaxis])
#     display(pred)
    y = df[-1:][features_predicted]
    y[:] = pred
    y[:] = inverse_normalize(y.to_numpy(), SCALER, features)
    y[:] = inverse_transform(y.to_numpy(), prev=df[-1:][features_predicted], t=t)
    y['model'] = MODEL_NAME
    y.reset_index(level=0, inplace=True)
    y['ts'] = y['ts'] + pd.tseries.offsets.Minute(n=10)
    preds = preds.append(y)
#     display(y)
    df = data_a[step:step+context_length]


In [58]:
display(df)
display(y)
display(data_b[:-1])
display(preds)

Unnamed: 0_level_0,conn_count_uid_in,ssh_count_uid_in
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-03-17 15:10:00,14494.0,434.0
2021-03-17 15:20:00,15154.0,581.0
2021-03-17 15:30:00,15781.0,766.0
2021-03-17 15:40:00,15193.0,782.0
2021-03-17 15:50:00,20239.0,1394.0
2021-03-17 16:00:00,28962.0,3041.0
2021-03-17 16:10:00,23375.0,1710.0
2021-03-17 16:20:00,16481.0,769.0
2021-03-17 16:30:00,15434.0,828.0
2021-03-17 16:40:00,15720.0,727.0


Unnamed: 0,ts,conn_count_uid_in,ssh_count_uid_in,model
0,2021-03-17 17:10:00,17090.800781,1150.73645,20210315-6m1m-conn-ssh


Unnamed: 0_level_0,conn_count_uid_in,ssh_count_uid_in
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-03-17 17:10:00,15486.0,568.0
2021-03-17 17:20:00,15326.0,559.0
2021-03-17 17:30:00,17105.0,748.0
2021-03-17 17:40:00,26657.0,2344.0
2021-03-17 17:50:00,28884.0,2445.0
2021-03-17 18:00:00,23318.0,1787.0
2021-03-17 18:10:00,20599.0,1788.0


Unnamed: 0,ts,conn_count_uid_in,ssh_count_uid_in,model
0,2021-03-04 14:40:00,10377.725586,642.491211,20210315-6m1m-conn-ssh
0,2021-03-04 14:50:00,9797.469727,155.466949,20210315-6m1m-conn-ssh
0,2021-03-04 15:00:00,11508.770508,605.237305,20210315-6m1m-conn-ssh
0,2021-03-04 15:10:00,21873.572266,2105.972412,20210315-6m1m-conn-ssh
0,2021-03-04 15:20:00,22339.859375,2258.614014,20210315-6m1m-conn-ssh
...,...,...,...,...
0,2021-03-17 16:30:00,16516.628906,640.048096,20210315-6m1m-conn-ssh
0,2021-03-17 16:40:00,15256.444336,495.266357,20210315-6m1m-conn-ssh
0,2021-03-17 16:50:00,15858.875000,658.539062,20210315-6m1m-conn-ssh
0,2021-03-17 17:00:00,17383.917969,385.306000,20210315-6m1m-conn-ssh


In [48]:
df = buffer[-context_length:]
df.index = pd.to_datetime(df.index, unit='ms')
df_t = transform(df, MODEL_CFG['prediction_steps'])
df_n = normalize(df_t, SCALER)
pred = MODEL.predict(df_n[np.newaxis])
y = df[-1:][features_predicted]
pred = pd.DataFrame(pred, y.index)
pred.set_axis(y.columns, axis=1, inplace=True)
dummy = pd.DataFrame(np.zeros(shape=(1, df.shape[1])), index=y.index)
dummy.set_axis(df.columns, axis=1, inplace=True)
dummy[features_predicted] = pred
dummyt = inverse_normalize(dummy, SCALER, features)
dummyt = dummyt[features_predicted]
pred = inverse_transform(dummyt, prev=df[-1:][features_predicted].values)
pred.set_axis([pred.index + pd.tseries.offsets.Minute(n=10)], axis=0, inplace=True)
pred['model'] = MODEL_NAME
pred.reset_index(level=0, inplace=True)
display(df)
display(pred)

Unnamed: 0_level_0,conn_count_uid_in,ssh_count_uid_in
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-03-14 00:00:00,24821.0,2838.0
2021-03-14 00:10:00,18508.0,2029.0
2021-03-14 00:20:00,13974.0,906.0
2021-03-14 00:30:00,15491.0,736.0
2021-03-14 00:40:00,21066.0,1441.0
2021-03-14 00:50:00,22639.0,662.0
2021-03-14 01:00:00,15588.0,587.0
2021-03-14 01:10:00,13071.0,492.0
2021-03-14 01:20:00,16154.0,774.0
2021-03-14 01:30:00,14789.0,496.0


Unnamed: 0,ts,conn_count_uid_in,ssh_count_uid_in,model
0,2021-03-14 02:10:00,13976.21698,430.213631,data_train-seq-12


In [41]:
pd.to_datetime(df.index, unit='ms')

DatetimeIndex(['2021-03-14 00:00:00', '2021-03-14 00:10:00',
               '2021-03-14 00:20:00', '2021-03-14 00:30:00',
               '2021-03-14 00:40:00', '2021-03-14 00:50:00',
               '2021-03-14 01:00:00', '2021-03-14 01:10:00',
               '2021-03-14 01:20:00', '2021-03-14 01:30:00',
               '2021-03-14 01:40:00', '2021-03-14 01:50:00',
               '2021-03-14 02:00:00'],
              dtype='datetime64[ns]', name='ts', freq=None)

In [13]:
topics = consumer.subscription()
consumer.partitions_for_topic('mods-agg-10m')

{0}

In [7]:
max = 20

topics = consumer.subscription()
for topic in topics:
    partitions = consumer.partitions_for_topic(topic)
    topic_partitions = consumer.assignment()
    for topic_partition in topic_partitions:
        print('t:%s, tp:%s' % (topic, topic_partition))
        print(consumer.offsets_for_times({topic_partition:161550780000})
        consumer.seek_to_end(topic_partition)
        end = consumer.position(topic_partition)
        beg = end - max
        print(beg,end)
        consumer.seek(topic_partition, beg)
        for m in consumer:
            if m.key.decode('ascii') == 'conn':
                print(m)


t:mods-agg-10m, tp:TopicPartition(topic='mods-agg-10m', partition=0)
{TopicPartition(topic='mods-agg-10m', partition=0): OffsetAndTimestamp(offset=0, timestamp=1614850800750)}
6784 6804
ConsumerRecord(topic='mods-agg-10m', partition=0, offset=6786, timestamp=1615556400944, timestamp_type=0, key=b'conn', value='{"0":{"ts":1615559400000,"conn_count_uid_in":16222.0,"conn_count_uid_internal":2517.0,"conn_count_uid_out":2070.0,"conn_mean_duration_in":5.9272474136,"conn_mean_duration_internal":0.3427608462,"conn_mean_duration_out":3.7189914797,"conn_nunique_uid_in":16222.0,"conn_nunique_uid_internal":2517.0,"conn_nunique_uid_out":2070.0,"conn_sum_orig_bytes_in":34612357350.0,"conn_sum_orig_bytes_internal":560210.0,"conn_sum_orig_bytes_out":1493272020.0,"conn_sum_resp_bytes_in":3426974451.0,"conn_sum_resp_bytes_internal":1722535.0,"conn_sum_resp_bytes_out":16388479478.0}}', headers=[], checksum=None, serialized_key_size=4, serialized_value_size=601, serialized_header_size=-1)
ConsumerRecord(t

KeyboardInterrupt: 

In [55]:
consumer.seek_to_end(TopicPartition(topic='mods-agg-10m', partition=0))

In [33]:
consumer.assignment()

{TopicPartition(topic='mods-agg-10m', partition=0)}

In [37]:
consumer.seek(consumer.assignment().item(0),6240)

AttributeError: 'set' object has no attribute 'item'

In [56]:
counter=0
for message in consumer:
    counter += 1
    protocol = message.key.decode('ascii')
    print(message)

KeyboardInterrupt: 