In [1]:
import pandas as pd
import os

INPUT_PARQUET = "./cell_metadata.parquet"

# Output directory
OUTPUT_DIR = "./field_uniques"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Fields to process
FIELDS = ["cell_type", "tissue_general", "tissue", "disease", "development_stage", "sex"]


print(f"Loading from Parquet: {INPUT_PARQUET}")
df = pd.read_parquet(INPUT_PARQUET)

# Generate one CSV per field
for field in FIELDS:
    if field in df.columns:
        unique_values = df[field].dropna().unique()
        unique_values.sort()
        output_path = os.path.join(OUTPUT_DIR, f"{field}_unique_values.csv")
        pd.Series(unique_values, name=field).to_csv(output_path, index=False)
        print(f"Saved: {output_path}")
    else:
        print(f"Field not found in data: {field}")


Loading from Parquet: ./cell_metadata.parquet
Saved: ./field_uniques\cell_type_unique_values.csv
Saved: ./field_uniques\tissue_general_unique_values.csv
Saved: ./field_uniques\tissue_unique_values.csv
Saved: ./field_uniques\disease_unique_values.csv
Saved: ./field_uniques\development_stage_unique_values.csv
Saved: ./field_uniques\sex_unique_values.csv


## Disease

In [3]:
import pandas as pd
import os
import json
from rapidfuzz import process, fuzz

# Paths
UNIQUE_DISEASE_PATH = "./field_uniques/disease_unique_values.csv"
BMG_DISEASE_PATH = "../BMG/BioMedGraphica_Disease.csv"
OUTPUT_PATH = "./mapping_table/mapped_diseases.json"

# Custom match anchors
MATCH_ANCHORS = {
    "Mild cognitive impairment": "cognitive impairment with or without cerebellar ataxia",
    "Mixed gliomas": "glioma",
}

# Terms to skip matching (non-disease labels)
SKIP_MATCHING_TERMS = {
    "healthy", "normal", "unclassified", "control", "none", "no disease",
    "cell stress", "b-cell non-hodgkin lymphoma"
}

# Specific normalization for skip-matched terms
NORMALIZED_SKIP_MATCHES = {
    "healthy": "normal",
    "control": "normal",
    "none": "normal",
    "no disease": "normal"
}

# Load input data
unique_diseases = pd.read_csv(UNIQUE_DISEASE_PATH).dropna()[["disease"]].squeeze().tolist()
bmg_df = pd.read_csv(BMG_DISEASE_PATH, dtype=str).fillna("")

# Build candidate map from BMG
candidate_map = []
for _, row in bmg_df.iterrows():
    bmg_id = row["BioMedGraphica_ID"]
    for col in row.index:
        if col == "BioMedGraphica_ID":
            continue
        values = str(row[col]).split(";") if ";" in str(row[col]) else [row[col]]
        for val in values:
            val_clean = val.strip()
            if val_clean:
                candidate_map.append((val_clean, bmg_id, col))

all_terms = [entry[0] for entry in candidate_map]

# Match loop
results = []
for disease in unique_diseases:
    disease_lower = disease.lower().strip()

    # Rule-based normalization
    if disease_lower in NORMALIZED_SKIP_MATCHES:
        standardized_term = NORMALIZED_SKIP_MATCHES[disease_lower]
        results.append({
            "original_disease": disease,
            "query_used": disease,
            "matched_term": standardized_term,
            "matched_field": "standardized",
            "BioMedGraphica_ID": None,
            "match_score": None
        })
        continue

    # Skip matching but use original term
    if disease_lower in SKIP_MATCHING_TERMS:
        results.append({
            "original_disease": disease,
            "query_used": disease,
            "matched_term": disease,
            "matched_field": "NA",
            "BioMedGraphica_ID": None,
            "match_score": None
        })
        continue

    query_term = MATCH_ANCHORS.get(disease, disease)

    # Try exact match first
    matched_exact = None
    for i, term in enumerate(all_terms):
        if query_term.lower() == term.lower():
            matched_exact = candidate_map[i]
            score = 100
            break

    if matched_exact:
        matched_term, matched_bmg_id, matched_field = matched_exact
    else:
        match, score, idx = process.extractOne(
            query=query_term,
            choices=all_terms,
            scorer=fuzz.token_sort_ratio
        )
        matched_term, matched_bmg_id, matched_field = candidate_map[idx]

    results.append({
        "original_disease": disease,
        "query_used": query_term,
        "matched_term": matched_term,
        "matched_field": matched_field,
        "BioMedGraphica_ID": matched_bmg_id,
        "match_score": score
    })

