<a href="https://colab.research.google.com/github/huangch/FastAutoGPT/blob/master/Untitled1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [31]:
import numpy as np
import pandas as pd
from scipy.spatial import Delaunay
from scipy.sparse import csr_matrix

# ---------- pseudo data ----------
def make_cells(n=3000, width=2048, height=2048, seed=42):
    rng = np.random.default_rng(seed)
    x = rng.uniform(0, width, n); y = rng.uniform(0, height, n)
    labels = np.array(["lymphocyte", "tumor", "others"])
    ann = rng.choice(labels, size=n, p=[0.33, 0.34, 0.33])
    return pd.DataFrame({"x": x, "y": y, "cell_annotation": pd.Categorical(ann, categories=labels)})

# ---------- fast Delaunay + CSR neighbors ----------
def delaunay_triangulation_fast(points: np.ndarray,
                                qhull_options: str = "Qbb Qc QJ",
                                need_triangles: bool = False,
                                need_edges: bool = False):
    P = np.ascontiguousarray(points, dtype=np.float64)
    uniq_pts, uniq_to_orig, orig_to_uniq = np.unique(P, axis=0, return_index=True, return_inverse=True)
    if uniq_pts.shape[0] < 3:
        raise ValueError("Need at least 3 unique points for 2D Delaunay.")
    tri = Delaunay(uniq_pts, qhull_options=qhull_options)  # no Qz
    indptr_u, indices_u = tri.vertex_neighbor_vertices
    indices_orig = uniq_to_orig[indices_u]
    simplices = edges = None
    if need_triangles or need_edges:
        simplices = uniq_to_orig[tri.simplices]
    if need_edges:
        a, b, c = simplices.T
        edges_raw = np.vstack([np.c_[a, b], np.c_[b, c], np.c_[c, a]])
        edges = np.unique(np.sort(edges_raw, axis=1), axis=0)
    return (tri, indptr_u, indices_u, indices_orig, orig_to_uniq, uniq_to_orig, simplices, edges)

# ---------- ALL nodes k-hop neighborhoods (batch, sparse) ----------
def k_hop_all_nodes(indptr_u: np.ndarray,
                    indices_u: np.ndarray,
                    orig_to_uniq: np.ndarray,
                    uniq_to_orig: np.ndarray,
                    k: int,
                    include_start: bool = False):
    """
    Returns a list of np.ndarrays: for each ORIGINAL node i, all ORIGINAL ids within <= k hops.
    """
    n = indptr_u.size - 1
    if k < 0:
        raise ValueError("k must be >= 0")

    # Build symmetric boolean adjacency A (unique index space)
    rows = np.repeat(np.arange(n), np.diff(indptr_u))
    cols = indices_u
    data = np.ones_like(cols, dtype=bool)
    A = csr_matrix((data, (rows, cols)), shape=(n, n), dtype=bool)
    A = A.maximum(A.T)           # symmetrize
    A.setdiag(False); A.eliminate_zeros()

    if k == 0:
        R = csr_matrix((n, n), dtype=bool)  # empty reachability
    else:
        R = A.copy()
        frontier = A.copy()
        for _ in range(2, k + 1):
            frontier = (frontier @ A).astype(bool)
            frontier.eliminate_zeros()
            R = R.maximum(frontier)         # boolean OR

    if include_start:
        R.setdiag(True)

    # Collect neighbors per UNIQUE node, then map to ORIGINAL ids.
    neigh_u = [uniq_to_orig[R.indices[R.indptr[u]:R.indptr[u+1]]] for u in range(n)]
    return [neigh_u[orig_to_uniq[i]] for i in range(orig_to_uniq.size)]

