# RISE Camp Capstone Exercise

In this exercise, you will see how many of the projects you've learned about in the last couple days fit together. Those of you who attended last year's RISE Camp will remember the Pong integration exercise that trained an RL policy in Ray and deployed it in Clipper. Today's version of Pong is more feature rich: We take advantage of Flor's experiment tracking and WAVE's encryption capabilities, in addition to leveraging a new Clipper deployment model and RLlib's policies.

We will train three ML models in this exercise: The first two models will use [imitation learning](https://blog.statsbot.co/introduction-to-imitation-learning-32334c3b1e7a) to learn how to play Pong, and the third will train a reinforcement learning policy using RLlib and Ray. Flor will track the training processes for all three models. We will also encrypt each one of these models with WAVE and deploy & serve the models in Clipper.

For those of you unfamiliar with imitation learning, the approach is simple. We previously recorded the actions of humans playing Pong. The model we are buildling is a logitic regression classifier that selects a paddle action (up, down, or stay) based on the current location, velocity, and trajectory of the ball.

Finally, you'll play a game (or more!) against each of the three models. We'll aggregate the results to see which model performs the best.

In [None]:
# Python compatibility imports
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import gym
import pong_py
import cloudpickle

# ray imports
import ray
from ray.tune.registry import register_env
from ray.rllib.agents import ppo

import flor

In [None]:
# set Flor metadata for the notebook
flor.setNotebookName('integration.ipynb')

In a separate notebook, we have set up a WAVE client and defined some helper functions that we'll use below. Feel free to look at the `wave-setup.ipynb` file in this directory if you'd like to dig in.

In [None]:
%run wave-setup.ipynb

In [None]:
# call Wave helper function to create granting and receiving entities
orgEntity, recipientEntity = createWaveEntities()

# Imitation Learning

## Model Training

First, we're going to define three Flor functions---`preproc_imitation`, `train_imitation_model`, and `encrypt_model`, which clean the input data, train an imitation learning model, and encrypt that model using Wave, respectively. 

The preprocessing function reads an input CSV and converts the `up`, `down`, and `stay` labels into numerical values. It also normalizes the all the numerical values (the location of the controlled paddle, the location & velocity of the ball, and the previous location of the ball).

In [None]:
# PRE-DEFINED FLOR FUNCTION. PLEASE DO NOT CHANGE.

@flor.func
def preproc_imitation(imitation_data, procd_imitation_data, **kwargs):
    import pandas as pd
    df_data = pd.read_csv(imitation_data)
    
    # drop the user column because we don't want to train on it
    df_data = df_data.drop(labels="user", axis=1)

    # discretize the labels
    def convert_label(label):
        """Convert labels into numeric values"""
        if(label=="down"):
            return 1
        elif(label=="up"):
            return 2
        else:
            return 0

    df_data['label'] = df_data['label'].apply(convert_label)
    df_data.loc[:, "leftPaddle_y":"ball_y_prev"] = df_data.loc[:, "leftPaddle_y":"ball_y_prev"]/500.0
    df_data.to_json(procd_imitation_data)

The model training function takes a JSON blob of the cleaned data and fits a SciKit Learn logistic regression model to classify the action to take based on the input features. The model is pickled and dumped into a file.

In [None]:
# PRE-DEFINED FLOR FUNCTION. PLEASE DO NOT CHANGE.

@flor.func
def train_imitation_model(procd_imitation_data, model, **kwargs):
    import cloudpickle
    import pandas as pd
    from sklearn import linear_model
    df_data = pd.read_json(procd_imitation_data)
    
    labels = df_data['label']
    training_data= df_data.drop(['label'], axis=1)

    skmodel = linear_model.LogisticRegression()
    skmodel.fit(training_data, labels)
    with open(model, 'wb') as f:
        cloudpickle.dump(skmodel, f)

Finally, the `encrypt_model` function takes the model we trained above and a handle to a WAVE entity that has access to all models. It uses the WAVE entity to encrypt the model and serializes the ciphered model into a file.

In [None]:
# PRE-DEFINED FLOR FUNCTION. PLEASE DO NOT CHANGE.

@flor.func
def encrypt_model(granting_entity, model, model_tag, encrypted_model, **kwargs):
    import wave3 as wv
    granting_entity = deserializeEntity(granting_entity)
    
    # read the model binary, so we can encrypt it
    with open(model, 'rb') as f:
        model = f.read()
    
    # NOTE: We are relying on a global handle to WAVE here. 
    # In practice, we would have to recreate this handle explicitly.
    encrypt_response = wave.EncryptMessage(
        wv.EncryptMessageParams(
            # the namespace is the organization
            namespace=granting_entity.hash,
            resource="models/pong/" + model_tag,
            content=model))
    
    with open(encrypted_model, 'wb') as f:
        f.write(encrypt_response.ciphertext)

Next, we define a Flor experiment called `pong-imitation` and link together the input data and the functions defined above. 

In [None]:
# *** RERUN ***

# CHANGE ME LATER
DATA_FILE = 'imitation-small.csv'

# get the small or large tag from the DATA_FILE variable
model_tag = DATA_FILE.split('.')[0].split('-')[1]

# where to serialize the deserialize the Wave entity
ENTITY_FILE = 'org_entity.bin'

with flor.Experiment('pong-imitation') as ex:
    # load data into an artifact
    imitation_data = ex.artifact(DATA_FILE, 'imitation_data')
    
    # call preprocessing function
    do_preproc_imitation = ex.action(preproc_imitation, [imitation_data])
    procd_imitation_data = ex.artifact('imitation_data.json', 'procd_imitation_data', do_preproc_imitation)
    
    # train the model 
    do_train_imitation_model = ex.action(train_imitation_model, [procd_imitation_data])
    model = ex.artifact('model.pkl', 'model', do_train_imitation_model)
    
    model_tag = ex.literal(name='model_tag', v=model_tag)
    
    # serialize the wave entity, so we can track it as an artifact
    serializeEntity(orgEntity, ENTITY_FILE)
    granting_entity = ex.artifact(ENTITY_FILE, 'granting_entity')
    
    do_encrypt_model = ex.action(encrypt_model, [granting_entity, model, model_tag])
    encrypted_model = ex.artifact('encrypted_model.bin', 'encrypted_model', do_encrypt_model)

Lastly, we will call `pull` on the encrypted model. As we saw in the Flor tutorial, this will run the whole pipeline and generate the model. The `utag` argument will be used to differentiate from different versions of the model we train.

In [None]:
# *** RERUN ***

encrypted_model.pull(utag=model_tag)
model_location = encrypted_model.resolve_location()[0]

## Model Deployment

Now that we have a model that we've trained, we are going to deploy it in Clipper. Run the cell below to start Clipper on your EC@ instance.

In [None]:
# Make logging work correctly in the Jupyter notebook and set up Clipper
# NOTE: YOU SHOULD ONLY RUN THIS CELL ONCE!

import logging
import sys
import subprocess

from clipper_admin import DockerContainerManager, ClipperConnection
from clipper_admin.deployers import python as py_deployer
from clipper_util.auth_deployer import auth_deploy_python_model

logger = logging.getLogger()
logger.setLevel(logging.INFO)

clipper_conn = ClipperConnection(DockerContainerManager())
clipper_conn.stop_all()
clipper_conn.start_clipper()

We will now read the encrypted model from the file Flor has stored it in:

In [None]:
# *** RERUN ***

# load the encrypted model into memory
with open(model_location, 'rb') as f:
    ciphered_model = f.read()

And we will attempt to deploy that model. You'll notice that we've wrapped the standard Clipper API with a special authenticated deployer that only deploys the model if it can decrypt it with the given recipient entity.

Running the cell below _*should initially fail*_. This is because we haven't yet granted the `recipientEntity` permission to read the model that we've encrypted. Run the cell below to see for yourself.

Once we deploy the model, we will create a new application called `pong-{model_name}` and link the model to that application.

In [None]:
# *** RERUN ***

model_name = "pong-policy-" + model_tag.v
app_name = "pong-" + model_tag.v

auth_deploy_python_model(
    clipper_conn,
    model_name,
    wave,
    recipientEntity,
    ciphered_model,    
    version=1,
    input_type="doubles"
)

clipper_conn.register_application(name=app_name, default_output="0", input_type="doubles", slo_micros=100000)
clipper_conn.link_model_to_app(app_name=app_name, model_name=model_name)

Now, we're going to go ahead and use the helper function defined above to grant permission to the recipient entity to decrypt the ciphered model. Run the cell below to grant decryption permission then rerun the cell above to build and deploy the new model.

In [None]:
# *** RERUN ***

grantPermission(orgEntity, recipientEntity, 'models/pong/' + model_tag.v)

Now, for comparison's sake, we're going to train another imitation learning model with a different dataset -- `imitation-large.csv`. This dataset incorporates training data from two different players, and it should ideally perform better than the previous version. 

In the Flor experiment cell above, change the `DATA_FILE` variable to point to `imitation-large.csv` and then rerun all the calls marked with `# *** RERUN ***`. 

Once you've done that, run the cell below to start the pong server locally, and you'll be given a URL which you can open in your browser to play against the two pong models you've just trained. You'll select the models as opponents in the dropdown menu on the left hand side. You'll notice there's a reinforcement learning option there as well -- we'll come back to that model soon.

In [None]:
clipper_addr = clipper_conn.get_query_addr()

import subprocess
server_handle = subprocess.Popen(["./start_webserver.sh", clipper_addr], stdout=subprocess.PIPE)
print(str(server_handle.stdout.readline().strip(), 'utf-8'))

You'll notice that, surprisingly, the model trained on the large dataset performs significantly worse than the model trained on the small dataset. Let's take a look at the difference between the two and see why:

In [None]:
# read the two CSVs into dataframes
import pandas as pd
large = pd.read_csv('imitation-large.csv')
small = pd.read_csv('imitation-small.csv')

large[large.merge(small, on=list(large.columns), how='left', indicator=True)['_merge'] == 'left_only']

We see that all the entries in the large dataset have been made by Ion, who apparently isn't very good at Pong. As a result, the noise from these entries worsens the performance of the model in practice.

# Reinforcement Learning

In this exercise, we will use Proximal Policy Optimization (PPO) to train a reinforcement learning agent that plays Pong. As we'll see, the RL policy trained here will perform significantly better than either of the imtiation learning models we trained above. As with the previous exercise, we'll first define a couple Flor functions. We will also reuse the encrypt model function defined above.

First, we need to either start Ray or make sure it's already running on our machine:

In [None]:
@flor.func
def start_ray(**kwargs):
    try:
        ray.get([])
    except:
        ray.init()    
    return {'exit_code': 0}

Next, we define a function called Train agent that takes an environment configuration and trains the model for a specified number of iterations. Both the environment config and the number of iterations will be Flor literals.

In [None]:
@flor.func
def train_agent(env_config, num_iterations, model, **kwargs):
    import cloudpickle as cp
    
    register_env("pong_env", lambda ec: pong_py.PongJSEnv())
    agent = ppo.PPOAgent(env="pong_env", config={"env_config": {}})

    for i in range(num_iterations):
        result = agent.train()
        
    with open(model, 'wb') as f:
        f.write(agent.save_to_object())

Finally, we'll string all of these functions together into another Flor experiment called `rl-pong`.

In [None]:
model_tag = 'rl'

with flor.Experiment('rl-pong') as ex:
    # make sure that Ray is running before attempting to train a model
    do_start_ray = ex.action(start_ray, [])
    exit_code = ex.literal(name='exit_code', parent=do_start_ray)
    
    # define configurations variables relevant to training the RL model
    env_config = ex.literal({}, 'env_config') # TODO: Fill env_config
    num_iterations = ex.literal(50, 'num_iterations')

    # setup the training action and the save location of the checkpoint
    do_train_agent = ex.action(train_agent, [env_config, num_iterations, exit_code])
    model = ex.artifact('model.pkl', 'model', do_train_agent)
    
    model_tag = ex.literal(name='model_tag', v=model_tag)
    
    # serialize the wave entity, so we can track it as an artifact
    serializeEntity(orgEntity, ENTITY_FILE)
    granting_entity = ex.artifact(ENTITY_FILE, 'granting_entity')
    
    do_encrypt_model = ex.action(encrypt_model, [granting_entity, model, model_tag])
    encrypted_model = ex.artifact('encrypted_model.bin', 'encrypted_model', do_encrypt_model)

We run this pipeline in the usual fashion:

In [None]:
encrypted_model.plot()

In [None]:
encrypted_model.pull()
model_location = encrypted_model.resolve_location()[0]

Once again, we have to grant permission to the entity to decrypt the model, but this time, we'll use a wildcard, letting it decrypt anything in the `models/pong/` resource space.

In [None]:
grantPermission(orgEntity, recipientEntity, 'models/pong/*')

And deploy the model to Clipper under a new application name (`pong-rl`). Note that this code is the same as the cell above but is repeated for convenience.

In [None]:
from clipper_util.auth_deployer import auth_deploy_rllib_model

# load the encrypted model into memory
with open(model_location, 'rb') as f:
    ciphered_model = f.read()

model_name = "pong-policy-" + model_tag.v
app_name = "pong-" + model_tag.v

auth_deploy_rllib_model(
    clipper_conn,
    model_name,
    wave,
    recipientEntity,
    ciphered_model,    
    version=1,
    input_type="doubles"
)

clipper_conn.register_application(name=app_name, default_output="0", input_type="doubles", slo_micros=100000)
clipper_conn.link_model_to_app(app_name=app_name, model_name=model_name)