In [1]:
# Example of simple model load and evaluate

# ===============LICENSE_START=======================================================
# Apache-2.0
# ===================================================================================
# Copyright (C) 2019 AT&T Intellectual Property  All rights reserved.
# ===================================================================================
# This software file is distributed by AT&T
# under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# This file is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============LICENSE_END=========================================================


In [2]:
import numpy as np
import pandas as pd
import os,sys,shutil  # file checks
import dill as pickle   # serialize functions and data as compressed binary 
import gzip  # compression 
import yaml   # configuration file
import time  # time tracking

import threading  # threaded process evals

from acumos.wrapped import load_model
from acumos.modeling import Model, List, Dict, create_namedtuple, create_dataframe
from acumos.session import AcumosSession, Requirements

import util_call
import util_review

# load our configuration
config_path = 'config.yaml'
if not os.path.isfile(config_path):
    print("Sorry, can't find the configuration file {}, aborting.".format(config_path))
    sys.exit(-1)
config = yaml.safe_load(open(config_path))

# Load Raw Data
Load the raw test data and double-check the schema of the data with a random sample.

In [3]:
## PART 1 - load and start a local model runner 
# https://pypi.org/project/acumos/#using-dataframes-with-scikit-learn

# read our larger datasets as binary files
with gzip.open(config["path"]["etl"], 'rb') as f:
    df = pickle.load(f)
print(df["X_test"].columns)
print(df["X_test"].sample(1).transpose())

Index(['helpful', 'reviewText', 'summary', 'unixReviewTime', 'categories',
       'description'],
      dtype='object')
                                                            21380
