## RL Classification


In [26]:

import time
import gym
import random
import numpy as np

from tensorflow import keras
from tensorflow.keras import layers

#from baselines.ppo2 import ppo2
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv

from baselines import deepq
from baselines import bench
from baselines import logger

import tensorflow as tf

from baselines.common.tf_util import make_session

ModuleNotFoundError: No module named 'tensorflow.contrib'

In [2]:

from keras.layers import *
from keras.models import *
from matplotlib import pyplot as plt
from sklearn.utils import shuffle
from itertools import combinations, product
from keras.datasets import cifar10


In [3]:

from multiprocessing import Pool
from functools import partial

In [4]:
# One way to load data
cifar_class_names = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
(X_train, y_train), (X_test, y_test) = cifar10.load_data()
    
num_classes = 10
# Convert class vectors to binary class matrices.
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
    
X_train = X_train / 255.0
X_test = X_test / 255.0
input_shape = [32,32,3]
lamda = .1

### Load Data

In [5]:
#Another way to load data

#Model / data parameters
num_classes = 10
input_shape = (32, 32, 3)

#the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

#Scale images to the [0, 1] range
X_train = X_train.astype("float32") / 255
X_test = X_test.astype("float32") / 255
#Make sure images have shape (32, 32, 3)
# X_train = np.expand_dims(X_train, -1)
# X_test = np.expand_dims(X_test, -1)
print("X_train shape:", X_train.shape)
print(X_train.shape[0], "train samples")
print(X_test.shape[0], "test samples")


# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

X_train shape: (50000, 32, 32, 3)
50000 train samples
10000 test samples


## CNN model with CIFAR dataset

In [31]:
import sys
from matplotlib import pyplot
from keras.datasets import cifar10
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from tensorflow.keras.optimizers import SGD

In [36]:
# load train and test dataset
def load_dataset():
	# load dataset
	(trainX, trainY), (testX, testY) = cifar10.load_data()
	# one hot encode target values
	trainY = keras.utils.to_categorical(trainY)
	testY = keras.utils.to_categorical(testY)
	return trainX, trainY, testX, testY
 
# scale pixels
def prep_pixels(train, test):
	# convert from integers to floats
	train_norm = train.astype('float32')
	test_norm = test.astype('float32')
	# normalize to range 0-1
	train_norm = train_norm / 255.0
	test_norm = test_norm / 255.0
	# return normalized images
	return train_norm, test_norm

In [37]:
# define cnn model
def define_model():
	model = Sequential()
	model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same', input_shape=(32, 32, 3)))
	model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(MaxPooling2D((2, 2)))
	model.add(Dropout(0.2))
	model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(MaxPooling2D((2, 2)))
	model.add(Dropout(0.2))
	model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_uniform', padding='same'))
	model.add(MaxPooling2D((2, 2)))
	model.add(Dropout(0.2))
	model.add(Flatten())
	model.add(Dense(128, activation='relu', kernel_initializer='he_uniform'))
	model.add(Dropout(0.2))
	model.add(Dense(10, activation='softmax'))
	# compile model
	opt = SGD(lr=0.001, momentum=0.9)
	model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
	return model

In [38]:
# plot diagnostic learning curves
def summarize_diagnostics(history):
	# plot loss
	pyplot.subplot(211)
	pyplot.title('Cross Entropy Loss')
	pyplot.plot(history.history['loss'], color='blue', label='train')
	pyplot.plot(history.history['val_loss'], color='orange', label='test')
	# plot accuracy
	pyplot.subplot(212)
	pyplot.title('Classification Accuracy')
	pyplot.plot(history.history['accuracy'], color='blue', label='train')
	pyplot.plot(history.history['val_accuracy'], color='orange', label='test')
	# save plot to file
	filename = sys.argv[0].split('/')[-1]
	pyplot.savefig(filename + '_plot.png')
	pyplot.close()

In [39]:
# run the test harness for evaluating a model
def run_test_harness():
	# load dataset
	trainX, trainY, testX, testY = load_dataset()
	# prepare pixel data
	trainX, testX = prep_pixels(trainX, testX)
	# define model
	model = define_model()
	# fit model
	history = model.fit(trainX, trainY, epochs=100, batch_size=64, validation_data=(testX, testY), verbose=0)
	# evaluate model
	_, acc = model.evaluate(testX, testY, verbose=0)
	print('> %.3f' % (acc * 100.0))
	# learning curves
	summarize_diagnostics(history)
 
# entry point, run the test harness
run_test_harness()



KeyboardInterrupt: 

### Keras Baseline

In [14]:
def keras_train(batch_size=32, epochs=10):
    model = keras.Sequential(
        [
            keras.Input(shape=input_shape),
            layers.Flatten(),
            layers.Dense(64, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(num_classes, activation="softmax")
        ]
    )

    model.summary()

    model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    
    start_time = time.time()
    model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.1)
    end_time = time.time()

    score = model.evaluate(X_test, y_test, verbose=0)
    print("Test loss:", score[0])
    print("Test accuracy:", score[1])
    print("Training Time:", end_time - start_time)
    
keras_train()

