In [6]:
import json
import pandas as pd
import os
import shutil
from glob import glob
import zipfile
import numpy as np
from tqdm import tqdm
from shapely import wkb
from shapely.geometry import shape
import geopandas as gpd
from shapely.geometry import Point

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
QV_THRESHOLD = 20 

In [7]:
class_list = {
    0: "epithelial",
    1: "Basal/Myoepithelial",
    2: "Smooth muscle",
    3: "Fibroblast",
    4: "Endothelial",
    5: "Lymphocyte",                # T + B ÌÜµÌï©
    6: "Plasma cell",
    7: "Macrophage/Histiocyte",     # ÌÜµÌï©
    8: "Neutrophil",
    9: "Adipocyte",
    10: "Other/Unknown"
}
marker_genes = {
    "epithelial": [
         "EPCAM", "KRT8", "KRT18", "KRT19",
    "ERBB2", "MKI67", "GATA3", "CDH1",
    "CLDN4"
    ],


    "Basal/Myoepithelial": [
       "MYH11", "ACTA2", "MYLK",
        "TAGLN", "CNN1", "MYL9"
    ],

    "Smooth muscle": [
        "MYH11", "ACTA2", "MYLK"
    ],

    "Fibroblast": [
        "PDGFRA", "PDGFR", "DPT", "LUM",
    "SFRP1", "FBLN1", "SFRP4", "POSTN",
    "COL1A1", "COL1A2", "COL3A1", "DCN", "THY1"
    ],

    "Endothelial": [
         "PECAM1", "KDR", "CD93", "EGFL7",
    "VWF", "CLEC14A", "MMRN2", "ESM1",
    "CD34", "CDH5"
    ],

    "Lymphocyte": [  # T + B ÌÜµÌï©
        "CD3E", "CD3G",
        "GZMA", "GZMK", "NKG7", "CCL5",
        "TRAC", "TCF7", "LT", "IL2RG",
        "CD4", "CD8A",
        "CD79A", "CD79", "MS4A1", "CD19",
        "CD69", "CXCR4", "CCR7", "SELL"
    ],

    "Plasma cell": [
        "MZB1", "PRDM1", "TNFRSF17", "SLAMF7",
    "XBP1", "SDC1", "JCHAIN", "IRF4"
    ],

    "Macrophage/Histiocyte": [
        "CD68", "CD163", "MRC1", "C1QA",
    "AIF1", "CD14", "FCGR3A", "CX3CR1",
    "LST1", "CSF1R", "TYROBP"
    ],

    "Neutrophil": [
         "S100A8", "S100A9", "LYZ",
    "CEACAM8", "MPO", "ELANE"
    ],

    "Adipocyte": [
         "ADIPOQ", "LPL", "PPARG",
    "FABP4", "PLIN1", "CEBPA", "LEP"
    ],

    "Other/Unknown": []
}

class_colors_hex = {
    "epithelial": "#FF0000",        # Îπ®Í∞ï
    "Basal/Myoepithelial": "#FFA500",     # Ï£ºÌô©
    "Smooth muscle": "#8B4513",           # Í∞àÏÉâ
    "Fibroblast": "#00FF00",              # Ï¥àÎ°ù
    "Endothelial": "#0000FF",             # ÌååÎûë
    "Lymphocyte": "#FFFF00",              # ÎÖ∏Îûë (T/B lymphocyte ÌÜµÌï©)
    "Plasma cell": "#9400D3",             # Î≥¥Îùº
    "Macrophage/Histiocyte": "#00FFFF",   # ÏãúÏïà(Ï≤≠Î°ù)
    "Neutrophil": "#1E90FF",              # DodgerBlue (Î∞ùÏùÄ ÌååÎûë)
    "Adipocyte": "#FFC0CB",               # ÌïëÌÅ¨
    "Other/Unknown": "#808080"            # ÌöåÏÉâ
}
class_colors = {
    "epithelial": [255, 0, 0],            # Îπ®Í∞ï - Ï¢ÖÏñë ÏÉÅÌîº
    "Basal/Myoepithelial": [255, 165, 0],       # Ï£ºÌô©
    "Smooth muscle": [139, 69, 19],             # Í∞àÏÉâ
    "Fibroblast": [0, 255, 0],                  # Ï¥àÎ°ù
    "Endothelial": [0, 0, 255],                 # ÌååÎûë
    "Lymphocyte": [255, 255, 0],                # ÎÖ∏Îûë (T/B ÌÜµÌï©)
    "Plasma cell": [148, 0, 211],               # Î≥¥Îùº
    "Macrophage/Histiocyte": [0, 255, 255],     # ÏãúÏïà (Ï≤≠Î°ù)
    "Neutrophil": [30, 144, 255],               # ÎèÑÏ†ÄÎ∏îÎ£® (Î∞ùÏùÄ ÌååÎûë)
    "Adipocyte": [255, 192, 203],               # ÌïëÌÅ¨
    "Other/Unknown": [128, 128, 128]            # ÌöåÏÉâ
}


