In [1]:
!pip install -q \
  torch>=2.1.0 \
  transformers>=4.39.0 \
  appdirs \
  jsonpickle \
  filelock \
  h5py \
  nltk \
  dotmap \
  pytest


In [2]:
!pip install radgraph

Collecting radgraph
  Downloading radgraph-0.1.18.tar.gz (587 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/588.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m225.3/588.0 kB[0m [31m6.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m588.0/588.0 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: radgraph
  Building wheel for radgraph (setup.py) ... [?25l[?25hdone
  Created wheel for radgraph: filename=radgraph-0.1.18-py3-none-any.whl size=812635 sha256=7cc1dd0a4503017e36560ff55e540a6e3844612633c74ec78039d3743420096e
  Stored in directory: /root/.cache/pip/wheels/fb/3c/fb/214f5d5cdab2a0f9f0904fd81d7fd1134404100b4444554df8
Successfully built radgraph
Installing collected packages: radgraph
Successfully installed radgraph-0.1.18


In [3]:
import json
from radgraph import get_radgraph_processed_annotations, RadGraph

  return datetime.utcnow().replace(tzinfo=utc)


In [4]:
import pandas as pd
import ast
import re


In [5]:
def clean_text(x):
    if not x:
        return ""
    if isinstance(x, list):
        return ", ".join(x)
    return str(x)


def annotation_to_sentence(annotation):
    obs = clean_text(annotation.get("observation"))
    loc = clean_text(annotation.get("located_at"))
    sug = clean_text(annotation.get("suggestive_of"))
    tag = annotation.get("tags", [""])[0]

    # Normalize tag
    tag = re.sub("_", " ", tag)

    parts = []

    # Handle ABSENT case first
    if tag == "definitely absent":
        if obs:
            sentence = f"No {obs}"
            if loc:
                sentence += f" in the {loc}"
            sentence += "."
            return sentence

    # PRESENT (or unknown) case
    if obs:
        parts.append(obs.capitalize())

    if loc:
        parts.append(f"in the {loc}")

    if sug:
        parts.append(f"suggestive of {sug}")

    sentence = " ".join(parts).strip()

    if sentence and not sentence.endswith("."):
        sentence += "."

    return sentence


In [6]:

df = pd.read_csv("/content/final_gt.csv")

clean = (
    df.iloc[:, 0]
      .astype(str)
      .str.replace(r"\s+", " ", regex=True)
      .str.strip()
      .reset_index(drop=True)
)

concepts = df.iloc[:, 1].reset_index(drop=True)

eval_df = pd.DataFrame({"report": clean, "concepts": concepts})
model_type = "modern-radgraph-xl"
radgraph = RadGraph(model_type=model_type)
reports = eval_df["report"].astype(str).tolist()

pred_concepts = []

for report in reports:
    annotations = radgraph([report])  # one report batch
    processed = get_radgraph_processed_annotations(annotations)  # dict with "processed_annotations"

    sents = []
    for ann in processed["processed_annotations"]:
        s = annotation_to_sentence(ann)
        if s:
            sents.append(s)

    pred_concepts.append(list(dict.fromkeys(sents)))

reports = eval_df["report"].astype(str).tolist()

gt_concepts = [
    ast.literal_eval(x) if pd.notna(x) else []
    for x in eval_df["concepts"].tolist()
]


  return datetime.utcnow().replace(tzinfo=utc)


Using device: cpu


modern-radgraph-xl.tar.gz:   0%|          | 0.00/579M [00:00<?, ?B/s]

  tar.extractall(path=model_dir)
  return datetime.utcnow().replace(tzinfo=utc)


tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/694 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

  return datetime.utcnow().replace(tzinfo=utc)


In [7]:
for i in range(len(pred_concepts)):
  print(i)
  print(reports[i])
  print(gt_concepts[i])
  print(pred_concepts[i])

0
The cardiac, mediastinal and hilar contours are normal. The pulmonary vascularity is normal. Lungs are clear. No pleural effusion or pneumothorax is present. No acute osseous abnormality is seen.
['normal cardiac contours', 'normal mediastinal contours', 'normal hilar contours', 'normal pulmonary vascularity', 'clear lungs', 'no pleural effusion', 'no pneumothorax', 'no acute osseous abnormality']
['Normal in the cardiac contours, mediastinal contours, hilar contours.', 'Normal in the vascularity.', 'Clear in the lungs.', 'No effusion in the pleural.', 'No pneumothorax.', 'No acute abnormality in the osseous.']
1
PA and lateral views of the chest provided demonstrate no focal consolidation, effusion or pneumothorax. The cardiomediastinal silhouette is normal. Bony structures are intact. No free air is seen below the right hemidiaphragm.
['no focal consolidation', 'no pleural effusion', 'no pneumothorax', 'normal cardiomediastinal silhouette', 'normal bony structures', 'no free air un

In [8]:
def canonicalize_concept(s: str,is_pred: bool = False) -> list[str]:
    s = str(s).strip().lower()
    s = re.sub(r"\s+", " ", s)
    s = s.rstrip(".")

    # Split concepts by comma for adjective propagation
    split_concepts = [c.strip() for c in s.split(',')] if is_pred else [s]
    processed_concepts = []

    for concept_str in split_concepts:
        # Rule 1: Propagate leading adjectives (e.g., 'normal') across comma-separated concepts
        # This logic needs to be careful not to apply to single concepts, only to comma-separated ones
        if is_pred and len(split_concepts) > 1:
            # Get the leading adjective from the *first* part of the comma-separated string
            first_part_raw = split_concepts[0]
            match_leading_adj = re.match(r"^(\w+)", first_part_raw)
            leading_adjective = match_leading_adj.group(1) if match_leading_adj else ""

            # Apply adjective to current concept_str if it doesn't already start with an adjective or "no"
            current_concept_first_word = concept_str.split(' ')[0]
            if leading_adjective and \
               current_concept_first_word not in ["normal", "mild", "moderate", "severe", "slight", "minimal", "increased", "decreased", "diffuse", "bilateral", "patchy", "clear", "enlarged", "stable", "unchanged", "intact", "no"] and \
               concept_str != first_part_raw:
                concept_str = f"{leading_adjective} {concept_str}"

        # Canonicalization rules from original function
        concept_str = concept_str.replace("top - normal", "normal")
        concept_str = concept_str.replace("within normal limits", "normal")
        concept_str = concept_str.replace("unremarkable", "normal")

        # reorder templates to match GT phrase style
        # "no effusion in the pleural" -> "no pleural effusion"
        m = re.match(r"^no (.+?) in the (.+)$", concept_str)
        if m:
            obs, loc = m.group(1), m.group(2)
            concept_str = f"no {loc} {obs}"

        # "calcified in the aorta" -> "calcified aorta"
        m = re.match(r"^(.+?) in the (.+)$", concept_str)
        if m:
            obs, loc = m.group(1), m.group(2)
            concept_str = f"{obs} {loc}"

        concept_str = re.sub(r"\s+", " ", concept_str).strip()

        # Rule 2: Remove redundant 'lungs' qualifier if it directly follows 'no'
        if is_pred and concept_str.startswith("no lungs "):
            concept_str = concept_str.replace("no lungs ", "no ", 1)

        # drop single-word leftovers (these usually come from missing location)
        if len(concept_str.split()) == 1:
            concept_str = ""

        if concept_str: # Only add non-empty concepts
            processed_concepts.append(concept_str)

    return processed_concepts

gt_norm= []
for concept_list in gt_concepts:
    report_concepts = []
    for concept in concept_list:
        report_concepts.extend(canonicalize_concept(concept, is_pred=False))
    gt_norm.append([x for x in report_concepts if x])

pred_norm= []
for concept_list in pred_concepts:
    report_concepts = []
    for concept_str_raw in concept_list:
        report_concepts.extend(canonicalize_concept(concept_str_raw, is_pred=True))
    pred_norm.append([x for x in report_concepts if x])




In [9]:
for i in (range(len(pred_concepts))):
  print(reports[i])
  print(i," :")
  print(gt_norm[i],'\n',pred_norm[i],'\n')


The cardiac, mediastinal and hilar contours are normal. The pulmonary vascularity is normal. Lungs are clear. No pleural effusion or pneumothorax is present. No acute osseous abnormality is seen.
0  :
['normal cardiac contours', 'normal mediastinal contours', 'normal hilar contours', 'normal pulmonary vascularity', 'clear lungs', 'no pleural effusion', 'no pneumothorax', 'no acute osseous abnormality'] 
 ['normal cardiac contours', 'normal mediastinal contours', 'normal hilar contours', 'normal vascularity', 'clear lungs', 'no pleural effusion', 'no pneumothorax', 'no osseous acute abnormality'] 

PA and lateral views of the chest provided demonstrate no focal consolidation, effusion or pneumothorax. The cardiomediastinal silhouette is normal. Bony structures are intact. No free air is seen below the right hemidiaphragm.
1  :
['no focal consolidation', 'no pleural effusion', 'no pneumothorax', 'normal cardiomediastinal silhouette', 'normal bony structures', 'no free air under diagphrag

In [10]:
def similarity1(a: str, b: str) -> float:
    a_tokens = set(a.lower().split())
    b_tokens = set(b.lower().split())

    if not a_tokens and not b_tokens:
        return 1.0
    if not a_tokens or not b_tokens:
        return 0.0

    intersection = a_tokens & b_tokens
    union = a_tokens | b_tokens

    return len(intersection) / len(union)


In [11]:
def similarity2(a: str, b: str) -> float:
    a_tokens = set(a.lower().split())
    b_tokens = set(b.lower().split())

    if not a_tokens and not b_tokens:
        return 1.0
    if not a_tokens or not b_tokens:
        return 0.0

    intersection = a_tokens & b_tokens
    union = a_tokens | b_tokens

    return (2*len(intersection)) / (len(a_tokens)+len(b_tokens))


In [12]:
def fuzzy_counts_one(gt_list, pred_list, threshold):
    gt = [g.strip().lower() for g in gt_list if g.strip()]
    pr = [p.strip().lower() for p in pred_list if p.strip()]

    used_gt = set()
    TP = 0

    for p in pr:
        best_j = None
        best_score = 0.0

        for j, g in enumerate(gt):
            if j in used_gt:
                continue

            score = similarity2(p, g)
            if score > best_score:
                best_score = score
                best_j = j

        if best_j is not None and best_score >= threshold:
            TP += 1
            used_gt.add(best_j)

    FP = len(pr) - TP
    FN = len(gt) - TP

    return TP, FP, FN


In [13]:
def fuzzy_prf(gt_norm, pred_norm, threshold):
    TP = FP = FN = 0

    for gt, pr in zip(gt_norm, pred_norm):
        t, f, n = fuzzy_counts_one(gt, pr, threshold)
        TP += t
        FP += f
        FN += n

    precision = TP / (TP + FP) if TP + FP else 0
    recall    = TP / (TP + FN) if TP + FN else 0
    f1        = (2 * precision * recall / (precision + recall)) if precision + recall else 0

    return precision, recall, f1, (TP, FP, FN)


In [14]:

for th in [0.7,0.75, 0.8, 0.85,0.9]:
    P, R, F1, counts = fuzzy_prf(gt_norm, pred_norm, threshold=th)
    print(th, P, R, F1)

0.7 0.6970128022759602 0.6940509915014165 0.6955287437899219
0.75 0.6842105263157895 0.6813031161473088 0.6827537260468417
0.8 0.647226173541963 0.6444759206798867 0.6458481192334989
0.85 0.5860597439544808 0.5835694050991501 0.5848119233498936
0.9 0.55049786628734 0.5481586402266289 0.5493257629524485


In [15]:
import pandas as pd

# Create a DataFrame from pred_norm_revised
# Each row will be a list of predicted concepts for a report
predicted_concepts_df = pd.DataFrame({'concepts': pred_norm})

# Save the DataFrame to a CSV file
output_filename = 'pre_concepts.csv'
predicted_concepts_df.to_csv(output_filename, index=False)

print(f"Predicted concepts saved to '{output_filename}'.")
print("You can now download this file from the Colab file browser.")

Predicted concepts saved to 'pre_concepts.csv'.
You can now download this file from the Colab file browser.


no fuzzy

In [None]:
TP = FP = FN = 0

for g_list, p_list in zip(gt_norm, pred_norm):
    g = set(g_list)
    p = set(p_list)

    TP += len(g & p)
    FP += len(p - g)
    FN += len(g - p)

precision = TP / (TP + FP) if (TP + FP) else 0
recall    = TP / (TP + FN) if (TP + FN) else 0
f1        = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0

print("TP FP FN:", TP, FP, FN)
print("Precision:", precision)
print("Recall:", recall)
print("F1:", f1)


TP FP FN: 1125 1705 1711
Precision: 0.39752650176678445
Recall: 0.3966854724964739
F1: 0.3971055418284504


In [None]:
subset_correct = 0
for g_list, p_list in zip(gt_norm, pred_norm):
    if set(g_list) == set(p_list):
        subset_correct += 1

subset_accuracy = subset_correct / len(gt_norm) if len(gt_norm) else 0
print("Subset accuracy:", subset_accuracy)



Subset accuracy: 0.00997506234413965


In [None]:
label_set = sorted(set(x for xs in gt_norm for x in xs) | set(x for xs in pred_norm for x in xs))
L = len(label_set)
N = len(gt_norm)

mismatches = 0
for g_list, p_list in zip(gt_norm, pred_norm):
    g = set(g_list)
    p = set(p_list)
    for lbl in label_set:
        mismatches += int((lbl in g) ^ (lbl in p))

hamming_loss = mismatches / (N * L) if (N * L) else 0
print("Hamming loss:", hamming_loss)
print("N (reports):", N, "L (labels):", L)


Hamming loss: 0.004052665671691371
N (reports): 401 L (labels): 2102
