In [36]:
import numpy as np
import torch

def estimate_num_blocks(mat: np.ndarray, eps=0.05, min_block_size=1):
    """
    Schätzt Anzahl rechteckiger Blöcke in der Matrix.

    Parameter
    ---------
    mat : ndarray (size, size)
        Symmetrische Matrix mit Blöcken und NaNs
    eps : float
        Schwellenwert für Ähnlichkeit (z. B. 0.1)
    min_block_size : int
        Minimale Fläche für einen Block (gegen Rauschen)

    Rückgabe
    --------
    count : int
        Geschätzte Blockanzahl
    """
    visited = np.zeros_like(mat, dtype=bool)
    size = mat.shape[0]
    count = 0

    def flood_fill(i, j, ref_val):
        """Füllt zusammenhängende Zellen mit ähnlichem Wert"""
        stack = [(i, j)]
        region = []

        while stack:
            x, y = stack.pop()
            if (
                0 <= x < size and 0 <= y < size
                and not visited[x, y]
                and not np.isnan(mat[x, y])
                and abs(mat[x, y] - ref_val) < eps
            ):
                visited[x, y] = True
                region.append((x, y))
                # Nachbarn hinzufügen (4-connectivity)
                stack.extend([
                    (x+1, y), (x-1, y),
                    (x, y+1), (x, y-1)
                ])
        return region

    for i in range(size):
        for j in range(i+1, size):  # Nur obere Hälfte
            if not visited[i, j] and not np.isnan(mat[i, j]):
                region = flood_fill(i, j, mat[i, j])
                if len(region) >= min_block_size:
                    count += 1
    return count


def generate_block_matrix_with_gaps(size=100,
                                    noise_level=0.0,
                                    p_drop=0.15,
                                    p_group=0.5,
                                    max_group_size=4,
                                    fill_value=np.nan,
                                    lam = 5,
                                    rho = 0,
                                    rho_expand = 4):
    """
    Erstellt eine symmetrische Block‑Matrix (‑1…1) und löscht hinterher
    zufällig Spalten + zugehörige Zeilen (= Informationsverlust).

    Parameter
    ----------
    size : int
        Seitenlänge der quadratischen Matrix.
    num_blocks : int
        Wie viele rechteckige Blöcke erzeugt werden.
    noise_level : float
        Standardabweichung des Rauschens in Block‑ bzw. Hintergrundfeldern.
    p_drop : float
        Wahrscheinlichkeit, dass eine gegebene Spalte entfernt wird (0–1).
    p_group : float
        Wahrscheinlichkeit, dass nach dem Entfernen einer Spalte
        *zusätzliche* benachbarte Spalten als zusammenhängende Gruppe
        entfernt werden.
    max_group_size : int
        Obergrenze für die Größe einer entfernten Spaltengruppe.
    fill_value : float
        Wert, mit dem gelöschte Zellen aufgefüllt werden
        (NaN, 0, −999 … je nach Downstream‑Modell).

    Rückgabe
    --------
    mat : ndarray, shape (size, size)
        Symmetrische Matrix mit evtl. fehlenden Spalten/Zeilen.
    num_blocks : int
        Tatsächlich verwendete Block‑Anzahl (unverändert).
    removed_idx : list[int]
        Sortierte Liste der entfernten Spalten/Zeilen‑Indizes.
    """
    # -------- 1) Grundmatrix mit Blöcken erzeugen -------- #
    rng = np.random.default_rng()

    # -------- 3) Matrix initialisieren -------- #
    mat = np.full((size, size), fill_value, dtype=float)
    used = np.full((size, size), False, dtype=float)
    num_blocks = 0

    rho = rng.exponential(rho)
    rho_expand = rng.exponential(rho_expand)
    

    # -------- 4) Blöcke füllen und symmetrisch setzen -------- #

    for i in range(size):
        for j in range(i+1, size):
            if used[i,j] == False:
                block_breite = min(j - i, rng.poisson(rho) + 1, size - i)
                block_tiefe = min(j-i, rng.poisson(rho) + 1, size - j)
                val = rng.uniform(-1, 1)
                block = rng.normal(val, noise_level, size=(block_breite, block_tiefe))
                num_blocks += 1
                # Grenzindizes berechnen
                r0, r1 = i, i + block_breite
                c0, c1 = j, j + block_tiefe
                
                # Obere Hälfte (originaler Block)
                mat[r0:r1, c0:c1] = block
                
                # Untere Hälfte (transponiert einschreiben)
                mat[c0:c1, r0:r1] = block.T      # hier wird auf echte Slice-Koordinaten geschrieben
                used[i:(i+block_breite), j:(j+block_tiefe)] = True

    for i in range(size):
        mat[i,i] = 0

    # -------- 2) Spalten-/Zeilenlöschung vorbereiten -------- #
    n_initial = min(size, max(0, rng.poisson(rho)))
    candidates = rng.choice(np.arange(size), size=n_initial, replace=False)
    
    deleted_rows = []

    for idx in candidates:
        deleted_rows.append(idx)

        # Links erweitern
        left_expand = rng.poisson(rho_expand)
        left_indices = [i for i in range(idx - left_expand, idx) if 0 <= i < size]
        deleted_rows.extend(left_indices)

        # Rechts erweitern
        right_expand = rng.poisson(rho_expand)
        right_indices = [i for i in range(idx + 1, idx + 1 + right_expand) if 0 <= i < size]
        deleted_rows.extend(right_indices)

    # -------- 3) Spalten und Zeilen auf fill_value setzen -------- #
    if deleted_rows:
        mat[:, deleted_rows] = fill_value
        mat[deleted_rows, :] = fill_value

    num_blocks = estimate_num_blocks(mat)
    return mat, torch.tensor(num_blocks, dtype=torch.float32)