# Save results
display_name_path = "../BMG/BioMedGraphica_Disease_Display_Name.csv"
display_df = pd.read_csv(display_name_path, dtype=str).dropna(subset=["BioMedGraphica_ID", "BMG_Disease_Name"])
bmg_name_map = dict(zip(display_df["BioMedGraphica_ID"], display_df["BMG_Disease_Name"]))

# Post-process: replace matched_term using display name if BioMedGraphica_ID is available
for item in results:
    bmg_id = item.get("BioMedGraphica_ID")
    if bmg_id and bmg_id in bmg_name_map:
        item["matched_term"] = bmg_name_map[bmg_id]

# Save results
os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(results, f, indent=2, ensure_ascii=False)

print(f"Matched results saved to: {OUTPUT_PATH}")


Matched results saved to: ./mapping_table/mapped_diseases.json


## Cell Type Map

In [8]:
import pandas as pd
import os
import json
from rapidfuzz import process, fuzz

# Paths
UNIQUE_CELLTYPE_PATH = "./field_uniques/cell_type_unique_values.csv"
BMG_CELLTYPE_PATH = "../BMG/cell_type_mapping_table.csv"
OUTPUT_JSON = "./mapping_table/mapped_celltypes.json"
OUTPUT_REVIEW = "./mapping_table/review_celltypes_score_not_100.json"

# Custom match anchors (optional)
MATCH_ANCHORS = {
    "Epithelial (malignant)": "Epithelial cell",
    "Epithelial (non-malignant)": "Epithelial cell",
    "Cancer-associated fibroblast": "fibroblast",
    "Endocrine": "endocrine cell",
}

# Terms to skip matching
SKIP_MATCHING_TERMS = {"unclassified", "unknown", "unannoted"}

# Load inputs
unique_celltypes = pd.read_csv(UNIQUE_CELLTYPE_PATH).dropna()[["cell_type"]].squeeze().tolist()
bmg_df = pd.read_csv(BMG_CELLTYPE_PATH, dtype=str).fillna("")

# Build BMG match candidates
candidate_map = []
for _, row in bmg_df.iterrows():
    bmg_id = row["CMT_ID"]
    for col in row.index:
        if col == "CMT_ID":
            continue
        values = str(row[col]).split(";") if ";" in str(row[col]) else [row[col]]
        for val in values:
            val_clean = val.strip()
            if val_clean:
                candidate_map.append((val_clean, bmg_id, col))

all_terms = [entry[0] for entry in candidate_map]

# Matching loop
results = []
for celltype in unique_celltypes:
    celltype_lower = celltype.lower().strip()
    if celltype_lower in SKIP_MATCHING_TERMS:
        results.append({
            "original_celltype": celltype,
            "query_used": celltype,
            "matched_term": celltype,
            "matched_field": "NA",
            "CMT_ID": None,
            "match_score": None
        })
        continue

    query_term = MATCH_ANCHORS.get(celltype, celltype)

    # Exact match check
    matched_exact = None
    for i, term in enumerate(all_terms):
        if query_term.lower() == term.lower():
            matched_exact = candidate_map[i]
            score = 100
            break

    if matched_exact:
        matched_term, matched_bmg_id, matched_field = matched_exact
    else:
        match, score, idx = process.extractOne(
            query=query_term,
            choices=all_terms,
            scorer=fuzz.token_sort_ratio
        )
        matched_term, matched_bmg_id, matched_field = candidate_map[idx]

    results.append({
        "original_celltype": celltype,
        "query_used": query_term,
        "matched_term": matched_term,
        "matched_field": matched_field,
        "CMT_ID": matched_bmg_id,
        "match_score": score
    })

