assign temperature range assertions, like would be found in local/flattened_n4l_temperature_components.tsv, agaisnt the minimum and maximum boundaries in metpo.owl

why are there N4L and NCBI taxa in there now?

need better review of  conflicts, which should lead to modified parsing

check which taxa didn't get any assignments

N4L temperature predicates, after normalization

* <http://example.com/n4l/temperature>
    * at least partially handled by metpo/classify_temperature_values.ipynb, sparql/report_parsed_temperature_categories.rq
* <http://example.com/n4l/temperature_(grows)>
* <http://example.com/n4l/temperature_optimum>
* <http://example.com/n4l/temperature_range>
* <http://example.com/n4l/temperature_(does_not_grow)>

What can be inferred from another temperature assertion?

Optimum is generally measured, not inferred.

* Delta can always be calculated from the range.
* Categorical labels are assigned based on optimum (and sometimes range).
* Psychrotolerant/thermotolerant specifically require both optimum and range (and duration of tolerance?) for accurate assignment.



In [None]:
import os
import re
from typing import List, Tuple, Dict, Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from pathlib import Path

import xml.etree.ElementTree as ET
from xml.dom import minidom

In [None]:
notebook_dir = Path().resolve()
project_root = notebook_dir if (notebook_dir / "Makefile").exists() else notebook_dir.parent
# assets_dir = project_root / "assets"
# local_dir = project_root / "local"

In [None]:
PROTOLOG_DIR = project_root / "assets" / "N4L_phenotypic_ontology_2016" / "extracted_protologs_2013" / "protologs"

protologs_set = set(os.listdir(PROTOLOG_DIR))

In [None]:
classes_csv = project_root / "local" / "metpo_classes_temperature_limits.csv"
observations_tsv = project_root / "local" / "flattened_n4l_temperature_components.tsv"
output_tsv = project_root / "local" / "categorized_temperature_range_assignments.tsv"
summary_tsv = project_root / "local" / "categorized_temperature_range_summary.tsv"

In [None]:
intra_conflict_threshold = 8
inter_conflict_threshold = 1

In [None]:
overlap_threshold = 0.9

In [None]:
observations_df = pd.read_csv(observations_tsv, sep='\t')

In [None]:
# Load inputs
classes_df = pd.read_csv(classes_csv)


In [None]:
observations_df = observations_df[['?subject', '?predicate',
                                   '?minimum_value', '?maximum_value', '?spot_value',
                                   '?unit']]

In [None]:
observations_df = observations_df.drop_duplicates()


In [None]:
observations_df = observations_df[
    observations_df[['?minimum_value', '?maximum_value', '?spot_value']]
    .notna().any(axis=1)
]


In [None]:
# Clean class definitions
classes_df = classes_df.rename(columns={
    "s": "class_iri",
    "l": "class_label",
    "min": "class_min",
    "max": "class_max"
})

In [None]:
observations_df = observations_df.rename(columns=lambda c: c.lstrip('?'))

In [None]:
for col in ['minimum_value', 'maximum_value', 'spot_value']:
    observations_df[col] = pd.to_numeric(observations_df[col], errors='coerce')


In [None]:
# Remove angle brackets for subject matching
observations_df['subject'] = observations_df['subject'].str.strip('<>')
observations_df['predicate'] = observations_df['predicate'].str.strip('<>')


In [None]:
# Convert numeric values if not done already
for col in ['minimum_value', 'maximum_value', 'spot_value']:
    observations_df[col] = pd.to_numeric(observations_df[col], errors='coerce')


In [None]:
observations_df

may want to add facultative psychrophile to match KG-Microbe

In [None]:
# -----------------------------------------------
# Define class groupings
# -----------------------------------------------

# Group 1: growth category classes (e.g., thermophile)
growth_classes = classes_df[classes_df['class_label'].isin([
    "psychrophile", "psychrotolerant", "mesophile",
    "thermotolerant", "thermophile", "hyperthermophile",
    "extreme thermophile", "extreme hyperthermophile"
])].copy()

In [None]:
# Group 2: temperature optimum and range subclasses
subrange_classes = classes_df[classes_df['class_label'].str.startswith("temperature optimum") |
                              classes_df['class_label'].str.startswith("temperature range")].copy()


