# Apache Kafka Integration + Preprocessing / Interactive Analysis with KSQL

This notebook uses the combination of Python, Apache Kafka, KSQL for Machine Learning infrastructures. This is part of [DLAKE-1022](https://jira.knab.nl/browse/DLAKE-1022), an initiative to implement a streaming data framework for Knab.


We are looking into 3 use cases;
* A - Data Integration Layer (SOA/EDA/Microservices & DatStream Processing Framework)
* B - Stream Processing Framework ( Online transformations and business rules & Online machine learning
* C - Next Generation Data Storage Platform (Efficient state persistence & Change data capture)

Author: Max van Rooijen (max.van.rooijen@knab.nl), based on a notebook work by [Kai Waehner](https://github.com/kaiwaehner/python-jupyter-apache-kafka-ksql-tensorflow-keras).


## Data Integration and Preprocessing with Python and KSQL

In [1]:
import json

from time import sleep
from json import dumps
from kafka import KafkaProducer

In [2]:
from ksql import KSQLAPI
client = KSQLAPI('http://ksqldb-server:8088')

In [3]:
producer = KafkaProducer(bootstrap_servers=['broker:29092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

In [4]:
from faker import Faker
fake = Faker()

In [5]:
producer.send('transactions', value={
    'transaction_id': "RF" + str(fake.pyint(5)),
    'transaction_type': "transaction_" + str(fake.pyint(1)),
    'from_account': fake.iban(),
    'to_account': fake.iban(),
    'amount_cents': fake.pyint(),
    'created_at': fake.date_time().strftime("%Y/%m/%d, %H:%M:%S")
})

<kafka.producer.future.FutureRecordMetadata at 0x7f38610edfd0>

Consume source data from Kafka Topic "creditcardfraud_source":

In [6]:
client.create_stream(table_name='TRANSACTIONS',
                     columns_type=['transaction_id string',
                                   'transaction_type string',
                                   'from_account string',
                                   'to_account string',
                                   'amount_cents integer',
                                   'created_at string'],
                     topic='transactions',
                     value_format='JSON')

True

Create a materialized table (no high level method available)

In [7]:
qr = """CREATE TABLE agg AS
  SELECT transaction_type, COUNT(*) AS num_transactions, SUM(amount_cents) AS total_value
  FROM TRANSACTIONS GROUP BY transaction_type EMIT CHANGES
  """

client.ksql(qr)


[{'@type': 'currentStatus',
  'statementText': "CREATE TABLE AGG WITH (KAFKA_TOPIC='AGG', PARTITIONS=1, REPLICAS=1) AS SELECT\n  TRANSACTIONS.TRANSACTION_TYPE TRANSACTION_TYPE,\n  COUNT(*) NUM_TRANSACTIONS,\n  SUM(TRANSACTIONS.AMOUNT_CENTS) TOTAL_VALUE\nFROM TRANSACTIONS TRANSACTIONS\nGROUP BY TRANSACTIONS.TRANSACTION_TYPE\nEMIT CHANGES;",
  'commandId': 'table/`AGG`/create',
  'commandStatus': {'status': 'SUCCESS',
   'message': 'Created query with ID CTAS_AGG_0'},
  'commandSequenceNumber': 2,

Preprocessing: 

- Filter columns which are not needed 
- Filter messages where column 'class' is empty
- Change data format to Avro for more convenient further processing


In [8]:
client.create_stream_as(table_name='transactions_high',
                     select_columns=['transaction_id', 'transaction_type', 'from_account', 'to_account', 'amount_cents', 'created_at'],
                     src_table='transactions',
                     conditions='amount_cents > 5000',
                     kafka_topic='transactions_high',
                     value_format='JSON')

True

Take a look at the creates KSQL Streams:

In [54]:
client.ksql('show streams')

[{'@type': 'streams',
  'statementText': 'show streams;',
  'streams': [{'type': 'STREAM',
    'name': 'TRANSACTIONS',
    'topic': 'transactions',
    'format': 'JSON'},
   {'type': 'STREAM',
    'name': 'TRANSACTIONS_HIGH',
    'topic': 'transactions_high',
    'format': 'JSON'}],

Take a look at the metadata of the KSQL Stream:

In [55]:
client.ksql('describe TRANSACTIONS_HIGH')

[{'@type': 'sourceDescription',
  'statementText': 'describe TRANSACTIONS_HIGH;',
  'sourceDescription': {'name': 'TRANSACTIONS_HIGH',
   'windowType': None,
   'readQueries': [],
   'writeQueries': [{'queryString': "CREATE STREAM TRANSACTIONS_HIGH WITH (KAFKA_TOPIC='transactions_high', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT\n  TRANSACTIONS.TRANSACTION_ID TRANSACTION_ID,\n  TRANSACTIONS.TRANSACTION_TYPE TRANSACTION_TYPE,\n  TRANSACTIONS.FROM_ACCOUNT FROM_ACCOUNT,\n  TRANSACTIONS.TO_ACCOUNT TO_ACCOUNT,\n  TRANSACTIONS.AMOUNT_CENTS AMOUNT_CENTS,\n  TRANSACTIONS.CREATED_AT CREATED_AT\nFROM TRANSACTIONS TRANSACTIONS\nWHERE (TRANSACTIONS.AMOUNT_CENTS > 5000)\nEMIT CHANGES;",
     'sinks': ['TRANSACTIONS_HIGH'],
     'sinkKafkaTopics': ['transactions_high'],
     'id': 'CSAS_TRANSACTIONS_HIGH_3',
     'state': 'RUNNING'}],
   'fields': [{'name': 'ROWTIME',
     'schema': {'type': 'BIGINT', 'fields': None, 'memberSchema': None}},
    {'name': 'ROWKEY',
     'schema': {'type'

### Materialized Views ###

Interactive query statement:

In [None]:
d

In [56]:
 
res = """{"row":{"columns":[1610726138178,null,"RF12111","GB85XZQL75965818871538","GB80IUYD16603233079843",7385,"1983/04/18, 07:44:25"]}},
"""

jres = json.loads(res[:-2])

print(jres["row"]["columns"][5])


7385


In [64]:
query = client.query('SELECT * FROM TRANSACTIONS_HIGH EMIT CHANGES')

for item in query: 
    print(item)
    try:
        jres = json.loads(item[:-2])
        print(jres)
        print(jres["row"]["columns"][5])
    except:
        print("Wrong input")

[{"header":{"queryId":"none","schema":"`ROWTIME` BIGINT, `ROWKEY` STRING, `TRANSACTION_ID` STRING, `TRANSACTION_TYPE` STRING, `FROM_ACCOUNT` STRING, `TO_ACCOUNT` STRING, `AMOUNT_CENTS` INTEGER, `CREATED_AT` STRING"}},

Wrong input
{"row":{"columns":[1610728033298,null,"RF333","transaction_6356","GB08OVGC18106202405257","GB40PVNR48124202324734",8771,"1972/08/06, 12:54:51"]}},

{'row': {'columns': [1610728033298, None, 'RF333', 'transaction_6356', 'GB08OVGC18106202405257', 'GB40PVNR48124202324734', 8771, '1972/08/06, 12:54:51']}}
GB40PVNR48124202324734
{"row":{"columns":[1610728043308,null,"RF5139","transaction_4381","GB92RUPJ87312363637369","GB25CQOX86842005592242",9960,"1974/09/26, 14:35:08"]}},

{'row': {'columns': [1610728043308, None, 'RF5139', 'transaction_4381', 'GB92RUPJ87312363637369', 'GB25CQOX86842005592242', 9960, '1974/09/26, 14:35:08']}}
GB25CQOX86842005592242
{"row":{"columns":[1610728083348,null,"RF1913","transaction_9665","GB57JBNI00763933938639","GB73ILED61547761202238"

KeyboardInterrupt: 

### Additional (optional) analysis and preprocessing examples

Some more examples for possible data wrangling and preprocessing with KSQL:

- Anonymization
- Augmentation
- Merge / Join data frames

In [103]:
query = client.query('SELECT transaction_id, MASK_LEFT(from_account, 2) FROM TRANSACTIONS EMIT CHANGES')

for item in query: 
    print(item)

[{"header":{"queryId":"none","schema":"`TRANSACTION_ID` STRING, `KSQL_COL_1` STRING"}},

{"row":{"columns":["RF12111","XX94RQMU23438827339470"]}},

{"row":{"columns":["RF12111","XX83WIKR43447254751484"]}},



KeyboardInterrupt: 

# Mapping from KSQL to NumPy / pandas for Machine Learning tasks

In [25]:
import numpy as np
import pandas as pd
import json

The query below command returns a Python generator. It can be printed e.g. by reading its values via next(query) or a for loop.

Due to a current [bug in ksql-python library](https://github.com/bryanyang0528/ksql-python/issues/57), we need to to an additional line of Python code to strip out unnecessary info and change to 2D array 

In [63]:
query = client.query('select * from AGG WHERE ROWKEY = \'transaction_6\'') # Returns a Python generator object

try:
    for item in query:
        print(item)
except RuntimeError:
    print("^^ final result ^^")


[{"header":{"queryId":"query_1610727957173","schema":"`ROWKEY` STRING KEY, `ROWTIME` BIGINT, `TRANSACTION_TYPE` STRING, `NUM_TRANSACTIONS` BIGINT, `TOTAL_VALUE` INTEGER"}}]
^^ final result ^^


In [100]:
query = client.query('select * from AGG WHERE ROWKEY=\'GB73ZPWA03514126591488\';') # Returns a Python generator object

#items = [item for item in query][:-1]        # -1 to remove last record that is a dummy msg for "Limit Reached"          
#one_record = json.loads(''.join(items))      # Join two records as one as ksql-python is splitting it into two?          
#data = [one_record['row']['columns'][2:-1]]  # Strip out unnecessary info and change to 2D array                     
#df = pd.DataFrame(data=data)   

records = [json.loads(r) for r in ''.join(query).strip().replace('\n\n\n\n', '').split('\n')]
data = [r['row']['columns'][2:] for r in records[:-1]]
#data = r['row']['columns'][2] for r in records
df = pd.DataFrame(data=data, columns=['transaction_id', 'from_account', 'to_account', 'amount_cents', 'created_at'])
df

RuntimeError: generator raised StopIteration

In [None]:
! cat example_data | redis-cli --pipe

### Generate some test data 

As discussed in the step-by-step guide, you have various options. Here we - ironically - read messages from a CSV file. This is for simple demo purposes so that you don't have to set up a real continuous Kafka stream. 

In real world or more advanced examples, you should connect to a real Kafka data stream (for instance using the Kafka data generator or Kafka Connect).

Here we just consume a few messages for demo purposes so that they get mapped into a pandas dataframe:

                cat /Users/kai.waehner/git-projects/python-jupyter-apache-kafka-ksql-tensorflow-keras/data/creditcard_extended.csv | kafka-console-producer --broker-list localhost:9092 --topic creditcardfraud_source
                
You need to do this from command line because Jupyter cannot execute this in parallel to above KSQL query.

# Preprocessing with Pandas + Model Training with TensorFlow / Keras

#### BE AWARE: You need enough messages in the pandas data frame to train the model in the below cells (if you just play around with ksql-python and just add a few Kafka events, it is not a sufficient number of rows to continue. You can simply change to df = pd.read_csv("data/creditcard.csv") as shown below in this case to get a bigger data set...


This part only includes the steps required for model training of the Autoencoder with Keras and TensorFlow. 

If you want to get a better understanding of the model, take a look at the other notebook [Python Tensorflow Keras Fraud Detection Autoencoder.ipynb](http://localhost:8888/notebooks/Python%20Tensorflow%20Keras%20Fraud%20Detection%20Autoencoder.ipynb) which includes many more details, plots and explanations.

[Kudos to David Ellison](https://www.datascience.com/blog/fraud-detection-with-tensorflow).

[The credit card fraud data set is available at Kaggle](https://www.kaggle.com/mlg-ulb/creditcardfraud/data).

In [None]:
# import packages
# matplotlib inline
#import pandas as pd
#import numpy as np
from scipy import stats
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_recall_curve
from sklearn.metrics import recall_score, classification_report, auc, roc_curve
from sklearn.metrics import precision_recall_fscore_support, f1_score
from sklearn.preprocessing import StandardScaler
from pylab import rcParams
from keras.models import Model, load_model
from keras.layers import Input, Dense
from keras.callbacks import ModelCheckpoint, TensorBoard
from keras import regularizers

In [None]:
# Use the dataframe from above (imported and preprocessed with KSQL)

# As alternative directly import from a CSV file ("the normal approach without Kafka and streaming data")

# "data/creditcard_small.csv" is a very small data set (just for quick demo purpose to get a model binary)
# => replace with "data/creditcard.csv" to use a real data set to train a model with good accuracy
#df = pd.read_csv("data/creditcard.csv") 


df.head(n=5) #just to check you imported the dataset properly

In [None]:
#set random seed and percentage of test data
RANDOM_SEED = 314 #used to help randomly select the data points
TEST_PCT = 0.2 # 20% of the data

#set up graphic style in this case I am using the color scheme from xkcd.com
rcParams['figure.figsize'] = 14, 8.7 # Golden Mean
LABELS = ["Normal","Fraud"]
#col_list = ["cerulean","scarlet"]# https://xkcd.com/color/rgb/
#sns.set(style='white', font_scale=1.75, palette=sns.xkcd_palette(col_list))

In [None]:
normal_df = [df.Class == 0] #save normal_df observations into a separate df
fraud_df = [df.Class == 1] #do the same for frauds

In [None]:
#data = df.drop(['Time'], axis=1) #if you think the var is unimportant
df_norm = df
df_norm['Time'] = StandardScaler().fit_transform(df_norm['Time'].values.reshape(-1, 1))
df_norm['Amount'] = StandardScaler().fit_transform(df_norm['Amount'].values.reshape(-1, 1))

In [None]:
train_x, test_x = train_test_split(df_norm, test_size=TEST_PCT, random_state=RANDOM_SEED)
train_x = train_x[train_x.Class == 0] #where normal transactions
train_x = train_x.drop(['Class'], axis=1) #drop the class column

test_y = test_x['Class'] #save the class column for the test set
test_x = test_x.drop(['Class'], axis=1) #drop the class column

train_x = train_x.values #transform to ndarray
test_x = test_x.values

### My Jupyter Notebook crashed sometimes in the next step 'model training' (probably memory issues):

In [None]:
# Reduce number of epochs and batch_size if your Jupyter crashes (due to memory issues)
# nb_epoch = 100
# batch_size = 128
nb_epoch = 5
batch_size = 32

input_dim = train_x.shape[1] #num of columns, 30
encoding_dim = 14
hidden_dim = int(encoding_dim / 2) #i.e. 7
learning_rate = 1e-7

input_layer = Input(shape=(input_dim, ))
encoder = Dense(encoding_dim, activation="tanh", activity_regularizer=regularizers.l1(learning_rate))(input_layer)
encoder = Dense(hidden_dim, activation="relu")(encoder)
decoder = Dense(hidden_dim, activation='tanh')(encoder)
decoder = Dense(input_dim, activation='relu')(decoder)
autoencoder = Model(inputs=input_layer, outputs=decoder)

In [None]:
autoencoder.compile(metrics=['accuracy'],
                    loss='mean_squared_error',
                    optimizer='adam')

cp = ModelCheckpoint(filepath="models/autoencoder_fraud.h5",
                               save_best_only=True,
                               verbose=0)

tb = TensorBoard(log_dir='./logs',
                histogram_freq=0,
                write_graph=True,
                write_images=True)

history = autoencoder.fit(train_x, train_x,
                    epochs=nb_epoch,
                    batch_size=batch_size,
                    shuffle=True,
                    validation_data=(test_x, test_x),
                    verbose=1,
                    callbacks=[cp, tb]).history

In [None]:
autoencoder = load_model('models/autoencoder_fraud.h5')


In [None]:
test_x_predictions = autoencoder.predict(test_x)
mse = np.mean(np.power(test_x - test_x_predictions, 2), axis=1)
error_df = pd.DataFrame({'Reconstruction_error': mse,
                        'True_class': test_y})
error_df.describe()

The binary 'models/autoencoder_fraud.h5' is the trained model which can then be deployed anywhere to do prediction on new incoming events in real time. 

# Model Deployment

This demo focuses on the combination of Python and KSQL for data preprocessing and model training. If you want to understand the relation between Apache Kafka, KSQL and Python-related Machine Learning tools like TensorFlow for model deployment and monitoring, please check out my other Github projects:

Some examples of model deployment in Kafka environments:

- [Analytic models (TensorFlow, Keras, H2O and Deeplearning4j) embedded in Kafka Streams microservices](https://github.com/kaiwaehner/kafka-streams-machine-learning-examples)
- [Anomaly detection of IoT sensor data with a model embedded into a KSQL UDF](https://github.com/kaiwaehner/ksql-udf-deep-learning-mqtt-iot)
- [RPC communication between Kafka Streams application and model server (TensorFlow Serving)](https://github.com/kaiwaehner/tensorflow-serving-java-grpc-kafka-streams)

# Combine with Redis Data

Add data to redis instance

In [58]:
! cat example_data | redis-cli --pipe -h redis

All data transferred. Waiting for the last reply...
Last reply received from server.
errors: 0, replies: 10


In [59]:
! pip install redis

Collecting redis
  Downloading redis-3.5.3-py2.py3-none-any.whl (72 kB)
[K     |████████████████████████████████| 72 kB 976 kB/s eta 0:00:011
[?25hInstalling collected packages: redis
Successfully installed redis-3.5.3


In [60]:
import redis

In [61]:
r = redis.Redis(host="redis")

In [62]:
r.get("transaction_6")

b'cat_6'

# Oracle Test

In [1]:
! pip install cx_Oracle

Collecting cx_Oracle
  Downloading cx_Oracle-8.1.0-cp38-cp38-manylinux1_x86_64.whl (825 kB)
[K     |████████████████████████████████| 825 kB 7.0 MB/s eta 0:00:01
[?25hInstalling collected packages: cx-Oracle
Successfully installed cx-Oracle-8.1.0


In [17]:
import os
os.environ["ORACLE_HOME"] = "/opt/oracle/instantclient_19_8"


In [18]:
os.getenv("ORACLE_HOME")

'/opt/oracle/instantclient_19_8'

In [16]:
import cx_Oracle

dsn_tns = cx_Oracle.makedsn('oracledb', '1521', service_name='xe') # if needed, place an 'r' before any parameter in order to address special characters such as '\'.
conn = cx_Oracle.connect(user=r'system', password='oracle', dsn=dsn_tns) # if needed, place an 'r' before any parameter in order to address special characters such as '\'. For example, if your user name contains '\', you'll need to place 'r' before the user name: user=r'User Name'
query = """select * from ALL_USERS"""

c = conn.cursor()
c.execute(query) # use triple quotes if you want to spread your query across multiple lines
for row in c:
    print (row[0], '-', row[1]) # this only shows the first two columns. To add an additional column you'll need to add , '-', row[2], etc.
conn.close()


# Appendix: Pandas analysis with above Fraud Detection Data

In [None]:
df = pd.read_csv("data/creditcard.csv")

In [None]:
df.head()

In [None]:
df.shape

In [None]:
df.index

In [None]:
df.columns

In [None]:
df.values

In [None]:
df.describe()

In [None]:
df['Amount']

In [None]:
df[0:3]

In [None]:
df.iloc[1,1]

In [None]:
# Takes a minute or two (big CSV file)...
#df.plot()