# Separate review list (score != 100)
review_list = [r for r in results if r.get("match_score") != 100 and r.get("match_score") is not None]
review_list_sorted = sorted(review_list, key=lambda x: x["match_score"], reverse=True)

# Output directories
os.makedirs(os.path.dirname(OUTPUT_JSON), exist_ok=True)

# Save full result
with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
    json.dump(results, f, indent=2, ensure_ascii=False)

# Save review-needed subset
with open(OUTPUT_REVIEW, "w", encoding="utf-8") as f:
    json.dump(review_list_sorted, f, indent=2, ensure_ascii=False)

# Summary report
total = len([r for r in results if r.get("match_score") is not None])
exact = len([r for r in results if r.get("match_score") == 100])
print(f"Exact matches: {exact} / {total} ({round(exact/total * 100, 2)}%)")
print(f"Review-needed entries saved to: {OUTPUT_REVIEW}")


Exact matches: 756 / 809 (93.45%)
Review-needed entries saved to: ./mapping_table/review_celltypes_score_not_100.json


## Sex

In [5]:
import pandas as pd
import os
import json

# Paths
INPUT_PATH = "./field_uniques/sex_unique_values.csv"
OUTPUT_PATH = "./mapping_table/mapped_sex.json"

# Manual mapping rules
MAPPING = {
    "f": "female",
    "m": "male",
    "female": "female",
    "male": "male",
    "unknown": "unknown",
    "unclassified": "unknown",
    "nan": "unknown",
    "": "unknown"
}

# Load unique values
df = pd.read_csv(INPUT_PATH, dtype=str).fillna("nan")
values = df["sex"].tolist()

# Process mapping
results = []
for value in values:
    key = value.strip().lower()
    mapped = MAPPING.get(key, "unknown")
    results.append({
        "original_sex": value,
        "normalized_sex": mapped
    })

# Save JSON
os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(results, f, indent=2, ensure_ascii=False)

print(f"Mapped sex terms saved to: {OUTPUT_PATH}")


Mapped sex terms saved to: ./mapping_table/mapped_sex.json


## Development Stage

In [6]:
import pandas as pd
import os
import json
import re

# Paths
INPUT_PATH = "./field_uniques/development_stage_unique_values.csv"
OUTPUT_ALL = "./mapping_table/mapped_development_stages.json"
OUTPUT_REVIEW = "./mapping_table/review_development_stages.json"

# Carnegie stage: estimated days per stage from authoritative tables
CARNEGIE_STAGE_DAYS = {
    1: 1, 2: 2.5, 3: 4.5, 4: 5.5, 5: 9.5, 6: 14,
    7: 16, 8: 18, 9: 20, 10: 22.5, 11: 24, 12: 28,
    13: 31.5, 14: 33, 15: 36.5, 16: 39.5, 17: 43,
    18: 46, 19: 49.5, 20: 51.5, 21: 54, 22: 55, 23: 61.5
}

# Named stages directly mapped
NAMED_STAGE_MAPPING = {
    "adult stage": (30, "adult"),
    "young adult stage": (20, "young adult"),
    "prime adult stage": (35, "adult"),
    "middle aged stage": (50, "middle aged"),
    "late adult stage": (70, "aged"),
    "infant stage": (0.5, "infant"),
    "child stage (1-4 yo)": (2.5, "preschool child"),
    "juvenile stage (5-14 yo)": (9.5, "child"),
    "pediatric stage": (10, "child"),
    "postnatal stage": (0.1, "infant"),
    "blastula stage": (0.01, "embryonic"),
    "embryonic stage": (0.03, "embryonic"),
    "organogenesis stage": (0.05, "embryonic"),
    "unknown": (None, "unknown"),
}

