In [None]:
# # Param
# test = "False"

In [None]:
dataset = "data"

In [None]:
from utils.data_io import join_path

log_path = 'log.txt'

def reset_file(path):
    with open(path, "w", encoding="utf-8") as f:
        pass

def add_line(path, content):
    with open(path, "a", encoding="utf-8") as f:
        f.write(content + "\n")

add_line(log_path, "")
add_line(log_path, "2. concept generating")

In [34]:
import os
print(os.getcwd())

/Users/dauduchieu/Documents/iSE2025/CBM


In [35]:
is_test = True
if test == "False" or test == False:
    is_test = False

In [37]:
from utils.data_loader import DataLoader

In [38]:
data_loader = DataLoader(dataset)

In [39]:
train_df = data_loader.get_data_train()

In [40]:
data_desc = data_loader.get_data_desc()
label_column = data_desc['label_column']
text_column = data_desc['text_column']

In [41]:
labels = train_df[label_column].unique()
labels

array(['general pathological conditions', 'neoplasms',
       'digestive system diseases', 'nervous system diseases',
       'cardiovascular diseases'], dtype=object)

In [42]:
if is_test:
    train_df = train_df.groupby(label_column).sample(50)

In [43]:
from abc import ABC, abstractmethod
class LLMCaller(ABC):
    @abstractmethod
    def structed_output(self, prompt:str, output_struct):
        pass

In [44]:
from time import time, sleep
from typing import List, Callable, TypeVar

T = TypeVar('T')

class GeminiRateLimiter:
    def __init__(self, requests_per_minute: int = 15):
        self.rpm = requests_per_minute
        self.times: List[float] = []

    def wait(self):
        now = time()
        self.times = [t for t in self.times if now - t <= 60]
        if len(self.times) >= self.rpm:
            sleep(60 - (now - self.times[0]))
            now = time()
            self.times = [t for t in self.times if now - t <= 60]

    def record(self):
        self.times.append(time())

    def execute(self, f: Callable[..., T], *args, **kwargs) -> T:
        self.wait()
        try:
            r = f(*args, **kwargs)
            self.record()
            return r
        except:
            self.record()
            raise

    def __call__(self, f: Callable[..., T]) -> Callable[..., T]:
        def wrapper(*args, **kwargs):
            return self.execute(f, *args, **kwargs)
        return wrapper

    def __enter__(self):
        self.wait()
        return self

    def __exit__(self, *args):
        self.record()

In [45]:
from google import genai

class GeminiAPICaller(LLMCaller):
    def __init__(self, api_key:str, api_model:str, api_rpm:int):
        self.client = genai.Client(api_key=api_key)
        self.api_model = api_model
        self.rate_limiter = GeminiRateLimiter(requests_per_minute=api_rpm)

    def structed_output(self, prompt:str, output_struct):
        with self.rate_limiter:
            response = self.client.models.generate_content(
                model=self.api_model,
                contents=prompt,
                config={
                    'response_mime_type': 'application/json',
                    'response_schema': output_struct,
                },
            )

        res = response.parsed
        return res

In [46]:
llm_api_config = data_loader.get_llm_config()

llm_caller = GeminiAPICaller(
    api_key=llm_api_config['api_key'],
    api_model=llm_api_config['model'],
    api_rpm=llm_api_config['rate_per_minute']
)

In [47]:
import numpy as np
import spacy
from collections import defaultdict
import math
from tqdm import tqdm

In [48]:
nlp = spacy.load("en_core_web_sm")

