```bash
conda create --name ani python=3.8
conda install -c conda-forge jupyterlab
pip install open_clip_torch
pip install stable-baselines3[extra]
pip install gym[all]
pip install pyglet==1.5.27
pip install tensorboardX
conda install -c anaconda ipywidgets
conda install -c anaconda scipy 
```

## Experiments
 - 2 environments: CartPole and LunarLander
 - clip models: openclip https://github.com/mlfoundations/open_clip(different models), cloob https://github.com/ml-jku/cloob
 - different prompts
     - try several prompts at the same time which describe different states and then the rwd is formulated based on to which prompt the env image is most similar
     - ...


In [1]:
import gym
from stable_baselines3 import DQN
import open_clip
from PIL import Image
import torch
import numpy as np
from tensorboardX import SummaryWriter
import os
import scipy.stats as stats
import json
import matplotlib.pyplot as plt
import math

In [2]:
def disable_view_window():
        from gym.envs.classic_control import rendering
        org_constructor = rendering.Viewer.__init__

        def constructor(self, *args, **kwargs):
            org_constructor(self, *args, **kwargs)
            self.window.set_visible(visible=False)

        rendering.Viewer.__init__ = constructor

In [3]:
disable_view_window()

In [4]:
class CLIPEnv():
    def __init__(self, env, clip_model, clip_preprocess, tokenizer, prompt, writer):
        self.env = env
        self.model = clip_model
        self.preprocess = clip_preprocess
        self.tokenizer = tokenizer
        self.text_features = self.model.encode_text(self.tokenizer([prompt]))
        self.text_features /= self.text_features.norm(dim=-1, keepdim=True)
        self.action_space = self.env.action_space
        self.observation_space = self.env.observation_space
        self.metadata = self.env.metadata
        self.clip_rewards_per_episode = []
        self.env_rewards_per_episode = []
        
        self.clip_rewards = []
        self.env_rewards = []
        
        self.writer = writer
        self.n_steps = 0
        self.n_episodes = 0

    
    def reset(self):
        return self.env.reset()
    
    def close(self):
        return self.env.close()
    
    def step(self, action):
        next_st, rwd, done, info = self.env.step(action)
        img = self.env.render(mode="rgb_array")
        clip_rwd = self.get_clip_reward(img)
        
        if len(self.clip_rewards_per_episode) == 0:
            self.clip_rewards_per_episode.append(clip_rwd)
            self.env_rewards_per_episode.append(rwd)
        else:
            self.clip_rewards_per_episode.append(self.clip_rewards_per_episode[-1] + clip_rwd)
            self.env_rewards_per_episode.append(self.env_rewards_per_episode[-1] + rwd)
        
     
        if done:
            #self.writer.add_scalar('episode_length',  len(self.env_rewards_per_episode), self.n_episodes)
            
            self.writer.add_scalar('episode_rewards/env_reward',  sum(self.env_rewards_per_episode), self.n_episodes)
            self.writer.add_scalar('episode_rewards/clip_reward', sum(self.clip_rewards_per_episode) , self.n_episodes)
            
            self.env_rewards.append(self.env_rewards_per_episode)
            self.clip_rewards.append(self.clip_rewards_per_episode)

            self.env_rewards_per_episode = []
            self.clip_rewards_per_episode = []
            
            self.n_episodes += 1
            
        
        self.n_steps += 1

        return next_st, rwd, done, info
    
    def get_clip_reward(self, state):
        image = self.preprocess(Image.fromarray(np.uint8(state))).unsqueeze(0)
        with torch.no_grad(), torch.cuda.amp.autocast():
            image_features = self.model.encode_image(image)
            image_features /= image_features.norm(dim=-1, keepdim=True)
            sim = (image_features @ self.text_features.T)
        return sim.cpu().detach().numpy()[0][0]

In [5]:
!mkdir experiments

A subdirectory or file experiments already exists.


In [6]:
def run_exp(agent, env, prompt, clip_model_name, env_name, exp_path, n_steps, notes=''):
    if not os.path.exists(exp_path):
        os.mkdir(exp_path)
    
    agent.learn(total_timesteps=n_steps, progress_bar=True)
    agent.save(f"{exp_path}/agent")
    
    corr = stats.pearsonr([sum(e) for e in env.env_rewards], [sum(e) for e in env.clip_rewards])[0]
    m_rwd = np.mean([sum(e) for e in env.env_rewards[-10:]])
    results = {
        'env_name': env_name,
        'prompt': prompt,
        'clip_model_name': clip_model_name,
        'correlation': corr,
        'mean_env_rwd_over_last_10_episodes': m_rwd,
        'n_episodes': env.n_episodes,
        'n_steps': env.n_steps,
         'notes': notes,
    }
    with open(f'{exp_path}/results.json', 'w') as f:
        json.dump(results, f)
    
    # compute correlation between env and clip rewards for each episode separately
    per_episode_corr = [stats.pearsonr(e, c)[0] for e, c in zip(env.env_rewards, env.clip_rewards)]
    # if nan return 0 correlation
    per_episode_corr = [0 if math.isnan(corr) else corr for corr in per_episode_corr]
    
    for i in range(env.n_episodes):
        env.writer.add_scalar('Per episode correlation', per_episode_corr[i], i)

In [7]:
open_clip.list_pretrained()