# MeSH-based age category mapping
def age_category(age):
    if age is None:
        return "unknown"
    if age < 0.08:
        return "newborn"
    if age < 1:
        return "infant"
    if age < 6:
        return "preschool child"
    if age < 13:
        return "child"
    if age < 19:
        return "adolescent"
    if age < 25:
        return "young adult"
    if age < 45:
        return "adult"
    if age < 65:
        return "middle aged"
    if age < 80:
        return "aged"
    return "80 and over"

# Extract numeric age from free-text terms
def parse_numeric_age(term):
    term = term.lower().strip()

    match = re.search(r'(\d+\.?\d*)\s*yr', term)
    if match:
        return float(match.group(1))
    match = re.search(r'(\d+\.?\d*)\s*year', term)
    if match:
        return float(match.group(1))
    match = re.search(r'(\d+\.?\d*)\s*-year-old', term)
    if match:
        return float(match.group(1))

    match = re.search(r'(\d+\.?\d*)\s*(?:mo|month)', term)
    if match:
        return round(float(match.group(1)) / 12, 2)
    match = re.search(r'(\d+\.?\d*)\s*-month-old', term)
    if match:
        return round(float(match.group(1)) / 12, 2)

    match = re.search(r'(\d+\.?\d*)\s*(?:w|week)', term)
    if match:
        return round(float(match.group(1)) / 52, 2)
    match = re.search(r'(\d+)(?:st|nd|rd|th) week post-fertilization', term)
    if match:
        return round(float(match.group(1)) / 52, 2)

    match = re.search(r'(\d+\.?\d*)\s*d', term)
    if match:
        return round(float(match.group(1)) / 365, 3)

    match = re.search(r'carnegie stage\s*(\d+)', term)
    if match:
        stage = int(match.group(1))
        if stage in CARNEGIE_STAGE_DAYS:
            return round(CARNEGIE_STAGE_DAYS[stage] / 365, 3)

    return None

# Handle named stages and fallback parsing
def parse_named_stage(term):
    if term in NAMED_STAGE_MAPPING:
        return NAMED_STAGE_MAPPING[term]

    match = re.match(r'(\w+)\s+decade stage', term)
    if match:
        word_to_decade = {
            "third": 30, "fourth": 40, "fifth": 50,
            "sixth": 60, "seventh": 70, "eighth": 80, "ninth": 90
        }
        dec = match.group(1).lower()
        if dec in word_to_decade:
            age = word_to_decade[dec] + 5
            return (age, age_category(age))

    match = re.match(r'(\w+)\s+LMP month stage', term)
    if match:
        word_to_month = {
            "first": 1, "second": 2, "third": 3, "fourth": 4,
            "fifth": 5, "sixth": 6, "seventh": 7, "eighth": 8, "ninth": 9
        }
        month = match.group(1).lower()
        if month in word_to_month:
            gest_weeks = (word_to_month[month] - 0.5) * 4
            return (round(gest_weeks / 52, 2), "fetal")

    fallback_age = parse_numeric_age(term)
    if fallback_age is not None:
        return fallback_age, age_category(fallback_age)

    return None, "unknown"

# Process and categorize development stages
df = pd.read_csv(INPUT_PATH, dtype=str).fillna("")
terms = df["development_stage"].unique().tolist()

all_results = []
review_list = []

for term in terms:
    numeric_age = parse_numeric_age(term)
    if numeric_age is not None:
        category = age_category(numeric_age)
    else:
        numeric_age, category = parse_named_stage(term)

    if category in {"fetal", "embryonic"}:
        phase = "pre-birth"
    elif category == "unknown":
        phase = "unknown"
    else:
        phase = "post-birth"

    result = {
        "original_stage": term,
        "numeric_age": numeric_age,
        "age_category": category,
        "birth_phase": phase
    }
    all_results.append(result)

    if numeric_age is None:
        review_list.append(result)

