In [None]:

import tensorflow as tf

class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', threshold=0.5, **kwargs):
        super().__init__(name=name, **kwargs)
        self.threshold = threshold
        self.true_positives = self.add_weight(name='tp', initializer='zeros')
        self.false_positives = self.add_weight(name='fp', initializer='zeros')
        self.false_negatives = self.add_weight(name='fn', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.cast(y_pred > self.threshold, tf.float32)
        y_true = tf.cast(y_true, tf.float32)
        
        tp = tf.reduce_sum(y_true * y_pred)
        fp = tf.reduce_sum(y_pred) - tp
        fn = tf.reduce_sum(y_true) - tp
        
        self.true_positives.assign_add(tp)
        self.false_positives.assign_add(fp)
        self.false_negatives.assign_add(fn)

    def result(self):
        precision = self.true_positives / (self.true_positives + self.false_positives + 1e-6)
        recall = self.true_positives / (self.true_positives + self.false_negatives + 1e-6)
        return 2 * (precision * recall) / (precision + recall + 1e-6)

    def reset_states(self):
        self.true_positives.assign(0)
        self.false_positives.assign(0)
        self.false_negatives.assign(0)





class AdaptiveFocalLoss(tf.keras.losses.Loss):
    def __init__(self, gamma, alpha, delta,
                 reduction=tf.keras.losses.Reduction.AUTO, name='AdaptiveFocalLoss'):
        super().__init__(reduction=reduction, name=name)
        self.gamma = gamma
        self.alpha = alpha
        self.delta = delta

    def call(self, y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.cast(y_pred, tf.float32)
        p_t = tf.where(tf.equal(y_true, 1), y_pred, 1 - y_pred)
        p_t = tf.clip_by_value(p_t, self.delta, 1.0 - self.delta)
        alpha_factor = tf.where(tf.equal(y_true, 1), self.alpha, 1 - self.alpha)
        focal_loss = -alpha_factor * tf.pow(1 - p_t, self.gamma) * tf.math.log(p_t)
        return tf.reduce_mean(focal_loss)

    def get_config(self):
        config = super().get_config()
        config.update({
            "gamma": self.gamma,
            "alpha": self.alpha,
            "delta": self.delta
        })
        return config


In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.regularizers import l2


class DynamicConv1D(layers.Layer):
    def __init__(self, filters, kernel_size, strides=1, padding='same', num_experts=4, **kwargs):
        super(DynamicConv1D, self).__init__(** kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.padding = padding
        self.num_experts = num_experts
        
        self.experts = [
            layers.Conv1D(filters, kernel_size, strides=strides, padding=padding,
                         kernel_regularizer=l2(1e-4), use_bias=False)
            for _ in range(num_experts)
        ]
        
        self.gate = tf.keras.Sequential([
            layers.GlobalAveragePooling1D(),
            layers.Dense(num_experts, activation='softmax', kernel_regularizer=l2(1e-4))
        ])
    
    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        gates = self.gate(inputs)  # [batch, num_experts]
        gates = tf.reshape(gates, [batch_size, 1, 1, self.num_experts])
        
        expert_outputs = [tf.expand_dims(expert(inputs), axis=-1) for expert in self.experts]
        expert_outputs = tf.concat(expert_outputs, axis=-1)
        return tf.reduce_sum(expert_outputs * gates, axis=-1)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            'filters': self.filters, 'kernel_size': self.kernel_size,
            'strides': self.strides, 'padding': self.padding, 'num_experts': self.num_experts
        })
        return config


class Module(layers.Layer):
    """仅使用动态卷积的模块"""
    def __init__(self, filters, kernel_size=3, strides=1, padding='same',
                 num_experts=4, **kwargs):
        super(Module, self).__init__(** kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.padding = padding
        self.num_experts = num_experts
        

        self.dynamic_conv = tf.keras.Sequential([
            DynamicConv1D(filters, kernel_size, strides, padding, num_experts),
            layers.BatchNormalization(), layers.ReLU()
        ])
        
        self.fusion = layers.Conv1D(filters, 1, kernel_regularizer=l2(1e-4))
        self.bn_fusion = layers.BatchNormalization()
        self.residual = layers.Conv1D(filters, 1, strides=strides, kernel_regularizer=l2(1e-4)) \
                        if strides != 1 else None
        
    def call(self, inputs):
        conv_out = self.dynamic_conv(inputs)

        out = self.bn_fusion(self.fusion(conv_out)) 
        if self.residual:
            out = layers.add([out, self.residual(inputs)])
        return tf.nn.relu(out)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            'filters': self.filters, 'kernel_size': self.kernel_size, 'strides': self.strides,
            'padding': self.padding, 'num_experts': self.num_experts
        })
        return config

In [None]:
import numpy as np
import os

def save_results(indices, chr):
    os.makedirs('results_25', exist_ok=True)
    np.savetxt(f'results_25/{chr}_boundary.txt', indices, fmt='%d')