In [49]:
def extract_custom_candidates(text, use_pos=True, pos_list=None, use_ner=True, use_chunks=True):
    doc = nlp(text)
    candidates = []

    if use_pos:
        if pos_list is None:
            pos_list = ["NOUN", "PROPN", "ADJ"]
        for token in doc:
            if token.pos_ in pos_list and not token.is_stop and token.is_alpha:
                candidates.append(token.lemma_.lower())

    if use_ner:
        for ent in doc.ents:
            lemmatized_ent_tokens = [token.lemma_.lower() for token in nlp(ent.text)]
            candidates.append(" ".join(lemmatized_ent_tokens))

    if use_chunks:
        for chunk in doc.noun_chunks:
            lemmatized_chunk_tokens = [token.lemma_.lower() for token in chunk if not token.is_stop and token.is_alpha]
            chunk_text = " ".join(lemmatized_chunk_tokens).strip()
            if chunk_text:
                candidates.append(chunk_text)

    return candidates

In [50]:
def extract_keywords_with_df_ilf(df, text_column, label_column, top_n=15,
    use_pos=True, pos_list=None, use_ner=True, use_chunks=True, smooth=True,
    threshold=0.02
):
    df = df.copy()
    labels = df[label_column].unique()
    num_labels = len(labels)

    doc_freq_by_label = defaultdict(lambda: defaultdict(int))  # DF
    label_counts = defaultdict(int)

    for label in labels:
        texts = df[df[label_column] == label][text_column].astype(str).tolist()
        label_counts[label] = len(texts)

        for text in tqdm(texts, total=len(texts), desc=f"Label: {label}"):
            candidates = extract_custom_candidates(
                text, use_pos=use_pos, pos_list=pos_list,
                use_ner=use_ner, use_chunks=use_chunks
            )
            unique_terms = set(candidates)
            for term in unique_terms:
                doc_freq_by_label[label][term] += 1

    all_terms = set(term for label_terms in doc_freq_by_label.values() for term in label_terms)
    ilf_scores = defaultdict(float)

    # ILF
    for term in all_terms:
        label_occurrences = sum(
            1 for label in labels
            if (doc_freq_by_label[label][term] / label_counts[label]) > threshold
        )
        ilf_scores[term] = np.log(num_labels / label_occurrences) if label_occurrences > 0 else 0

    ranked_keywords_per_label = defaultdict(list)

    for label in labels:
        keyword_scores = {}
        for term, df_val in doc_freq_by_label[label].items():
            normalized_df = df_val / label_counts[label]
            df_score = np.log(1 + normalized_df) if smooth else normalized_df
            keyword_scores[term] = df_score * ilf_scores[term]
        sorted_keywords = sorted(keyword_scores.items(), key=lambda x: x[1], reverse=True)
        ranked_keywords_per_label[label] = [kw for kw, score in sorted_keywords[:top_n]]

    return ranked_keywords_per_label

In [None]:
add_line(log_path, "keyword extracting")

In [51]:
keywords = extract_keywords_with_df_ilf(
    df=train_df,
    text_column=text_column,
    label_column=label_column,
    top_n=10,
    use_pos=True,
    pos_list=["NOUN", "PROPN", "ADJ"],
    use_ner=False,
    use_chunks=True,
    threshold=0.02
)

Label: general pathological conditions: 100%|██████████| 2268/2268 [01:12<00:00, 31.46it/s]
Label: neoplasms: 100%|██████████| 1890/1890 [00:57<00:00, 32.87it/s]
Label: digestive system diseases: 100%|██████████| 682/682 [00:22<00:00, 30.42it/s]
Label: nervous system diseases: 100%|██████████| 926/926 [00:26<00:00, 35.19it/s]
Label: cardiovascular diseases: 100%|██████████| 1723/1723 [01:01<00:00, 27.90it/s]


In [52]:
keywords = dict(keywords)
print(keywords)