# ---------- example: run with k=2 over all nodes ----------
if __name__ == "__main__":
    df = make_cells(n=3000, width=2048, height=2048, seed=7)
    points = df[["x", "y"]].to_numpy(copy=False)

    (tri, indptr_u, indices_u, indices_orig,
     orig2uniq, uniq2orig, _, _) = delaunay_triangulation_fast(points,
                                                               need_triangles=False,
                                                               need_edges=False)

    k = 2
    neighbors_k = k_hop_all_nodes(indptr_u, indices_u, orig2uniq, uniq2orig, k=k, include_start=False)

    # demo: inspect a few nodes
    for i in [0, 1, 2, 123]:
        ids = neighbors_k[i]
        print(f"node {i}: {len(ids)} nodes within <= {k} hops")
        print(df.loc[ids, "cell_annotation"].value_counts(), "\n")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Name: count, dtype: int64 

node 2286: 25 nodes within <= 2 hops
cell_annotation
others        11
tumor          8
lymphocyte     6
Name: count, dtype: int64 

node 2287: 21 nodes within <= 2 hops
cell_annotation
others        10
tumor          6
lymphocyte     5
Name: count, dtype: int64 

node 2288: 18 nodes within <= 2 hops
cell_annotation
others        9
lymphocyte    6
tumor         3
Name: count, dtype: int64 

node 2289: 14 nodes within <= 2 hops
cell_annotation
lymphocyte    6
others        5
tumor         3
Name: count, dtype: int64 

node 2290: 14 nodes within <= 2 hops
cell_annotation
lymphocyte    6
tumor         5
others        3
Name: count, dtype: int64 

node 2291: 15 nodes within <= 2 hops
cell_annotation
tumor         8
lymphocyte    4
others        3
Name: count, dtype: int64 

node 2292: 25 nodes within <= 2 hops
cell_annotation
others        10
tumor          9
lymphocyte     6
Name: count, dtype: int

The provided code defines functions to perform Delaunay triangulation and find k-hop neighbors on a set of 2D points, simulating cell locations with annotations.

Here's a breakdown:

1.  **`make_cells` function**: Creates a pandas DataFrame with random `x`, `y` coordinates and categorical `cell_annotation` labels ("lymphocyte", "tumor", "others").

2.  **`delaunay_triangulation_fast` function**:
    *   Takes an array of points.
    *   Performs Delaunay triangulation using `scipy.spatial.Delaunay`.
    *   Handles potential duplicate points by finding unique points and mapping back to original indices.
    *   Optionally returns the simplices (triangles) and edges of the triangulation.
    *   Returns various arrays and the `Delaunay` object for further use.

3.  **`k_hop_all_nodes` function**:
    *   Takes the `indptr_u` and `indices_u` from the Delaunay triangulation (representing unique point neighbors), original-to-unique and unique-to-original mappings, and the number of hops `k`.
    *   Builds a symmetric boolean adjacency matrix `A` in the unique index space using `csr_matrix`.
    *   Computes the k-hop reachability matrix `R` by repeatedly multiplying `A` with itself (boolean matrix multiplication).
    *   Optionally includes the starting node in the neighbors (`include_start`).
    *   Maps the unique node neighbors back to the original node indices.
    *   Returns a list where each element is an array of original node IDs within `k` hops of the corresponding original node.

4.  **Example Usage (`if __name__ == "__main__":`)**:
    *   Generates sample cell data using `make_cells`.
    *   Performs Delaunay triangulation on the cell coordinates.
    *   Calls `k_hop_all_nodes` to find 2-hop neighbors for all cells.
    *   Prints the number of neighbors and the counts of each cell annotation type within the 2-hop neighborhood for a few example nodes.

# Task
Modify the code to identify nodes with more than N neighbors (including k-hopping neighbors), and among these, find nodes where the ratio of k-hopping neighbors with annotations in `primary_annotations` to the total number of k-hopping neighbors is greater than R.

## Get k-hop neighbors

### Subtask:
Obtain the k-hop neighbors for all nodes, this time including the starting node.


**Reasoning**:
Obtain the k-hop neighbors for all nodes, including the starting node, by calling the `k_hop_all_nodes` function with `include_start=True`.



In [33]:
neighbors_k_inclusive = k_hop_all_nodes(indptr_u, indices_u, orig2uniq, uniq2orig, k=k, include_start=True)

