# DO IMPORTS

In [None]:
# Import dependencies
import random
from confluent_kafka import TopicPartition,Producer,Consumer
import uuid
import certifi
import codecs
import os

import mlflow
import sklearn
import sklearn.datasets
from sklearn.linear_model import LogisticRegression
import pandas
import pickle

CLUSTER_API_KEY= os.environ['KAFKA_USER_NAME']
CLUSTER_API_SECRET= os.environ['KAFKA_PASSWORD']
KAFKA_BOOTSTRAP_SERVERS = os.environ['KAFKA_BOOTSTRAP_SERVERS']
os.environ['KAFKA_USER_NAME']=CLUSTER_API_KEY
os.environ['KAFKA_PASSWORD']=CLUSTER_API_SECRET
os.environ['KAFKA_BOOTSTRAP_SERVERS']='pkc-6ojv2.us-west4.gcp.confluent.cloud:9092'
model_name= os.environ['DOMINO_PROJECT_NAME']
model_name_prefix = model_name
uuid=uuid.uuid1()

FEATURES_TOPIC=f'{model_name_prefix}-features'
PREDICTION_TOPIC=f'{model_name_prefix}-predictions'
MODEL_UPDATE_TOPIC=f'{model_name_prefix}-updates'

# Create a test model and publish to MLflow Model Registry

![Alt text](./images/publish_model.png)

## What is a Domino Model API Endpoint in the Streaming Context

A Domino API Endpoint is a multi-relica REST Endpoint which also acts as a Kafka Consumer and Producer. It consumes features and model version updates 
from separate Kafka topics and generates predictions and writes them to the prediction topic. This mode of predictions operates on the push mechanism 
where the incoming features are passed to the latest model version and generates new predictions.

It also provides an API Endpoint to invoke interactively. By default it uses the latest model but can also be used to generate perdiction of any previous model version.

A Model Endpoint is configured with-
1. A Python file containing the model code
2. A function call to be invoked when invoking the model via REST call
3. Environment variables which contain all the connectivity details for Kafka (These can be be also injected in more secure manner such as Vault or 
                                                                               IRSA or Workload Idenities)
4. A Kafka Consumer or Producer that is continuously listening.


The Kafka Consumers for features across all model api instances will use the same consumer group id. However the Kafka Consumer for the Model Updates topic will use
a unique group id per instance.


![Alt text](./images/make_rest_calls.png)

In [None]:
EXPERIMENT_NAME = model_name_prefix
#EXPERIMENT_ID = mlflow.create_experiment(EXPERIMENT_NAME)
exp = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
if not exp:
    EXPERIMENT_ID = mlflow.create_experiment(EXPERIMENT_NAME)
else:
    EXPERIMENT_ID = exp.experiment_id


In [None]:

#X, y = make_blobs(n_samples=100, centers=2, n_features=2, random_state=1)
idx=0
RUN_NAME = f"run_{idx}"
with mlflow.start_run(experiment_id=EXPERIMENT_ID, run_name=RUN_NAME) as run:
    # Retrieve run id
    RUN_ID = run.info.run_id
    data = sklearn.datasets.make_classification(n_samples=1000, n_classes=2,n_clusters_per_class=1, n_features=5,n_informative=2, n_redundant=0, n_repeated=0)
    X = data[0]
    y = data[1]
    model = LogisticRegression()
    model.fit(X, y)
    # Track parameters
    mlflow.log_param("n_classes", 2)
    mlflow.log_param("n_clusters_per_class", 1)
    mlflow.log_param("n_features", 5)
    mlflow.log_param("n_informative", 2)
    mlflow.log_param("n_redundant", 0)
    mlflow.log_param("n_repeated", 0)

    # Track metrics - Value is fake for this demo
    accuracy = 90
    mlflow.log_metric("accuracy", accuracy)
    

    # Track model
    output = mlflow.sklearn.log_model(model, "regression",registered_model_name=model_name)

In [None]:
#print(output.model_uri)
client =  mlflow.tracking.MlflowClient()
registered_model = client.get_registered_model(model_name)
#print(registered_model)
versions = registered_model.latest_versions
print(versions[0])
model_name = versions[0].name
model_version = versions[0].version


In [None]:
client.transition_model_version_stage(
  name=model_name,
  version=model_version,
  stage='Staging',
)
#client.get_registered_model(model_name)