In [45]:
import numpy as np

def count_blocks_sym(mat: np.ndarray, threshold: float = 0.05):
    """
    Schnelle Blocksuche in quadratischer symmetrischer Matrix.

    Parameter
    ---------
    mat : np.ndarray (N, N)
        Symmetrische Matrix mit beliebigen Werten.
    threshold : float
        Maximaler Wertunterschied für 'gleichartigen' Block.

    Returns
    -------
    num_blocks : int
        Anzahl gefundener Blöcke.
    labels : np.ndarray (N, N)
        Ganzzahlige Beschriftung je Zelle (0..num_blocks-1).
    """
    assert mat.ndim == 2 and mat.shape[0] == mat.shape[1], "Matrix muss quadratisch sein."
    n = mat.shape[0]

    # 1. Label-Puffer und Union-Find-Struktur vorbereiten
    labels = -np.ones_like(mat, dtype=int)
    parent = []           # parent[i] = root‑Index des Blocks i
    ranks  = []           # Union‑by‑rank

    def find(u):
        """Pfadkompression."""
        while parent[u] != u:
            parent[u] = parent[parent[u]]
            u = parent[u]
        return u

    def union(u, v):
        """Vereinigt zwei Blöcke (root‑IDs)."""
        ru, rv = find(u), find(v)
        if ru == rv:
            return
        # Union by rank
        if ranks[ru] < ranks[rv]:
            parent[ru] = rv
        elif ranks[ru] > ranks[rv]:
            parent[rv] = ru
        else:
            parent[rv] = ru
            ranks[ru] += 1

    # 2. Einmal über Matrix laufen (oben‑links‑Nachbarn reichen)
    next_label = 0
    for i in range(n):
        for j in range(n):
            val = mat[i, j]

            # Kandidaten‑Nachbarn: oben (i‑1,j) und links (i,j‑1)
            candidates = []
            if i > 0 and abs(val - mat[i-1, j]) <= threshold:
                candidates.append(labels[i-1, j])
            if j > 0 and abs(val - mat[i, j-1]) <= threshold:
                candidates.append(labels[i, j-1])

            # Keine passenden Nachbarn → neuer Block
            if not candidates:
                labels[i, j] = next_label
                parent.append(next_label)
                ranks.append(0)
                next_label += 1
            else:
                # Nutze erstes passendes Label
                lbl = candidates[0]
                labels[i, j] = lbl
                # Vereinige evtl. unterschiedliche Nachbarn
                for other in candidates[1:]:
                    if other != lbl:
                        union(lbl, other)

    # 3. Zweite Phase: Root‑Labels & Umnummerierung
    root_map = {}
    new_label = 0
    for idx in range(next_label):
        root = find(idx)
        if root not in root_map:
            root_map[root] = new_label
            new_label += 1

    # labels aktualisieren
    it = np.nditer(labels, flags=['multi_index'], op_flags=['readwrite'])
    while not it.finished:
        it[0][...] = root_map[find(it[0].item())]
        it.iternext()

    return new_label, labels


