<h1>Dependencies

In [None]:
!pip install stable-baselines3[extra]

In [None]:
!pip install gym

In [None]:
!pip show gym

In [1]:
import sys
sys.path.append('c:\\users\\hp\\desktop\\projectpeptide\\summer_internship_2023\\projectpeptide\\lib\\site-packages')

In [217]:
import gym 
from gym import Env
from gym.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete 

import numpy as np
import pandas as pd
import random

import os

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.vec_env import DummyVecEnv


from rdkit import Chem
from rdkit.Chem import Crippen

<h1> Types of Spaces

In [None]:
Discrete(3).sample()

In [None]:
Box(0,1,shape=(3,3)).sample()

In [None]:
Tuple((Discrete(2), Box(0,100, shape=(1,)))).sample()

In [None]:
Dict({'height':Discrete(2), "speed":Box(0,100, shape=(1,))}).sample()

In [None]:
MultiBinary(4).sample()

In [None]:
MultiDiscrete([5,2,2]).sample()

<h1> Building an Environment

In [3]:
len_peptide = 5

In [28]:
#Reward as logP only

# Function to convert amino acid indices to a tripeptide sequence
def indices_to_peptide(indices):
    amino_acids = ['ALA', 'ARG', 'ASN', 'ASP', 'CYS', 'GLN', 'GLU', 'GLY', 'HIS', 'ILE', 'LEU', 'LYS', 'MET', 'PHE', 'PRO', 'SER', 'THR', 'TRP', 'TYR', 'VAL']
    peptide_sequence = '-'.join([amino_acids[i] for i in indices])
    return peptide_sequence

# Function to calculate logP for a tripeptide
def calculate_logP(peptide_sequence):
    # Create an RDKit molecule object
    mol = Chem.MolFromSequence(peptide_sequence)

    # Check if the molecule is valid
    if mol is not None:
        # Calculate the logP value using Crippen's method
        logP = Crippen.MolLogP(mol)
        return logP
    else:
        # Return a placeholder value (you may want to handle invalid sequences differently)
        return -1000000000.0

In [29]:
peptide = MultiDiscrete([20] * len_peptide).sample()

# Convert indices to a tripeptide sequence
sequence = indices_to_peptide(peptide)
hydrophobicity = calculate_logP(sequence)

print('Protein chain as interger: {}\nProtein chain as string: {}\nHydrophobicity value for protein chain: {}'.format(peptide, sequence, hydrophobicity))

Protein chain as interger: [12  0 12 18 14]
Protein chain as string: MET-ALA-MET-TYR-PRO
Hydrophobicity value for protein chain: -1000000000.0


In [139]:
class peptideEnv(Env):
    
    def __init__(self):
        
        # Actions taken are placing the randomized amino acid at the randomized position of the peptide 
        self.action_space = MultiDiscrete([20, len_peptide])
        
        # all the peptides sequence possible
        self.observation_space = MultiDiscrete([20] * len_peptide)
        
        # Set start as a randomized peptide
        self.state = MultiDiscrete([20] * len_peptide).sample()
        self.critical_logP = -1000000000.0
        
        self.numAction = 100
                
        
        
    def step(self, action):
        
        # Extract amino acid and position from the action
        amino_acid = action // len_peptide
        position = action % len_peptide

        # Update the state based on the action
        self.state[position] = amino_acid
        self.numAction -=1
        
        
        
        # Convert indices to a peptide sequence and calculate logP- hydrophobicity 
        # for the peptide
        sequence = indices_to_peptide(self.state)
        logP_value = calculate_logP(sequence)
        
        
        # Calculate reward
        if logP_value > self.critical_logP: 
            reward = 1 
            self.critical_logP = logP_value
        elif logP_value == self.critical_logP:
            reward = 0
        else:
            reward = -1 
        
        # Check if shower is done
        if self.numAction <= 0: 
            done = True
        else:
            done = False
            
        info = {"Peptide after step" : self.state, "LogP_value": logP_value }
        globals()["chain"] = info

        return self.state, reward, done, info

    
    
    def render(self):
        # Implement viz
        pass
    
    
    
    def reset(self):
        # Reset the peptide to a new randomized state
        self.state = MultiDiscrete([20] * len_peptide).sample()

        # Reset critical_logP at the beginning of each episode
        self.critical_logP = -1000000000.0
        self.numAction = 100

        return self.state