In [None]:
#Test Model Artifact Serialization and Deserialization
import codecs
import pickle

pickled = codecs.encode(pickle.dumps(model), "base64").decode()
model_instance = pickle.loads(codecs.decode(pickled.encode(), "base64"))
print(type(model_instance))
model_instance.predict([[1,1,1,1,1]])[0]      

# Publish Serialized Model To Kafka Topic

**All Domino Model API Instances will have the same Kafka Consumer Group Id to ensure that messages are processed mostly once, and idempotently**

![Alt text](./images/kafka_mlflow.png)


In [None]:
# Import dependencies
import os
import random
import certifi
from confluent_kafka import TopicPartition,Producer,Consumer
import json

import uuid
uuid=uuid.uuid1()
version = model_version

model_name = model_name
client_id = f'{model_name}-publish'
producer_conf = {'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS'),
                 'sasl.username': os.environ.get('KAFKA_USER_NAME'),
                 'sasl.password': os.environ.get('KAFKA_PASSWORD'),
                 'sasl.mechanism': 'PLAIN',
                 'security.protocol': 'SASL_SSL',
                 'ssl.ca.location': certifi.where(),
                 'client.id': client_id}

MODEL_UPDATES_TOPIC=f'{model_name}-updates'
print(MODEL_UPDATES_TOPIC)

model_updates_producer = Producer(producer_conf)


import codecs
import pickle
pickled = codecs.encode(pickle.dumps(model), "base64").decode()
model_json={'model':pickled, 'version':version}
#unpickled = pickle.loads(codecs.decode(model_json['model'].encode(), "base64"))
model_updates_producer.produce(MODEL_UPDATES_TOPIC,value=json.dumps(model_json), key=str(uuid))
model_updates_producer.flush()

In [None]:
# Read Published Models


![Alt text](./images/kafka_mlflow.png)

In [None]:
import uuid
uuid=uuid.uuid1()
group_id = f'grp-{uuid}'
import time
import os
from confluent_kafka import TopicPartition,Producer,Consumer
import certifi

def get_latest_model(models:{},group_id):    
    attempt=1
    latest_version=0
    model_update_consumer_conf = {
                     'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS'),
                     'sasl.username': os.environ.get('KAFKA_USER_NAME'),
                     'sasl.password': os.environ.get('KAFKA_PASSWORD'),
                     'sasl.mechanism': 'PLAIN',
                     'security.protocol': 'SASL_SSL',
                     'ssl.ca.location': certifi.where(),
                     'group.id': group_id,
                     'enable.auto.commit': False,
                     'auto.offset.reset': 'earliest'}
    model_updates_tls = []
    model_updates_tls.append(TopicPartition(MODEL_UPDATE_TOPIC, 0))

    model_update_consumer = Consumer(model_update_consumer_conf)
    model_update_consumer.assign(model_updates_tls)    
    msg = model_update_consumer.poll(timeout=1.0)
    if not msg:
        msg = model_update_consumer.poll(timeout=1.0)    
    while(True):
        if(msg):
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                    (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    sys.stderr.write(f'Error code{msg.error().code()} \n')
            else:
                model_json = json.loads(msg.value().decode("utf-8"))
                picked_str=model_json['model']
                model_instance = pickle.loads(codecs.decode(model_json['model'].encode(), "base64"))
                model_version = int(model_json['version'])
                print(f'Retrived{model_version}')
                models[model_version]=model_instance
                if(model_version>latest_version):
                    latest_version = model_version
                model_update_consumer.commit()
        
            msg = model_update_consumer.poll(timeout=1.0) 
        else:
            print('Waiting')
            msg = model_update_consumer.poll(timeout=10.0) 
    print('Returning')
    

In [None]:
import threading
import uuid
uuid=uuid.uuid1()
group_id = f'grp-{uuid}'
models = {}
x = threading.Thread(target=get_latest_model, args=(models,group_id))
x.start()


In [None]:
X = data[0][0:5]
Y = data[1][0:5]
print(X)

# Generate Features

![Alt text](./images/generate_features.png)

In [None]:
# Import dependencies
import os
import random
import certifi
from confluent_kafka import TopicPartition,Producer,Consumer
import json
model_name='example_model'
client_id='client-features-1'
producer_conf = {
                 'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS'),
                 'sasl.username': os.environ.get('KAFKA_USER_NAME'),
                 'sasl.password': os.environ.get('KAFKA_PASSWORD'),
                 'sasl.mechanism': 'PLAIN',
                 'security.protocol': 'SASL_SSL',
                 'ssl.ca.location': certifi.where(),
                 'client.id': client_id}