## Identify nodes with more than n neighbors

### Subtask:
Filter the nodes to keep only those with more than `N` neighbors.


**Reasoning**:
Filter the nodes based on the number of neighbors being greater than N.



In [34]:
N = 25  # Define the threshold for the minimum number of neighbors

nodes_with_sufficient_neighbors = []
for i, neighbors in enumerate(neighbors_k_inclusive):
    if len(neighbors) > N:
        nodes_with_sufficient_neighbors.append(i)

print(f"Found {len(nodes_with_sufficient_neighbors)} nodes with more than {N} neighbors.")

Found 310 nodes with more than 25 neighbors.


## Count primary annotation neighbors

### Subtask:
For the filtered nodes, count the number of k-hop neighbors whose annotations are in `primary_annotations`.


**Reasoning**:
Iterate through the filtered nodes and count the number of primary neighbors for each.



In [35]:
primary_annotations = ["tumor", "lymphocyte"]  # Define primary annotation types
primary_neighbor_counts = []

for node_index in nodes_with_sufficient_neighbors:
    k_hop_neighbors = neighbors_k_inclusive[node_index]
    primary_neighbors = df.loc[k_hop_neighbors, "cell_annotation"].isin(primary_annotations)
    primary_count = primary_neighbors.sum()
    primary_neighbor_counts.append(primary_count)

print(f"Calculated primary neighbor counts for {len(primary_neighbor_counts)} filtered nodes.")

Calculated primary neighbor counts for 310 filtered nodes.


## Calculate the ratio

### Subtask:
For the filtered nodes, calculate the ratio of primary annotation neighbors to the total number of k-hop neighbors.


**Reasoning**:
Calculate the ratio of primary annotation neighbors to the total number of k-hop neighbors for each filtered node.



In [36]:
primary_ratios = []

for i, node_index in enumerate(nodes_with_sufficient_neighbors):
    total_neighbors = len(neighbors_k_inclusive[node_index])
    primary_count = primary_neighbor_counts[i]
    if total_neighbors > 0:
        ratio = primary_count / total_neighbors
    else:
        ratio = 0  # Should not happen based on previous filtering
    primary_ratios.append(ratio)

print(f"Calculated {len(primary_ratios)} primary ratios.")

Calculated 310 primary ratios.


## Identify nodes meeting both criteria

### Subtask:
Filter the nodes further to keep only those where the calculated ratio is greater than `R`.


**Reasoning**:
Filter the nodes based on the primary annotation ratio threshold and print the count of resulting nodes.



In [37]:
R = 0.5  # Define the threshold for the minimum primary annotation ratio

nodes_meeting_both_criteria = []
for i, node_index in enumerate(nodes_with_sufficient_neighbors):
    if primary_ratios[i] > R:
        nodes_meeting_both_criteria.append(node_index)

print(f"Found {len(nodes_meeting_both_criteria)} nodes meeting both criteria (more than {N} neighbors and primary ratio > {R}).")

Found 295 nodes meeting both criteria (more than 25 neighbors and primary ratio > 0.5).


## Present the results

### Subtask:
Display the identified nodes and their calculated ratios.


**Reasoning**:
Display the identified nodes and their calculated ratios by iterating through the list of nodes meeting both criteria and retrieving their corresponding ratios.



In [38]:
print("Nodes meeting both criteria and their primary ratios:")
for node_index in nodes_meeting_both_criteria:
    # Find the position of node_index in nodes_with_sufficient_neighbors
    # This assumes nodes_meeting_both_criteria is a subset of nodes_with_sufficient_neighbors
    # and maintains the relative order, which is true based on the filtering logic.
    try:
        index_in_sufficient_neighbors = nodes_with_sufficient_neighbors.index(node_index)
        ratio = primary_ratios[index_in_sufficient_neighbors]
        print(f"Node {node_index}: Ratio = {ratio:.4f}")
    except ValueError:
        # This should not happen if the logic is correct, but added for robustness
        print(f"Error: Node {node_index} not found in nodes_with_sufficient_neighbors.")


