# Space Biology Data Cleaning

This notebook prepares the local experiments CSV for the Space Biology Knowledge Engine by normalizing fields, deriving filters, and saving a cleaned dataset for ingestion into the app backend and analytics panels.

It follows OSDR/GeneLab data processing conventions for standardized metadata across studies, assays, organisms, missions, and tissues to enable reliable filtering and cross-study analysis.


In [None]:
# Imports and setup
from __future__ import annotations
import os
import json
from pathlib import Path
from typing import Dict, Any, List

import pandas as pd
import numpy as np
from dotenv import load_dotenv

import plotly.express as px
import plotly.io as pio

load_dotenv()

# Paths
REPO_ROOT = Path(os.getcwd()).resolve().parents[0] if (Path(os.getcwd()).name == "notebooks") else Path(os.getcwd()).resolve()
DATA_DIR = REPO_ROOT / "data"
RAW_CSV = DATA_DIR / "experiments.csv"
CLEAN_CSV = DATA_DIR / "experiments.csv"  # overwrite in-place for the app

# Theming for plots
pio.templates.default = "plotly_dark"

# External integration (environment-configurable)
BACKEND_BASE = os.environ.get("FRONTEND_BACKEND_URL", "http://localhost:5000")
OSDR_BASE = os.environ.get("OSDR_BASE", "")  # leave empty to avoid hardcoding; set in .env to use

print("Repo root:", REPO_ROOT)
print("Data dir:", DATA_DIR)
print("Backend base:", BACKEND_BASE)
print("OSDR base (optional):", OSDR_BASE or "<not set>")

## Load data

The app expects a CSV with columns such as study_accession, publication_title, year, mission, organism, tissue, assay, platform, environment, doi, source_url, abstract, and metadata_json for quick local development.


In [None]:
if not RAW_CSV.exists():
    raise FileNotFoundError(f"Missing {RAW_CSV}. Place your experiments.csv in the data/ directory.")

df = pd.read_csv(RAW_CSV)
print("Rows loaded:", len(df))
df.head(3)

## Cleaning utilities

These helpers normalize text, map common organism/assay/mission synonyms to standard labels, and ensure JSON columns are loadable into Python dictionaries for downstream use.


In [None]:
import re

def _clean_text(s: Any) -> str:
    if pd.isna(s):
        return ""
    t = str(s)
    t = t.replace("\u00a0", " ")  # nbsp
    t = re.sub(r"[\r\t]+", " ", t)
    t = re.sub(r"\s+", " ", t)
    return t.strip()

ORGANISM_MAP = {
    "mus musculus": "Mus musculus",
    "mouse": "Mus musculus",
    "mice": "Mus musculus",
    "arabidopsis": "Arabidopsis thaliana",
    "arabidopsis thaliana": "Arabidopsis thaliana",
    "drosophila": "Drosophila melanogaster",
}

ASSAY_MAP = {
    "rna-seq": "RNA-seq",
    "rnaseq": "RNA-seq",
    "microarray": "microarray",
    "amplicon": "amplicon",
    "methyl-seq": "methyl-seq",
}

MISSION_MAP = {
    "iss": "ISS",
    "international space station": "ISS",
}

TISSUE_MAP = {
    "bone": "bone",
    "femur": "femur",
    "leaf": "leaf",
}

def std_label(s: str, m: Dict[str, str]) -> str | None:
    if not s:
        return None
    low = s.strip().lower()
    return m.get(low, s.strip())

def parse_metadata_json(x: Any) -> Dict[str, Any]:
    if isinstance(x, dict):
        return x
    if pd.isna(x) or x is None or str(x).strip() == "":
        return {}
    try:
        return json.loads(x)
    except Exception:
        return {}

def coerce_year(v: Any) -> int | None:
    try:
        iv = int(v)
        if 1900 <= iv <= 2100:
            return iv
    except Exception:
        pass
    return None

## Apply cleaning and standardization

This step trims whitespace, harmonizes entity names, and creates standardized columns for filters used in the app UI.


In [None]:
cols = df.columns.tolist()
for c in [
    "study_accession","publication_title","year","mission","organism","tissue",
    "assay","platform","environment","doi","source_url","abstract","metadata_json"
]:
    if c not in df.columns:
        df[c] = None

# Clean text-like fields
for c in ["publication_title","mission","organism","tissue","assay","platform","environment","doi","source_url","abstract"]:
    df[c] = df[c].map(_clean_text)

# Year coercion
df["year"] = df["year"].map(coerce_year)

# Standardized labels for filters
df["organism_std"] = df["organism"].map(lambda x: std_label(x, ORGANISM_MAP))
df["assay_std"] = df["assay"].map(lambda x: std_label(x, ASSAY_MAP))
df["mission_std"] = df["mission"].map(lambda x: std_label(x, MISSION_MAP))
df["tissue_std"] = df["tissue"].map(lambda x: std_label(x, TISSUE_MAP))

