In [None]:
import pandas as pd
import numpy as np
import re
import tensorflow as tf
import os
from sklearn.metrics import confusion_matrix
from keras.optimizers import Adam
import plotly.graph_objects as go
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from keras.layers import Flatten, Dropout,Reshape
from Bio import SeqIO
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')
# example of training a gan on mnist
from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy import vstack
from numpy.random import randn
from numpy.random import randint
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Conv1D, Conv2D,MaxPooling1D
from tensorflow.keras.layers import Convolution1D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.models import Model
from keras.callbacks import EarlyStopping
from keras.layers import BatchNormalization
from skimage.transform import resize
from tensorflow.keras.layers import Embedding
from matplotlib import pyplot
from tensorflow.keras.preprocessing.sequence import pad_sequences


# Read the FASTA file and extract the sequences

In [None]:
with open('C:/data/omicron800.fasta') as fasta_file:
    identifiers = []
    lengths = []
    for seq_record in SeqIO.parse(fasta_file, 'fasta'):  # (generator)
        identifiers.append(str(seq_record.seq))
        lengths.append(len(seq_record.seq))

d = {'sequences': identifiers, 'len': lengths}
data = pd.DataFrame(d)
data['label'] = "0"

allSeq = data.sequences

nucleotides = ['A', 'G', 'C', 'T', 'N']
# Convert sequences from strings to lists of integers
allSeq_int = []
for seq in allSeq:
    seq_int = [nucleotides.index(base) for base in seq if base in nucleotides]
    allSeq_int.append(seq_int)

# Perform padding to a fixed length
max_length = 29912  # Set the desired fixed length for sequences
padded_sequences = pad_sequences(allSeq_int, maxlen=max_length, padding='post', truncating='post')

# Initialize an empty array to store the one-hot encoded data

onehot_dict = {nucleotide: np.eye(len(nucleotides))[i] for i, nucleotide in enumerate(nucleotides)}
onehot_data = np.zeros((len(padded_sequences), max_length, len(nucleotides)), dtype=np.int8)

# Perform one-hot encoding for each sequence in padded_sequences
for i, seq in enumerate(padded_sequences):
    for j, base in enumerate(seq):
        onehot_data[i, j] = onehot_dict[nucleotides[base]]

# Save the one-hot encoded data and the labels into separate X and y files
X_file = 'X.npy'  # File name for the input data (one-hot encoded sequences)
y_file = 'y.npy'  # File name for the labels
np.save(X_file, onehot_data)
np.save(y_file, np.array(data['label']))

omicron800=np.savez_compressed('omicron800.dat', x=onehot_data , y=np.array(data['label']))

totalTrainData =np.load('omicron800.dat.npz',allow_pickle=True)

x=totalTrainData['x']
y=totalTrainData['y']

print(x.shape)
print(y.shape)

print("Shape of one-hot encoded data:", onehot_data.shape)
print("Shape of labels:", np.array(data['label']).shape)
print("X file saved as:", X_file)
print("y file saved as:", y_file)


In [None]:
from tensorflow.keras.utils import to_categorical

xtrain,xtest, ytrain,ytest=train_test_split(x, y, test_size=0.2)
ytrain1=ytrain
ytest1=ytest
ytrain = to_categorical(ytrain1,1)
ytest = to_categorical(ytest1,1)
xtrain.shape

#define_discriminator

In [None]:
in_shape=(29912, 5)
n_classes = 1

def define_discriminator(in_shape=(29912, 5)):
    # input
    in_image = Input(shape=in_shape)

    # feature extraction
    fe = Conv1D(64, 5, padding='same', input_shape=in_shape[1:])(in_image)
    fe = LeakyReLU(alpha=0.1)(fe)
    fe = Conv1D(64, 5, padding='same')(fe)
    fe = LeakyReLU(alpha=0.1)(fe)

    # flatten feature maps
    fe = Flatten()(fe)

    # dropout
    fe = Dropout(0.2)(fe)

    # output
    out_layer = Dense(1, activation='sigmoid')(fe)

    # define model
    model = Model(in_image, out_layer)

    # compile model
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
    return model


#define_generator

In [None]:
def define_generator(latent_dim):
    in_lat = Input(shape=(latent_dim,))

    # generator network
    gen = Dense(320)(in_lat)
    gen = LeakyReLU(alpha=0.2)(gen)
    gen = Dense(29912 * 5)(gen)
    gen = Reshape((29912, 5))(gen)

    out_layer = Conv1D(5, 1, activation='tanh', padding='same')(gen)

    model = Model(in_lat, out_layer)
    return model


In [None]:
def define_gan(g_model, d_model):
    # make weights in the discriminator not trainable
    d_model.trainable = False

    # get noise input from generator model
    gen_noise = g_model.input

    # get image output from the generator model
    gen_output = g_model.output

    # connect image output from generator as input to discriminator
    gan_output = d_model(gen_output)

    # define gan model as taking noise and outputting a classification
    model = Model(gen_noise, gan_output)

    # compile model
    opt = Adam()
    model.compile(loss='binary_crossentropy', optimizer=opt)
    return model