In [None]:
# Group 3: delta range classes
delta_classes = classes_df[classes_df['class_label'].str.startswith("temperature delta")].copy()


In [None]:
# Count non-null values across minimum, maximum, spot (melted)
observation_counts = (
    observations_df[['subject', 'minimum_value', 'maximum_value', 'spot_value']]
    .melt(id_vars='subject', value_name='value')
    .dropna(subset=['value'])
    .groupby('subject')
    .size()
    .reset_index(name='observation_count')
)

# Count unique predicates per subject
predicate_counts = (
    observations_df[['subject', 'predicate']]
    .drop_duplicates()
    .groupby('subject')
    .size()
    .reset_index(name='predicate_count')
)

# ✅ Merge them into one DataFrame
summary = pd.merge(observation_counts, predicate_counts, on='subject', how='outer').fillna(0)
summary['observation_count'] = summary['observation_count'].astype(int)
summary['predicate_count'] = summary['predicate_count'].astype(int)

summary['magnitude'] = np.sqrt(summary['observation_count'] ** 2 + summary['predicate_count'] ** 2)

In [None]:
PREDICATE_COLORS = {
    'temperature_optimum': 'green',
    'temperature_range': 'yellow',
    'temperature_(grows)': 'orange',
    'temperature_(does_not_grow)': 'red'
}


In [None]:
def get_predicate_color(predicate_iri):
    short = predicate_iri.split('/')[-1]
    return PREDICATE_COLORS.get(short, 'gray')

In [None]:

def plot_temperature_profile(subject_id=None, observations_df=None, classes_df=None):
    has_obs = observations_df is not None and subject_id is not None
    has_classes = classes_df is not None

    if not has_obs and not has_classes:
        raise ValueError("Must provide at least a subject + observations_df or a classes_df.")

    fig, ax = plt.subplots(figsize=(10, 6))
    y_labels = []
    y_positions = []
    y_pos = 0

    # === Plot subject observations ===
    if has_obs:
        sub = observations_df[observations_df['subject'] == subject_id].copy()
        if sub.empty:
            print(f"No observations found for subject: {subject_id}")
        else:
            spot_agg = (
                sub[pd.notnull(sub['spot_value'])]
                .groupby(['predicate', 'spot_value'])
                .size()
                .reset_index(name='count')
            )

            for pred_short in reversed(PREDICATE_COLORS):
                full_preds = [p for p in sub['predicate'].unique() if p.endswith(pred_short)]
                for pred in full_preds:
                    color = get_predicate_color(pred)

                    # Spot values
                    spots = spot_agg[spot_agg['predicate'] == pred]
                    if not spots.empty:
                        for _, row in spots.iterrows():
                            ax.scatter(row['spot_value'], y_pos, color=color,
                                       s=30 + 20 * row['count'], zorder=5)
                        y_labels.append(f"{pred_short} (spot)")
                        y_positions.append(y_pos)
                        y_pos += 1

                    # Range values
                    ranges = sub[
                        (sub['predicate'] == pred) &
                        pd.notnull(sub['minimum_value']) &
                        pd.notnull(sub['maximum_value'])
                        ]
                    if not ranges.empty:
                        range_group = (
                            ranges.groupby(['minimum_value', 'maximum_value'])
                            .size()
                            .reset_index(name='count')
                        )
                        for _, row in range_group.iterrows():
                            ax.plot([row['minimum_value'], row['maximum_value']], [y_pos, y_pos],
                                    color=color, lw=1 + row['count'], alpha=0.6)
                            ax.scatter([row['minimum_value'], row['maximum_value']], [y_pos, y_pos],
                                       color=color, s=20 + 10 * row['count'])
                        y_labels.append(f"{pred_short} (range)")
                        y_positions.append(y_pos)
                        y_pos += 1

    # === Plot target classes ===
    if has_classes:
        class_df = classes_df[
            ~classes_df['class_label'].str.startswith("temperature delta")
        ].copy()

        # Sorting keys
        class_df['sort_min'] = class_df['class_min'].fillna(-np.inf)
        class_df['sort_max'] = class_df['class_max'].fillna(np.inf)

        # Grouping logic
        is_range = class_df['class_label'].str.contains("temperature range", case=False)
        is_optimum = class_df['class_label'].str.contains("temperature optimum", case=False)

        df_range = class_df[is_range].sort_values(by=['sort_min', 'sort_max'])
        df_optimum = class_df[is_optimum & ~is_range].sort_values(by=['sort_min', 'sort_max'])
        df_other = class_df[~is_range & ~is_optimum].sort_values(by=['sort_min', 'sort_max'])

        sorted_class_df = pd.concat([df_range, df_optimum, df_other], ignore_index=True)

        for _, row in sorted_class_df.iterrows():
            label = row['class_label']
            min_val = row['class_min']
            max_val = row['class_max']
            cy = y_pos

            if pd.notnull(min_val) and pd.notnull(max_val):
                ax.plot([min_val, max_val], [cy, cy], color='black', lw=2)
                ax.scatter([min_val, max_val], [cy, cy], color='black', zorder=5)
            elif pd.notnull(min_val) and pd.isnull(max_val):
                ax.text(min_val, cy, '▶', fontsize=12, ha='left', va='center', color='black')
            elif pd.notnull(max_val) and pd.isnull(min_val):
                ax.text(max_val, cy, '◀', fontsize=12, ha='right', va='center', color='black')

            y_labels.append(label)
            y_positions.append(cy)
            y_pos += 1

    # === Final layout ===
    if y_labels:
        ax.set_yticks(y_positions)
        ax.set_yticklabels(y_labels)

    ax.set_xlabel("Temperature (°C)")
    title = "Temperature Profile"
    if subject_id:
        title += f" for {subject_id}"
    ax.set_title(title)
    ax.grid(True)
    plt.tight_layout()
    plt.show()