In [None]:
def classify_cell_by_genes(gene_list, marker_dict):
    """Ïó¨Îü¨ Ïú†Ï†ÑÏûêÎ•º Í∏∞Î∞òÏúºÎ°ú cell type scoring"""
    scores = {cell_type: 0 for cell_type in marker_dict.keys()}
    
    for gene in gene_list:
        for cell_type, markers in marker_dict.items():
            if gene in markers:
                scores[cell_type] += 1
    
    max_score = max(scores.values())
    if max_score == 0:
        return 'Other/Unknown', 0
    
    best_type = max(scores, key=scores.get)
    return best_type, scores

xenium_annotation_list = glob('../../data/spatialTranscriptome/cellvit_seg/*_cellvit_seg.parquet')
save_path = '../../data/spatialTranscriptome/preprocessed_xenium/'
xenium_transcripts_list = [f.replace("cellvit_seg/", "transcripts/") for f in xenium_annotation_list]
xenium_transcripts_list = [f.replace("_cellvit_seg.parquet", "_transcripts.parquet") for f in xenium_transcripts_list]

for i in range(len(xenium_transcripts_list)):
    if os.path.exists(save_path + 'labels/' + os.path.basename(xenium_annotation_list[i]).replace('_cellvit_seg.parquet', '.csv')):
        print(f"Skipping (already processed): {save_path + 'labels/' + os.path.basename(xenium_annotation_list[i]).replace('_cellvit_seg.parquet', '.csv')}")
        continue
    
    xenium_transcript_path = xenium_transcripts_list[i]
    df_transcript = pd.read_parquet(xenium_transcript_path)
    df_filtered = df_transcript[df_transcript['qv'] > QV_THRESHOLD].copy()
    
    # feature_name Ï≤òÎ¶¨
    if type(df_filtered['feature_name'].iloc[0]) == bytes:
        df_filtered['feature_name'] = df_filtered['feature_name'].str.decode('utf-8')
        df_filtered = df_filtered[~df_filtered['feature_name'].str.contains('BLANK|NegControl|antisense', case=False, na=False)]
    elif type(df_filtered['feature_name'].iloc[0]) == str:
        df_filtered = df_filtered[~df_filtered['feature_name'].str.contains('BLANK|NegControl|antisense', case=False, na=False)]
    else:
        print(f"Skipping: {xenium_transcript_path}")
        continue
    
    # Segmentation Îç∞Ïù¥ÌÑ∞ Î°úÎìú
    xenium_annotation_path = xenium_annotation_list[i]
    df_seg = pd.read_parquet(xenium_annotation_path)
    
    # GeoDataFrame Î≥ÄÌôò
    transcript_gdf = gpd.GeoDataFrame(
        df_filtered,
        geometry=gpd.points_from_xy(df_filtered['he_x'], df_filtered['he_y'])
    )
    
    polygons = [wkb.loads(geom) for geom in df_seg['geometry']]
    seg_gdf = gpd.GeoDataFrame(df_seg, geometry=polygons)
    
    # Spatial join ÏàòÌñâ
    joined = gpd.sjoin(transcript_gdf, seg_gdf, how='inner', predicate='within')
    
    # üöÄ ÏµúÏ†ÅÌôî: groupbyÎ°ú Ìïú Î≤àÏóê Ï≤òÎ¶¨
    annotations = []
    
    if len(joined) > 0:
        # index_rightÎ°ú Í∑∏Î£πÌôîÌïòÏó¨ Í∞Å ÏÖÄÏùò Ïú†Ï†ÑÏûê Î¶¨Ïä§Ìä∏ Ìïú Î≤àÏóê ÏÉùÏÑ±
        grouped = joined.groupby('index_right')['feature_name'].apply(list)
        
        for idx in tqdm(range(len(seg_gdf))):
            if idx in grouped.index:
                genes_in_cell = grouped[idx]
                cell_type, score = classify_cell_by_genes(genes_in_cell, marker_genes)
                
                bounds = seg_gdf.iloc[idx].geometry.bounds
                annotations.append({
                    'x1': int(bounds[0]),
                    'y1': int(bounds[1]),
                    'x2': int(bounds[2]),
                    'y2': int(bounds[3]),
                    'class_name': cell_type,
                    'gene_count': len(genes_in_cell)
                })
    
    # Ï†ÄÏû•
    if annotations:
        df = pd.DataFrame(annotations)
        create_dir(save_path + 'labels/')
        output_file = save_path + 'labels/' + os.path.basename(xenium_annotation_path).replace('_cellvit_seg.parquet', '.csv')
        df.to_csv(output_file, index=False)

