In [1]:
# --- Imports ---
import pandas as pd
from typing import Tuple, Dict
import numpy as np

In [2]:
# Load *.network.tsv
df1 = pd.read_csv('K562/K562.network.tsv', sep = '\t')

df1 = df1[['chrom', 'chromStart', 'chromEnd']]
df1 = df1.drop_duplicates()

print(df1.shape)
df1.head(n = 3)

(48589, 3)


Unnamed: 0,chrom,chromStart,chromEnd
0,chr1,9863,10662
3,chr1,29099,29599
6,chr1,32447,32947


In [3]:
# Load *.variants.hg38.bed
df2 = pd.read_csv('variants/K562.variants.hg38.bed',
                  sep = '\t', names = ['chrom', 'chromStart', 'chromEnd', 'name'])

df2 = df2[['chrom', 'chromStart', 'chromEnd']]
df2 = df2.drop_duplicates()

print(df2.shape)
df2.head(n = 3)

(6803, 3)


Unnamed: 0,chrom,chromStart,chromEnd
0,chr1,2188393,2188394
1,chr1,2189185,2189186
2,chr1,2214725,2214726


In [4]:
# Load macs2_peaks.narrowPeak.sorted.candidateRegions.bed
df3 = pd.read_csv('../ENCODE_rE2G/results/K562/Peaks/macs2_peaks.narrowPeak.sorted.candidateRegions.bed',
                  sep = '\t', names = ['chrom', 'chromStart', 'chromEnd'])

print(df3.shape)
df3.head(n = 3)

(143534, 3)


Unnamed: 0,chrom,chromStart,chromEnd
0,chr1,9863,10662
1,chr1,11061,11561
2,chr1,13785,14753


In [5]:
# Permutation test
def permutation_test(df1: pd.DataFrame, df2: pd.DataFrame, df3: pd.DataFrame, permutations: int, random_state: int) -> Tuple[float, pd.DataFrame]:
    RNG = np.random.default_rng(random_state)
    
    vars_by_chrom = {}
    for chrom, sub in df2.groupby('chrom'):
        pos = np.sort(sub['chromStart'].to_numpy(dtype = np.int64))
        if pos.size > 0:
            vars_by_chrom[chrom] = pos

    if not vars_by_chrom:
        histogram_df = pd.DataFrame({'overlaps': [0], 'label': ['observed']})
        return 1.0, histogram_df

    def count_overlaps(starts_by_chrom: Dict[str, np.ndarray], ends_by_chrom: Dict[str, np.ndarray]) -> int:
        total = 0
        for chrom, var_pos in vars_by_chrom.items():
            if chrom not in starts_by_chrom:
                continue

            starts = starts_by_chrom[chrom]
            ends = ends_by_chrom[chrom]
            if starts.size == 0 or var_pos.size == 0:
                continue

            left_idx = np.searchsorted(var_pos, starts, side = 'left')
            right_idx = np.searchsorted(var_pos, ends, side = 'left')

            diff = np.zeros(var_pos.size + 1, dtype = np.int32)
            np.add.at(diff, left_idx, 1)
            np.add.at(diff, right_idx, -1)

            covered = np.cumsum(diff[:-1]) > 0
            total += int(covered.sum())

        return total

    obs_starts_by_chrom = {}
    obs_ends_by_chrom = {}

    for chrom, sub in df1.groupby('chrom'):
        obs_starts_by_chrom[chrom] = sub['chromStart'].to_numpy(dtype = np.int64)
        obs_ends_by_chrom[chrom] = sub['chromEnd'].to_numpy(dtype = np.int64)

    obs_overlap = count_overlaps(obs_starts_by_chrom, obs_ends_by_chrom)
    
    info_per_chrom = {}

    df3_grouped = {chrom: sub for chrom, sub in df3.groupby('chrom')}
    for chrom, sub1 in df1.groupby('chrom'):
        if chrom not in df3_grouped:
            raise ValueError(f'No candidate regions (df3) for chromosome {chrom} in df1.')

        starts1 = sub1['chromStart'].to_numpy(dtype = np.int64)
        ends1 = sub1['chromEnd'].to_numpy(dtype = np.int64)
        lengths1 = ends1 - starts1
        if np.any(lengths1 <= 0):
            raise ValueError(f'Non-positive interval length in df1 on {chrom}.')

        uniq_lengths, inv = np.unique(lengths1, return_inverse = True)

        sub3 = df3_grouped[chrom]
        peaks_start = sub3['chromStart'].to_numpy(dtype = np.int64)
        peaks_end = sub3['chromEnd'].to_numpy(dtype = np.int64)
        peaks_len = peaks_end - peaks_start
        if np.any(peaks_len <= 0):
            raise ValueError(f'Non-positive interval length in df3 on {chrom}.')

        candidates_by_len_index = []
        indices_by_len_index = []
        for k, L in enumerate(uniq_lengths):
            idxs = np.where(inv == k)[0]
            indices_by_len_index.append(idxs)

            candidate_peaks = np.where(peaks_len >= L)[0]
            if candidate_peaks.size == 0:
                raise ValueError(f'No candidate peaks in df3 on {chrom} are long enough for enhancer length {L}.')
            candidates_by_len_index.append(candidate_peaks)

        info_per_chrom[chrom] = {
            'uniq_lengths': uniq_lengths,
            'indices_by_len_index': indices_by_len_index,
            'candidates_by_len_index': candidates_by_len_index,
            'peaks_start': peaks_start,
            'peaks_end': peaks_end,
            'n_intervals': lengths1.size,
        }

    null_counts = np.zeros(permutations, dtype = np.int64)

    for b in range(permutations):
        perm_starts_by_chrom = {}
        perm_ends_by_chrom = {}

        for chrom, info in info_per_chrom.items():
            n = info['n_intervals']
            uniq_lengths = info['uniq_lengths']
            indices_by_len_index = info['indices_by_len_index']
            candidates_by_len_index = info['candidates_by_len_index']
            peaks_start = info['peaks_start']
            peaks_end = info['peaks_end']

            starts = np.empty(n, dtype = np.int64)
            ends = np.empty(n, dtype = np.int64)

            for k, L in enumerate(uniq_lengths):
                idxs = indices_by_len_index[k]
                if idxs.size == 0:
                    continue

                candidate_peaks = candidates_by_len_index[k]
                chosen_peak_idx = RNG.choice(candidate_peaks, size = idxs.size, replace = True)
                peak_s = peaks_start[chosen_peak_idx]
                peak_e = peaks_end[chosen_peak_idx]
                max_offset = peak_e - peak_s - L
                offsets = RNG.integers(0, max_offset + 1, size = idxs.size)
                starts[idxs] = peak_s + offsets
                ends[idxs] = starts[idxs] + L

            perm_starts_by_chrom[chrom] = starts
            perm_ends_by_chrom[chrom] = ends

        null_counts[b] = count_overlaps(perm_starts_by_chrom, perm_ends_by_chrom)

    p_val = (np.sum(null_counts >= obs_overlap) + 1) / (permutations + 1)

    histogram_df = pd.DataFrame({'Overlaps': np.concatenate([null_counts, np.array([obs_overlap])]), 'Label': ['Null'] * permutations + ['Observed']})

    return obs_overlap, p_val, histogram_df

In [None]:
# Results
obs_overlap, p_val, histogram_df = permutation_test(df1 = df1, df2 = df2, df3 = df3, permutations = 1000, random_state = 42)

print(f'Observed: {obs_overlap}')
print(f'p-value: {p_val}')
histogram_df.head(n = 5)