# Elastic knn

## Load Dataset

In [1]:
import numpy as np
import os
import pandas as pd

In [3]:
X_train_features = np.loadtxt(os.path.join("0004",'X_train_features.txt'))
X_test_features = np.loadtxt(os.path.join("0004",'X_test_features.txt'))

In [1]:
y_train = np.loadtxt(os.path.join("0004",'y_train.txt'))
y_test = np.loadtxt(os.path.join("0004",'y_test.txt'))

In [None]:
train_images = np.loadtxt(os.path.join("0004",'train_images.txt'))
test_images = np.loadtxt(os.path.join("0004",'test_images.txt'))

In [None]:
X_train_features.shape

In [None]:
X_test_features.shape

In [None]:
y_train.shape

In [None]:
y_test.shape

In [None]:
train_images

In [None]:
test_images

In [25]:
train_df = pd.DataFrame(X_train_features)
train_df['target'] = y_train.astype(int)

In [26]:
test_df = pd.DataFrame(X_test_features)
test_df['target'] = y_test.astype(int)

## Elastic KNN

### Install Packages

In [None]:
!pip install tensorflow-io
!pip install elasticsearch

### Import Libraries

In [None]:
import os
import time
from sklearn.model_selection import train_test_split
from elasticsearch import Elasticsearch
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_io as tfio

### Validate tf and tfio imports

In [None]:
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))

### Download and setup the Elasticsearch instance

In [None]:
%%bash

wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512
tar -xzf elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
sudo chown -R daemon:daemon elasticsearch-7.9.2/
shasum -a 512 -c elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512 

#### Run Instance

In [None]:
%%bash --bg

sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch

In [None]:
# Sleep for few seconds to let the instance start.
time.sleep(20)

#### Check Availability

In [None]:
%%bash

ps -ef | grep elasticsearch

#### Check local host

In [None]:
%%bash

curl -sX GET "localhost:9200/"

### Store Database in Elastic Cluster

In [None]:
ES_NODES = "http://localhost:9200"

def prepare_es_data(index, doc_type, df):
    records = df.to_dict(orient="records")
    es_data = []
    for idx, record in enumerate(records):
        meta_dict = {
            "index": {
                "_index": index, 
                "_type": doc_type, 
                "_id": idx
            }
        }
        es_data.append(meta_dict)
        es_data.append(record)
    
    return es_data

def index_es_data(index, es_data):
    
    es_client = Elasticsearch(hosts = [ES_NODES])
    
    if es_client.indices.exists(index):
        print("deleting the '{}' index.".format(index))
        res = es_client.indices.delete(index=index)
        print("Response from server: {}".format(res)
              
    print("creating the '{}' index.".format(index))
    res = es_client.indices.create(index=index)
    print("Response from server: {}".format(res))

    print("bulk index the data")
    res = es_client.bulk(index=index, body=es_data, refresh = True)
    print("Errors: {}, Num of records indexed: {}".format(res["errors"], len(res["items"])))

In [None]:
train_es_data = prepare_es_data(index="train", doc_type="images", df=train_df)
test_es_data = prepare_es_data(index="test", doc_type="images", df=test_df)

index_es_data(index="train", es_data=train_es_data)
time.sleep(3)
index_es_data(index="test", es_data=test_es_data)

### Prepare tfio Dataset

#### Train

In [None]:
BATCH_SIZE=32
HEADERS = {"Content-Type": "application/json"}

train_ds = tfio.experimental.elasticsearch.ElasticsearchIODataset(
        nodes=[ES_NODES],
        index="train",
        doc_type="images",
        headers=HEADERS
    )

# Prepare a tuple of (features, label)
train_ds = train_ds.map(lambda v: (v, v.pop("target")))
train_ds = train_ds.batch(BATCH_SIZE)

#### Test

In [None]:
test_ds = tfio.experimental.elasticsearch.ElasticsearchIODataset(
        nodes=[ES_NODES],
        index="test",
        doc_type="images",
        headers=HEADERS
    )

# Prepare a tuple of (features, label)
test_ds = test_ds.map(lambda v: (v, v.pop("target")))
test_ds = test_ds.batch(BATCH_SIZE)

### Pre-Processing

In [None]:
def get_normalization_layer(name, dataset):
    normalizer = preprocessing.Normalization()
    
    feature_ds = dataset.map(lambda x, y: x[name])
    
    normalizer.adapt(feature_ds)
    
    return normalizer

In [None]:
all_inputs = []
encoded_features = []

for header in train_df.columns[: -1]:
    
    numeric_col = tf.keras.Input(shape=(1,), name=header)
    
    normalization_layer = get_normalization_layer(header, train_ds)
    
    encoded_numeric_col = normalization_layer(numeric_col)
    
    all_inputs.append(numeric_col)
    
    encoded_features.append(encoded_numeric_col)

### Build Compile & Train Model

In [None]:
# Set the parameters
OPTIMIZER="adam"
LOSS=tf.keras.losses.CategoricalCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10

In [None]:
# Convert the feature columns into a tf.keras layer
all_features = tf.keras.layers.concatenate(encoded_features)

# design/build the model
x = tf.keras.layers.Dense(32, activation="relu")(all_features)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.5)(x)

output = tf.keras.layers.Dense(5748, activation="softmax")(x)

model = tf.keras.Model(all_inputs, output)

tf.keras.utils.plot_model(model, rankdir='LR', show_shapes=True)

In [None]:
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [None]:
model.fit(train_ds, epochs=EPOCHS)

### Evaluate Model

In [None]:
res = model.evaluate(test_ds)
print("test loss, test acc:", res)