Nodes meeting both criteria and their primary ratios:
Node 18: Ratio = 0.7586
Node 39: Ratio = 0.8214
Node 46: Ratio = 0.7143
Node 47: Ratio = 0.7097
Node 51: Ratio = 0.6923
Node 71: Ratio = 0.6154
Node 93: Ratio = 0.5714
Node 108: Ratio = 0.7857
Node 121: Ratio = 0.7586
Node 142: Ratio = 0.8333
Node 166: Ratio = 0.7500
Node 167: Ratio = 0.5926
Node 168: Ratio = 0.7143
Node 176: Ratio = 0.7308
Node 182: Ratio = 0.7037
Node 192: Ratio = 0.6207
Node 193: Ratio = 0.7429
Node 199: Ratio = 0.5667
Node 213: Ratio = 0.6296
Node 214: Ratio = 0.7407
Node 237: Ratio = 0.5769
Node 242: Ratio = 0.7174
Node 249: Ratio = 0.6154
Node 252: Ratio = 0.7188
Node 256: Ratio = 0.7667
Node 261: Ratio = 0.5556
Node 295: Ratio = 0.7037
Node 309: Ratio = 0.7000
Node 337: Ratio = 0.5294
Node 351: Ratio = 0.7778
Node 355: Ratio = 0.6538
Node 357: Ratio = 0.7692
Node 362: Ratio = 0.7692
Node 371: Ratio = 0.7692
Node 374: Ratio = 0.6667
Node 376: Ratio = 0.5484
Node 394: Ratio = 0.6154
Node 397: Ratio = 0.7500
Nod

## Summary:

### Data Analysis Key Findings

*   295 nodes were identified that have more than 25 neighbors (including k-hop neighbors) and a ratio of primary annotated k-hop neighbors (tumor or lymphocyte) to total k-hop neighbors greater than 0.5.
*   The specific primary annotation types considered were "tumor" and "lymphocyte".

### Insights or Next Steps

*   The identified nodes represent areas in the graph where there is a high concentration of primary annotated cells within their k-hop neighborhood. Further investigation into the characteristics and spatial location of these nodes could provide insights into biological patterns.
*   Consider visualizing these identified nodes on the original image to understand their spatial distribution relative to different tissue structures and cell types.


In [None]:
import matplotlib.pyplot as plt

def generate_binary_image(nodes_to_highlight, df, width, height, mpp):
    """
    Generates a binary image highlighting the specified nodes.

    Args:
        nodes_to_highlight (list): A list of node indices to highlight in the image.
        df (pd.DataFrame): The DataFrame containing node coordinates ('x', 'y').
        width (int): The width of the original image in pixels.
        height (int): The height of the original image in pixels.
        mpp (float): Microns per pixel, used to scale coordinates if necessary
                     (although in this case we are using pixel coordinates directly).

    Returns:
        np.ndarray: A 2D numpy array representing the binary image.
    """
    # Create a blank binary image
    binary_image = np.zeros((height, width), dtype=np.uint8)

    # Mark the specified nodes on the image
    for node_index in nodes_to_highlight:
        x, y = df.loc[node_index, ['x', 'y']].values
        # Ensure coordinates are within image bounds and cast to integers
        x_int, y_int = int(round(x)), int(round(y))
        if 0 <= x_int < width and 0 <= y_int < height:
            binary_image[y_int, x_int] = 1 # Mark the pixel with 1

    return binary_image

# Example usage (assuming 'nodes_meeting_both_criteria' is the list of identified nodes)
# You would call this function after running the previous steps to get nodes_meeting_both_criteria
# binary_map = generate_binary_image(nodes_meeting_both_criteria, df, 2048, 2048, 0.5) # Example mpp
# plt.imshow(binary_map, cmap='gray')
# plt.title("Binary Map of Identified Nodes")
# plt.show()