In [3]:
import os
import gzip
from Bio import SeqIO
import pandas as pd
from urllib.parse import unquote

# === Paths ===
dna_dir = "dataset/dataset1/dna_chromosomes"
gff3_dir = "dataset/dataset1/gff3_files"

# === Collect all FASTA and GFF3 files ===
fasta_files = sorted([
    os.path.join(dna_dir, f) for f in os.listdir(dna_dir)
    if f.lower().endswith(".fa.gz")
])
gff3_files = sorted([
    os.path.join(gff3_dir, f) for f in os.listdir(gff3_dir)
    if f.lower().endswith(".gff3")
])

# === Parse GFF3 attributes ===
def parse_attributes(attr_str):
    attr_dict = {}
    for pair in attr_str.strip().split(";"):
        if "=" in pair:
            key, value = pair.split("=", 1)
            attr_dict[key.strip()] = unquote(value.strip())
    return attr_dict

# === Function to parse GFF3 and extract gene entries for a given chromosome ===
def parse_gff3(gff3_file, chrom_id):
    genes = []
    with open(gff3_file, encoding="utf-8") as f:
        for line in f:
            if line.startswith("#"):
                continue
            parts = line.strip().split("\t")
            if len(parts) < 9:
                continue
            if parts[2] != "gene":
                continue
            if parts[0] != chrom_id:
                continue  # skip if this line refers to a different chromosome

            start = int(parts[3]) - 1  # Convert to 0-based index
            end = int(parts[4])
            strand = parts[6]
            attrs = parse_attributes(parts[8])
            gene_id = attrs.get("ID", "NA")

            genes.append((start, end, strand, gene_id))
    return genes

# === Extract gene sequences ===
gene_sequences = []

for fasta_path, gff_path in zip(fasta_files, gff3_files):
    print(f"Processing: {os.path.basename(fasta_path)} with {os.path.basename(gff_path)}")

    # Read the chromosome sequence (support gzipped or uncompressed)
    if fasta_path.endswith(".gz"):
        with gzip.open(fasta_path, "rt", encoding="utf-8") as f:
            record = next(SeqIO.parse(f, "fasta"))
    else:
        with open(fasta_path, "r", encoding="utf-8") as f:
            record = next(SeqIO.parse(f, "fasta"))

    chrom_seq = record.seq
    chrom_id = record.id

    # Parse gene entries from the GFF3 file
    genes = parse_gff3(gff_path, chrom_id)

    for start, end, strand, gene_id in genes:
        # Boundary check to avoid errors
        if start < 0 or end > len(chrom_seq):
            continue

        gene_seq = chrom_seq[start:end]
        if strand == "-":
            gene_seq = gene_seq.reverse_complement()

        gene_sequences.append({
            "gene_id": gene_id,
            "chrom": chrom_id,
            "start": start,
            "end": end,
            "strand": strand,
            "sequence": str(gene_seq)
        })

# === Convert to DataFrame ===
df_genes = pd.DataFrame(gene_sequences)

# === Preview first few entries ===
print(df_genes.head())


Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.1.fa.gz with Zea_mays.Zm-B73-REFERENCE-NAM-5.0.60.chromosome.1.gff3
Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.10.fa.gz with Zea_mays.Zm-B73-REFERENCE-NAM-5.0.60.chromosome.10.gff3
Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.2.fa.gz with Zea_mays.Zm-B73-REFERENCE-NAM-5.0.60.chromosome.2.gff3
Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.3.fa.gz with Zea_mays.Zm-B73-REFERENCE-NAM-5.0.60.chromosome.3.gff3
Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.4.fa.gz with Zea_mays.Zm-B73-REFERENCE-NAM-5.0.60.chromosome.4.gff3
Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.5.fa.gz with Zea_mays.Zm-B73-REFERENCE-NAM-5.0.60.chromosome.5.gff3
Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.6.fa.gz with Zea_mays.Zm-B73-REFERENCE-NAM-5.0.60.chromosome.6.gff3
Processing: Zea_mays.Zm-B73-REFERENCE-NAM-5.0.dna.chromosome.7.fa.gz with Zea_mays.Zm-B73-REFER

In [4]:
df_genes.head()