In [None]:

def assign_temperature_classes_advanced(
        subject_id,
        observations_df,
        classes_df,
        overlap_threshold=0.5,
        return_all_matches=True
):
    def get_range(row):
        if pd.notnull(row.get("minimum_value")) and pd.notnull(row.get("maximum_value")):
            return row["minimum_value"], row["maximum_value"]
        elif pd.notnull(row.get("spot_value")):
            return row["spot_value"], row["spot_value"]
        else:
            return None

    obs = observations_df[observations_df["subject"] == subject_id]
    if obs.empty:
        return pd.DataFrame()

    predicate_intervals = {
        "temperature_optimum": [],
        "temperature_range": [],
        "temperature_(grows)": [],
        "temperature_(does_not_grow)": []
    }

    for _, row in obs.iterrows():
        predicate = row["predicate"].split("/")[-1]
        if predicate in predicate_intervals:
            r = get_range(row)
            if r:
                predicate_intervals[predicate].append(r)

    all_positive_ranges = predicate_intervals["temperature_range"] + predicate_intervals["temperature_(grows)"] + \
                          predicate_intervals["temperature_optimum"]
    if all_positive_ranges:
        min_temp = min(r[0] for r in all_positive_ranges)
        max_temp = max(r[1] for r in all_positive_ranges)
        delta = max_temp - min_temp
    else:
        delta = None

    def tag_class_group(label):
        if "delta" in label.lower():
            return "delta"
        elif "range" in label.lower():
            return "range"
        elif "optimum" in label.lower():
            return "optimum"
        else:
            return "categorical"

    results = []
    class_df = classes_df.copy()
    class_df["group"] = class_df["class_label"].apply(tag_class_group)
    class_df["span"] = class_df["class_max"].fillna(np.inf) - class_df["class_min"].fillna(-np.inf)

    for _, cls in class_df.iterrows():
        label = cls["class_label"]
        iri = cls.get("class_iri")
        group = cls["group"]
        cmin = cls["class_min"] if pd.notnull(cls["class_min"]) else -np.inf
        cmax = cls["class_max"] if pd.notnull(cls["class_max"]) else np.inf
        span = cls["span"]

        matched = False
        excluded = False
        matched_range = None
        overlap_fraction = 0

        if group == "optimum":
            candidates = predicate_intervals["temperature_optimum"]
        elif group in ["range", "categorical"]:
            candidates = predicate_intervals["temperature_range"] + predicate_intervals["temperature_(grows)"]
        elif group == "delta" and delta is not None:
            if cmin <= delta <= cmax:
                matched = True
                matched_range = (delta, delta)
                overlap_fraction = 1.0
        else:
            candidates = []

        for rmin, rmax in candidates:
            overlap_min = max(cmin, rmin)
            overlap_max = min(cmax, rmax)
            overlap_len = max(0, overlap_max - overlap_min)
            rlen = max(1e-6, rmax - rmin)
            frac = overlap_len / rlen
            if frac >= overlap_threshold:
                matched = True
                matched_range = (rmin, rmax)
                overlap_fraction = frac
                break

        if matched and group != "delta":
            for rmin, rmax in predicate_intervals["temperature_(does_not_grow)"]:
                if max(cmin, rmin) < min(cmax, rmax):
                    excluded = True
                    break

        if matched:
            results.append({
                "subject": subject_id,
                "class_label": label,
                "class_iri": iri,
                "class_group": group,
                "assignment": "match" if not excluded else "excluded",
                "source_range": matched_range,
                "class_range": (cmin, cmax),
                "overlap_fraction": overlap_fraction,
                "class_span": span
            })

    result_df = pd.DataFrame(results)
    if not return_all_matches:
        final = []
        for group in result_df["class_group"].unique():
            subset = result_df[(result_df["class_group"] == group) & (result_df["assignment"] == "match")]
            if not subset.empty:
                final.append(subset.loc[subset["class_span"].idxmin()])
        return pd.DataFrame(final)

    return result_df