# Parse metadata_json into a normalized python dict for downstream logic
df["metadata_parsed"] = df["metadata_json"].map(parse_metadata_json)

df.head(5)

## Deduplicate

Drop duplicate rows based on a composite key of study_accession, assay, and tissue to prevent double counting in analytics and graph views.


In [None]:
before = len(df)
df = df.drop_duplicates(subset=["study_accession","assay","tissue"]).reset_index(drop=True)
after = len(df)
print(f"Deduplicated: {before} -> {after}")
df.head(3)

## Derive filter columns and basic quality checks

Ensure year is present when possible and add fallback derivations if needed to support timeline and filter operations.


In [None]:
# If year missing, try very simple heuristic from DOI year fragment (optional)
def infer_year_from_doi(doi: str) -> int | None:
    if not doi:
        return None
    m = re.search(r"(20\d{2}|19\d{2})", doi)
    if m:
        return coerce_year(m.group(1))
    return None

mask_missing_year = df["year"].isna()
df.loc[mask_missing_year, "year"] = df.loc[mask_missing_year, "doi"].map(infer_year_from_doi)

# Show nulls for key fields
key_cols = ["study_accession","publication_title","year","mission_std","organism_std","tissue_std","assay_std"]
df[key_cols].isna().sum()

## Save cleaned CSV

Write back to data/experiments.csv so the app can read standardized fields for search, graph, and analytics.


In [None]:
CLEAN_CSV.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(CLEAN_CSV, index=False)
print("Saved:", CLEAN_CSV, "rows:", len(df))

## Quick EDA

Basic counts by organism, assay, mission, tissue, and year help validate filter distributions for the dashboard.


In [None]:
def bar_from_counts(series: pd.Series, title: str):
    if series is None or series.empty:
        fig = px.bar(title=title)
        fig.add_annotation(text="No data", showarrow=False, x=0.5, y=0.5)
        return fig
    s = series.dropna().astype(str).value_counts()
    fig = px.bar(x=s.index, y=s.values, title=title)
    fig.update_layout(xaxis_title=None, yaxis_title=None)
    return fig

fig_org = bar_from_counts(df["organism_std"], "By organism")
fig_assay = bar_from_counts(df["assay_std"], "By assay")
fig_mission = bar_from_counts(df["mission_std"], "By mission")
fig_tissue = bar_from_counts(df["tissue_std"], "Top tissues")

fig_org.show()
fig_assay.show()
fig_mission.show()
fig_tissue.show()

if "year" in df.columns:
    y = df["year"].dropna()
    try:
        y = y.astype(int)
        yc = y.value_counts().sort_index()
        fig_y = px.bar(x=yc.index.astype(str), y=yc.values, title="Studies over time")
        fig_y.update_layout(xaxis_title="Year", yaxis_title="Count")
        fig_y.show()
    except Exception:
        pass

## Optional: Fetch from OSDR API (stub)

This stub shows how to wire up programmatic ingestion from the OSDR API once a base endpoint is configured in the environment, returning a normalized dataframe compatible with this project’s CSV schema.


In [None]:
import httpx

def fetch_from_osdr(query: str, limit: int = 25) -> pd.DataFrame:
    """
    Placeholder OSDR ingest:
    - Reads OSDR_BASE from environment (.env) to avoid hardcoding URLs.
    - Returns a DataFrame with columns aligned to experiments.csv schema where possible.
    """
    base = OSDR_BASE.strip()
    if not base:
        print("OSDR_BASE not set; skipping remote fetch.")
        return pd.DataFrame()
    # Example pattern; adapt to the actual OSDR API once available
    # params = {"q": query, "size": limit}
    # url = f"{base.rstrip('/')}/search"
    # with httpx.Client(timeout=20) as client:
    #     r = client.get(url, params=params)
    #     r.raise_for_status()
    #     payload = r.json()
    # Parse payload into rows
    rows: List[Dict[str, Any]] = []
    # for item in payload.get("results", []):
    #     rows.append({ ... map to schema ... })
    return pd.DataFrame(rows)

# Example usage (disabled):
# df_remote = fetch_from_osdr("bone microgravity", limit=10)
# display(df_remote.head())

## Optional: Post cleaned data to backend /ingest

This helper can push the cleaned CSV to the running backend for indexing, entity extraction, and graph building from within the notebook during development.


In [None]:
def post_to_backend_ingest(csv_path: Path, mode: str = "append") -> dict:
    url = f"{BACKEND_BASE.rstrip('/')}/ingest"
    payload = {"sources": [str(csv_path)], "mode": mode}
    try:
        r = httpx.post(url, json=payload, timeout=60)
        r.raise_for_status()
        return r.json()
    except Exception as e:
        print("Ingest failed:", e)
        return {"status": "error", "message": str(e)}

# Example usage (disabled):
# resp = post_to_backend_ingest(CLEAN_CSV, mode="rebuild")
# resp