# Census Income Classification on HS

In [1]:
import grpc
import hydro_serving_grpc as hs
import numpy as np

Connect to cluster via grpc

In [2]:
# Connect to remote local\dev GRPC stub
IS_LOCAL_CLUSTER = True
if IS_LOCAL_CLUSTER:
    channel = grpc.insecure_channel("localhost:9090") 
else:
    creds = grpc.ssl_channel_credentials()
    channel= grpc.secure_channel("dev.k8s.hydrosphere.io", credentials=creds)
    
stub = hs.PredictionServiceStub(channel) 


In [3]:
# Specify model spec
model_spec = hs.ModelSpec(name="census_income_classification")

Test connection with one sample

In [4]:
def pack_value_into_request(val):
    # Pack values into protobuff tensors

    age_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[0]])
    workclass_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[1]])
    edu_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[2]])
    marital_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[3]])
    occupation_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[4]])
    relationship_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[5]])
    race_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[6]])
    sex_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[7]])
    gain_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[8]])
    loss_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[9]])
    hours_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[10]])
    country_tensor = hs.TensorProto(dtype=hs.DT_INT64, int64_val=[val[11]])
    
    # Pack tensors into a request
    request = hs.PredictRequest(model_spec=model_spec, inputs={"age": age_tensor,
                                                           "workclass": workclass_tensor, 
                                                           "education": edu_tensor,
                                                           "marital_status": marital_tensor,
                                                           "occupation": occupation_tensor,
                                                           "relationship":relationship_tensor,
                                                           "race": race_tensor,
                                                           "sex": sex_tensor,
                                                           "capital_gain": gain_tensor, 
                                                           "capital_loss": loss_tensor,
                                                           "hours_per_week":hours_tensor,
                                                           "country":country_tensor}) 
    return request

In [5]:
# Send data to "adult_scalar" application
result = stub.Predict(pack_value_into_request(np.ones((12,), dtype="int64")))
classes = result.outputs['classes'].int64_val[0]
print(f'Result class: {classes}')

Result class: 0


## Simulate production data stream

In [6]:
import pandas as pd
import time 
from tqdm.auto import tqdm

data = pd.read_csv("../data/validation.csv")
dirty_data = pd.read_csv("../data/dirty_data.csv")

In [7]:
# Send 500 samples of normal data
counter = 0
for idx, row in tqdm(data.iloc[:500].iterrows(), total=500):
    x = np.array(row)
    request = pack_value_into_request(x)
    result = stub.Predict(request)
    time.sleep(1)

HBox(children=(IntProgress(value=0, max=500), HTML(value='')))




In [17]:
# Send an infinite stream of data
counter = 0
dirty_sample = dirty_data.iloc[2500:].sample(1000)

In [18]:
for idx, row in tqdm(dirty_sample.iterrows(), total=len(dirty_sample)):
    x = np.array(row)
    request = pack_value_into_request(x)
    result = stub.Predict(request)
    time.sleep(0.25)

HBox(children=(IntProgress(value=0, max=1000), HTML(value='')))