In [84]:
import time

mat = generate_block_matrix_with_gaps(size = 100)[0]
np.set_printoptions(precision=2, suppress=True)

#print(matrix[0])

start_time = time.time()
thresholds = np.arange(0, 1.0001, 0.025)
results = count_blocks_sym(mat, thresholds)
for th, (lab, k) in zip(thresholds, results):
    print(f"th={th}: Blöcke={k}")
end_time = time.time()
print(end_time-start_time)

np.max(mat)

th=0.0: Blöcke=5050
th=0.025: Blöcke=4808
th=0.05: Blöcke=4581
th=0.07500000000000001: Blöcke=4339
th=0.1: Blöcke=4125
th=0.125: Blöcke=3925
th=0.15000000000000002: Blöcke=3720
th=0.17500000000000002: Blöcke=3529
th=0.2: Blöcke=3353
th=0.225: Blöcke=3180
th=0.25: Blöcke=3020
th=0.275: Blöcke=2859
th=0.30000000000000004: Blöcke=2719
th=0.325: Blöcke=2566
th=0.35000000000000003: Blöcke=2420
th=0.375: Blöcke=2298
th=0.4: Blöcke=2143
th=0.42500000000000004: Blöcke=2021
th=0.45: Blöcke=1912
th=0.47500000000000003: Blöcke=1785
th=0.5: Blöcke=1677
th=0.525: Blöcke=1581
th=0.55: Blöcke=1488
th=0.5750000000000001: Blöcke=1388
th=0.6000000000000001: Blöcke=1300
th=0.625: Blöcke=1218
th=0.65: Blöcke=1149
th=0.675: Blöcke=1074
th=0.7000000000000001: Blöcke=988
th=0.7250000000000001: Blöcke=912
th=0.75: Blöcke=862
th=0.775: Blöcke=804
th=0.8: Blöcke=743
th=0.8250000000000001: Blöcke=684
th=0.8500000000000001: Blöcke=648
th=0.875: Blöcke=607
th=0.9: Blöcke=549
th=0.925: Blöcke=503
th=0.9500000000000

0.9998343400533789

In [63]:
import scipy.ndimage as ndi

threshold = 0.01
n = 1000
mat = generate_block_matrix_with_gaps(size = n)[0]

start_time = time.time()
mask = ~np.isnan(mat)
# NaNs maskieren
valid = ~np.isnan(mat)

# Eine binäre Maske, wo Werte innerhalb des Schwellenwerts zu ihren Nachbarn ähnlich sind
# Wir initialisieren die Maske als False, dann setzen wir nur gültige Vergleichsergebnisse
similar = np.zeros_like(mat, dtype=bool)

# Vergleiche zu Nachbarn nach oben, links, unten, rechts
for dx, dy in [(-1,0), (0,-1), (1,0), (0,1)]:
    shifted = np.roll(mat, shift=(dx, dy), axis=(0,1))
    shifted_valid = np.roll(valid, shift=(dx, dy), axis=(0,1))
    cmp_mask = (np.abs(mat - shifted) < threshold) & valid & shifted_valid
    similar |= cmp_mask

# Verbundene Bereiche labeln
structure = np.array([[0,1,0],
                      [1,1,1],
                      [0,1,0]])

labels, num = ndi.label(similar, structure=structure)

print(f"Gefundene Blöcke: {num}")

end_time = time.time()
print(end_time-start_time)

Gefundene Blöcke: 17664
0.03799796104431152


In [73]:
import numpy as np
from numba import njit
from typing import Union, Sequence, List, Tuple

