In [None]:
import json
import pandas as pd
import os
import shutil
from glob import glob
import zipfile
import numpy as np
from tqdm import tqdm
from shapely import wkb
from shapely.geometry import shape
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
QV_THRESHOLD = 20 

In [None]:
class_list = {
    0: "epithelial",
    1: "Basal/Myoepithelial",
    2: "Smooth muscle",
    3: "Fibroblast",
    4: "Endothelial",
    5: "Lymphocyte",                # T + B 통합
    6: "Plasma cell",
    7: "Macrophage/Histiocyte",     # 통합
    8: "Neutrophil",
    9: "Adipocyte",
    10: "Other/Unknown"
}
marker_genes = {
    "epithelial": [
         "EPCAM", "KRT8", "KRT18", "KRT19",
    "ERBB2", "MKI67", "GATA3", "CDH1",
    "CLDN4"
    ],


    "Basal/Myoepithelial": [
       "MYH11", "ACTA2", "MYLK",
        "TAGLN", "CNN1", "MYL9"
    ],

    "Smooth muscle": [
        "MYH11", "ACTA2", "MYLK"
    ],

    "Fibroblast": [
        "PDGFRA", "PDGFR", "DPT", "LUM",
    "SFRP1", "FBLN1", "SFRP4", "POSTN",
    "COL1A1", "COL1A2", "COL3A1", "DCN", "THY1"
    ],

    "Endothelial": [
         "PECAM1", "KDR", "CD93", "EGFL7",
    "VWF", "CLEC14A", "MMRN2", "ESM1",
    "CD34", "CDH5"
    ],

    "Lymphocyte": [  # T + B 통합
        "CD3E", "CD3G",
        "GZMA", "GZMK", "NKG7", "CCL5",
        "TRAC", "TCF7", "LT", "IL2RG",
        "CD4", "CD8A",
        "CD79A", "CD79", "MS4A1", "CD19",
        "CD69", "CXCR4", "CCR7", "SELL"
    ],

    "Plasma cell": [
        "MZB1", "PRDM1", "TNFRSF17", "SLAMF7",
    "XBP1", "SDC1", "JCHAIN", "IRF4"
    ],

    "Macrophage/Histiocyte": [
        "CD68", "CD163", "MRC1", "C1QA",
    "AIF1", "CD14", "FCGR3A", "CX3CR1",
    "LST1", "CSF1R", "TYROBP"
    ],

    "Neutrophil": [
         "S100A8", "S100A9", "LYZ",
    "CEACAM8", "MPO", "ELANE"
    ],

    "Adipocyte": [
         "ADIPOQ", "LPL", "PPARG",
    "FABP4", "PLIN1", "CEBPA", "LEP"
    ],

    "Other/Unknown": []
}

class_colors_hex = {
    "epithelial": "#FF0000",        # 빨강
    "Basal/Myoepithelial": "#FFA500",     # 주황
    "Smooth muscle": "#8B4513",           # 갈색
    "Fibroblast": "#00FF00",              # 초록
    "Endothelial": "#0000FF",             # 파랑
    "Lymphocyte": "#FFFF00",              # 노랑 (T/B lymphocyte 통합)
    "Plasma cell": "#9400D3",             # 보라
    "Macrophage/Histiocyte": "#00FFFF",   # 시안(청록)
    "Neutrophil": "#1E90FF",              # DodgerBlue (밝은 파랑)
    "Adipocyte": "#FFC0CB",               # 핑크
    "Other/Unknown": "#808080"            # 회색
}
class_colors = {
    "epithelial": [255, 0, 0],            # 빨강 - 종양 상피
    "Basal/Myoepithelial": [255, 165, 0],       # 주황
    "Smooth muscle": [139, 69, 19],             # 갈색
    "Fibroblast": [0, 255, 0],                  # 초록
    "Endothelial": [0, 0, 255],                 # 파랑
    "Lymphocyte": [255, 255, 0],                # 노랑 (T/B 통합)
    "Plasma cell": [148, 0, 211],               # 보라
    "Macrophage/Histiocyte": [0, 255, 255],     # 시안 (청록)
    "Neutrophil": [30, 144, 255],               # 도저블루 (밝은 파랑)
    "Adipocyte": [255, 192, 203],               # 핑크
    "Other/Unknown": [128, 128, 128]            # 회색
}