[('RN50', 'openai'),
 ('RN50', 'yfcc15m'),
 ('RN50', 'cc12m'),
 ('RN50-quickgelu', 'openai'),
 ('RN50-quickgelu', 'yfcc15m'),
 ('RN50-quickgelu', 'cc12m'),
 ('RN101', 'openai'),
 ('RN101', 'yfcc15m'),
 ('RN101-quickgelu', 'openai'),
 ('RN101-quickgelu', 'yfcc15m'),
 ('RN50x4', 'openai'),
 ('RN50x16', 'openai'),
 ('RN50x64', 'openai'),
 ('ViT-B-32', 'openai'),
 ('ViT-B-32', 'laion400m_e31'),
 ('ViT-B-32', 'laion400m_e32'),
 ('ViT-B-32', 'laion2b_e16'),
 ('ViT-B-32', 'laion2b_s34b_b79k'),
 ('ViT-B-32-quickgelu', 'openai'),
 ('ViT-B-32-quickgelu', 'laion400m_e31'),
 ('ViT-B-32-quickgelu', 'laion400m_e32'),
 ('ViT-B-16', 'openai'),
 ('ViT-B-16', 'laion400m_e31'),
 ('ViT-B-16', 'laion400m_e32'),
 ('ViT-B-16-plus-240', 'laion400m_e31'),
 ('ViT-B-16-plus-240', 'laion400m_e32'),
 ('ViT-L-14', 'openai'),
 ('ViT-L-14', 'laion400m_e31'),
 ('ViT-L-14', 'laion400m_e32'),
 ('ViT-L-14', 'laion2b_s32b_b82k'),
 ('ViT-L-14-336', 'openai'),
 ('ViT-H-14', 'laion2b_s32b_b79k'),
 ('ViT-g-14', 'laion2b_s12b_

In [8]:
ENV_NAME = 'CartPole-v1'
DQN_POLICY = 'MlpPolicy' #'LnCnnPolicy'

MODEL = 'RN50'#'ViT-B-32-quickgelu'
PRETRAINED = 'yfcc15m'#'laion400m_e32'

COMMENT = 'Correlation_test_1'

PROMPT = 'White background, brown vertical pole in the middle, on top of the black box, vertically aligned'
N_STEPS = 2000

EXP_NAME = f'{ENV_NAME}_{PROMPT}_{MODEL}_{PRETRAINED}_{N_STEPS}_{COMMENT}'

EXP_PATH = f'experiments/{EXP_NAME}/'

In [9]:
model, _, preprocess = open_clip.create_model_and_transforms(MODEL, pretrained=PRETRAINED)
tokenizer = open_clip.get_tokenizer(MODEL)

In [10]:
env = gym.make(ENV_NAME)
writer = SummaryWriter(EXP_PATH)

cl_env = CLIPEnv(env, model, preprocess, tokenizer, PROMPT, writer)

agent = DQN(DQN_POLICY, 
            cl_env, 
            verbose=0, 
            learning_starts=1000, 
            buffer_size=15000, 
            target_update_interval=500,
            tensorboard_log=f'{EXP_PATH}dqn/',
            exploration_fraction=0.5,
            exploration_initial_eps=0.5,
            exploration_final_eps=0.2)


run_exp(agent, cl_env, PROMPT, f'open_clip_{MODEL}', ENV_NAME, EXP_PATH, N_STEPS,
        'all additional info about experiment goes here')

Output()



### Evaluate prompts for good/bad situations in CartPole

In [32]:
good_example = Image.open("cartpole_examples/good.png") 
bad_example = Image.open("cartpole_examples/bad.png") 

def evaluate_prompt_cartpole(prompt):
    env = gym.make(ENV_NAME)
    writer = SummaryWriter(EXP_PATH)
    cl_env = CLIPEnv(env, model, preprocess, tokenizer, prompt, writer)

    good_reward = cl_env.get_clip_reward(good_example)
    bad_reward = cl_env.get_clip_reward(bad_example)
    
    print(f'Good state clip reward: {good_reward}')
    print(f'Bad state clip reward: {bad_reward}')
    print(f'Diff: {good_reward - bad_reward}')

In [51]:
evaluate_prompt_cartpole('White background, brown vertical pole in the middle, on top of the black box, vertically aligned') # 'RN50' 'yfcc15m'

Good state clip reward: 0.14994792640209198
Bad state clip reward: 0.10380738228559494
Diff: 0.04614054411649704


In [52]:
evaluate_prompt_cartpole('White background, brown vertical pole in the middle, on top of the black box, perpendicular to each other')

Good state clip reward: 0.12130261957645416
Bad state clip reward: 0.09950494766235352
Diff: 0.021797671914100647


# Evaluate the trained agent

In [None]:
def experiment_prompt(prompt, agent, env, rewards_storage, n_steps=100):
    obs = env.reset()
    for _ in range(n_steps):
        action, _states = agent.predict(obs)
        obs, rewards, dones, info = env.step(action)
    rewards_storage[prompt] = (env.env_rewards.copy(), env.clip_rewards.copy())

In [None]:
prompt_rewards = {}

In [None]:
env = gym.make(ENV_NAME)
cl_env = CLIPEnv(env, model, preprocess, tokenizer, prompt, writer)

agent = DQN.load(f"{EXP_PATH}/agent", env=cl_env)
experiment_prompt(prompt, agent, cl_env, prompt_rewards)

In [None]:
cl_env.close()

In [None]:
plt.plot(prompt_rewards[prompt][0][0])
plt.title('Env rewards (1 episode)')
plt.show()

In [None]:
plt.plot(prompt_rewards[prompt][1][0])
plt.title('CLIP rewards (1 episode)')
plt.show()