#Sample Generation Functions

In [None]:
def generate_real_samples(dataset, n_samples):
    xdata, _ = dataset  # only images
    ix = np.sort(randint(0, xdata.shape[0], n_samples))
    X = xdata[ix]
    y = ones((n_samples, 1))
    return X, y

def generate_latent_points(latent_dim, n_samples):
    x_input = randn(latent_dim * n_samples)
    z_input = x_input.reshape(n_samples, latent_dim)
    return z_input

def generate_fake_samples(generator, latent_dim, n_samples):
    z_input = generate_latent_points(latent_dim, n_samples)
    images = generator.predict(z_input)
    y = zeros((n_samples, 1))
    return images, y


#Training Function

In [None]:
def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=100, n_batch=100):
    half_batch = int(n_batch / 2)
    d_real_loss_epoch = []
    d_fake_loss_epoch = []
    g_loss_epoch = []

    for i in range(n_epochs):
        # get randomly selected 'real' samples
        X_real, y_real = generate_real_samples(dataset, half_batch)
        # update discriminator model weights
        d_loss1, _ = d_model.train_on_batch(X_real, y_real)
        # generate 'fake' examples
        X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
        # update discriminator model weights
        d_loss2, _ = d_model.train_on_batch(X_fake, y_fake)

        # prepare points in latent space as input for the generator
        z_input = generate_latent_points(latent_dim, n_batch)

        y_gan = ones((n_batch, 1))
        g_loss = gan_model.train_on_batch(z_input, y_gan)
        g_loss_epoch.append(g_loss)
        d_real_loss_epoch.append(d_loss1)
        d_fake_loss_epoch.append(d_loss2)

        # save the generator model every 10 epochs
        if (i + 1) % 10 == 0:
            print(f'>Epoch {i+1}, loss_real={d_loss1:.3f}, loss_fake={d_loss2:.3f}, loss_gan={g_loss:.3f}')
            filename = f'cgan_model_{i + 1:03d}.h5'
            g_model.save(filename)
    return g_loss_epoch, d_real_loss_epoch, d_fake_loss_epoch


In [None]:
d_model = define_discriminator((29912, 5))
print(d_model.summary())
latent_dim = 100
g_model = define_generator(latent_dim)
print(g_model.summary())
gan_model = define_gan(g_model, d_model)
print(gan_model.summary())


In [None]:
#train
(g_loss_epoch,d_real_loss_epoch,d_fake_loss_epoch) = train(g_model, d_model, gan_model, (xtrain,ytrain), latent_dim,n_epochs=300, n_batch=200,n_classes=2)

loss1 = pd.DataFrame({'fake' : d_fake_loss_epoch, 'real' : d_real_loss_epoch})

headerList = ['fakeloss_x' ,'fakeloss_y' , 'realloss_x', 'realloss_y']

loss1.to_csv('Loss1.csv')

pd.read_csv('Loss1.csv')

################generated facke sample
latent_dim = 100
n_samples = 1000

# Load the trained generator model
g_model = load_model('C:\data\cgan_model_100.h5')

# Generate 20 fake samples
generated_images, _ = generate_fake_samples(g_model, latent_dim, n_samples, 2)
threshold = 0.8  # Example threshold value

# Convert to binary values
X_binary = np.where(generated_images >= threshold, 1, 0)


# Example one-hot encoded DNA sequence
X_train_onehot = X_binary  # Replace this with your actual X_train data

# Define mapping from one-hot encoding to AGCTN
agctn_mapping = {0: 'A', 1: 'G', 2: 'C', 3: 'T', 4: 'N'}

# Convert one-hot encoded DNA sequence to AGCTN
X_train_agctn = np.argmax(X_train_onehot, axis=-1)  # Get index of maximum value along last axis
X_train_agctn = np.vectorize(agctn_mapping.get)(X_train_agctn)  # Map indices to AGCTN using dictionary

print(X_train_agctn.shape)  # Shape of the converted AGCTN sequence
print(X_train_agctn[0])  # Example converted AGCTN sequence

####
def save_fasta(sequence, file_path, sequence_name="sequence"):
    with open(file_path, "w") as f:
        for i in range(sequence.shape[0]):
            f.write(">" + sequence_name + "_" + str(i) + "\n")  # Write sequence name with index to FASTA file
            f.write(''.join(sequence[i]) + "\n")  # Write sequence to FASTA file, joining AGCTN characters in each row


file_path = "C:\data\my_sequence_omicron.fasta"  # Replace with desired file path
save_fasta(X_train_agctn, file_path, sequence_name="X_train_agctn")
print("AGCTN sequence saved to FASTA file:", file_path)