print("All samples processed!")

100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 29137/29137 [00:04<00:00, 6306.14it/s]


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import openslide as ops

reduction_factor=20
slide=ops.OpenSlide('../../data/spatialTranscriptome/preprocessed_xenium/wsis/TENX140.tif')
thumbnail = slide.get_thumbnail((slide.level_dimensions[0][0] // reduction_factor, slide.level_dimensions[0][1] // reduction_factor))
mask=np.ones_like(np.array(thumbnail)) * 0
labels_df=pd.read_csv('../../data/spatialTranscriptome/preprocessed_xenium/labels/TENX140.csv')

fig, ax = plt.subplots(figsize=(22, 20))

for idx, row in labels_df.iterrows():
    x=row['x1']/reduction_factor + row['x2']/reduction_factor
    x=x//2
    y=row['y1']/reduction_factor + row['y2']/reduction_factor
    y=y//2
    mask[int(y):int(y)+2, int(x):int(x)+2]=np.array(class_colors[row['class_name']])/255.

ax.imshow(mask*0.5 + np.array(thumbnail)/255.*0.5)
ax.axis('off')
ax.set_title('Cell Type Annotation', fontsize=16, fontweight='bold')

# ÌÅ¥ÎûòÏä§Î≥Ñ Í∞úÏàò Í≥ÑÏÇ∞
class_counts = labels_df['class_name'].value_counts()

# Î≤îÎ°Ä Ï∂îÍ∞Ä (ÌÅ¥ÎûòÏä§ Í∞úÏàò Ìè¨Ìï®)
legend_patches = []
for class_name, hex_color in class_colors_hex.items():
    count = class_counts.get(class_name, 0)
    label = f"{class_name}: {count}"
    patch = mpatches.Patch(color=hex_color, label=label)
    legend_patches.append(patch)

ax.legend(handles=legend_patches, 
         loc='upper right', 
         fontsize=15,
         framealpha=0.95,
         bbox_to_anchor=(1.18, 1.0),
         title='Cell Type (Count)',
         title_fontsize=12)

plt.tight_layout()
plt.show()

# Ï†ÑÏ≤¥ ÌÜµÍ≥Ñ Ï∂úÎ†•
print("=== Cell Type Statistics ===")
print(f"Total cells: {len(labels_df)}")
print("\nCell type distribution:")
print(class_counts.sort_index())

In [None]:
df_filtered[df_filtered['cell_id']==b'aaaeppaj-1']

In [None]:
grouped_transcripts