# Output results
os.makedirs(os.path.dirname(OUTPUT_ALL), exist_ok=True)
with open(OUTPUT_ALL, "w", encoding="utf-8") as f:
    json.dump(all_results, f, indent=2, ensure_ascii=False)

with open(OUTPUT_REVIEW, "w", encoding="utf-8") as f:
    json.dump(review_list, f, indent=2, ensure_ascii=False)

print(f"All mapped development stages saved to: {OUTPUT_ALL}")
print(f"{len(review_list)} items needing review saved to: {OUTPUT_REVIEW}")


All mapped development stages saved to: ./mapping_table/mapped_development_stages.json
2 items needing review saved to: ./mapping_table/review_development_stages.json


## DB file

In [9]:
import pandas as pd
import os
import json

# File paths
metadata_path = "./cell_metadata.parquet"
celltype_map_path = "./mapping_table/mapped_celltypes.json"
disease_map_path = "./mapping_table/mapped_diseases.json"
dev_stage_map_path = "./mapping_table/mapped_development_stages.json"
sex_map_path = "./mapping_table/mapped_sex.json"
output_path = "./cell_metadata_with_mappings.parquet"

# Load metadata
df = pd.read_parquet(metadata_path)

# Load mapping files
with open(celltype_map_path, encoding="utf-8") as f:
    celltype_map = {x["original_celltype"]: x for x in json.load(f)}

with open(disease_map_path, encoding="utf-8") as f:
    disease_map = {x["original_disease"]: x for x in json.load(f)}

with open(dev_stage_map_path, encoding="utf-8") as f:
    dev_map = {x["original_stage"]: x for x in json.load(f)}

with open(sex_map_path, encoding="utf-8") as f:
    sex_map = {x["original_sex"]: x for x in json.load(f)}

# Mapping helpers
def get_bmg_fields(map_dict, value):
    entry = map_dict.get(value, {})
    bmg_id = entry.get("BioMedGraphica_ID", "")
    bmg_name = entry.get("matched_term", "")
    if not bmg_id or bmg_id == "null":
        bmg_id = ""
    if not bmg_name or bmg_name == "null":
        bmg_name = ""
    return bmg_id, bmg_name

def get_cmt_fields(map_dict, value):
    entry = map_dict.get(value, {})
    bmg_id = entry.get("CMT_ID", "")
    bmg_name = entry.get("matched_term", "")
    if not bmg_id or bmg_id == "null":
        bmg_id = ""
    if not bmg_name or bmg_name == "null":
        bmg_name = ""
    return bmg_id, bmg_name

def get_dev_fields(map_dict, value):
    entry = map_dict.get(value, {})
    return (
        entry.get("numeric_age", None),
        entry.get("age_category", ""),
        entry.get("birth_phase", "")
    )

def get_sex_field(map_dict, value):
    entry = map_dict.get(value, {})
    return entry.get("normalized_sex", "")

# Add mapped columns
df["CMT_id"], df["CMT_name"] = zip(*df["cell_type"].map(lambda v: get_cmt_fields(celltype_map, v)))
df["disease_BMG_id"], df["disease_BMG_name"] = zip(*df["disease"].map(lambda v: get_bmg_fields(disease_map, v)))
df["development_stage_numeric_age"], df["development_stage_category"], df["birth_phase"] = zip(*df["development_stage"].map(lambda v: get_dev_fields(dev_map, v)))
df["sex_normalized"] = df["sex"].map(lambda v: get_sex_field(sex_map, v))

if "tissue_general" in df.columns:
    df["tissue_general"] = df["tissue_general"].astype(str).str.replace("_", " ")

# Save to file
df.to_parquet(output_path, index=False)
df.to_csv("./cell_metadata_with_mappings.csv", index=False)
print(f"Enhanced metadata saved to: {output_path}")


Enhanced metadata saved to: ./cell_metadata_with_mappings.parquet
