# Generation of synthetic promoters for *C. cinerea* from upstream sequences
## Purpose
This notebook extracts upstream sequences from the *C. cinerea* genome and trains a neural generator. It then generates synthetic promoters based on this generator.
## Extraction of upstream sequences

In [4]:
#load required libraries
import pandas as pd
from Bio import SeqIO, AlignIO, BiopythonParserWarning
import shutil
import subprocess
import os
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
import os
import sys
import warnings
import torch
import sys, pathlib, importlib

os.chdir("..") #change directory to root for relative improrts
proj_root = pathlib.Path(r"c:\Users\alexa\OneDrive - Danmarks Tekniske Universitet\27460 - Syntesebiologi\Exam\27460_synthetic_promoters").resolve()
sys.path.insert(0, str(proj_root))

The custom function to extract upstream sequences from and annotated genome is imported from `utils` module

In [5]:
#import from utils module
from src.utils import extract_upstream_sequences, one_hot_encode_df_to_tensor

In [3]:
warnings.filterwarnings("ignore", category=BiopythonParserWarning) #ignore

upstream_seq_df = extract_upstream_sequences("genome/amutbmut.gbk", 1000, ["CDS"])

In [4]:
print("Length of upstream sequences dataframe:", len(upstream_seq_df))
print("first 5 rows:\n", upstream_seq_df.head(5))

Length of upstream sequences dataframe: 16862
first 5 rows:
             id                                           sequence
0  CC2G_000001  CTCCCAAAAAGCGTAAGTCCTATTCTCTTTCTACTACTATCTTTTG...
1  CC2G_000002  CTTGGCCTTAGTGACAACACCTTGTTCGCCACTGCTCTATCTAACT...
2  CC2G_000003  CAAGCTCTACCGGCGAAGTGATTTGCCAATTCTTCTGTTGCCGGCG...
3  CC2G_000004  TTCAAGTGTTTTATCACAGTATCTAACATCATGACTTCACCGATGG...
4  CC2G_000005  AGACTAGCAAAGACTATAAAACAAACGGAATACATGCGCATGATAC...


## Encoding to tensor with one-hot encoding
We now encode the extracted sequences to a tensor in preparation for machine learning

In [5]:
seq_tensor:torch.Tensor = one_hot_encode_df_to_tensor(upstream_seq_df, 1000, "sequence")
print(seq_tensor.shape)

torch.Size([16862, 4, 1000])


The encoded tensor has the expected size 

## Preparation

### GPU initialization and status

In [6]:
if torch.cuda.is_available():
    dev = torch.device('cuda')
    print("CUDA available. Device:", torch.cuda.current_device(), "-", torch.cuda.get_device_name(0))
else:
    dev = torch.device('cpu')
    print("CUDA not available")

if dev.type == 'cuda':
    torch.cuda.synchronize()
    print("GPU forward/backward synchronized")

CUDA available. Device: 0 - NVIDIA GeForce RTX 3060 Laptop GPU
GPU forward/backward synchronized