features_producer = Producer(producer_conf)
X = data[0]
Y = data[1]
print(X)
for index in range(5):
    x_test = X[index].tolist()
    y_test = Y[index].tolist()
    json.dumps({'X':x_test,'Y':str(y_test)}).encode('utf-8')
    k_record = json.dumps({'X':x_test,'Y':str(y_test)}).encode('utf-8')
    features_producer.produce(FEATURES_TOPIC, value=k_record, key=str(index))
    if index>0 and index%1000==0:
        print('Flushing')
        features_producer.flush()
print('Flushing')
features_producer.flush()

# Make Predictions

In [None]:
def consume_features(group_id:str):    

    model_name='example_model'
    FEATURES_TOPIC=f'{model_name}-features'
    print(FEATURES_TOPIC)

    latest_version = max(list(models.keys()))

    features_tls = []
    features_tls.append(TopicPartition(FEATURES_TOPIC, 0))
    features_tls.append(TopicPartition(FEATURES_TOPIC, 1))
    features_tls.append(TopicPartition(FEATURES_TOPIC, 2))
    features_tls.append(TopicPartition(FEATURES_TOPIC, 3))

    #Only one model instance recieves the message (Each has the SAME consumer group)
    features_consumer_conf = {
                     'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS'),
                     'sasl.username': os.environ.get('KAFKA_USER_NAME'),
                     'sasl.password': os.environ.get('KAFKA_PASSWORD'),
                     'sasl.mechanism': 'PLAIN',
                     'security.protocol': 'SASL_SSL',
                     'ssl.ca.location': certifi.where(),
                     'group.id': group_id,
                     'enable.auto.commit': True,
                     'auto.offset.reset': 'latest'}
    features_consumer = Consumer(features_consumer_conf)
    #features_consumer.subscribe([FEATURES_TOPIC])
    features_consumer.assign(features_tls)    

    client_id='client-1'
    producer_conf = {
                     'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS'),
                     'sasl.username': os.environ.get('KAFKA_USER_NAME'),
                     'sasl.password': os.environ.get('KAFKA_PASSWORD'),
                     'sasl.mechanism': 'PLAIN',
                     'security.protocol': 'SASL_SSL',
                     'ssl.ca.location': certifi.where(),
                     'client.id': client_id}
    PREDICTION_TOPIC=f'{model_name}-predictions'
    predictions_producer = Producer(producer_conf)

    msg = None
    msg = features_consumer.poll(1)    
    while(True):
        if msg:
            #latest_version = max(list(models.keys()))
            features_json = json.loads(msg.value().decode("utf-8"))        
            
            y_hat = models[latest_version].predict([features_json['X']])[0]
            features_consumer.commit()
            features_json['Y_HAT']=str(y_hat)
            features_json['MODEL_VERSION']=str(latest_version)
            p_record = json.dumps(features_json).encode('utf-8')
            predictions_producer.produce(PREDICTION_TOPIC, value=p_record, key=msg.key())
            predictions_producer.flush()
            msg = features_consumer.poll(1)    
            
        else:
            print('waiting for more features')
            msg = features_consumer.poll(10)    

In [None]:
inference_group_id = f'sameer-1'
cf = threading.Thread(target=consume_features, args=(inference_group_id,))
cf.start()

In [None]:
import os
os.environ['CLUSTER_API_KEY']=CLUSTER_API_KEY
os.environ['CLUSTER_API_SECRET']=CLUSTER_API_SECRET
os.environ['BOOTSTRAP_SERVERS']='pkc-6ojv2.us-west4.gcp.confluent.cloud:9092'
os.environ['MODEL_NAME']='example_model'
os.environ['INFERENCE_GROUP_ID']='sameer-test-1'

## Make REST API Calls to Model API Endpoint
![Alt text](./images/make_rest_calls.png)


import kafka_model 

kafka_model.predict(x=[1,1,1,1,1],version=2)

In [None]:
kafka_model.init()