In [None]:
assignments = assign_temperature_classes_advanced(
    "http://example.com/n4l/rid.2547_nm.11491",
    observations_df,
    classes_df,
    overlap_threshold=overlap_threshold,
    return_all_matches=True
)

In [None]:
unique_subjects = observations_df['subject'].unique()

In [None]:
assignments = pd.concat([
    assign_temperature_classes_advanced(
        subject_id=subj,
        observations_df=observations_df,
        classes_df=classes_df,
        overlap_threshold=overlap_threshold,
        return_all_matches=True
    )
    for subj in unique_subjects
], ignore_index=True)

In [None]:
assignments

In [None]:
def detect_temperature_conflicts(observations_df: pd.DataFrame) -> pd.DataFrame:
    def extract_intervals(df: pd.DataFrame) -> List[Tuple[float, float]]:
        ranges = []
        for _, row in df.iterrows():
            if pd.notnull(row.get("minimum_value")) and pd.notnull(row.get("maximum_value")):
                ranges.append((row["minimum_value"], row["maximum_value"]))
            elif pd.notnull(row.get("spot_value")):
                v = row["spot_value"]
                ranges.append((v, v))
        return ranges

    def compute_spread(intervals: List[Tuple[float, float]]) -> float:
        if not intervals:
            return 0.0
        mins, maxs = zip(*intervals)
        return max(maxs) - min(mins)

    def count_disjoint_clusters(intervals: List[Tuple[float, float]], proximity=2.0) -> int:
        if not intervals:
            return 0
        sorted_intervals = sorted(intervals, key=lambda x: x[0])
        clusters = 1
        _, current_end = sorted_intervals[0]
        for start, end in sorted_intervals[1:]:
            if start > current_end + proximity:
                clusters += 1
                current_end = end
            else:
                current_end = max(current_end, end)
        return clusters

    def has_overlap(a: Tuple[float, float], b: Tuple[float, float], buffer=1.0) -> bool:
        return max(a[0], b[0]) - min(a[1], b[1]) <= buffer

    summaries = []
    for subject, group in observations_df.groupby("subject"):
        summary = {"subject": subject}
        notes = []

        # Per-predicate intra-conflict analysis (excluding does_not_grow)
        intra_scores = []
        for predicate in ["temperature_optimum", "temperature_range", "temperature_(grows)"]:
            pred_group = group[group["predicate"].str.endswith(predicate)]
            intervals = extract_intervals(pred_group)
            spread = compute_spread(intervals)
            clusters = count_disjoint_clusters(intervals)

            summary[f"{predicate}_spread"] = spread
            summary[f"{predicate}_clusters"] = clusters
            intra_scores.append(clusters + spread / 10)

            if clusters > 1:
                notes.append(f"{predicate} has {clusters} disjoint clusters")
            if spread > 30:
                notes.append(f"{predicate} spread is wide ({spread:.1f}°C)")

        summary["intra_conflict_score"] = sum(intra_scores)

        # Inter-predicate conflict: grows vs does_not_grow
        grows = extract_intervals(group[group["predicate"].str.endswith("temperature_(grows)")])
        not_grows = extract_intervals(group[group["predicate"].str.endswith("temperature_(does_not_grow)")])
        inter_score = 0
        for g in grows:
            for ng in not_grows:
                if has_overlap(g, ng, buffer=2.0):
                    inter_score += 1
                    notes.append(f"Grow {g} conflicts with NoGrow {ng}")

        summary["inter_conflict_score"] = inter_score
        summary["conflict_notes"] = "; ".join(notes) if notes else "None"
        summaries.append(summary)

    return pd.DataFrame(summaries)