@njit                       # ---------- Kern für *einen* threshold ----------
def _count_blocks_single(mat, idx, threshold):
    n = idx.size
    label_matrix = -np.ones((n, n), dtype=np.int32)
    num_labels   = 0

    for ii in range(n):
        i = idx[ii]
        for jj in range(ii, n):
            j = idx[jj]
            if ii == 0 and jj == 0:
                label_matrix[ii, jj] = 0
                num_labels += 1
            elif ii == 0:
                if abs(mat[i, j] - mat[i, idx[jj-1]]) < threshold:
                    label_matrix[ii, jj] = label_matrix[ii, jj-1]
                else:
                    label_matrix[ii, jj] = num_labels
                    num_labels += 1
            else:
                if abs(mat[i, j] - mat[idx[ii-1], j]) < threshold:
                    label_matrix[ii, jj] = label_matrix[ii-1, jj]
                elif abs(mat[i, j] - mat[i, idx[jj-1]]) < threshold:
                    label_matrix[ii, jj] = label_matrix[ii, jj-1]
                else:
                    label_matrix[ii, jj] = num_labels
                    num_labels += 1
    return label_matrix, num_labels


# ----------------------- Benutzer‑Funktion -----------------------
def count_blocks_sym(mat: np.ndarray,
                     threshold: Union[float, Sequence[float]] = 0.05
                    ) -> List[Tuple[np.ndarray, int]]:
    """
    Führe die Blockzählung für einen oder mehrere threshold‑Werte durch.

    Parameters
    ----------
    mat : np.ndarray (N, N)
        Symmetrische Matrix mit NaNs.
    threshold : float | list[float]
        Einzelner Grenzwert oder Liste davon.

    Returns
    -------
    results : list[(label_matrix, num_labels)]
        Liste in derselben Reihenfolge wie threshold(s).
    """
    assert mat.ndim == 2 and mat.shape[0] == mat.shape[1], "Matrix muss quadratisch sein."
    thresholds = np.atleast_1d(threshold).astype(np.float64)

    # Vorarbeit außerhalb der Schleife
    idx = np.where(~np.isnan(mat[0]))[0]

    results = []
    for th in thresholds:
        labels, k = _count_blocks_single(mat, idx, th)
        results.append((labels, int(k)))
    return results


In [70]:
def count_blocks_sym(mat: np.ndarray, threshold: float = 0.05):
    """
    Schnelle Blocksuche in quadratischer symmetrischer Matrix.

    Parameter
    ---------
    mat : np.ndarray (N, N)
        Symmetrische Matrix mit beliebigen Werten.
    threshold : float
        Maximaler Wertunterschied für 'gleichartigen' Block.

    Returns
    -------
    num_blocks : int
        Anzahl gefundener Blöcke.
    labels : np.ndarray (N, N)
        Ganzzahlige Beschriftung je Zelle (0..num_blocks-1).
    """
    assert mat.ndim == 2 and mat.shape[0] == mat.shape[1], "Matrix muss quadratisch sein."
    n = mat.shape[0]

    gene_presence_vector = ~np.isnan(mat[0])
    matrix = mat[np.ix_(gene_presence_vector, gene_presence_vector)]
    n = sum(gene_presence_vector)

    label_matrix = np.full((n, n), np.nan)
    number_of_labels = 0
    
    for i in range(n):
        for j in range(i, n):
            if i == 0 and j == 0:
                label_matrix[i,j] = 0
                number_of_labels += 1
            elif i == 0: # j != 0
                if abs(matrix[i,j] - matrix[i,j-1]) < threshold:
                    label_matrix[i,j] = label_matrix[i,j-1]
                else:
                    label_matrix[i,j] = number_of_labels
                    number_of_labels += 1
            else:
                if abs(matrix[i,j] - matrix[i-1,j]) < threshold:
                    label_matrix[i,j] = label_matrix[i-1,j]
                elif abs(matrix[i,j] - matrix[i,j-1]) < threshold:
                    label_matrix[i,j] = label_matrix[i,j-1]
                else:
                    label_matrix[i,j] = number_of_labels
                    number_of_labels += 1
                    
    return label_matrix, number_of_labels
                        
            

In [32]:
gene_presence_vector = ~np.isnan(matrix[0])
#matrix = matrix[np.ix_(gene_presence_vector, gene_presence_vector)]
sum(gene_presence_vector)

10

In [221]:
import torch
from torch.utils.data import Dataset