{'general pathological conditions': ['hemorrhage', 'cardiac', 'respiratory', 'inflammatory', 'post', 'graft', 'pregnancy', 'airway', 'unknown', 'growth'], 'neoplasms': ['carcinoma', 'metastasis', 'chemotherapy', 'cancer', 'breast', 'node', 'metastatic', 'malignant', 'malignancy', 'radiation'], 'digestive system diseases': ['bowel', 'gastrointestinal', 'biliary', 'cirrhosis', 'bile', 'endoscopic', 'esophageal', 'gallbladder', 'ulcer', 'intestinal'], 'nervous system diseases': ['seizure', 'spinal', 'neurologic', 'deficit', 'nerve', 'nervous', 'cord', 'headache', 'movement', 'cognitive'], 'cardiovascular diseases': ['coronary', 'ventricular', 'cardiac', 'myocardial', 'systolic', 'hypertensive', 'blood pressure', 'diastolic', 'hemodynamic', 'heart']}


In [53]:
def llm_filter_keyword_concepts(keyword_concepts, data_topic, n_concepts=5):
    """
    Filter keyword concepts for each label with additional context of keywords from other labels.
    
    Args:
        keyword_concepts (dict): Mapping from label name to a list of raw keywords.
        data_topic (str): The domain context (e.g., 'biomedical research abstracts').
        n_concepts (int): Number of keywords to keep per label.

    Returns:
        dict: Mapping from label name to list of filtered keywords.
    """
    filtered_keywords_by_label = {}
    all_labels = list(keyword_concepts.keys())

    for target_label in all_labels:
        target_keywords = keyword_concepts[target_label]
        keyword_list = ", ".join(target_keywords)

        # Add reference keywords from other labels
        competing_info = ""
        for other_label in all_labels:
            if other_label == target_label:
                continue
            other_keywords = keyword_concepts[other_label]
            other_kw_str = ", ".join(other_keywords)
            competing_info += f'- {other_label}: {other_kw_str}\n'

        # Construct prompt
        prompt = f"""
You are a domain expert working on the topic: "{data_topic}".

Your task is to select the top {n_concepts} most meaningful and representative terms for the category "{target_label}".

---

### CATEGORY:
"{target_label}"

### RAW EXTRACTED KEYWORDS:
{keyword_list}

---

### REFERENCE KEYWORDS FROM OTHER CATEGORIES:
(These are keywords from competing labels. Use them to ensure your selections are distinctive.)

{competing_info}

---

### INSTRUCTIONS:
- Select exactly {n_concepts} keywords from the provided list under RAW EXTRACTED KEYWORDS.
- Chosen terms should:
  - Be highly relevant and representative of the target category
  - Clearly distinguish the target category from others
  - Be specific, unambiguous, and non-generic

Avoid:
- Generic terms (e.g., "thing", "aspect", "problem")
- Redundant or overlapping words
- Keywords similar to those in other categories

Return your answer as a JSON object in the format:
{{ "{target_label}": ["keyword1", "keyword2", ...] }}
""".strip()

        output_struct = {
            "type": "object",
            "properties": {
                target_label: {
                    "type": "array",
                    "items": {
                        "type": "string"
                    }
                }
            },
            "required": [target_label]
        }

        result = llm_caller.structed_output(prompt=prompt, output_struct=output_struct)
        filtered_keywords_by_label[target_label] = result.get(target_label, [])

    return filtered_keywords_by_label

In [54]:
keyword_concepts = llm_filter_keyword_concepts(keywords, data_desc['data_topic'], n_concepts=5)

In [55]:
keyword_concepts

{'general pathological conditions': ['hemorrhage',
  'inflammatory',
  'growth',
  'pregnancy',
  'airway'],
 'neoplasms': ['carcinoma',
  'metastasis',
  'chemotherapy',
  'cancer',
  'malignancy'],
 'digestive system diseases': ['gastrointestinal',
  'biliary',
  'cirrhosis',
  'esophageal',
  'ulcer'],
 'nervous system diseases': ['seizure',
  'neurologic',
  'nerve',
  'spinal cord',
  'cognitive'],
 'cardiovascular diseases': ['coronary',
  'ventricular',
  'myocardial',
  'hypertensive',
  'hemodynamic']}