Model: "sequential_8"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten_8 (Flatten)          (None, 3072)              0         
_________________________________________________________________
dense_24 (Dense)             (None, 64)                196672    
_________________________________________________________________
dense_25 (Dense)             (None, 64)                4160      
_________________________________________________________________
dense_26 (Dense)             (None, 10)                650       
Total params: 201,482
Trainable params: 201,482
Non-trainable params: 0
_________________________________________________________________
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test loss: 1.6072862148284912
Test accuracy: 0.42719998955726624
Training Time: 35.0728178024292


### Create RL Interface (gym.Env)

In [24]:

class CifarEnv(gym.Env):
    def __init__(self, images_per_episode=1, dataset=(X_train, y_train), random=True):
        super().__init__()

        self.action_space = gym.spaces.Discrete(10)
        self.observation_space = gym.spaces.Box(low=0, high=1,
                                                shape=(32, 32, 3),
                                                dtype=np.float32)

        self.images_per_episode = images_per_episode
        self.step_count = 0

        self.x, self.y = dataset
        self.random = random
        self.dataset_idx = 0

    def step(self, action):
        done = False
        reward = int(action == self.expected_action)

        obs = self._next_obs()

        self.step_count += 1
        if self.step_count >= self.images_per_episode:
            done = True

        return obs, reward, done, {}

    def reset(self):
        self.step_count = 0

        obs = self._next_obs()
        return obs

    def _next_obs(self):
        if self.random:
            next_obs_idx = random.randint(0, len(self.x) - 1)
            self.expected_action = int(self.y[next_obs_idx])
            obs = self.x[next_obs_idx]

        else:
            obs = self.x[self.dataset_idx]
            self.expected_action = int(self.y[self.dataset_idx])

            self.dataset_idx += 1
            if self.dataset_idx >= len(self.x):
                raise StopIteration()

        return obs


### Train Classifier Using DQN

In [25]:


def cifar_dqn():
    logger.configure(dir='./logs/cifar_dqn', format_strs=['stdout', 'tensorboard'])
    env = CifarEnv(images_per_episode=1)
    env = bench.Monitor(env, logger.get_dir())

    model = deepq.learn(
        env,
        "mlp",
        num_layers=1,
        num_hidden=64,
        activation=tf.nn.relu,
        hiddens=[32],
        dueling=True,
        lr=1e-4,
        total_timesteps=int(1.2e5),
        buffer_size=10000,
        exploration_fraction=0.1,
        exploration_final_eps=0.01,
        train_freq=4,
        learning_starts=10000,
        target_network_update_freq=1000,
    )

    model.save('dqn_cifar.pkl')
    env.close()
    
    return model

start_time = time.time()
dqn_model = cifar_dqn()
print("DQN Training Time:", time.time() - start_time)

AttributeError: module 'tensorflow.python.pywrap_tensorflow' has no attribute 'EventsWriter'

In [None]:
def cifar_dqn_eval(dqn_model):
    attempts, correct = 0,0

    env = CifarEnv(images_per_episode=1, dataset=(x_test, y_test), random=False)
    
    try:
        while True:
            obs, done = env.reset(), False
            while not done:
                obs, rew, done, _ = env.step(dqn_model(obs[None])[0])

                attempts += 1
                if rew > 0:
                    correct += 1

    except StopIteration:
        print()
        print('validation done...')
        print('Accuracy: {0}%'.format((float(correct) / attempts) * 100))

cifar_dqn_eval(dqn_model)

### Train Classifier using PPO

In [None]:
def mnist_ppo():
    logger.configure(dir='./logs/mnist_ppo', format_strs=['stdout', 'tensorboard'])
    env = DummyVecEnv([lambda: bench.Monitor(MnistEnv(images_per_episode=1), logger.get_dir())])

    model = ppo2.learn(
        env=env,
        network='mlp',
        num_layers=2,
        num_hidden=64,
        nsteps=32,
        total_timesteps=int(1.2e5),
        seed=int(time.time()))
    
    return model

start_time = time.time()
ppo_model = mnist_ppo()
print("PPO Training Time:", time.time() - start_time)


In [None]:
def mnist_ppo_eval(ppo_model):
    attempts, correct = 0,0

    env = DummyVecEnv([lambda: MnistEnv(images_per_episode=1, dataset=(x_test, y_test), random=False)])
    
    try:
        while True:
            obs, done = env.reset(), [False]
            while not done[0]:
                obs, rew, done, _ = env.step(ppo_model.step(obs[None])[0])

                attempts += 1
                if rew[0] > 0:
                    correct += 1

    except StopIteration:
        print()
        print('validation done...')
        print('Accuracy: {0}%'.format((float(correct) / attempts) * 100))

mnist_ppo_eval(ppo_model)

### Conclusions?

**Facts**
- All models used roughly the **same number of parameters**
- DQN model splits final layer into two for value and advantages heads
- PPO model adds a value head after the final layer
- All models are run for equivalent of **2 epochs**

**Results**
- Standard Supervised Learning
    - Training Time: 11.93s
    - Test Accuracy: 96.27%
    
- DQN
    - Training Time: 461.5s
    - Test Accuracy: 93.48%
    
- PPO
    - Training Time: 638.7s
    - Test Accuracy: 95.20%
    
**Remarks**

RL can be used for classification, but it's clearly not the optimal method and doesn't present any advantages over using standard supervised learning.