In [140]:
env = peptideEnv()
env = DummyVecEnv([lambda: env])

In [141]:
env.observation_space.sample()

array([13, 18,  8, 16, 11], dtype=int64)

In [119]:
env.reset()

array([[16, 17, 15,  6,  7]], dtype=int64)

In [None]:
# from stable_baselines3.common.env_checker import check_env
# check_env(env, warn=True)

<h1> Test Environment

In [174]:
def numbers2peptide(indices):
    amino_acids = ['ALA', 'ARG', 'ASN', 'ASP', 'CYS', 'GLN', 'GLU', 'GLY', 'HIS', 'ILE', 'LEU', 'LYS', 'MET', 'PHE', 'PRO', 'SER', 'THR', 'TRP', 'TYR', 'VAL']
    # indices = info['Peptide after step'] 
    chain_array = np.array([])
    for i in indices:
        chain_array = np.append(chain_array, amino_acids[i])
        
    return chain_array
    

In [49]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
        print(info)
        
    print('Episode:{} Score:{}'.format(episode, score))
    # print(numbers2peptide(info["Peptide after step"]))
    print()
    
env.close()

[{'Peptide after step': [2, 19, 13, 0, 8], 'LogP_value': -6.777999999999981, 'TimeLimit.truncated': False}]
[{'Peptide after step': [1, 19, 13, 0, 8], 'LogP_value': -5.751029999999987, 'TimeLimit.truncated': False}]
[{'Peptide after step': [1, 19, 13, 0, 0], 'LogP_value': -4.669329999999999, 'TimeLimit.truncated': False}]
[{'Peptide after step': [1, 19, 13, 0, 3], 'LogP_value': -6.236829999999993, 'TimeLimit.truncated': False}]
[{'Peptide after step': [1, 2, 13, 0, 3], 'LogP_value': -10.071229999999966, 'TimeLimit.truncated': False}]
[{'Peptide after step': [1, 2, 13, 0, 2], 'LogP_value': -11.702029999999949, 'TimeLimit.truncated': False}]
[{'Peptide after step': [3, 2, 13, 0, 2], 'LogP_value': -11.098199999999958, 'TimeLimit.truncated': False}]
[{'Peptide after step': [3, 2, 2, 0, 2], 'LogP_value': -13.547399999999932, 'TimeLimit.truncated': False}]
[{'Peptide after step': [3, 3, 2, 0, 2], 'LogP_value': -11.916599999999951, 'TimeLimit.truncated': False}]
[{'Peptide after step': [3, 3,

In [50]:
env.close()

<h1> Train Model

In [122]:
log_path = os.path.join('Training', 'Logs')

In [143]:
# CustomCallback class
class CustomCallback(BaseCallback):
    def __init__(self, verbose=0, log_interval=10):
        super().__init__(verbose)
        self.log_interval = log_interval
        self.info_values = []

    def _on_step(self):
        if self.num_timesteps % self.log_interval == 0:
            peptide_after_step = globals()['chain']['Peptide after step']
            logP_value = globals()['chain']["LogP_value"]

            values = {"Peptide after step": peptide_after_step, "LogP_value": logP_value}
            self.info_values.append(values)
        return True


In [154]:
del model

NameError: name 'model' is not defined

In [155]:
model = PPO("MlpPolicy", env, verbose=2, tensorboard_log=log_path)
callback = CustomCallback()

Using cpu device


In [157]:
model.learn(total_timesteps= 500000, callback=callback)

Logging to Training\Logs\PPO_3
-----------------------------
| time/              |      |
|    fps             | 324  |
|    iterations      | 1    |
|    time_elapsed    | 6    |
|    total_timesteps | 2048 |
-----------------------------
------------------------------------------
| time/                   |              |
|    fps                  | 281          |
|    iterations           | 2            |
|    time_elapsed         | 14           |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0105977785 |
|    clip_fraction        | 0.0734       |
|    clip_range           | 0.2          |
|    entropy_loss         | -4.6         |
|    explained_variance   | -0.0035      |
|    learning_rate        | 0.0003       |
|    loss                 | 6.87         |
|    n_updates            | 10           |
|    policy_gradient_loss | -0.0196      |
|    value_loss           | 61.4         |
----------------------------

<stable_baselines3.ppo.ppo.PPO at 0x28b72ec73a0>

In [221]:
# Access the info values stored in the callback
stored_info_values = np.array(callback.info_values)
final_result = stored_info_values[-1]
final_chain = np.array(final_result['Peptide after step'])
# final_lopP = final_result['LogP_value']
final_chain_string = numbers2peptide(final_chain)

# Extract 'LogP_value' from each stored info value
logP_values = [result.get('LogP_value', 'Not found!') for result in stored_info_values]
chain = [result.get('Peptide after step', 'Not found!') for result in stored_info_values]


# Convert the list to a NumPy array if needed
logP_values_array = np.array(logP_values)
chain_array = np.array(chain)

df = pd.DataFrame(stored_info_values)

csv_path = os.path.join('Training', 'Outputs','model_output.csv')
df.to_csv(csv_path, index=False)




<h1> Save Model

In [None]:
model.save('PPO_3')

<h1> Mean Reward

In [None]:
# Assuming 'model' and 'env' are already defined
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10, render=True)

print(f"Mean Reward: {mean_reward}, Std Reward: {std_reward}")

<h1> Graphs

In [None]:
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
import matplotlib.pyplot as plt

In [None]:
logPath = os.path.join('Training', 'Logs', 'PPO_2')

# Load logs
event_acc = EventAccumulator(logPath)
event_acc.Reload()

In [None]:
# List all scalar tags in the event file
scalar_tags = event_acc.Tags()['scalars']
print(scalar_tags)

In [None]:
# Load logs
event_acc = EventAccumulator(logPath)
event_acc.Reload()

In [None]:
# Extract relevant information
timesteps = np.arange(10,250, 10)

# Create a 2x3 grid of subplots
fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(15, 8))

# Plot in the first subplot
rewards = [scalars.value for scalars in event_acc.Scalars('rollout/ep_rew_mean')]
axes[0, 0].plot(timesteps, rewards)
axes[0, 0].set_xlabel('Total Iterations completed')
axes[0, 0].set_ylabel('Average Episode Reward')
axes[0, 0].set_title('Training Progress')

# Plot in the first subplot
rewards = [scalars.value for scalars in event_acc.Scalars('train/entropy_loss')]
axes[0, 1].plot(timesteps, rewards)
axes[0, 1].set_xlabel('Total Iterations completed')
axes[0, 1].set_ylabel('Entropy Loss')
axes[0, 1].set_title('Training Progress')

# Plot in the first subplot
rewards = [scalars.value for scalars in event_acc.Scalars('train/explained_variance')]
axes[0, 2].plot(timesteps, rewards)
axes[0, 2].set_xlabel('Total Iterations completed')
axes[0, 2].set_ylabel('Variance')
axes[0, 2].set_title('Training Progress')

# Plot in the first subplot
rewards = [scalars.value for scalars in event_acc.Scalars('train/loss')]
axes[1, 0].plot(timesteps, rewards)
axes[1, 0].set_xlabel('Total Iterations completed')
axes[1, 0].set_ylabel('Loss')
axes[1, 0].set_title('Training Progress')

# Plot in the first subplot
rewards = [scalars.value for scalars in event_acc.Scalars('train/policy_gradient_loss')]
axes[1, 1].plot(timesteps, rewards)
axes[1, 1].set_xlabel('Total Iterations completed')
axes[1, 1].set_ylabel('Policy Gradient Loss')
axes[1, 1].set_title('Training Progress')

# Plot in the first subplot
rewards = [scalars.value for scalars in event_acc.Scalars('train/value_loss')]
axes[1, 2].plot(timesteps, rewards)
axes[1, 2].set_xlabel('Total Iterations completed')
axes[1, 2].set_ylabel('Value Loss')
axes[1, 2].set_title('Training Progress')

# Adjust layout
plt.tight_layout()
plt.savefig("Graph")
plt.show()

In [132]:
wh = globals()['info']['Peptide after step']

'Not found!'