In [None]:
xenium_annotation_list=glob('../../data/spatialTranscriptome/xenium_seg/*_xenium_nucleus_seg.parquet')
xenium_wsi_list = [f.replace("xenium_seg/", "wsis/") for f in xenium_annotation_list]
xenium_wsi_list = [f.replace("_xenium_nucleus_seg.parquet", ".tif") for f in xenium_wsi_list]
xenium_metadata_list = [f.replace("wsis/", "metadata/") for f in xenium_wsi_list]
xenium_metadata_list = [f.replace(".tif", ".json") for f in xenium_metadata_list]
xenium_patche_list = [f.replace("wsis/", "patches/") for f in xenium_wsi_list]
xenium_patche_list = [f.replace(".tif", ".h5") for f in xenium_patche_list]
save_path='../../data/spatialTranscriptome/preprocessed_xenium/'
for i in tqdm(range(len(xenium_annotation_list))):
    zip_path = xenium_annotation_list[i]
    wsi_path = xenium_wsi_list[i]
    create_dir(save_path+'wsis/')
    create_dir(save_path+'metadata/')
    create_dir(save_path+'patches/')
    shutil.move(wsi_path, f'{save_path}wsis/{os.path.basename(wsi_path)}')
    shutil.move(xenium_metadata_list[i], f'{save_path}metadata/{os.path.basename(xenium_metadata_list[i])}')
    shutil.move(xenium_patche_list[i], f'{save_path}patches/{os.path.basename(xenium_patche_list[i])}')

In [None]:

xenium_annotation_list=glob('../../data/spatialTranscriptome/xenium_seg/*_xenium_nucleus_seg.parquet')
xenium_transcripts_list=[f.replace("xenium_seg/", "transcripts/") for f in xenium_annotation_list]
xenium_transcripts_list=[f.replace("_xenium_nucleus_seg.parquet", "_transcripts.parquet") for f in xenium_transcripts_list]

RnA_unique_list=[]
for i in tqdm(range(len(xenium_transcripts_list))):
    df_transcript = pd.read_parquet(xenium_transcripts_list[i])
    df_transcript=df_transcript.loc[df_transcript['overlaps_nucleus']==1]
    RnA_unique_list.extend(list(pd.unique(df_transcript['feature_name'])))
    RnA_unique_list=list(np.unique(np.array(RnA_unique_list)))

In [None]:

def classify_cell_by_genes(gene_list, marker_dict):
    """여러 유전자를 기반으로 cell type scoring"""
    scores = {cell_type: 0 for cell_type in marker_dict.keys()}
    
    for gene in gene_list:
        for cell_type, markers in marker_dict.items():
            if gene in markers:
                scores[cell_type] += 1
    
    # 가장 높은 점수의 cell type 반환
    max_score = max(scores.values())
    if max_score == 0:
        return 'Other/Unknown', 0
    
    best_type = max(scores, key=scores.get)
    return best_type, scores