Unnamed: 0,gene_id,chrom,start,end,strand,sequence
0,gene:Zm00001eb000010,1,34616,40204,+,TCTCACGCCAATATGCCATGGATAATGCACGCGGGAACGGAACAAA...
1,gene:Zm00001eb000020,1,41213,46762,-,TCTCAGGTTTGAAACAAGCCACAGCTTAATTTCCATACAGTCACTG...
2,gene:Zm00001eb000030,1,106147,106620,-,TATTTCCCCACACTGAATGCCTTTGTCTTTTACGTGGCTCGTATTT...
3,gene:Zm00001eb000040,1,107079,108196,-,AAATTTAAATTCCTAATTGTTATACCTACATGTCCCTACAATACAA...
4,gene:Zm00001eb000050,1,108553,114382,-,CTGCCGAGCAGTGGAGAAGGACCGGCGTCCGGAGGTGGCCGGCGGC...


In [5]:
geo_path="dataset/dataset1/geo_files/genes_to_alias_ids.tsv"
df = pd.read_csv(geo_path, sep='\t') 

In [6]:
alias_path="dataset/dataset1/expression level TPM/abundance.tsv"
df_alias = pd.read_csv(alias_path, sep='\t') 

In [7]:
df_alias.head()

Unnamed: 0,target_id,length,eff_length,est_counts,tpm
0,Zm00001d035916_T001,1121,965.632,38.0,2.46992
1,Zm00001d048284_T001,1078,922.632,5.54944,0.377513
2,Zm00001d048284_T002,815,659.632,42.7046,4.06335
3,Zm00001d048284_T003,866,710.632,22.746,2.00896
4,Zm00001d036601_T001,687,531.632,1.06506,0.12574


In [8]:
df.head()

Unnamed: 0,Zm00001eb000010,B73 Zm00001eb.1,Zm00001d027230,AGPv4_Zm00001d.2
0,Zm00001eb000020,B73 Zm00001eb.1,Zm00001d027231,AGPv4_Zm00001d.2
1,Zm00001eb000050,B73 Zm00001eb.1,Zm00001d027234,AGPv4_Zm00001d.2
2,Zm00001eb000060,B73 Zm00001eb.1,Zm00001d027239,AGPv4_Zm00001d.2
3,Zm00001eb000070,B73 Zm00001eb.1,Zm00001d027240,AGPv4_Zm00001d.2
4,Zm00001eb000080,B73 Zm00001eb.1,Zm00001d027242,AGPv4_Zm00001d.2


In [9]:
df.rename(columns={"Zm00001eb000010":"id1","B73 Zm00001eb.1":"id2","Zm00001d027230":"gene_alias_id"})

Unnamed: 0,id1,id2,gene_alias_id,AGPv4_Zm00001d.2
0,Zm00001eb000020,B73 Zm00001eb.1,Zm00001d027231,AGPv4_Zm00001d.2
1,Zm00001eb000050,B73 Zm00001eb.1,Zm00001d027234,AGPv4_Zm00001d.2
2,Zm00001eb000060,B73 Zm00001eb.1,Zm00001d027239,AGPv4_Zm00001d.2
3,Zm00001eb000070,B73 Zm00001eb.1,Zm00001d027240,AGPv4_Zm00001d.2
4,Zm00001eb000080,B73 Zm00001eb.1,Zm00001d027242,AGPv4_Zm00001d.2
...,...,...,...,...
32921,Zm00001eb434580,B73 Zm00001eb.1,Zm00001d026711,AGPv4_Zm00001d.2
32922,Zm00001eb434590,B73 Zm00001eb.1,Zm00001d026712,AGPv4_Zm00001d.2
32923,Zm00001eb438080,B73 Zm00001eb.1,Zm00001d042403,AGPv4_Zm00001d.2
32924,Zm00001eb438620,B73 Zm00001eb.1,Zm00001d036112,AGPv4_Zm00001d.2


In [10]:
import pandas as pd

# Fix column names in 'df'
df.columns = ['id1', 'id2', 'gene_alias_id', 'AGPv4_Zm00001d.2']

# Step 1: Clean the 'gene_id' column in df_genes
df_genes['gene_id_clean'] = df_genes['gene_id'].str.replace('gene:', '', regex=False)

# Step 2: Create a mapping from 'id1' to 'gene_alias_id'
id_to_alias = df.set_index('id1')['gene_alias_id'].to_dict()

# Step 3: Map gene_alias_id to df_genes
df_genes['alias_id'] = df_genes['gene_id_clean'].map(id_to_alias)

# Step 4: Clean up the temporary column
df_genes.drop(columns=['gene_id_clean'], inplace=True)

# Check the results
print(df_genes[['gene_id', 'alias_id']].head(50))


                 gene_id        alias_id