class BlockMatrixDataset(Dataset):
    def __init__(self,
                 n_samples=10_000,
                 size=100,
                 noise_level=0.05,
                 lam=3,
                 rho=1,
                 rho_expand=1,
                 seed=None):
        self.size = size
        self.rng  = np.random.default_rng(seed)
        self.mats = []
        self.labels = []

        for _ in range(n_samples):
            mat, n_blocks = generate_block_matrix_with_gaps(
                size=size,
                noise_level=noise_level,
                lam=lam,
                rho=rho,
                rho_expand=rho_expand
            )
            # Maske für fehlende Werte
            mask = np.isnan(mat).astype(float)
            # fehlende Einträge 0 setzen (oder Mittelwert)
            mat[np.isnan(mat)] = 0.0

            # [C, H, W]  -> 2 Kanäle: Matrix + Maske
            sample = np.stack([mat, mask])
            self.mats.append(sample.astype(np.float32))
            self.labels.append(float(n_blocks))

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        x = torch.tensor(self.mats[idx])
        y = torch.tensor(self.labels[idx])
        return x, y

In [245]:
import torch.nn as nn
import torch.nn.functional as F

class BlockCounterCNN(nn.Module):
    def __init__(self, n_channels=1):
        super().__init__()
        self.conv1 = nn.Conv2d(n_channels, 16, 5, padding=2)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool  = nn.AdaptiveAvgPool2d(1)   # [64,1,1]
        self.fc1   = nn.Linear(64, 128)        # <‑‑ fix
        self.head  = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = self.pool(x)              # [batch, 64, 1, 1]
        x = x.view(x.size(0), -1)       # → [batch, 64]
        print(x.shape)   # sollte (batch_size, 64)
        x = F.relu(self.fc1(x)) 
        return self.head(x).squeeze(1)

def run_simulations(simulator, prior, num_simulations):
    thetas, xs = [], []

    for _ in range(num_simulations):
        sim = generate_block_matrix_with_gaps()
        sim[0][np.isnan(sim[0])] = 0.0
        thetas.append(sim[1])  # num_blocks
        xs.append(torch.tensor(sim[0], dtype=torch.float32))  # [size, size]

    theta = torch.tensor(thetas, dtype=torch.float32).unsqueeze(1)  # [N, 1]
    x = torch.stack(xs, dim=0)  # [N, H, W]
    x = x.unsqueeze(1)
    return theta, x


# Training:
from sbi.utils import BoxUniform

prior = BoxUniform(low=torch.tensor([1.0]), high=torch.tensor([4000.0]))
theta, x = run_simulations(simulator, prior, num_simulations=10)

print(x.shape)

torch.Size([10, 1, 100, 100])


In [246]:
from sbi.utils import BoxUniform

# Anzahl der Blöcke zwischen 1 und 30
import sbi
from sbi import utils
from sbi.neural_nets import posterior_nn
from sbi.inference import SNPE


embedding_net = BlockCounterCNN()
neural_posterior = posterior_nn(
    model="maf",
    embedding_net=embedding_net,
    hidden_features=64,
    num_transforms=4
)
inference = SNPE(prior=prior, density_estimator=neural_posterior)

# Simuliere 10.000 θ,x-Paare
density_estimator = inference.append_simulations(theta, x).train()
posterior = inference.build_posterior(density_estimator)



torch.Size([1, 64])
torch.Size([2, 64])


RuntimeError: Debug hint: The simulated data x has 3 dimensions.
            With default settings, sbi cannot deal with multidimensional simulations.
            Make sure to use an embedding net that reduces the dimensionality, e.g., a
            CNN in case of images, or change the simulator to return one-dimensional x.
            

In [237]:
mat, true_num_blocks = generate_block_matrix_with_gaps(num_blocks=7)
mat[np.isnan(mat)] = 0
x_obs = torch.tensor(mat, dtype=torch.float32).flatten()

# Posterior auswerten
posterior_samples = posterior.sample((1_000,), x=x_obs)

import matplotlib.pyplot as plt
plt.hist(posterior_samples.numpy(), bins=30, density=True)
plt.axvline(true_num_blocks, color='r', linestyle='--')
plt.title("Posterior über Blockanzahl")
plt.show()


TypeError: generate_block_matrix_with_gaps() got an unexpected keyword argument 'num_blocks'