save_path='../../data/spatialTranscriptome/preprocessed_xenium/'
xenium_transcripts_list=[f.replace("xenium_seg/", "transcripts/") for f in xenium_annotation_list]
xenium_transcripts_list=[f.replace("_xenium_nucleus_seg.parquet", "_transcripts.parquet") for f in xenium_transcripts_list]
for i in range(len(xenium_transcripts_list)):
    xenium_transcript_path = xenium_transcripts_list[i]
    df_transcript = pd.read_parquet(xenium_transcript_path)
    df_filtered = df_transcript[df_transcript['qv'] > QV_THRESHOLD].copy()
    if type(df_filtered['feature_name'].iloc[0])==bytes:
        df_filtered = df_filtered[~df_filtered['feature_name'].str.decode('utf-8').str.contains('BLANK|NegControl|antisense', case=False, na=False)]
        df_filtered['feature_name'] = df_filtered['feature_name'].str.decode('utf-8').replace("b'", "", regex=False).str.replace("'", "", regex=False)
    elif type(df_filtered['feature_name'].iloc[0])==str:
        df_filtered = df_filtered[~df_filtered['feature_name'].str.contains('BLANK|NegControl|antisense', case=False, na=False)]
    else:
        print(xenium_transcript_path)
        continue
    df_filtered
    xenium_annotation_path=xenium_annotation_list[i]
    df_seg = pd.read_parquet(xenium_annotation_path)
    df=pd.DataFrame(columns=['x1','y1','x2','y2','class_name'])
    annotations = []
    grouped_transcripts = df_filtered.groupby('cell_id')['feature_name'].apply(list).to_dict()
    for j in tqdm(range(len(df_seg))):
        temp_df_seg=df_seg.iloc[j]
        cell_id=temp_df_seg.name
        geom_binary=temp_df_seg['geometry']
        polygon = wkb.loads(geom_binary)
        x,y=polygon.exterior.xy
        x1=int(np.min(x))
        y1=int(np.min(y))
        x2=int(np.max(x))
        y2=int(np.max(y))
        try:
            genes_in_cell=grouped_transcripts[cell_id]
            cell_type, score = classify_cell_by_genes(genes_in_cell, marker_genes)
            annotations.append({
                'x1': x1,
                'y1': y1,
                'x2': x2,
                'y2': y2,
                'class_name': cell_type,
            })
        except KeyError:
            continue
    df = pd.DataFrame(annotations)
    create_dir(save_path+'labels/')    
    df.to_csv(save_path+'labels/'+os.path.basename(xenium_annotation_path).replace('_xenium_nucleus_seg.parquet', '.csv'), index=False)

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import openslide as ops

reduction_factor=20
slide=ops.OpenSlide('../../data/spatialTranscriptome/preprocessed_xenium/wsis/NCBI785.tif')
thumbnail = slide.get_thumbnail((slide.level_dimensions[0][0] // reduction_factor, slide.level_dimensions[0][1] // reduction_factor))
mask=np.ones_like(np.array(thumbnail)) * 0
labels_df=pd.read_csv('../../data/spatialTranscriptome/preprocessed_xenium/labels/NCBI785.csv')

fig, ax = plt.subplots(figsize=(22, 20))

for idx, row in labels_df.iterrows():
    x=row['x1']/reduction_factor + row['x2']/reduction_factor
    x=x//2
    y=row['y1']/reduction_factor + row['y2']/reduction_factor
    y=y//2
    mask[int(y):int(y)+2, int(x):int(x)+2]=np.array(class_colors[row['class_name']])/255.

ax.imshow(mask*0.5 + np.array(thumbnail)/255.*0.5)
ax.axis('off')
ax.set_title('Cell Type Annotation', fontsize=16, fontweight='bold')

# 클래스별 개수 계산
class_counts = labels_df['class_name'].value_counts()

# 범례 추가 (클래스 개수 포함)
legend_patches = []
for class_name, hex_color in class_colors_hex.items():
    count = class_counts.get(class_name, 0)
    label = f"{class_name}: {count}"
    patch = mpatches.Patch(color=hex_color, label=label)
    legend_patches.append(patch)

ax.legend(handles=legend_patches, 
         loc='upper right', 
         fontsize=15,
         framealpha=0.95,
         bbox_to_anchor=(1.18, 1.0),
         title='Cell Type (Count)',
         title_fontsize=12)

plt.tight_layout()
plt.show()

# 전체 통계 출력
print("=== Cell Type Statistics ===")
print(f"Total cells: {len(labels_df)}")
print("\nCell type distribution:")
print(class_counts.sort_index())

In [None]:
df_filtered[df_filtered['cell_id']==b'aaaeppaj-1']

In [None]:
grouped_transcripts