0   gene:Zm00001eb000010             NaN
1   gene:Zm00001eb000020  Zm00001d027231
2   gene:Zm00001eb000030             NaN
3   gene:Zm00001eb000040             NaN
4   gene:Zm00001eb000050  Zm00001d027234
5   gene:Zm00001eb000060  Zm00001d027239
6   gene:Zm00001eb000070  Zm00001d027240
7   gene:Zm00001eb000080  Zm00001d027242
8   gene:Zm00001eb000090             NaN
9   gene:Zm00001eb000100  Zm00001d027244
10  gene:Zm00001eb000110             NaN
11  gene:Zm00001eb000120             NaN
12  gene:Zm00001eb000130             NaN
13  gene:Zm00001eb000140  Zm00001d027249
14  gene:Zm00001eb000150  Zm00001d027250
15  gene:Zm00001eb000160             NaN
16  gene:Zm00001eb000170  Zm00001d027257
17  gene:Zm00001eb000180  Zm00001d027256
18  gene:Zm00001eb000200  Zm00001d027258
19  gene:Zm00001eb000190  Zm00001d027259
20  gene:Zm00001eb000210  Zm00001d027265
21  gene:Zm00001eb000220  Zm00001d027266
22  gene:Zm00001eb000230  Zm00001d027267
23  gene:Zm00001

In [11]:
# Drop rows where alias_id is NaN
df_genes = df_genes.dropna(subset=['alias_id'])

# Reset index if needed
df_genes = df_genes.reset_index(drop=True)

# Optional: check that it's gone
print(df_genes['alias_id'].isna().sum())


0


In [12]:
df_genes.head()

Unnamed: 0,gene_id,chrom,start,end,strand,sequence,alias_id
0,gene:Zm00001eb000020,1,41213,46762,-,TCTCAGGTTTGAAACAAGCCACAGCTTAATTTCCATACAGTCACTG...,Zm00001d027231
1,gene:Zm00001eb000050,1,108553,114382,-,CTGCCGAGCAGTGGAGAAGGACCGGCGTCCGGAGGTGGCCGGCGGC...,Zm00001d027234
2,gene:Zm00001eb000060,1,188558,189581,-,CCATTGCCCTAGCACACGTACGCGCAGATCCGTCCGCTCCAATCCG...,Zm00001d027239
3,gene:Zm00001eb000070,1,190191,198832,-,ATGAAGAGAAAGGAAAACTCCGCGCACTCGGCGTCGCCTCTCAACA...,Zm00001d027240
4,gene:Zm00001eb000080,1,200261,203393,-,TTGGACCATCTTTATGCTTTATGGGGCTAAGAAGCATAAGACCTGT...,Zm00001d027242


In [13]:
import pandas as pd

# Step 1: Clean up df_alias to remove transcript suffix
df_alias['clean_id'] = df_alias['target_id'].str.replace(r'_T\d+$', '', regex=True)

# Step 2: Group by clean_id and sum or average TPM if needed (in case multiple transcripts per gene)
# Here we'll sum TPMs for all isoforms of a gene
tpm_by_gene = df_alias.groupby('clean_id')['tpm'].sum().reset_index()

# Step 3: Merge df_genes with this TPM info using alias_id == clean_id
df_genes = df_genes.merge(tpm_by_gene, how='left', left_on='alias_id', right_on='clean_id')

# Step 4: Rename the column to tpm_value and drop clean_id
df_genes = df_genes.rename(columns={'tpm': 'tpm_value'}).drop(columns=['clean_id'])

# Done
print(df_genes.head())


                gene_id chrom   start     end strand  \
0  gene:Zm00001eb000020     1   41213   46762      -   
1  gene:Zm00001eb000050     1  108553  114382      -   
2  gene:Zm00001eb000060     1  188558  189581      -   
3  gene:Zm00001eb000070     1  190191  198832      -   
4  gene:Zm00001eb000080     1  200261  203393      -   

                                            sequence        alias_id  \
0  TCTCAGGTTTGAAACAAGCCACAGCTTAATTTCCATACAGTCACTG...  Zm00001d027231   
1  CTGCCGAGCAGTGGAGAAGGACCGGCGTCCGGAGGTGGCCGGCGGC...  Zm00001d027234   
2  CCATTGCCCTAGCACACGTACGCGCAGATCCGTCCGCTCCAATCCG...  Zm00001d027239   
3  ATGAAGAGAAAGGAAAACTCCGCGCACTCGGCGTCGCCTCTCAACA...  Zm00001d027240   
4  TTGGACCATCTTTATGCTTTATGGGGCTAAGAAGCATAAGACCTGT...  Zm00001d027242   

   tpm_value  