### Neural network setup
A model similar to the one used in [Synthetic-promoters-via-GAN repo](https://github.com/TokaMamdoh/Synthetic-promoters-via-GAN) is utilized. Our model is simpler and runs for around 100 epochs compared to 500 used in the original project.

In [6]:
from src.utils import Generator, Critic, SelfAttentionLayer, Initialize_weights, GradientPenalty

### Training

In [None]:
log_interval = 50 # for logging to check for convergence
loss_critic_history = []
loss_gen_history = []
loss_epoch_history = []

import torch
from torch.utils.data import TensorDataset, DataLoader

class WGANGPLossD:
    @staticmethod
    def Wasserstein(critic_real, critic_fake):
        # critic_real, critic_fake are 1D tensors (batch of scores)
        # WGAN critic loss = E[fake] - E[real]
        return critic_fake.mean() - critic_real.mean()

class WGANGPLossG:
    @staticmethod
    def Wasserstein(critic_fake):
        # generator loss = -E[critic(fake)]
        return -critic_fake.mean()

# instantiate used names
lossD = WGANGPLossD()
lossG = WGANGPLossG()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

assert 'seq_tensor' in globals(), "seq_tensor not found; run encoding cell"
seq_channels = int(seq_tensor.shape[1])
seq_len = int(seq_tensor.shape[2])

Batch_size = globals().get('Batch_size', 32)
noise_dim = globals().get('noise_dim', 100)
features_gen = globals().get('features_gen', 64)
features_critic = globals().get('features_cirtic', globals().get('features_critic', 64))
num_epochs = globals().get('num_epochs', 10)
Critic_iterations = globals().get('Critic_iterations', 5)
Learning_rate = globals().get('Learning_rate', 1e-4)
Lamda_GP = globals().get('Lamda_GP', 10.0)

assert 'lossD' in globals() and 'lossG' in globals(), "Define lossD and lossG before training"

dataloader = DataLoader(TensorDataset(seq_tensor), batch_size=Batch_size, shuffle=True, drop_last=True)

gen = Generator(noise_dim, features_gen, SelfAttentionLayer, out_channels=4, target_len=seq_len).to(device)
critic = Critic(seq_channels, features_critic, SelfAttentionLayer).to(device)

Initialize_weights(gen)
Initialize_weights(critic)

opt_gen = torch.optim.Adam(gen.parameters(), lr=Learning_rate, betas=(0.0, 0.9))
opt_critic = torch.optim.Adam(critic.parameters(), lr=Learning_rate, betas=(0.0, 0.9))

def ensure_length_match(fake, real):
    if fake.size(2) != real.size(2):
        fake = torch.nn.functional.interpolate(fake, size=real.size(2), mode='linear', align_corners=False)
    return fake

def GradientPenalty(critic, real, fake, device):
    B = real.size(0)
    alpha = torch.rand(B, 1, 1, device=device)
    alpha = alpha.expand_as(real)
    interpolated = (alpha * real + (1 - alpha) * fake).requires_grad_(True)
    mixed_scores = critic(interpolated)
    scalar_score = mixed_scores.sum()
    grad = torch.autograd.grad(outputs=scalar_score, inputs=interpolated,
                               create_graph=True, retain_graph=True, only_inputs=True)[0]
    grad = grad.view(B, -1)
    grad_norm = grad.norm(2, dim=1)
    return ((grad_norm - 1) ** 2).mean()

# quick forward sanity check
with torch.no_grad():
    test_noise = torch.randn(2, noise_dim, 1, device=device)
    test_fake = gen(test_noise)
print("gen output shape:", test_fake.shape, "critic(gen) shape:", critic(test_fake).shape)

for epoch in range(num_epochs):
    for batch_idx, (real_batch,) in enumerate(dataloader):
        real = real_batch.to(device).float()

        # Train critic multiple times
        for _ in range(Critic_iterations):
            noise = torch.randn(real.size(0), noise_dim, 1, device=device)
            fake = gen(noise)
            fake = ensure_length_match(fake, real)

            critic_real = critic(real).reshape(-1)
            critic_fake = critic(fake.detach()).reshape(-1)

            gp = GradientPenalty(critic, real, fake.detach(), device)
            loss_critic = lossD.Wasserstein(critic_real, critic_fake) + Lamda_GP * gp

            opt_critic.zero_grad()
            loss_critic.backward()
            opt_critic.step()

        # Train generator once
        noise = torch.randn(real.size(0), noise_dim, 1, device=device)
        fake = gen(noise)
        fake = ensure_length_match(fake, real)

        critic_fake_for_gen = critic(fake).reshape(-1)
        loss_gen = lossG.Wasserstein(critic_fake_for_gen)

        opt_gen.zero_grad()
        loss_gen.backward()
        opt_gen.step()
        if batch_idx % log_interval == 0:
            loss_critic_history.append(loss_critic.item())
            loss_gen_history.append(loss_gen.item())
            loss_epoch_history.append(epoch)
        if batch_idx % 526 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}] Batch {batch_idx+1}/{len(dataloader)} "
                  f"Loss D: {loss_critic.item():.4f}, Loss G: {loss_gen.item():.4f}")

gen output shape: torch.Size([2, 4, 1000]) critic(gen) shape: torch.Size([2, 1])
Epoch [1/10] Batch 1/526 Loss D: 9.9993, Loss G: 0.0114
Epoch [1/10] Batch 16/526 Loss D: 5.7286, Loss G: 0.0609
Epoch [1/10] Batch 31/526 Loss D: -0.6563, Loss G: 0.1016
Epoch [1/10] Batch 46/526 Loss D: -2.1982, Loss G: 0.1298
Epoch [1/10] Batch 61/526 Loss D: -2.9431, Loss G: 0.2925
Epoch [1/10] Batch 76/526 Loss D: -4.3895, Loss G: 1.1785
Epoch [1/10] Batch 91/526 Loss D: -7.4009, Loss G: 3.8680
Epoch [1/10] Batch 106/526 Loss D: -9.5030, Loss G: 7.9252
Epoch [1/10] Batch 121/526 Loss D: -10.3180, Loss G: 11.4068
Epoch [1/10] Batch 136/526 Loss D: -11.8043, Loss G: 15.4467
Epoch [1/10] Batch 151/526 Loss D: -14.7317, Loss G: 17.9009
Epoch [1/10] Batch 166/526 Loss D: -14.7316, Loss G: 6.5726
Epoch [1/10] Batch 181/526 Loss D: -26.1685, Loss G: 32.0439
Epoch [1/10] Batch 196/526 Loss D: -27.8054, Loss G: 41.9845
Epoch [1/10] Batch 211/526 Loss D: -12.5567, Loss G: 51.0472
Epoch [1/10] Batch 226/526 Loss

In [None]:
#save the model and loss history for analysis
torch.save({
    "epoch": epoch,
    "gen_state_dict": gen.state_dict(),
    "critic_state_dict": critic.state_dict(),
    "opt_gen_state_dict": opt_gen.state_dict(),
    "opt_critic_state_dict": opt_critic.state_dict(),
    "loss_gen": loss_gen.item(),
    "loss_critic": loss_critic.item(),
    "loss_gen_history": loss_gen_history,
    "loss_critic_history": loss_critic_history,
    "loss_epoch_history": loss_epoch_history
}, "models/wgan_gp_100_epochs.pth")

df_losses = pd.DataFrame({
    "epoch": loss_epoch_history,
    "loss_critic": loss_critic_history,
    "loss_gen": loss_gen_history
})
print(df_losses.head())

## Generating sequences with the model