In [None]:
temperature_conflicts = detect_temperature_conflicts(observations_df)

In [None]:
# Merge the counts and conflicts
full_summary = pd.merge(
    summary,
    temperature_conflicts,
    on='subject',
    how='left'  # so no subject gets dropped
)

In [None]:
# Assuming your DataFrame is called df
plt.figure(figsize=(6, 6))
plt.scatter(full_summary["intra_conflict_score"], full_summary["inter_conflict_score"], alpha=0.7)
plt.xlabel("Intra Conflict Score")
plt.ylabel("Inter Conflict Score")
plt.title("Scatter Plot of Conflict Scores")
plt.grid(True)
plt.show()

In [None]:
plt.figure(figsize=(6, 4))
plt.hist(full_summary["intra_conflict_score"], bins=30, color="steelblue", edgecolor="black", alpha=0.8)
plt.xlabel("Intra Conflict Score")
plt.ylabel("Frequency")
plt.title("Histogram of Intra Conflict Scores")
plt.grid(True)
plt.show()

In [None]:
high_conflict_df = full_summary.query(
    f"intra_conflict_score >= {intra_conflict_threshold} or inter_conflict_score >= {inter_conflict_threshold}")


In [None]:
def extract_ids(subject_iri: str) -> Tuple[str, str]:
    rid_match = re.search(r"(rid\.\d+)", subject_iri)
    nm_match = re.search(r"(nm\.\d+)", subject_iri)
    rid = rid_match.group(1) if rid_match else "?"
    nm = nm_match.group(1) if nm_match else "?"
    return rid, nm

In [None]:
def find_protolog_files(rid: str, nm: str, filenames: set) -> Dict[str, Optional[str] or List[str]]:
    exact = f"{rid}_{nm}.xml" if rid != "?" and nm != "?" else None
    exact_match = exact if exact in filenames else None

    partial_matches = []

    # Compile strict regex: match only if ID is surrounded by (_) or (.) or start/end of string
    rid_pattern = re.compile(rf"(^|[_\.]){re.escape(rid)}([_\.]|$)") if rid != "?" else None
    nm_pattern = re.compile(rf"(^|[_\.]){re.escape(nm)}([_\.]|$)") if nm != "?" else None

    for f in filenames:
        if exact_match and f == exact_match:
            continue
        if ((rid_pattern and rid_pattern.search(f)) or
                (nm_pattern and nm_pattern.search(f))):
            partial_matches.append(f)

    return {
        "exact": exact_match,
        "partial": sorted(partial_matches)
    }


In [None]:
def load_protolog_content_pretty(filepath: str) -> Optional[str]:
    try:

        ns = {"n4l": "http://namesforlife.com/ns/protolog"}
        ET.register_namespace('', ns["n4l"])

        tree = ET.parse(filepath)
        root = tree.getroot()
        content = root.find(".//{http://namesforlife.com/ns/protolog}content")

        if content is not None:
            # Convert ElementTree element to string
            rough_string = ET.tostring(content, encoding="utf-8")
            # Parse with minidom for pretty print
            reparsed = minidom.parseString(rough_string)
            return reparsed.toprettyxml(indent="  ")
        else:
            return "(no <content> node found)"
    except Exception as e:
        print(f"(Failed to load {filepath}: {e})")
        return None