0  62.888229  
1   0.000000  
2  38.358540  
3   4.318454  
4  42.850347  


In [14]:
# Mapping dictionary
base_map = {'A': 0, 'T': 1, 'G': 2, 'C': 3}

# Function to encode a DNA sequence string
def encode_sequence(seq):
    return [base_map.get(base, -1) for base in seq.upper()]  # -1 for unknown bases like N

# Apply to each row in df_genes['sequence']
df_genes['encoded_sequence'] = df_genes['sequence'].apply(encode_sequence)

# Done
print(df_genes[['sequence', 'encoded_sequence']].head())


                                            sequence  \
0  TCTCAGGTTTGAAACAAGCCACAGCTTAATTTCCATACAGTCACTG...   
1  CTGCCGAGCAGTGGAGAAGGACCGGCGTCCGGAGGTGGCCGGCGGC...   
2  CCATTGCCCTAGCACACGTACGCGCAGATCCGTCCGCTCCAATCCG...   
3  ATGAAGAGAAAGGAAAACTCCGCGCACTCGGCGTCGCCTCTCAACA...   
4  TTGGACCATCTTTATGCTTTATGGGGCTAAGAAGCATAAGACCTGT...   

                                    encoded_sequence  
0  [1, 3, 1, 3, 0, 2, 2, 1, 1, 1, 2, 0, 0, 0, 3, ...  
1  [3, 1, 2, 3, 3, 2, 0, 2, 3, 0, 2, 1, 2, 2, 0, ...  
2  [3, 3, 0, 1, 1, 2, 3, 3, 3, 1, 0, 2, 3, 0, 3, ...  
3  [0, 1, 2, 0, 0, 2, 0, 2, 0, 0, 0, 2, 2, 0, 0, ...  
4  [1, 1, 2, 2, 0, 3, 3, 0, 1, 3, 1, 1, 1, 0, 1, ...  


In [15]:
df_genes.drop("sequence",axis=1,inplace=True)

In [16]:

max_len = df_genes['encoded_sequence'].apply(len).max()
print("Maximum encoded sequence length:", max_len)




Maximum encoded sequence length: 751401


In [17]:
# Step 1: Store lengths of all encoded sequences
sequence_lengths = df_genes['encoded_sequence'].apply(len)

# Step 2: Count how many are greater than 50,000
num_greater_than_50000 = (sequence_lengths > 40000).sum()

# Output results
print("Total sequences:", len(sequence_lengths))
print("Sequences > 50,000 bases:", num_greater_than_50000)


Total sequences: 32286
Sequences > 50,000 bases: 186


In [19]:
# Fixed length
FIXED_LEN = 40000
PAD_VALUE = 0  # A = 0

def pad_or_truncate(seq):
    if len(seq) > FIXED_LEN:
        return seq[:FIXED_LEN]  
    else:
        return seq + [PAD_VALUE] * (FIXED_LEN - len(seq))  # pad

# Apply to each sequence
df_genes['encoded_50k'] = df_genes['encoded_sequence'].apply(pad_or_truncate)

# Check shape of one example
print(len(df_genes['encoded_50k'].iloc[0]))  # should be 50000


40000


In [20]:
df_genes.head()

Unnamed: 0,gene_id,chrom,start,end,strand,alias_id,tpm_value,encoded_sequence,encoded_50k
0,gene:Zm00001eb000020,1,41213,46762,-,Zm00001d027231,62.888229,"[1, 3, 1, 3, 0, 2, 2, 1, 1, 1, 2, 0, 0, 0, 3, ...","[1, 3, 1, 3, 0, 2, 2, 1, 1, 1, 2, 0, 0, 0, 3, ..."
1,gene:Zm00001eb000050,1,108553,114382,-,Zm00001d027234,0.0,"[3, 1, 2, 3, 3, 2, 0, 2, 3, 0, 2, 1, 2, 2, 0, ...","[3, 1, 2, 3, 3, 2, 0, 2, 3, 0, 2, 1, 2, 2, 0, ..."
2,gene:Zm00001eb000060,1,188558,189581,-,Zm00001d027239,38.35854,"[3, 3, 0, 1, 1, 2, 3, 3, 3, 1, 0, 2, 3, 0, 3, ...","[3, 3, 0, 1, 1, 2, 3, 3, 3, 1, 0, 2, 3, 0, 3, ..."
3,gene:Zm00001eb000070,1,190191,198832,-,Zm00001d027240,4.318454,"[0, 1, 2, 0, 0, 2, 0, 2, 0, 0, 0, 2, 2, 0, 0, ...","[0, 1, 2, 0, 0, 2, 0, 2, 0, 0, 0, 2, 2, 0, 0, ..."
4,gene:Zm00001eb000080,1,200261,203393,-,Zm00001d027242,42.850347,"[1, 1, 2, 2, 0, 3, 3, 0, 1, 3, 1, 1, 1, 0, 1, ...","[1, 1, 2, 2, 0, 3, 3, 0, 1, 3, 1, 1, 1, 0, 1, ..."


In [21]:
df_genes['strand'] = df_genes['strand'].map({'+': 1, '-': 0})

In [22]:
df_genes.rename(columns={"encoded_50k":"sequence"},inplace=True)

In [23]:
df_genes["sequence"]

0        [1, 3, 1, 3, 0, 2, 2, 1, 1, 1, 2, 0, 0, 0, 3, ...
1        [3, 1, 2, 3, 3, 2, 0, 2, 3, 0, 2, 1, 2, 2, 0, ...
2        [3, 3, 0, 1, 1, 2, 3, 3, 3, 1, 0, 2, 3, 0, 3, ...
3        [0, 1, 2, 0, 0, 2, 0, 2, 0, 0, 0, 2, 2, 0, 0, ...
4        [1, 1, 2, 2, 0, 3, 3, 0, 1, 3, 1, 1, 1, 0, 1, ...
                               ...                        
32281    [1, 1, 3, 3, 0, 1, 1, 1, 3, 3, 0, 1, 3, 3, 3, ...
32282    [0, 1, 2, 3, 2, 2, 2, 3, 0, 0, 3, 3, 0, 0, 1, ...
32283    [3, 3, 3, 3, 0, 0, 0, 3, 3, 3, 3, 2, 2, 3, 3, ...
32284    [0, 3, 2, 0, 2, 1, 1, 1, 2, 2, 0, 2, 1, 2, 3, ...
32285    [3, 0, 0, 2, 0, 1, 1, 1, 3, 0, 2, 0, 1, 3, 3, ...
Name: sequence, Length: 32286, dtype: object

In [15]:
df['chrom'] = df['chrom'].astype(str)
df['start'] = df['start'].astype(int)
df['end'] = df['end'].astype(int)
df['strand'] = df['strand'].astype(str)
df['sequence'] = df['sequence'].astype(str)

In [25]:
import numpy as np
def fix_sequence(seq):
    return [4 if val == -1 else val for val in seq]

df_genes['sequence'] = df_genes['sequence'].apply(fix_sequence)
x = np.array(df_genes['sequence'].tolist(), dtype=np.uint8)


In [26]:
from sklearn.model_selection import train_test_split

y=df_genes["tpm_value"]


In [27]:
y.shape

(32286,)

In [28]:
import numpy as np
y=np.array(y)


In [29]:
from sklearn.model_selection import train_test_split
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.25,random_state=25)

In [30]:
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)

(24214, 40000)
(24214,)
(8072, 40000)


In [31]:
import numpy as np

np.save("x.npy", x)         # shape: (24214, 50000)
np.save("y.npy", y)         # shape: (24214,)


In [32]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.utils import Sequence

class DNADataGenerator(Sequence):
    def __init__(self, x_path, y_path, batch_size=32, start=0, end=None):
        self.x = np.load(x_path, mmap_mode='r')  # mmap_mode avoids loading whole file
        self.y = np.load(y_path, mmap_mode='r')
        self.batch_size = batch_size
        self.start = start
        self.end = end or len(self.x)
        self.indexes = np.arange(self.start, self.end)

    def __len__(self):
        return int(np.ceil((self.end - self.start) / self.batch_size))

    def __getitem__(self, index):
        batch_indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        batch_x = self.x[batch_indexes].astype(np.int32)
        batch_y = self.y[batch_indexes].astype(np.float32)
        return batch_x, batch_y


In [33]:
# Split using indices
split_idx = int(len(np.load("x.npy", mmap_mode='r')) * 0.75)

train_gen = DNADataGenerator("x.npy", "y.npy", batch_size=32, start=0, end=split_idx)
test_gen = DNADataGenerator("x.npy", "y.npy", batch_size=32, start=split_idx)




In [75]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import math

# Positional Encoding Function
def get_positional_encoding(seq_len, model_dim):
    angle_rads = np.arange(seq_len)[:, np.newaxis] / np.power(
        10000, (2 * (np.arange(model_dim)[np.newaxis, :] // 2)) / np.float32(model_dim)
    )
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    return tf.constant(angle_rads[np.newaxis, ...], dtype=tf.float32)  # (1, seq_len, model_dim)

# Sinusoidal Time Embedding (Optional)
class SinusoidalEmbedding(layers.Layer):
    def __init__(self, model_dim):
        super().__init__()
        self.model_dim = model_dim

    def call(self, x):
        half_dim = self.model_dim // 2
        emb_min_freq = 1.0
        emb_max_freq = 1000.0
        freqs = tf.exp(tf.linspace(tf.math.log(emb_min_freq), tf.math.log(emb_max_freq), half_dim))
        angles = 2.0 * math.pi * freqs * tf.expand_dims(x, -1)
        emb = tf.concat([tf.sin(angles), tf.cos(angles)], axis=-1)
        return emb

# Transformer Block
class TransformerBlock(layers.Layer):
    def __init__(self, model_dim, heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=heads, key_dim=model_dim)
        self.ffn = tf.keras.Sequential([ 
            layers.Dense(ff_dim, activation='gelu'),
            layers.Dense(model_dim),
        ])
        self.norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, x, training):
        attn_output = self.att(x, x)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.norm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.norm2(out1 + ffn_output)

# Main Model
class GeneExpressionTransformer(tf.keras.Model):
    def __init__(self, seq_len=50000, model_dim=128, num_heads=4, ff_dim=256, depth=4):
        super(GeneExpressionTransformer, self).__init__()
        self.seq_len = seq_len
        self.model_dim = model_dim
        self.embedding = layers.Embedding(input_dim=4, output_dim=model_dim)  # A,T,G,C = 0,1,2,3
        self.pos_encoding = get_positional_encoding(seq_len, model_dim)  # Fixed positional encoding

        self.time_emb = SinusoidalEmbedding(model_dim)  # Optional time/noise input

        self.transformer_blocks = [
            TransformerBlock(model_dim, num_heads, ff_dim) for _ in range(depth)
        ]

        self.spatial_dense = layers.Dense(1)  # Per-position output
        self.global_pool = layers.GlobalAveragePooling1D()
        self.global_dense = layers.Dense(1, activation='sigmoid')  # Global expression score

    def call(self, inputs, training=False):
        dna_seq, noise_var = inputs  # (batch, seq_len), (batch, 1)

        # DNA embedding
        x = self.embedding(dna_seq)  # (batch, seq_len, model_dim)
        x += self.pos_encoding[:, :tf.shape(x)[1], :]  # Add positional encoding

        # Optionally incorporate noise embedding (e.g., diffusion timestep)
        noise_emb = self.time_emb(noise_var)  # (batch, model_dim)
        noise_emb = tf.expand_dims(noise_emb, 1)  # (batch, 1, model_dim)

        # Broadcast noise embedding to match seq_len
        noise_emb = tf.broadcast_to(noise_emb, shape=(tf.shape(x)[0], tf.shape(x)[1], self.model_dim))
        x += noise_emb  # (batch, seq_len, model_dim)

        # Transformer blocks
        for block in self.transformer_blocks:
            x = block(x, training=training)

        # Global prediction
        global_output = self.global_pool(x)  # (batch, model_dim)
        global_output = self.global_dense(global_output)  # (batch, 1)

        # Per-position prediction
        spatial_output = self.spatial_dense(x)  # (batch, seq_len, 1)

        return spatial_output, global_output

# Test with dummy data
dummy_dna_seq = tf.random.uniform((1, 1000), maxval=4, dtype=tf.int32)
dummy_noise = tf.random.normal((1, 1))

model = GeneExpressionTransformer(seq_len=1000)
spatial_out, global_out = model((dummy_dna_seq, dummy_noise))

print("Spatial Output Shape:", spatial_out.shape)  # (1, 1000, 1)
print("Global Output Shape:", global_out.shape)    # (1, 1)


1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''Shape must be at most rank 3 but is rank 4 for '{{node BroadcastTo}} = BroadcastTo[T=DT_FLOAT, Tidx=DT_INT32](ExpandDims, BroadcastTo/shape)' with input shapes: [1,1,1,128], [3].''


InvalidArgumentError: Exception encountered when calling GeneExpressionTransformer.call().

[1m{{function_node __wrapped__BroadcastTo_device_/job:localhost/replica:0/task:0/device:CPU:0}} Rank of input (4) must be no greater than rank of output shape (3). [Op:BroadcastTo][0m

Arguments received by GeneExpressionTransformer.call():
  • inputs=('tf.Tensor(shape=(1, 1000), dtype=int32)', 'tf.Tensor(shape=(1, 1), dtype=float32)')
  • training=False

In [74]:
# Create a dummy input with reduced length for memory efficiency
dummy_dna_seq = tf.random.uniform((1, 1000), maxval=4, dtype=tf.int32)  # (batch_size=1, seq_len=1000)
dummy_noise = tf.random.normal((1, 1))  # Noise input

# Instantiate model with sequence length matching dummy input
model = GeneExpressionTransformer(seq_len=1000)

# Run a forward pass
spatial_out, global_out = model((dummy_dna_seq, dummy_noise))

# Print the actual output shapes
print("Spatial Output Shape:", spatial_out.shape)  # Expected: (1, 1000, 1)
print("Global Output Shape:", global_out.shape)    # Expected: (1, 1)


1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''Input 0 of layer "global_average_pooling1d_15" is incompatible with the layer: expected ndim=3, found ndim=4. Full shape received: (1, 1, 1000, 128)''


ValueError: Exception encountered when calling GeneExpressionTransformer.call().

[1mInput 0 of layer "global_average_pooling1d_15" is incompatible with the layer: expected ndim=3, found ndim=4. Full shape received: (1, 1, 1000, 128)[0m

Arguments received by GeneExpressionTransformer.call():
  • inputs=('tf.Tensor(shape=(1, 1000), dtype=int32)', 'tf.Tensor(shape=(1, 1), dtype=float32)')
  • training=False

In [61]:
import tensorflow as tf

class HGNNConv(tf.keras.layers.Layer):
    def __init__(self, input_dim, output_dim):
        super(HGNNConv, self).__init__()
        self.weight = self.add_weight(shape=(input_dim, output_dim),
                                      initializer='glorot_uniform',
                                      trainable=True)
        self.bias = self.add_weight(shape=(output_dim,),
                                    initializer='zeros',
                                    trainable=True)

    def call(self, x, G):
        x = tf.matmul(x, self.weight) + self.bias
        x = tf.matmul(G, x)
        return x


class HGNNEmbedding(tf.keras.Model):
    def __init__(self, input_dim, hidden_dim, dropout_rate=0.5):
        super(HGNNEmbedding, self).__init__()
        self.hgc1 = HGNNConv(input_dim, hidden_dim)
        self.hgc2 = HGNNConv(hidden_dim, hidden_dim)
        self.dropout = dropout_rate

    def call(self, x, G, training=False):
        x = tf.nn.relu(self.hgc1(x, G))
        if training:
            x = tf.nn.dropout(x, rate=self.dropout)
        x = tf.nn.relu(self.hgc2(x, G))
        return x


class HGNNClassifier(tf.keras.Model):
    def __init__(self, hidden_dim, num_classes):
        super(HGNNClassifier, self).__init__()
        self.fc = tf.keras.layers.Dense(num_classes)

    def call(self, x):
        return self.fc(x)


In [62]:
import tensorflow as tf
from tensorflow.keras import layers, models

class HybridAttentionFusion(tf.keras.Model):
    def __init__(self, vocab_size=5, embed_dim=64, conv_channels=64, kernel_sizes=[7, 5, 3], dropout_rate=0.2):
        super(HybridAttentionFusion, self).__init__()
        
        self.embedding = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim, mask_zero=True)

        # Convolutional layers for DNA
        self.conv1 = layers.Conv1D(conv_channels, kernel_sizes[0], activation='relu')
        self.conv2 = layers.Conv1D(conv_channels*2, kernel_sizes[1], activation='relu')
        self.conv3 = layers.Conv1D(conv_channels*4, kernel_sizes[2], activation='relu')
        self.global_pool = layers.GlobalMaxPooling1D()

        # Attention layers
        self.dna_attention = layers.Dense(conv_channels*4)
        self.chip_attention = layers.Dense(conv_channels*4)
        self.attention_fusion = layers.Dense(conv_channels*4)

        # Fully connected layers
        self.dropout = layers.Dropout(dropout_rate)
        self.fc1 = layers.Dense(1024, activation='relu')
        self.fc2 = layers.Dense(512, activation='relu')
        self.out = layers.Dense(1, activation='sigmoid')  # or use softmax for multi-class

    def call(self, dna_input, chip_input):
        # Embedding
        dna = self.embedding(dna_input)
        dna = self.conv1(dna)
        dna = self.conv2(dna)
        dna = self.conv3(dna)

        # Optional: apply same to chip_input if it's sequence-like
        chip = self.embedding(chip_input)
        chip = self.conv1(chip)
        chip = self.conv2(chip)
        chip = self.conv3(chip)

        # Attention mechanism
        dna_att = self.dna_attention(dna)
        chip_att = self.chip_attention(chip)

        # Cross attention
        dna_exp = tf.expand_dims(dna_att, 2)
        chip_exp = tf.expand_dims(chip_att, 1)
        fusion_matrix = self.attention_fusion(tf.nn.relu(dna_exp + chip_exp))

        dna_weight = tf.reduce_mean(fusion_matrix, axis=2)
        chip_weight = tf.reduce_mean(fusion_matrix, axis=1)

        dna = dna * tf.nn.sigmoid(dna_weight)
        chip = chip * tf.nn.sigmoid(chip_weight)

        # Global pooling and fusion
        dna_feat = self.global_pool(dna)
        chip_feat = self.global_pool(chip)
        fused = tf.concat([dna_feat, chip_feat], axis=1)

        # Dense layers
        x = self.dropout(fused)
        x = self.fc1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return self.out(x)


In [1]:
import tensorflow as tf

class UnifiedGeneExpressionModel(tf.keras.Model):
    def __init__(self, diffusion_model, hypergraph_model, fusion_model):
        super(UnifiedGeneExpressionModel, self).__init__()
        self.diffusion = diffusion_model          # GeneExpressionTransformer
        self.hypergraph = hypergraph_model        # HGNNClassifier
        self.fusion = fusion_model                # HybridAttentionFusion

    def call(self, inputs, training=False):
        # Unpack inputs
        (dna_seq, noise_var), hyper_input = inputs
        
        # Get diffusion model outputs
        spatial_out, global_out = self.diffusion((dna_seq, noise_var), training=training)  # (B, 50000, 1), (B, 1)
        
        # Get hypergraph features
        hyper_feat = self.hypergraph(hyper_input, training=training)  # (B, hidden_dim)

        # Fuse both features using attention
        output = self.fusion(global_out, hyper_feat, training=training)  # (B, 1)
        
        return output


## Initialize sub-models
diffusion_model = GeneExpressionTransformer(seq_len=50000, model_dim=128, num_heads=4, ff_dim=256, depth=4)
hypergraph_model = HGNNClassifier(hidden_dim=512, num_classes=1)
fusion_model = HybridAttentionFusion(vocab_size=5, embed_dim=128, conv_channels=64, kernel_sizes=[7, 5, 3], dropout_rate=0.2)

# Create unified model
unified_model = UnifiedGeneExpressionModel(diffusion_model, hypergraph_model, fusion_model)

# Dummy input to build model
dummy_dna_seq = tf.zeros((1, 50000), dtype=tf.int32)
dummy_noise_var = tf.ones((1, 1), dtype=tf.float32)
dummy_hyper_input = tf.random.normal((1, 512))

# Build model by calling it once
_ = unified_model(((dummy_dna_seq, dummy_noise_var), dummy_hyper_input))

# Print the summary
unified_model.summary()



NameError: name 'GeneExpressionTransformer' is not defined

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

sequence_length = x_train.shape[1]
vocab_size = 5   # A, T, G, C, N → use actual size from your tokenizer
embedding_dim = 128

model = models.Sequential([
    layers.Input(shape=(sequence_length,)),
    layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim),
    layers.Conv1D(filters=64, kernel_size=7, activation='relu'),
    layers.MaxPooling1D(pool_size=3),
    layers.Conv1D(filters=128, kernel_size=5, activation='relu'),
    layers.GlobalMaxPooling1D(),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(1, activation='sigmoid')  # or softmax if multiclass
])

model.compile(optimizer='adam',
              loss='binary_crossentropy',  # or categorical_crossentropy
              metrics=['accuracy'])


In [78]:
model.fit(train_gen, validation_data=test_gen, epochs=10)


Epoch 1/10
[1m  1/757[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:17:39[0m 11s/step - accuracy: 0.0000e+00 - loss: 0.1984

: 