<a href="https://colab.research.google.com/github/Danny2173/RAGproject/blob/main/4_Term_Normalization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Imports

In [None]:
# Install dependencies
%pip install -q pandas nltk

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Imports
import json, re, pickle
from collections import defaultdict
import pandas as pd
import nltk
from nltk.corpus import stopwords


##Loading Corpus

In [None]:
load_path = '/content/drive/MyDrive/corpus.json'

# Load the corpus
with open(load_path, "r") as f:
    corpus = json.load(f)

print("Corpus loaded")

Mounted at /content/drive
Corpus loaded


##Building term normalization

In [None]:
import pandas as pd
from collections import defaultdict

# Loading MRCONSO.RRF
mrconso_path = '/content/drive/MyDrive/MRCONSO.RRF'
col_names = [
    "CUI", "LAT", "TS", "LUI", "STT", "SUI", "ISPREF", "AUI",
    "SAUI", "SCUI", "SDUI", "SAB", "TTY", "CODE", "STR",
    "SRL", "SUPPRESS", "CVF"
]

# Filter for only English and MeSH dictionary
df = pd.read_csv(mrconso_path, sep='|', names=col_names, dtype=str, index_col=False)
df = df[(df["LAT"] == "ENG")]
df = df[df["SAB"].isin(["MSH", "SNOMEDCT_US", "LCH", "MEDLINE", "NCI"])]
df = df.dropna(subset=["STR"])

# Loading MRSTY.RRF for merging label type
sty_df = pd.read_csv(
    '/content/drive/MyDrive/MRSTY.RRF',
    sep='|',
    header=None,
    usecols=[0, 1, 3],
    names=['CUI', 'TUI', 'STY'],
    dtype=str
)


# Selecting MH (Main Heading) as the main term (priority order)
tty_order = ["MH", "PM", "SY", "ENTRY"]

def best_label(group):
    for tty in tty_order:
        matches = group[group["TTY"] == tty]
        if not matches.empty:
            return matches.iloc[0]["STR"].lower()
    return group.iloc[0]["STR"].lower()

# Grouping by CUI key
cui_to_main_term = (
    df.groupby("CUI", group_keys=False)
    .apply(best_label)
    .to_dict()
)

# Function to find synonyms
cui_to_synonyms = defaultdict(set)
for _, row in df.iterrows():
    cui_to_synonyms[row["CUI"]].add(row["STR"].lower())

# Merge together using main term and synonym df
merged = sty_df[sty_df["CUI"].isin(cui_to_main_term)].copy()
merged["Preferred"] = merged["CUI"].map(cui_to_main_term)
merged["Synonyms"] = merged["CUI"].map(lambda cui: sorted(cui_to_synonyms[cui]))

# Filtering for Disease/Condition only
allowed_tuis = {"T047", "T191"}
filtered = merged[merged["TUI"].isin(allowed_tuis)]


  .apply(get_best_label_from_group)


In [None]:
# Tracking CUIs wanted
allowed_cuis = set(filtered["CUI"])

# Rebuilding cui_to_synonyms and cui_to_main_term using allowed CUIs
cui_to_synonyms = defaultdict(set)
for _, row in df[df["CUI"].isin(allowed_cuis)].iterrows():
    cui_to_synonyms[row["CUI"]].add(row["STR"].lower())

cui_to_main_term = (
    df[df["CUI"].isin(allowed_cuis)]
    .groupby("CUI", group_keys=False)
    .apply(get_best_label_from_group)
    .to_dict()
)

# Reverse look-up synonym -> CUI
term_to_CUI = {
    syn: cui for cui, syns in cui_to_synonyms.items() for syn in syns
}


  .apply(get_best_label_from_group)


#Saving Pickle files

In [None]:
# Saving pickle files
with open("/content/drive/MyDrive/filtered_term_to_CUI.pkl", "wb") as f:
    pickle.dump(term_to_CUI, f)

with open("/content/drive/MyDrive/filtered_cui_to_main_term.pkl", "wb") as f:
    pickle.dump(cui_to_main_term, f)


In [None]:
# Loading pickle files

with open("/content/drive/MyDrive/filtered_term_to_CUI.pkl", "rb") as f:
    term_to_CUI = pickle.load(f)

with open("/content/drive/MyDrive/filtered_cui_to_main_term.pkl", "rb") as f:
    cui_to_main_term = pickle.load(f)

##Normalizing Corpus

In [None]:
# Creating ngrams and tracking indices
def ngram_tokenize_tokens(tokens, max_len=5):
    ngrams = []
    for i in range(len(tokens)):
        for j in range(i + 1, min(i + 1 + max_len, len(tokens) + 1)):
            span = tokens[i:j]
            ngram = ' '.join(span)
            ngrams.append((ngram, i, j))
    return ngrams

# Normalizing medical terms using main condition name
def cui_normalization(sentence, max_ngram_len=5):
    tokens = re.findall(r'\w+|\W+', sentence)
    # Filtering out words
    words = [w.lower() for w in tokens if re.match(r'\w+', w)]

    # Call tokenization function to return ngrams tuples
    ngrams = ngram_tokenize_tokens(words, max_ngram_len)
    replacements = []

    # Searching for terms in dictionary
    for ngram, start, end in ngrams:
        if ngram in term_to_CUI:
            cui = term_to_CUI[ngram]
            if cui in cui_to_main_term:
                replacements.append((start, end, cui_to_main_term[cui]))

    # Sorting by length then index (ensure longer terms first)
    replacements.sort(key=lambda x: (x[0], -(x[1] - x[0])))
    used = set()
    final = []
    # Ensure no overlap (check already used indices)
    for start, end, main_term in replacements:
        if not any(i in used for i in range(start, end)):
            final.append((start, end, main_term))
            used.update(range(start, end))

    # Reconstruct the sentence
    word_idx = 0
    output = []
    i = 0
    while i < len(tokens):
        # If the token is a word
        if re.match(r'\w+', tokens[i]):
            # Checking if index appears in final
            match = next((f for f in final if f[0] == word_idx), None)
            if match:
                output.append(match[2])  # append main term
                skip = match[1] - match[0]
                while skip > 0 and i < len(tokens):
                    if re.match(r'\w+', tokens[i]):
                        skip -= 1
                    i += 1
                # Update word-level index
                word_idx += (match[1] - match[0])
                continue
            word_idx += 1
        output.append(tokens[i])
        i += 1

    return ''.join(output)


##Example

In [None]:
# Example input sentence
example = "What are the symptons of type 2 diabetes?"

# Normalize it
normalized = cui_normalization(example)

# Output
print("Original:", example)
print("Normalized:", normalized)


Original: What are the symptons of type 2 diabetes?
Normalized: What are the symptons of diabetes mellitus, type 2?


In [None]:
# Normalizing entire corpus

for entry in corpus:
    entry["normalized_text"] = cui_normalization(entry["text"])


## Exporting Corpus

In [None]:
# Exporting Corpus

save_path = '/content/drive/MyDrive/corpus.json'

with open(save_path, "w") as f:
    json.dump(corpus, f, ensure_ascii=False, indent=2)

print(f"Corpus saved to {save_path}")

Corpus saved to /content/drive/MyDrive/corpus.json