In [56]:
def abstract_concept_prompt(
    data_topic,
    text_column,
    label_column,
    text_description,
    label_description,
    label_concepts,
    keyword_concepts,
    concepts_per_label=3
):
    label_concepts = [str(l) for l in label_concepts]
    label_concepts_str = "\n- " + "\n- ".join(label_concepts)

    keyword_list_str = ""
    for label, keywords in keyword_concepts.items():
        for kw in keywords:
            keyword_list_str += f"- Keyword: \"{kw}\", Label: \"{label}\"\n"

    total_labels = len(label_concepts)
    total_concepts = total_labels * concepts_per_label

    constraint_instruction = (
        f"- For each label, you must generate exactly {concepts_per_label} distinct and meaningful abstract concepts.\n"
        f"- The final list must contain exactly {total_concepts} abstract concept objects in total.\n"
        "- Each abstract concept object must include the \"label\" field indicating its corresponding label."
    )

    prompt = f"""
Data context:

- Data topic: {data_topic}
- Text column description: {text_description}
- Label column description: {label_description}

Input:

- List of label concepts in the dataset: {label_concepts_str}

- List of extracted keyword concepts, each associated with a label:
{keyword_list_str}

Important instructions:

- For each label, generate exactly {concepts_per_label} abstract concepts.
- That means the final JSON array must contain exactly {total_concepts} abstract concept objects.
- Do not exceed or go below the required number of concepts per label.
- A keyword can belong to multiple abstract concepts if semantically appropriate.
- Likewise, an abstract concept can contain multiple keywords.
- Abstract concepts must act as meaningful intermediate concepts between keywords and labels.
- Abstract concepts must be:
  - More specific than the label concept.
  - More general and meaningful than individual keywords.
  - They should never simply repeat or copy the label names.

{constraint_instruction}

Your task:

- Group the provided keywords into abstract, higher-level concepts.
- Each abstract concept object must:
  - Have a clear, concise, and meaningful "abstract_concept_name" relevant to the {data_topic} domain (cannot be identical to the label).
  - Include an optional short "description" (1-2 sentences).
  - Include a "keywords" list of related keywords.
  - Include the "label" field specifying the corresponding label.

Expected output:

- A valid JSON array.
- Each item in the array must be an object with:
  - "abstract_concept_name": string
  - "description": string (optional)
  - "keywords": array of strings
  - "label": one of the provided label concepts

Note: Only return valid JSON. Do not add explanations, comments, or extra text.
""".strip()

    return prompt

In [57]:
abstract_prompt = abstract_concept_prompt(
    data_topic=data_desc['data_topic'],
    text_column=data_desc['text_column'],
    label_column=data_desc['label_column'],
    text_description=data_desc['text_description'],
    label_description=data_desc['label_description'],
    label_concepts=labels,
    keyword_concepts=keyword_concepts,
    concepts_per_label=3
)

In [58]:
output_struct = {
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "abstract_concept_name": {"type": "string"},
            "description": {"type": "string"},
            "keywords": {
                "type": "array",
                "items": {"type": "string"}
            },
            "label": {"type": "string"}
        },
        "required": ["abstract_concept_name", "keywords", "label"]
    }
}

In [1]:
abstract_concepts = llm_caller.structed_output(
    prompt=abstract_prompt,
    output_struct=output_struct
)

abstract_concepts

NameError: name 'llm_caller' is not defined

In [None]:
len(abstract_concepts)

6

In [None]:
# from collections import Counter

# label_counts = Counter([item['label'] for item in abstract_concepts])
# print(label_counts)

# expected_count = 3
# for label in labels:
#     assert label_counts[label] == expected_count, f"Label {label} has wrong count!"

In [None]:
from utils.data_io import join_path, save_json

In [None]:
save_json(obj=keyword_concepts, dir=join_path(dataset, 'concepts'), file_name='keyword_concepts.json')

In [None]:
save_json(obj=abstract_concepts, dir=join_path(dataset, 'concepts'), file_name='abstract_concepts.json')

In [None]:
add_line(log_path, "concept gen done")