def direct_predict_with_contiguous_processing(model, P_test, middle_row_indices_test, chr, threshold=0.5):
    """
   Parameters:
        model -- Trained machine learning model
        P_test -- Test feature matrix (n_samples, n_features)
        middle_row_indices_test -- Genomic position index array
        y_test -- Test set labels (n_samples,)
        chr -- Chromosome identifier
        threshold -- Probability threshold (default 0.5)
    """
  
    y_pred_proba = model.predict(P_test).flatten()
    prob_map = {idx: prob for idx, prob in zip(middle_row_indices_test, y_pred_proba)}
    
    selected = middle_row_indices_test[y_pred_proba > threshold]
    sorted_indices = np.sort(np.unique(selected))

    final_indices = []
    current_segment = []

    for idx in sorted_indices:
        if not current_segment:
            current_segment.append(idx)
        else:
            if idx == current_segment[-1] + 1:
                current_segment.append(idx)
            else:
                if len(current_segment) >= 1:
                    max_prob_idx = max(current_segment, key=lambda x: prob_map[x])
                    final_indices.append(max_prob_idx)
                current_segment = [idx]

    if current_segment:
        max_prob_idx = max(current_segment, key=lambda x: prob_map[x])
        final_indices.append(max_prob_idx)
    save_results(np.array(final_indices), chr)
    return None



In [None]:
def compute_node_degrees(H):
    return np.sum(H, axis=1)
def compute_hyperedge_degrees(H):
    return np.sum(H, axis=0)

def compute_first_order_transition_probabilities(H, node_degrees, hyperedge_degrees):
    num_nodes, num_hyperedges = H.shape
    P1 = np.zeros((num_nodes, num_nodes))

    for v in range(num_nodes):
        for u in range(num_nodes):
            if u == v:
                continue

            pi_uv = 0
            for e in range(num_hyperedges):
                if H[u, e] == 0 or H[v, e] == 0:  
                    continue

                h_ve = H[v, e] 
                h_ue = H[u, e] 
                d_v = node_degrees[v] 
                delta_e = hyperedge_degrees[e] 
                pi_uv += (h_ve * h_ue) / (d_v * delta_e)

            P1[v, u] = pi_uv

    return P1



In [None]:
from tensorflow.keras.models import load_model

custom_objects = {
    'Module': Module,
    'DynamicConv1D': DynamicConv1D,
    'F1Score': F1Score,
    'AdaptiveFocalLoss': AdaptiveFocalLoss
}

model = load_model('HyperDeepTAD/model/best_model_seed_4200_lr_0.0003.h5', custom_objects=custom_objects)


In [None]:
import numpy as np

def process_files_to_arrays(filenames):
    X_all_chr = []
    middle_row_indices_all_chr = []
    # y_all_chr = []
    sample_counts = []

    for filename in filenames:
        count = 0
        with open(filename, 'r') as file:
            lines = file.readlines()
            for line in lines:
                line = line.strip()
                if line:
                    data = eval(line, {"array": np.array})
                    X_all_chr.append(data[0])
                    middle_row_indices_all_chr.append(data[1])
                    # y_all_chr.append(data[2])
                    count += 1
        sample_counts.append(count)

    return (np.array(X_all_chr),
            np.array(middle_row_indices_all_chr),
            # np.array(y_all_chr),
            sample_counts)

In [None]:
import os
import numpy as np
from sklearn.utils import shuffle

dir1 = '/public_data/liukaihua/Article/model/model_data/'
dir2 = '25_sub_matrix.txt'

chr_list = ['chr20','chr21','chr22']


In [None]:
for chr in chr_list:
    list1 = []
    f1 = os.path.join(dir1, f"{chr}_{dir2}")
    list1.append(f1)
    print(chr)
    print(f1)
    print(list1)
    X_test, middle_row_indices_test,sample_counts_test = process_files_to_arrays(list1)
    P_test= []
    for H in X_test:
      
        num_nodes, num_hyperedges = H.shape
        
        node_degrees = compute_node_degrees(H)
        hyperedge_degrees = compute_hyperedge_degrees(H)
       
        P1 = compute_first_order_transition_probabilities(H, node_degrees, hyperedge_degrees)
       
        P_test.append(P1)
    P_test = np.array(P_test)  
    print("P_test shape:", P_test.shape)
    # direct_predict_simple(model,P_test,middle_row_indices_test,chr,threshold=0.5)
    direct_predict_with_contiguous_processing(model,P_test,middle_row_indices_test,chr,threshold=0.5)


chr20
/public_data/liukaihua/Article/model/model_data/chr20_100_sub_matrix.txt
['/public_data/liukaihua/Article/model/model_data/chr20_100_sub_matrix.txt']
P_test shape: (635, 11, 11)
chr21
/public_data/liukaihua/Article/model/model_data/chr21_100_sub_matrix.txt
['/public_data/liukaihua/Article/model/model_data/chr21_100_sub_matrix.txt']
P_test shape: (408, 11, 11)
chr22
/public_data/liukaihua/Article/model/model_data/chr22_100_sub_matrix.txt
['/public_data/liukaihua/Article/model/model_data/chr22_100_sub_matrix.txt']
P_test shape: (393, 11, 11)