In [None]:
def display_subject_protolog(
        subject_iri: str,
        intra_score: float,
        inter_score: float,
        observations_df: pd.DataFrame,
        classes_df: pd.DataFrame
):
    rid, nm = extract_ids(subject_iri)
    print(f"## {subject_iri}")
    print(f"  - Reference ID: {rid}")
    print(f"  - Taxon Name ID: {nm}")
    print(f"  - Intra Conflict Score: {intra_score}")
    print(f"  - Inter Conflict Score: {inter_score}")

    plot_temperature_profile(subject_id=subject_iri, observations_df=observations_df, classes_df=classes_df)

    match_info = find_protolog_files(rid, nm, protologs_set)
    if match_info["exact"]:
        print(f"\n✅ Exact match: {match_info['exact']}")
        path = os.path.join(PROTOLOG_DIR, match_info["exact"])
        xml_text = load_protolog_content_pretty(path)
        if xml_text:
            print("\n--- Protolog XML ---")
            print(xml_text)
    elif match_info["partial"]:
        print(f"\n🟡 Partial matches for {rid or '[none]'}, {nm or '[none]'}:")
        for fname in match_info["partial"]:
            print(f"   - {fname}")
        for fname in match_info["partial"]:
            path = os.path.join(PROTOLOG_DIR, fname)
            xml_text = load_protolog_content_pretty(path)
            if xml_text:
                print("\n--- Protolog XML ---")
                print(xml_text)
                break
    else:
        print("\n(no protolog XML found)")

In [None]:
for _, row in high_conflict_df.iterrows():
    display_subject_protolog(
        subject_iri=row["subject"],
        intra_score=row["intra_conflict_score"],
        inter_score=row["inter_conflict_score"],
        observations_df=observations_df,
        classes_df=classes_df
    )


# 📊 Key DataFrames Created in `categorize_temperature_ranges.ipynb`

This notebook processes structured temperature phenotype data and creates several intermediate and final DataFrames. Below is a categorized summary of all major DataFrames that may be useful to save or reuse.

---

## ✅ Primary Inputs

### `observations_df`
- **Source**: `../assets/flattened_n4l_temperature_components_manually_filtered_quantitative.tsv`
- **Description**: Raw temperature-related annotations from N4L.
- **Columns**:
  `subject`, `predicate`, `minimum_value`, `maximum_value`, `spot_value`, etc.

### `classes_df`
- **Source**: `../assets/metpo-temperature-ranges-of-classes.csv`
- **Description**: Definitions of temperature class boundaries.
- **Columns**:
  `class_iri`, `class_label`, `class_min`, `class_max`

---

## 🧮 Intermediate & Analytical Outputs

### `summary`
- **Description**: Observation and predicate counts per subject.
- **Columns**:
  `subject`, `observation_count`, `predicate_count`, `magnitude`

### `assignments`
- **Description**: Per-subject temperature class assignment results.
- **Columns**:
  `subject`, `class_label`, `class_iri`, `assignment` (`match` or `excluded`),
  `source_range`, `class_range`, `overlap_fraction`, `class_span`, `class_group`

### `temperature_conflicts`
- **Description**: Per-subject intra- and inter-predicate conflict diagnostics.
- **Columns**:
  - `[predicate]_spread`
  - `[predicate]_clusters`
  - `intra_conflict_score`
  - `inter_conflict_score`
  - `conflict_notes`

### `full_summary`
- **Description**: Merge of `summary` and `temperature_conflicts`.
- **Use**: Ideal for export, scoring, or dashboarding.
- **Columns**: All of `summary` + `temperature_conflicts`

### `high_conflict_df`
- **Description**: Subset of `full_summary` with high conflict scores.
- **Filter Logic**:
  `intra_conflict_score >= 8 or inter_conflict_score >= 1`

---

## 📎 Optional Outputs

### Per-subject Plots
- **Generated by**: `plot_temperature_profile(subject_id, ...)`
- **Use**: Diagnostic visualizations (not saved automatically).

### Protolog Debug Info
- **Generated by**: `display_subject_protolog(...)`
- **Use**: XML snippets and alignment evidence for selected high-conflict subjects.


In [None]:
full_summary.to_csv(summary_tsv, sep="\t", index=False)

In [None]:
assignments.to_csv(output_tsv, sep="\t", index=False)