In [8]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
from google.colab import drive
import os
import zipfile

# Step 1: Mount Google Drive
drive.mount('/content/drive')
from google.colab import drive
import os

# Path to your directory in Google Drive
data_path = '/content/drive/My Drive/'

# List the contents of the directory
print(os.listdir(data_path))

print("CSV files in the directory:")
for filename in os.listdir(data_path):
    if filename.endswith('.csv'):
        print(filename)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
['Screenshot 2024-07-18 at 5.25.47\u202fPM.png', 'Senior Year Transcript.pdf', 'His 072B', 'Paper 3-2.pdf', 'Paper 3-2.gdoc', 'SAS 002 Essay.gdoc', 'Successful Scheduling (2).png', 'Study Guide Wage Labor.gdoc', 'Colab Notebooks', 'Kidney_marker_gene.xlsx', 'History Paper 2.gdoc', 'History Paper 2 .docx', 'History Paper 2 with comments.docx', 'output_file.h5ad', 'SAS Essay 2.gdoc', 'bioinformaticscopilot_CosMx_v11.ipynb', 'HIS 072 .gdoc', 'ECS 17.gdoc', 'resume_v1.pdf', 'SAS 005 (1).gdoc', 'Untitled document (1).gdoc', 'ECS 17 Notes.gdoc', 'SAS 005_2.gdoc', 'SAS 005.gdoc', 'Copy of SAS 005.gdoc', 'DDSC Project proposal.gdoc', 'Untitled document.gdoc', '2023-08-21_Preprocess_Practice.ipynb', 'resume (2).pdf', 'resume (1).pdf', 'COM 4 Art Presentation.gdoc', 'COM 4.gdoc', 'SAS 5 essay 2.gdoc', 'resume.pdf', 'essay 2.gdoc', 'LOR.gdoc', 'Gantt chart.gsheet', 'Pro

In [7]:
!python3 -m pip install -U nupack --no-index -f "/content/drive/MyDrive/package"


Looking in links: /content/drive/MyDrive/package


In [12]:
pip install nuad gym stable-baselines3 torch numpy matplotlib

Collecting stable-baselines3
  Downloading stable_baselines3-2.6.0-py3-none-any.whl.metadata (4.8 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (

In [9]:
!apt-get update
!apt-get install vienna-rna


Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,926 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,245 kB]
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2

In [10]:
from dataclasses import dataclass
from typing import Optional
import argparse
import os
from typing import List, Tuple

import nuad.constraints as nc
import nuad.search as ns


# DNA sequence designer for a 2D canvas of single-stranded tiles (SSTs).
# See docstring for create_design for a detailed description of the design.

def main() -> None:
    args: CLArgs = parse_command_line_arguments()

    design = create_design(width=args.width, height=args.height)

    constraints = create_constraints(design)

    params = ns.SearchParameters(
        constraints=constraints,
        out_directory=args.directory,
        restart=args.restart,
        random_seed=args.seed,
        scrolling_output=False,
        save_report_for_all_updates=True,
        force_overwrite=args.force_overwrite,
        # log_time=True,
    )
    ns.search_for_sequences(design, params)


# command-line arguments
@dataclass
class CLArgs:
    directory: str
    """output directory for search"""

    restart: bool
    """whether to restart a stopped search"""

    width: int
    """width of SST canvas"""

    height: int
    """height of SST canvas"""

    seed: Optional[int] = None
    """seed for random number generator; set to fixed integer for reproducibility"""

    force_overwrite: bool = False
    """whether to overwrite output files without prompting the user"""


def parse_command_line_arguments() -> CLArgs:
    default_directory = os.path.join('output', ns.script_name_no_ext())

    parser = argparse.ArgumentParser(
        description='Designs DNA sequences for a canvas of single-stranded tiles (SSTs).',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)

    parser.add_argument('-o', '--output-dir', type=str, default=default_directory,
                        help='directory in which to place output files')

    parser.add_argument('-s', '--seed', type=int,
                        help='seed for random number generator')

    parser.add_argument('-w', '--width', type=int, required=True,
                        help='width of canvas (number of tiles)')

    parser.add_argument('-ht', '--height', type=int, required=True,
                        help='height of canvas (number of tiles)')

    parser.add_argument('-r', '--restart', action='store_true',
                        help='If true, then assumes output directory contains output of search that was '
                             'cancelled, to restart from. Will automatically find the most recent design '
                             '(assuming they are indexed with a number such as 84), and will start the '
                             'numbering from there (i.e., the next files to be written upon improving the '
                             'design will have index 85).')

    parser.add_argument('-f', '--force', action='store_true',
                        help='If true, then overwrites the output files without prompting the user.')

    args = parser.parse_args()

    return CLArgs(
        directory=args.output_dir,
        width=args.width,
        height=args.height,
        seed=args.seed,
        restart=args.restart,
        force_overwrite=args.force,
    )


def create_design(width: int, height: int) -> nc.Design:
    """
    Creates an SST canvas `width` tiles width and `height` tiles high.

    For instance a width=4 x height=3 canvas looks like below, with each tile named t_x_y.
    x goes from 0 up to `width`-1.
    y goes from 0 up to `height`-1.

    Domain lengths are listed at the top in the next figure.

    .. code-block:: none

        |     10    |     11      |     10     |     11      |     10     |     11      |     10     |

                                   +==========---===========>
                                   |         t_0_2
                                   +==========---===========]
                     +===========---==========> +===========---==========>
                     |          t_0_1           |          t_1_2
                     +===========---==========] +===========---==========]
        +==========---===========> +==========---===========> +==========---===========>
        |         t_0_0            |         t_1_1            |         t_2_2
        +==========---===========] +==========---===========] +==========---===========]
                     +===========---==========> +===========---==========> +===========---==========>
                     |          t_1_0           |          t_2_1           |          t_3_2
                     +===========---==========] +===========---==========] +===========---==========]
                                   +==========---===========> +==========---===========>
                                   |         t_2_0            |         t_3_1
                                   +==========---===========] +==========---===========]
                                                +===========---==========>
                                                |          t_3_0
                                                +===========---==========]

    :param width:
        number of tiles wide to make canvas
    :param height:
        number of tiles high to make canvas
    :return:
        design with `width` x `height` canvas of SSTs
    """
    numpy_filters = [
        nc.NearestNeighborEnergyFilter(-9.3, -9.0, 52.0),  # energies should all be "close"
        nc.RunsOfBasesFilter(['C', 'G'], 4),  # forbid substrings of form {C,G}^4
        nc.ForbiddenSubstringFilter(['AAAAA', 'TTTTT']),  # forbid 5 A's in a row or 5 T's in a row
    ]

    domain_pool_10 = nc.DomainPool(f'length-10_domains', 10, numpy_filters=numpy_filters)
    domain_pool_11 = nc.DomainPool(f'length-11_domains', 11, numpy_filters=numpy_filters)

    design = nc.Design()

    for x in range(width):
        for y in range(height):
            # domains are named after the strand for which they are on the bottom,
            # so the two domains on top are starred and named after the tiles to which they bind
            # If you tilt your head 45 degrees left, then glues are
            # "north" (n), "south" (s), "west" (w), "east" (e),
            # so start with either ns_ ("north-south") or we_ ("west-east")
            # From 5' ] to 3' >, they are in the order s, w, n, e.
            #
            # Parity of x+y determines whether first and last domain (s and e) are length 11 or 10.
            #
            # e.g. this is tile t_3_5:
            #
            #          10           11
            #     +==========--===========>
            #     |  ns_2_5*     we_3_6*
            #     |
            #     |  we_3_5      ns_3_5
            #     +==========--===========]
            #
            # and this is tile t_6_1:
            #
            #          11           10
            #     +===========--==========>
            #     |  ns_5_1*      we_6_2*
            #     |
            #     |  we_6_1       ns_6_1
            #     +===========--==========]

            s_domain_name = f'ns_{x}_{y}'
            w_domain_name = f'we_{x}_{y}'
            n_domain_name = f'ns_{x - 1}_{y}*'
            e_domain_name = f'we_{x}_{y + 1}*'
            tile = design.add_strand(
                domain_names=[s_domain_name, w_domain_name, n_domain_name, e_domain_name], name=f't_{x}_{y}')

            if (x + y) % 2 == 0:
                outer_pool = domain_pool_11
                inner_pool = domain_pool_10
            else:
                outer_pool = domain_pool_10
                inner_pool = domain_pool_11

            s_domain, w_domain, n_domain, e_domain = tile.domains
            if not s_domain.has_pool():
                s_domain.pool = outer_pool
            if not e_domain.has_pool():
                e_domain.pool = outer_pool
            if not n_domain.has_pool():
                n_domain.pool = inner_pool
            if not w_domain.has_pool():
                w_domain.pool = inner_pool

    return design


@dataclass
class Thresholds:
    temperature: float = 52.0
    """Temperature in Celsius"""

    tile_ss: float = -1.5
    """NUPACK complex free energy threshold for individual tiles."""

    tile_pair_0comp: float = -2.5
    """RNAduplex complex free energy threshold for pairs tiles with no complementary domains."""

    tile_pair_1comp: float = -6.5
    """RNAduplex complex free energy threshold for pairs tiles with 1 complementary domain."""


def create_constraints(design: nc.Design) -> List[nc.Constraint]:
    thresholds = Thresholds()

    strand_individual_ss_constraint = nc.nupack_strand_free_energy_constraint(
    threshold=thresholds.tile_ss, temperature=thresholds.temperature, short_description='StrandSS')

    strand_pairs_rna_duplex_constraint_0comp, strand_pairs_rna_duplex_constraint_1comp = \
    nc.rna_duplex_strand_pairs_constraints_by_number_matching_domains(
        thresholds={0: thresholds.tile_pair_0comp,
                    1: thresholds.tile_pair_1comp},
        temperature=thresholds.temperature,
        short_descriptions={0: 'StrandPairRNA0Comp',
                            1: 'StrandPairRNA1Comp'},
        strands=design.strands,
        ignore_missing_thresholds=True,   # ← allow missing k’s
    )

    no_gggg_constraint = create_tile_no_gggg_constraint(weight=100)

    return [
        strand_individual_ss_constraint,
        strand_pairs_rna_duplex_constraint_0comp,
        strand_pairs_rna_duplex_constraint_1comp,
        no_gggg_constraint,
    ]


def create_tile_no_gggg_constraint(weight: float) -> nc.StrandConstraint:
    # This shows how one might make a custom constraint, in case those in dsd.constraints are not
    # sufficient. See also source code of provided constraints in dsd/constraints.py for more examples,
    # particularly for examples that call NUPACK or ViennaRNA.

    # We already forbid GGGG in any domain, but let's also ensure we don't get GGGG in any strand
    # i.e., forbid GGGG that comes from concatenating domains, e.g.,
    #
    #               *  ***
    #      ACGATCGATG  GGGATGCATGA
    #     +==========--===========>
    #     |
    #     +==========--===========]

    def evaluate(seqs: Tuple[str, ...], strand: Optional[nc.Strand]) -> nc.Result:  # noqa
        sequence = seqs[0]
        if 'GGGG' in sequence:
            result = nc.Result(excess=1.0, summary=f'GGGG found in {sequence}', value=sequence.count('GGGG'))
        else:
            result = nc.Result(excess=0.0, summary=f'', value=0)
        return result

    description = "No GGGG allowed in strand's sequence"

    return nc.StrandConstraint(description=description, short_description='NoGGGG',
                               weight=weight, evaluate=evaluate)


#if __name__ == '__main__':
#    main()

In [None]:
!wget https://raw.githubusercontent.com/UC-Davis-molecular-computing/nuad/main/examples/sst_canvas.py -O sst_canvas.py


--2025-05-09 22:46:55--  https://raw.githubusercontent.com/UC-Davis-molecular-computing/nuad/main/examples/sst_canvas.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10674 (10K) [text/plain]
Saving to: ‘sst_canvas.py’


2025-05-09 22:46:55 (80.2 MB/s) - ‘sst_canvas.py’ saved [10674/10674]



In [None]:
!python sst_canvas.py \
  --width 4 \
  --height 3 \
  --output-dir ./demo_output \
  --seed 42


using random seed of 42; use this same seed to reproduce this run
number of processes in system: 1
|-----------|--------|-----------|-----------|----------|--------------------|--------------------|--------|
| iteration | update | opt score | new score | StrandSS | StrandPairRNA0Comp | StrandPairRNA1Comp | NoGGGG |
|      2793 |    133 |     0.212 |     0.786 |    0.209 |            0.00206 |           0.001000 |    0.0 |
We've generated all possible DNA sequences at Hamming distance 1 
from the previous sequence TCTCTAGGGA and not found one that passed your 
NumpyFilters and SequenceFilters. Trying another distance.
|      3219 |    137 |     0.203 |     0.203 |    0.200 |            0.00206 |            0.00100 |    0.0 |
We've generated all possible DNA sequences at Hamming distance 1 
from the previous sequence TTCGTTTTGTA and not found one that passed your 
NumpyFilters and SequenceFilters. Trying another distance.
|      3328 |    137 |     0.203 |     0.204 |    0.200 |         

In [None]:
#from sst_designer import create_design, create_constraints
import nuad.search as ns

# 1) build a tiny 1×1 design
design = create_design(1, 1)
constraints = create_constraints(design)   # no more ValueError!


# 2) point search_for_sequences at a temp folder
params = ns.SearchParameters(
    constraints=constraints,
    out_directory='tmp_1x1',
    random_seed=42,
    scrolling_output=True,         # live stdout of progress
    save_report_for_all_updates=True,
    force_overwrite=True
)
ns.search_for_sequences(design, params)


## Simulate a Mock Environment

In [13]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import torch as th
import torch.nn as nn
from stable_baselines3 import PPO
from stable_baselines3.common.policies import ActorCriticPolicy

# Mock NUAD Environment for development purposes
class MockNUADEnv(gym.Env):
    def __init__(self, sequence_length=30, num_constraints=5):
        super().__init__()

        self.sequence_length = sequence_length
        self.num_constraints = num_constraints

        # Define observation space: sequence (one-hot encoded) and constraint values
        # Nucleotides represented as integers 0-3 (A, C, G, T)
        self.observation_space = spaces.Dict({
            'sequence': spaces.Box(low=0, high=1, shape=(4, sequence_length), dtype=np.float32),
            'constraint_values': spaces.Box(low=-10, high=10, shape=(num_constraints,), dtype=np.float32)
        })

        # Action space: (position, new_nucleotide)
        self.action_space = spaces.MultiDiscrete([sequence_length, 4])

        # Internal state
        self.current_sequence = None
        self.current_constraints = None

    def reset(self, seed=None):
        super().reset(seed=seed)

        # Generate random sequence
        self.current_sequence = np.random.randint(0, 4, size=self.sequence_length)

        # Generate random constraint values
        self.current_constraints = np.random.uniform(-5, 5, size=self.num_constraints)

        # Return observation
        return self._get_observation(), {}

    def step(self, action):
        # Unpack action
        position, nucleotide = action

        # Apply action
        old_nucleotide = self.current_sequence[position]
        self.current_sequence[position] = nucleotide

        # Calculate simple reward based on position and nucleotide
        # In the real environment, this would come from NUAD's constraint evaluation
        reward_change = 0

        # Simulate constraints changing based on the nucleotide change
        for i in range(self.num_constraints):
            # Random but deterministic change to constraints
            old_value = self.current_constraints[i]
            # More realistic: different constraints affected differently by the change
            if i % 4 == nucleotide:
                change = -0.5  # Improvement
            elif i % 4 == old_nucleotide:
                change = 0.3   # Worsening
            else:
                change = 0.1 * np.sin(position * i)  # Small change

            self.current_constraints[i] += change
            reward_change -= change  # Negative because lower constraint value is better

        # Get new observation
        observation = self._get_observation()

        # Determine if episode is done (simplified)
        done = all(c < -4.0 for c in self.current_constraints)

        # Additional info
        info = {
            'constraint_values': self.current_constraints,
            'constraint_improvement': reward_change
        }

        return observation, reward_change, done, False, info

    def _get_observation(self):
        # Convert sequence to one-hot encoding
        one_hot = np.zeros((4, self.sequence_length), dtype=np.float32)
        for i, nucleotide in enumerate(self.current_sequence):
            one_hot[nucleotide, i] = 1

        return {
            'sequence': one_hot,
            'constraint_values': self.current_constraints.astype(np.float32)
        }

    def render(self):
        # Simple rendering of the current sequence
        nucleotides = ['A', 'C', 'G', 'T']
        sequence_str = ''.join(nucleotides[n] for n in self.current_sequence)
        print(f"Sequence: {sequence_str}")
        print(f"Constraints: {self.current_constraints}")

## A custom policy network

In [14]:
class NUADFeatureExtractor(nn.Module):
    """
    Custom feature extractor for NUAD sequences and constraints
    """
    def __init__(self, observation_space):
        super().__init__()

        # Extract dimensions from observation space
        self.sequence_length = observation_space['sequence'].shape[1]
        self.num_constraints = observation_space['constraint_values'].shape[0]

        # CNN for sequence processing
        self.sequence_features = nn.Sequential(
            nn.Conv1d(4, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv1d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(output_size=8),  # Reduce to fixed size
            nn.Flatten()
        )

        # MLP for constraint processing
        self.constraint_features = nn.Sequential(
            nn.Linear(self.num_constraints, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU()
        )

        # Combined features
        sequence_features_size = 64 * 8  # channels * adaptive pooling size
        self.combined_layer = nn.Sequential(
            nn.Linear(sequence_features_size + 32, 128),
            nn.ReLU()
        )

        # Feature dimension output for SB3
        self.features_dim = 128

    def forward(self, observations):
        # Process sequence features (CNN)
        seq_features = self.sequence_features(observations['sequence'])

        # Process constraint features (MLP)
        constraint_features = self.constraint_features(observations['constraint_values'])

        # Combine features
        combined = th.cat([seq_features, constraint_features], dim=1)
        return self.combined_layer(combined)

class NUADPolicy(ActorCriticPolicy):
    """
    Custom policy for NUAD environment
    """
    def __init__(self, *args, **kwargs):
        # Pass our custom feature extractor to the parent class
        kwargs["features_extractor_class"] = NUADFeatureExtractor
        super().__init__(*args, **kwargs)

## A simplified training script

In [15]:
def train_nuad_policy(total_timesteps=100000):
    # Create environment
    env = MockNUADEnv(sequence_length=30, num_constraints=5)

    # Initialize PPO model with custom policy
    model = PPO(
        NUADPolicy,
        env,
        learning_rate=3e-4,
        n_steps=1024,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        verbose=1,
        tensorboard_log="./nuad_ppo_tensorboard/"
    )

    # Train the model
    model.learn(total_timesteps=total_timesteps)

    # Save the model
    model.save("nuad_policy_model")

    return model

def evaluate_policy(model, env, num_episodes=10):
    """Evaluate the trained policy"""
    rewards = []

    for episode in range(num_episodes):
        obs, _ = env.reset()
        done = False
        total_reward = 0
        steps = 0

        while not done and steps < 100:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, _, info = env.step(action)
            total_reward += reward
            steps += 1

            # Print progress
            if episode == 0:
                print(f"Step {steps}: Action={action}, Reward={reward:.4f}")
                env.render()

        rewards.append(total_reward)
        print(f"Episode {episode+1}: Total Reward={total_reward:.4f}, Steps={steps}")

    print(f"Average Reward over {num_episodes} episodes: {np.mean(rewards):.4f}")
    return rewards

if __name__ == "__main__":
    # Train policy
    model = train_nuad_policy(total_timesteps=10000)  # Small number for testing

    # Evaluate policy
    env = MockNUADEnv()
    evaluate_policy(model, env)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ./nuad_ppo_tensorboard/PPO_1
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 118      |
|    ep_rew_mean     | 39       |
| time/              |          |
|    fps             | 523      |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 1024     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 133         |
|    ep_rew_mean          | 45.5        |
| time/                   |             |
|    fps                  | 236         |
|    iterations           | 2           |
|    time_elapsed         | 8           |
|    total_timesteps      | 2048        |
| train/                  |             |
|    approx_kl            | 0.014598599 |
|    clip_fraction        | 0.193       |
|    clip_range       

## A realistic reward policy


In [16]:
def realistic_nuad_reward(old_sequence, new_sequence, position, new_nucleotide):
    """
    More realistic reward function that mimics NUAD constraints

    This is a placeholder for NUAD's actual constraint evaluation
    """
    reward = 0

    # Example constraint types to simulate:

    # 1. Base pairing stability (GC pairs are more stable than AT pairs)
    # Check if we've created or broken complementary base pairs
    for i in range(len(old_sequence)):
        if i != position and i % 10 == (position % 10):  # Simulate complementary positions
            # GC pair is worth more than AT pair
            old_pair_value = 2 if (old_sequence[position] + old_sequence[i]) % 3 == 0 else 1
            new_pair_value = 2 if (new_nucleotide + old_sequence[i]) % 3 == 0 else 1
            reward += new_pair_value - old_pair_value

    # 2. Sequence diversity (avoid repetitive sequences)
    old_window = old_sequence[max(0, position-2):min(len(old_sequence), position+3)]
    new_window = old_sequence.copy()
    new_window[position] = new_nucleotide
    new_window = new_window[max(0, position-2):min(len(old_sequence), position+3)]

    old_diversity = len(set(old_window))
    new_diversity = len(set(new_window))
    reward += 0.5 * (new_diversity - old_diversity)

    # 3. Target structure match (simplified)
    # Simulate a target structure as alternating nucleotides
    if position % 2 == 0 and new_nucleotide % 2 == 0:
        reward += 0.5
    elif position % 2 == 1 and new_nucleotide % 2 == 1:
        reward += 0.5

    return reward

# Update the step method in MockNUADEnv to use this reward function
def step(self, action):
    # Unpack action
    position, nucleotide = action

    # Save old sequence for reward calculation
    old_sequence = self.current_sequence.copy()

    # Apply action
    self.current_sequence[position] = nucleotide

    # Calculate reward using the more realistic function
    reward = realistic_nuad_reward(old_sequence, self.current_sequence, position, nucleotide)

    # Update constraints based on the reward (to keep tracking progress)
    for i in range(self.num_constraints):
        self.current_constraints[i] -= reward / self.num_constraints

    # Get new observation
    observation = self._get_observation()

    # Determine if done (when all constraints are satisfied)
    done = all(c < -4.0 for c in self.current_constraints)

    # Additional info
    info = {
        'constraint_values': self.current_constraints,
        'improvement': reward
    }

    return observation, reward, done, False, info

## Step 6: Prepare for Integration with the Real NUAD Environment


In [17]:
class RLPolicyManager:
    """
    Manager class for RL policies in NUAD

    This will provide a clean interface between your RL implementation
    and the actual NUAD environment when it's ready.
    """
    def __init__(self, model_path=None):
        self.model = None
        if model_path:
            self.load_model(model_path)

    def train(self, env, total_timesteps=100000, save_path="nuad_policy_model"):
        """Train a new policy on the provided environment"""
        self.model = PPO(
            NUADPolicy,
            env,
            learning_rate=3e-4,
            n_steps=1024,
            batch_size=64,
            n_epochs=10,
            gamma=0.99,
            verbose=1,
            tensorboard_log="./nuad_rl_logs/"
        )

        self.model.learn(total_timesteps=total_timesteps)

        if save_path:
            self.model.save(save_path)

        return self.model

    def load_model(self, model_path):
        """Load a trained policy"""
        self.model = PPO.load(model_path)
        return self.model

    def predict(self, observation):
        """Predict action based on observation"""
        if self.model is None:
            raise ValueError("No model loaded. Call train() or load_model() first.")

        action, _ = self.model.predict(observation, deterministic=True)
        return action

    def search_for_sequences(self, env, max_steps=100):
        """
        Run the policy to search for sequences

        This mimics the interface of NUAD's search_for_sequences function
        """
        if self.model is None:
            raise ValueError("No model loaded. Call train() or load_model() first.")

        # Initialize environment
        observation, _ = env.reset()

        # Track sequence history
        history = [env.current_sequence.copy()]
        best_reward = float('-inf')
        best_sequence = env.current_sequence.copy()

        # Run policy until done or max steps
        for step in range(max_steps):
            action, _ = self.model.predict(observation, deterministic=False)  # Use stochasticity for exploration
            observation, reward, done, _, info = env.step(action)

            # Save history
            history.append(env.current_sequence.copy())

            # Track best sequence
            if reward > best_reward:
                best_reward = reward
                best_sequence = env.current_sequence.copy()

            if done:
                break

        # Return results in a format similar to NUAD
        result = {
            'sequence': best_sequence,
            'history': history,
            'reward': best_reward,
            'steps': step + 1,
            'done': done
        }

        return result

## Step 7: Add Visualization for Training Progress

In [23]:
import matplotlib.pyplot as plt
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.results_plotter import load_results, ts2xy

class TrainingVisualizationCallback(BaseCallback):
    """
    Callback for saving and visualizing training progress
    """
    def __init__(self, log_dir, verbose=1):
        super().__init__(verbose)
        self.log_dir = log_dir
        self.rewards = []
        self.constraint_values = []

    def _on_step(self):
        # Record rewards and constraint values
        if len(self.model.ep_info_buffer) > 0 and len(self.model.ep_info_buffer[-1]) > 0:
            self.rewards.append(self.model.ep_info_buffer[-1]["r"])

        # Visualize every 1000 steps
        if self.n_calls % 1000 == 0:
            self.visualize_progress()

        return True

    def visualize_progress(self):
        """Visualize training progress"""
        # Create figure
        fig, ax = plt.subplots(figsize=(10, 5))

        # Plot rewards
        if len(self.rewards) > 0:
            ax.plot(self.rewards, label='Episode Rewards')
            ax.set_xlabel('Episodes')
            ax.set_ylabel('Reward')
            ax.set_title(f'Training Progress (Step {self.n_calls})')
            ax.legend()

        # Save figure
        plt.savefig(f"{self.log_dir}/progress_{self.n_calls}.png")
        plt.close(fig)