helpful                                                    [0, 1]
reviewText      Epson has a solid reputation for quality print...
summary                     Compact size, beautiful print quality
unixReviewTime                                         1373068800
categories      [office products, office electronics, printers...
description                                                      


# Create wrapped model protoype
Future versions of the API are addressing this issue, but for now, we'll need to mock-up what the call structure looks like for a given model.  For example, check the `Model Prototype Definition` section from the last  notebook for some additional discussion.  

*NOTE*: The most natural way to do get a model's signature and connection data is to find it on the marketplace and download the required files (e.g. protobuf definition, etc) from there directly.

Looking at a few example models for text-based sentiment processing we see a few common types there as well.

* **text-to-float** pattern: a textual string is input for the output of class probabilites
> TextIn = create_namedtuple('TextIn', [("TextIn", str)])
  FloatOut = create_namedtuple('FloatOut', [("FloatOut", List[float])])

* **text-to-float** pattern: a textual string is input for the output of class probabilites
> TextIn = create_namedtuple('TextIn', [("TextIn", str)])
  FloatOut = create_namedtuple('FloatOut', [("FloatOut", List[float])])



In [4]:
# go through and create just a few model templates
TextIn = create_namedtuple('TextIn', [("TextIn", str)])
FloatOut = create_namedtuple('FloatOut', [("FloatOut", List[float])])

# create function templates
def sent_predict(df: TextIn) -> FloatOut:
    '''Dummy function for prediction of a sentice'''
    return FloatOut([])
model = Model(sent_predict=sent_predict, classify=sent_predict)

# create model so that we can run it locally
session = AcumosSession()
model_dump = config["publish"]["name_model3"]+"_"+"text-to-float"
path_dump = os.path.join('data', model_dump)
if os.path.exists(path_dump):
    shutil.rmtree(path_dump)
session.dump(model, model_dump, 'data')  # creates ~/<name_publish>



# Load & Evaluate a Sentiment Model
Now that we have the model prototype 

1. Iterate through which text models (the shared ones) we want to analyze

2. For the raw training data and test data (the places where we have raw textual reviews), convienently wrapped in the helper function `call_sentiment_helper`
    1. Load the right stubbed model template (the one we just saved to disk above)
    2. Sub-sample the raw input data if a max number of items was provided (this speeds up the local demo)
    3. Call our model at a remote URL with the input data
    4. Depending on the model template (the call pattern), pull out specific floating values to keep (flatten)
    5. Return results

3. With the above results, write them to disk if it was a full dataset (because it takes a while) or display them to verify that we're doing the right thing!

4. Finally, cooalte the different results from each sentiment processor into a final data dictionary that other notebook scripts can utilize.

In [13]:
# first, we define a helper function that will load a model and call it against data
def call_sentiment_helper(model_name, df_eval, col_process, max_process_items, config, wrapped_model=None):
    # load model from disk, see that it is a nicely "wrapped" model
    model_remote_param = config["sentiment"][model_name]
    model_dump = config["publish"]["name_model3"]+"_"+model_remote_param["style"]
    
    # we allow the model to be passed because (a) they're all the same, (b) threading breaks with sessions
    if wrapped_model is None:
        wrapped_model = load_model(os.path.join('data', model_dump))

    # although there are a few text columns, we'll just send the the column `reviewText` in for analysis
    # NOTE: we're "wrapping" the one column as well for standard calling structure
    #    nd_sample = [ [text1], [text2], ... ]
    idx_access = list(range(len(df_eval)))
    if max_process_items != 0:   # 0 special case for EVERYTHING
        np.random.shuffle(idx_access)
        idx_access = idx_access[:min(len(idx_access), max_process_items)]
    print("Started processing data... ({} of {} samples)".format(len(idx_access), len(df_eval)))
    nd_sample = [[wrap_item] for wrap_item in df_eval.iloc[idx_access][col_process].values.tolist()]
    list_result, list_idx = util_call.score_model(wrapped_model, nd_sample, False,
                        name_function=model_remote_param["api"],
                        url_remote="{}:{}".format(
                            config["sentiment"]["deploy_host"], model_remote_param["port"]))
    index_df = [idx_access[i] for i in list_idx]  # remap our index in case anything was missed!
    df_result = pd.DataFrame(list_result, index=df_eval.index[index_df])
    
    # now pull out the iteresting parts according to known style/output
    if model_remote_param["style"] == "text-to-float":
        df_result = pd.DataFrame(df_result["FloatOut"], index=df_result.index)
    # other styles....
    return df_result

def helper_thread(model_name, wrapped_model=None):
    df_local = {}
    print("=== Started processing for model '{}'... === ".format(model_name))
    if not os.path.exists(config["sentiment"][model_name]["path"]) or max_process_items!=0:
        df_local["X_test"] = call_sentiment_helper(model_name, df["X_test"], 
            sentiment["col_sentiment"], max_process_items, config, wrapped_model=wrapped_model)

        df_local["X_train_raw"] = call_sentiment_helper(model_name, df["X_train_raw"], 
            sentiment["col_sentiment"], max_process_items, config, wrapped_model=wrapped_model)
        
        # write out our larger datasets as binary files
        if max_process_items==0:    # only write full datasets
            with gzip.open(config["sentiment"][model_name]["path"], 'wb') as f:
                pickle.dump(df_local, f)
        # show a preview of what was just done...
        print("... sample for model '{}'".format(model_name))
        print(df_local["X_train_raw"].join(df["X_train_raw"][sentiment["col_sentiment"]]).sample(3))

# okay, let's get ready to call our helper function for requested models
sentiment = {}
sentiment["X_test"] = pd.DataFrame([], index=df["X_test"].index)
sentiment["X_train_raw"] = pd.DataFrame([], index=df["X_train_raw"].index)
sentiment["col_sentiment"] = "reviewText"

# truncate range for faster evaluation
max_process_items = 0     # set to 0 for everything (warning it might take a while)

# actual evaluation code...
thread_list = []
thread_utilize = True
# load model (WARNING: if you needed another model style than what was created, you may need to rework this)
wrapped_model = load_model(os.path.join('data', model_dump))
# evaluate models that are activated/available
for model_name in config["sentiment"]["active_model"]:
    if thread_utilize:       # creating thread
        t1 = threading.Thread(target=helper_thread, args=(model_name,wrapped_model)) 
        t1.start()
        thread_list.append(t1)
    else:
        helper_thread(model_name,wrapped_model)

=== Started processing for model 'yelp'... === === Started processing for model 'care'... === 
Started processing data... (5075 of 5075 samples)

=== Started processing for model 'twitter'... === 
Sample 250...
Output error (http://acumos-gpu.research.att.com:8763), exception (500 Server Error: INTERNAL SERVER ERROR for url: http://acumos-gpu.research.att.com:8763/classify)
Sample 500...
Sample 750...
Sample 1000...
Sample 1250...
Sample 1500...
Sample 1750...
Sample 2000...
Sample 2250...
Sample 2500...
Sample 2750...
Sample 3000...
Sample 3250...
Sample 3500...
Sample 3750...
Sample 4000...
Sample 4250...
Sample 4500...
Sample 4750...
Sample 5000...
Evaluation time for 5075 items, 904.437 sec
Started processing data... (20299 of 20299 samples)
Sample 250...
Sample 500...
Output error (http://acumos-gpu.research.att.com:8763), exception (500 Server Error: INTERNAL SERVER ERROR for url: http://acumos-gpu.research.att.com:8763/classify)
Sample 750...
Sample 1000...
Sample 1250...
Sample

Exception in thread Thread-23:
Traceback (most recent call last):
  File "/Users/quinone/anaconda/envs/cognita36/lib/python3.6/site-packages/urllib3/connection.py", line 171, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw)
  File "/Users/quinone/anaconda/envs/cognita36/lib/python3.6/site-packages/urllib3/util/connection.py", line 79, in create_connection
    raise err
  File "/Users/quinone/anaconda/envs/cognita36/lib/python3.6/site-packages/urllib3/util/connection.py", line 69, in create_connection
    sock.connect(sa)
TimeoutError: [Errno 60] Operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/quinone/anaconda/envs/cognita36/lib/python3.6/site-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/Users/quinone/anaconda/envs/cognita36/lib/python3.6/site-packages/urllib3/connectionpool.py", line 354, in _make_request
    conn.request(method, 

In [6]:
# wait for all threads to terminate
for i in range(len(thread_list)):
    thread_list[i].join()

Sample 250...
Output error (http://acumos-gpu.research.att.com:8763), exception (500 Server Error: INTERNAL SERVER ERROR for url: http://acumos-gpu.research.att.com:8763/classify)
Sample 500...
Sample 750...
Sample 1000...
Sample 1250...
Sample 1500...
Sample 1750...
Sample 2000...
Sample 2250...
Sample 2500...
Sample 2750...
Sample 3000...
Sample 3250...
Sample 3500...
Sample 3750...
Sample 4000...
Sample 4250...
Sample 4500...
Sample 4750...
Sample 5000...
Evaluation time for 5075 items, 901.101 sec


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/Users/quinone/anaconda/envs/cognita36/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/quinone/anaconda/envs/cognita36/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-5-1bd5cb67c98d>", line 37, in helper_thread
    sentiment["col_sentiment"], max_process_items, config, wrapped_model=wrapped_model)
  File "<ipython-input-5-1bd5cb67c98d>", line 24, in call_sentiment_helper
    df_result = pd.DataFrame(list_result, index=df_eval.index[idx_access[list_idx]])
TypeError: list indices must be integers or slices, not list



In [7]:
print("Combining and writing combined features to output ETL file...")
# now read the processed samples into our main dataframe
for model_name in config["sentiment"]["active_model"]:
    if os.path.exists(config["sentiment"][model_name]["path"]):
        with gzip.open(config["sentiment"][model_name]["path"], 'rb') as f:
            df_local = pickle.load(f)
            for k in df_local:
                sentiment[k][model_name] = df_local[k]
with gzip.open(config["path"]["sentiment"], 'wb') as f:
    pickle.dump(sentiment, f)


Combining and writing combined features to output ETL file...
