# config

In [2]:
max_article_count = 50
max_sentences_count = 10

In [3]:
import pandas as pd
import json
import os

# data

In [4]:
cache_path = "./caches/wikipedia_article_content_cache.json"

try:
    with open(cache_path, "r", encoding="utf-8") as f:
        wiki_cache = json.load(f)
except FileNotFoundError:
    wiki_cache = {}

In [4]:
new_cache_entries = {}

In [5]:
def load_search_cache(cache_path="./caches/search_result_cache.json"):
    """Loads search cache from JSON, converting string keys back to tuples."""

    if os.path.exists(cache_path):
        with open(cache_path, "r", encoding="utf-8") as f:
            try:
                raw_cache = json.load(f)
                # Convert stored string keys back to tuple format
                search_cache = {eval(key): value for key, value in raw_cache.items()}
            except (json.JSONDecodeError, SyntaxError):
                search_cache = {}  # Handle errors gracefully
    else:
        search_cache = {}  # Initialize an empty cache if the file doesn't exist

    return search_cache

# Load the cache at the beginning of your script
search_cache = load_search_cache()

In [6]:
with open("./caches/name_entities_cache.json", "r", encoding="utf-8") as f:
    text_name_entites_dict = json.load(f)

empty_sentence = []

fetch 20 samples of vast dataset

In [None]:
df = pd.read_json('./datasets/vast_new_dataset.json')
print("Total items in dataset:", len(df))

df_vast_test = df[
    df['source']
      .apply(lambda src_list: any(src.get('file') == 'vast_test.csv' for src in src_list))
].reset_index(drop=True)

print("Filtered items (dataset == 'vast test'):", len(df_vast_test))

sample_df = df_vast_test

In [None]:
df = pd.read_json('./datasets/semeval_dataset.json')
print("Total items in dataset:", len(df))

df_vast_test = df[
    df['source']
      .apply(lambda src_list: any(src.get('file') == 'SemEval-2016-Test.csv' for src in src_list))
].reset_index(drop=True)

print("Filtered items (dataset == 'semeval test'):", len(df_vast_test))

sample_df = df_vast_test

In [7]:
df = pd.read_json('./datasets/pstance.json')
print("Total items in dataset:", len(df))

df_vast_test = df[
    df['source']
      .apply(lambda src_list: any('test' in src.get('file', '').lower() for src in src_list))
].reset_index(drop=True)

print("Filtered items (dataset == 'pstance test'):", len(df_vast_test))

sample_df = df_vast_test

Total items in dataset: 21572
Filtered items (dataset == 'pstance test'): 2157


# Knowledge Enrichment

## search topics in wikipedia

In [8]:
import wikipedia
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


In [9]:
import os

def store_in_cache(title, page=None):
    data = {
        "content": getattr(page, "content", "") or "",
        "summary": getattr(page, "summary", "") or ""
    }
    # Single update for both caches
    new_cache_entries[title] = data

def flush_cache():
    if not new_cache_entries:
        return  # nothing to do

    # Merge and write
    wiki_cache.update(new_cache_entries)
    with open(cache_path, "w", encoding="utf-8") as f:
        json.dump(wiki_cache, f, ensure_ascii=False, indent=2)

    # Clear pending entries
    new_cache_entries.clear()


def flush_name_entity_cache(new_entries, cache_path="./caches/name_entities_cache.json"):
    # Load existing cache if it exists
    if os.path.exists(cache_path):
        with open(cache_path, "r", encoding="utf-8") as f:
            try:
                existing_cache = json.load(f)
            except json.JSONDecodeError:
                existing_cache = {}
    else:
        existing_cache = {}

    # Merge new entries (without overwriting existing ones unless updated)
    existing_cache.update(new_entries)

    # Save the updated cache
    with open(cache_path, "w", encoding="utf-8") as f:
        json.dump(existing_cache, f, ensure_ascii=False, indent=2)


def update_search_cache(search_cache, cache_path="./caches/search_result_cache.json"):
    """Loads existing cache, converts tuple keys to strings, merges new entries, and saves the updated cache."""

    # Load existing cache if the file exists
    if os.path.exists(cache_path):
        with open(cache_path, "r", encoding="utf-8") as f:
            try:
                existing_cache = json.load(f)
            except json.JSONDecodeError:
                existing_cache = {}  # Handle corrupted JSON gracefully
    else:
        existing_cache = {}  # Initialize an empty cache if no prior data exists

    # Convert tuple keys in search_cache to strings for JSON compatibility
    formatted_cache = {str(key): value for key, value in search_cache.items()}

    # Merge new entries without overwriting unless updated
    existing_cache.update(formatted_cache)

    # Save the updated cache back to the file
    with open(cache_path, "w", encoding="utf-8") as f:
        json.dump(existing_cache, f, ensure_ascii=False, indent=2)

    # Clear search_cache to prevent duplicate processing
    search_cache.clear()

In [10]:
def safe_fetch_page(title, retries: int = 1):
    """
    Try to fetch wikipedia.page(title).  
    - On DisambiguationError: auto‑fallback to first non‑cached option.
    - On JSON decode / network error: retry once then give up.
    Returns:
      WikipediaPage on success,
      None on permanent failure.
    """
    try:
        time.sleep(0.1)
        content = wikipedia.page(title, auto_suggest=False)
        time.sleep(0.1)
        return content
    except wikipedia.DisambiguationError as e:
        opts = e.options[:10]
        new_cache_entries[title] = {'disambiguation': opts}
        for opt in opts:
            if opt.lower() != title.lower():
                try:
                    time.sleep(0.1)
                    opt_content = wikipedia.page(opt, auto_suggest=False)
                    time.sleep(0.1)
                    return opt_content
                except Exception:
                    continue
        return None
    except Exception as e:
        # retry once on transient errors
        if retries > 0 and "Expecting value" in str(e):
            time.sleep(1)
            return safe_fetch_page(title, retries - 1)
        return None

In [11]:
def fetch_page_content(wiki_title, search_results=None):
    """
    Return the page content for wiki_title, using the local cache if available.
    - If wiki_title in cache with 'content', returns that.
    - If wiki_title in cache with 'disambiguation', picks a fallback and returns its content.
    - Otherwise, returns empty string (no live fetch).
    """
    # 1) Direct content hit
    entry = wiki_cache.get(wiki_title) or new_cache_entries.get(wiki_title)

    if isinstance(entry, dict) and "content" in entry:
        return entry["content"] or ""

    # 2) Disambiguation hit
    if isinstance(entry, dict) and "disambiguation" in entry:
        # list of fallback titles
        opts = entry["disambiguation"]
        # pick first that is not in search_results
        for opt in opts:
            if not search_results or opt not in search_results:
                # check cache for the fallback page
                fb_entry = wiki_cache.get(opt, {})
                if "content" in fb_entry:
                    return fb_entry["content"] or ""
        return ""  # none found

    page = safe_fetch_page(wiki_title)
    if page:
        # record new cache entry
        store_in_cache(wiki_title, page)
        return page.content or ""
    else:
        # record empty so we don't retry endlessly
        store_in_cache(wiki_title)
        return ""
    # 3) Not in cache → no content
    return ""

In [12]:
def process_and_fetch_page_content(wiki_title, search_results):        
    try:
        content = fetch_page_content(wiki_title, search_results)

        if content == "":
            return ""
        
        # Define a list of unwanted section headings
        unwanted_sections = [
            "External links",
            "Further reading",
            "Etymology",
            "Others",
            "See also",
            "Notes",
            "References",
            "Book references",
            "Journal references",
            "News references",
            "Web references"
        ]
        
        # Create a regex pattern that matches a section heading.
        # Using re.escape to be safe with any special characters.
        pattern = r"\n==\s*(" + "|".join(re.escape(section) for section in unwanted_sections) + r")\s*=="
        
        # Split the content at the first occurrence of any unwanted section
        cleaned_content = re.split(pattern, content, flags=re.IGNORECASE)[0].strip()

        cleaned_content = re.sub(
            r'(?m)^\s*=+\s*.+?\s*=+\s*$',
            '',
            cleaned_content
        )

        # 3) Collapse multiple blank lines into two
        cleaned_content = re.sub(r'\n{2,}', '\n\n', cleaned_content).strip()
        
        return cleaned_content
    
    except Exception as e:
        print(f"Error fetching Wikipedia page: '{wiki_title}': {e}")
        return ""

In [13]:
def fetch_wikipedia_articles_knowledge (topic, max_result=25):
    
    page_content_array = []
    try:
        cache_key = (topic, max_result)
        if cache_key in search_cache:
            search_results = search_cache[cache_key]
        else:
            search_results = wikipedia.search(topic, results=max_result)
            search_cache[cache_key] = search_results
            print(f"ℹ️ search results for: {topic}")

        if search_results:
            with ThreadPoolExecutor(max_workers=10) as executor:
                futures = {executor.submit(process_and_fetch_page_content, title, search_results): title for title in search_results}
                for future in as_completed(futures):
                    title = futures[future]
                    try:
                        page_text = future.result()
                        if page_text:
                            page_content_array.append(page_text)
                    except Exception as e:
                        print(f"⚠️  Error fetching '{title}': {e}")        
        else:
            print(f"❌ No results found for this topic. {topic}")

    except Exception as e:
        print(f"❌ Error fetching Wikipedia content for '{topic}': {e}")
        return page_content_array

    return page_content_array

## find name entities from texts

In [None]:
from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline

model_name = "dbmdz/bert-large-cased-finetuned-conll03-english"  # You can change this to other models
tokenizer_name = "dbmdz/bert-large-cased-finetuned-conll03-english"

# Load the NER model and tokenizer
model = AutoModelForTokenClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
ner_pipeline = pipeline(task="ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple", grouped_entities=True)

In [None]:
def find_text_name_entities(text):
    ner_results = ner_pipeline(text)
    seen = {}
    for entity in ner_results:
        word = entity["word"].lower()
        group = entity["entity_group"]
        if word not in seen:
            seen[word] = group
    return list(seen.items())


## use GPT as name entity extractor

In [14]:
import openai
import time

In [None]:
client = openai.OpenAI(
    api_key='sss'  # replace with your API key
)

In [16]:
import re
import time

def old_extract_entities(text):
    prompt = f"""Identify and tag named entities in the following text with sequential entity tags. 
Use the format <e1>Entity1</e1>, <e2>Entity2</e2>, <e3>Entity3</e3>, etc. 
Each entity should have a distinct tag (e1, e2, e3, etc.).

Text:
"{text}"

Output only the modified text with tagged entities."""
    
    response = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[
            {"role": "system", "content": "You are an expert name entity extractor."},
            {"role": "user", "content": prompt}
        ],
        temperature=0
    )

    try:
        tagged_text = response.choices[0].message.content.strip()
        # extract and lowercase all entity contents inside <e*>...</e*>
        entities = [e.lower() for e in re.findall(r'<e\d+>(.*?)</e\d+>', tagged_text)]
    except Exception as e:
        print(f"Error parsing response: {e}")
        entities = []

    time.sleep(0.02)
    return entities


In [17]:
import ast
def extract_entities(text):
    prompt = f"""
You are an expert in text understanding and entity extraction.

From the following input text, identify **up to 10 of the most important conceptual or named entities** based on the **overall meaning and subject of the text**. This includes people, places, laws, institutions, technologies, or abstract concepts if they are key to the topic.

- Do NOT list trivial or generic words.
- Focus on what drives the message or argument of the text.
- Return ONLY the list of entities as an array of strings.
- Each string should contain just the entity (no tags, no explanations).
- Keep the number of entities between 3 and 10 depending on relevance.

Text:
\"\"\"{text}\"\"\"
"""

    response = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[
            {"role": "system", "content": "You are an expert in concept-level entity extraction from texts."},
            {"role": "user", "content": prompt}
        ],
        temperature=0
    )
    try:
        entity_text = response.choices[0].message.content.strip()
        entities = ast.literal_eval(entity_text)
        entities = [e.lower() for e in entities]
    except Exception as e:
        print(f"Error parsing response: {e}")
        entities = "error"
    time.sleep(0.02)
    return entities


## filter paragraphs contains name entities

In [18]:
from nltk import download, sent_tokenize
from nltk.tokenize import word_tokenize

download('punkt')
download('punkt_tab')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\alire\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\alire\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [19]:
import re

def filter_sentences_by_entities(
    sentences, 
    named_entities, 
    target, 
    max_sentences=20, 
    minimum_mached_entity_count=1,
    min_entity_coverage=0  # New: ratio of named entities required
):
    scored_sentences = []

    # Pre‑compile a regex to strip the target as a whole word
    target_pattern = re.compile(rf"\b{re.escape(target.lower())}")

    # Filter and compile entity patterns, skipping 1-character entities
    entity_patterns = {
        entity.lower(): re.compile(rf"\b{re.escape(entity.lower())}")
        for entity in named_entities
        if len(entity.strip()) > 1
    }

    total_entity_count = len(entity_patterns)

    for sent in sentences:
        sent_lower = sent.lower()

        # 1. Remove the target entirely
        sent_no_target = target_pattern.sub("", sent_lower)

        # 2. Score entities in the cleaned text
        entity_score = 0
        matched_entity_count = 0
        for entity, pat in entity_patterns.items():
            matches = pat.findall(sent_no_target)
            entity_score += len(matches) * len(entity)
            if matches:
                matched_entity_count += 1

        if entity_score == 0:
            continue  # skip sentences with no entity hits

        if matched_entity_count < minimum_mached_entity_count:
            continue

        if total_entity_count > 0:
            coverage_ratio = matched_entity_count / total_entity_count
            if coverage_ratio < min_entity_coverage:
                continue

        # 3. Normalize by sentence length (post‑removal)
        length = len(sent_no_target) or 1
        score = entity_score / length

        scored_sentences.append((sent, score))

    # 4. Return the top‑scoring sentences
    scored_sentences.sort(key=lambda x: x[1], reverse=True)
    return [sent for sent, _ in scored_sentences[:max_sentences]]


In [20]:
import re
from nltk.tokenize import sent_tokenize

def sentence_extraction(target, content_array):
    if not target or not isinstance(target, str):
        return []

    # Compile a case-insensitive, whole-word pattern for the target
    pattern = re.compile(rf"\b{re.escape(target)}", flags=re.IGNORECASE)

    sentences = []
    for content in content_array:
        if not content or not isinstance(content, str):
            continue
        for sent in sent_tokenize(content):
            # If the pattern matches anywhere in the sentence, keep it
            if pattern.search(sent):
                sentences.append(sent)
    return sentences


## flow function

In [21]:
import ast


def knowledge_enrichment(text, target, config):
    related_page_contents = fetch_wikipedia_articles_knowledge(
        target, config.get('max_article_count', 25))

    # text_name_entities = find_text_name_entities(text)
    text_name_entities_str = text_name_entites_dict.get(text, [])

    if isinstance(text_name_entities_str, str) and text_name_entities_str.startswith("["):
        try:
            text_name_entities = json.loads(text_name_entities_str)
            print(f"json load {text_name_entities_str}")
        except json.JSONDecodeError:
            print(f"⚠️ Error decoding JSON for: {text_name_entities_str}")
            text_name_entities = []
    else:
        text_name_entities = text_name_entities_str  # It's already a list

    if not text_name_entities:
        print(f"extracted {text_name_entities_str}")
        text_name_entities = old_extract_entities(text)

    text_name_entities = list(set(text_name_entities))  # Remove duplicates
    text_name_entites_dict[text] = text_name_entities

    content_sentences = sentence_extraction(target, related_page_contents)
    knowledge_sentences_array = filter_sentences_by_entities(
        content_sentences,
          text_name_entities,
            target,
              config.get('max_sentences_count', 10),
                config.get('minimum_entity_count', 1),
                  config.get('min_entity_coverage', 0))

    # print(len(content_sentences), len(knowledge_sentences_array))
    # print(text_name_entities)
    # if len(knowledge_sentences_array) < 10:
    #     empty_sentence.append((content_sentences, text_name_entities))

    return knowledge_sentences_array, text_name_entities

In [22]:
import ast
import json


def new_knowledge_enrichment(text, target, config):
    # مرحله اول: گرفتن محتوای ویکی‌پدیا برای تارگت به صورت لیست
    related_page_contents = fetch_wikipedia_articles_knowledge(
        target, config.get('max_article_count', 25)
    )

    # تلاش برای دریافت موجودیت‌ها از کش یا تبدیل JSON
    text_name_entities_str = text_name_entites_dict.get(text, [])

    if isinstance(text_name_entities_str, str) and text_name_entities_str.startswith("["):
        try:
            text_name_entities = json.loads(text_name_entities_str)
            print(f"✅ JSON load success: {text_name_entities_str}")
        except json.JSONDecodeError:
            print(f"⚠️ Error decoding JSON for: {text_name_entities_str}")
            text_name_entities = []
    else:
        text_name_entities = text_name_entities_str

    # اگر چیزی نبود، استخراج مستقیم
    if not text_name_entities:
        print(f"🔍 Extracting entities from raw text.")
        text_name_entities = old_extract_entities(text)

    text_name_entities = list(set(text_name_entities))  # حذف تکراری‌ها
    text_name_entites_dict[text] = text_name_entities   # ذخیره در کش

    # مرحله دوم: واکشی ویکی‌پدیا برای هر موجودیت و اضافه کردن به آرایه
    for entity in text_name_entities:
        try:
            entity_contents = fetch_wikipedia_articles_knowledge(
                entity, config.get('max_article_count', 25)
            )
            if isinstance(entity_contents, list):
                related_page_contents.extend(entity_contents)
            else:
                print(f"⚠️ Skipping entity '{entity}' – result is not a list")
        except Exception as e:
            print(f"⚠️ Failed to fetch content for entity '{entity}': {e}")

    # مرحله سوم: استخراج و فیلتر جملات دانش
    content_sentences = sentence_extraction(target, related_page_contents)
    knowledge_sentences_array = filter_sentences_by_entities(
        content_sentences,
        text_name_entities,
        target,
        config.get('max_sentences_count', 10),
        config.get('minimum_entity_count', 1),
        config.get('min_entity_coverage', 0)
    )

    return knowledge_sentences_array, text_name_entities

# prompt tuning ModernBERT-large

In [None]:
from transformers import pipeline
model_name = "answerdotai/ModernBERT-large"
model_tokenizer = AutoTokenizer.from_pretrained(model_name)
fill_pipeline =  pipeline("fill-mask", model=model_name)

In [None]:
def predict_mask(query):
    predictions = fill_pipeline(query)
    results = []
    for p in predictions:
        token = p['token_str']
        score = p['score']
        results.append({'token_str': token, 'score': score})

    return results

In [None]:
def query_maker(text, target, knowledge):
    return (
        f"{model_tokenizer.cls_token} the stance of text: {text}\n"
        f"{model_tokenizer.sep_token} according to knowledge: {knowledge}\n"
        f"{model_tokenizer.sep_token} toward {target} is {model_tokenizer.mask_token}."
    )

In [None]:
def format_result_list(result_list):
    return "\n".join(f"- {r['token_str']} ({r['score']:.4f})" for r in result_list)

# use GPT as verbalizer

In [23]:
def prompt_llm(text, target):
    prompt = f'''The task is Stance Detection. Determine the stance of the input text towards the specified target. The stance can be one of: FAVOR, AGAINST, or NONE.

Input Text: {text}
Target: {target}'''
    response = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[
            {
                "role": "system",
                "content": "You are an expert in stance detection.output structure is [stance label]-[reason of choosing] stance label is one of: FAVOR, AGAINST, or NONE (uppercase)"
            },
            {"role": "user", "content": prompt}
        ],
        max_tokens=100,
        temperature=0
    )
    try:
        predicted_stance = response.choices[0].message.content.strip()
    except Exception as e:
        print(f"Error parsing response: {e}")
        predicted_stance = "Error-Error"
    time.sleep(0.02)
    return predicted_stance.upper(), prompt

In [24]:
def prompt_llm_knowledge(text, target, knowledge):
    prompt = f'''Your task is Stance Detection. Consider the sequential concepts which appear in the unput text and Determine the stance of the author of input text towards the specified target in general. Use the provided knowledge context related to the target and name entities in the input text as factual background to enhance reasoning and support your decision. The stance can be one of: FAVOR, AGAINST, or NONE.
Input Text: {text}
Target: {target}
Knowledge context: 
{knowledge}'''
    response = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[
            {
                "role": "system",
                "content": "You are an expert in stance detection.output structure is [stance label]-[reason of choosing] stance label is one of: FAVOR, AGAINST, or NONE (uppercase)"
            },
            {"role": "user", "content": prompt}
        ],
        max_tokens=100,
        temperature=0
    )
    try:
        predicted_stance = response.choices[0].message.content.strip()
    except Exception as e:
        print(f"Error parsing response: {e}")
        predicted_stance = "Error - Error"
    time.sleep(0.1)
    return predicted_stance.upper(), prompt

In [25]:
import re

def prompt_sentiment_for_sentences(knowledge_sentences):
    """
    Ask the LLM only for the sentiment of each sentence, returned as
    a comma‑separated list of labels (e.g. POSITIVE,NEGATIVE,NEUTRAL).
    """
    if not knowledge_sentences:
        return [], ""
    
    # Clean each sentence to be single-line
    cleaned_sentences = [re.sub(r'\s+', ' ', s).strip() for s in knowledge_sentences]

    # Join with line breaks: each sentence on a new line
    kn_list = "\n".join(f"#!#{s}#!#" for s in cleaned_sentences)
    
    prompt = f"""
You are an expert at sentiment analysis.

Below are knowledge sentences (factual context). Each sentence is between two #!#. For each one, assign exactly one sentiment label: POSITIVE, NEGATIVE, or NEUTRAL.

Sentences:
{kn_list}

Respond with exactly N labels separated by commas, where N is the number of sentences above.
Example (for 3 sentences): POSITIVE,NEGATIVE,NEUTRAL
"""

    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Output ONLY the comma‑separated labels, no JSON, no explanation."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=100,
        temperature=0.0
    )
    
    time.sleep(0.1)
    out = resp.choices[0].message.content.strip()
    
    return [lbl.strip().upper() for lbl in out.split(",") if lbl.strip()], prompt


In [26]:
def verbalize_with_llm(result):
    prompt = f'''
You are given predictions from a masked-language model for stance detection.

query template:
{query_maker("[TEXT]", "[TARGET]", "[KNOWLEDGE]")}

Top 5 mask predictions (with scores):
{format_result_list(result)}

Your task:
Choose exactly one of the following stance labels:
- FAVOR: the text supports or agrees with the target
- AGAINST: the text opposes or disagrees with the target
- NONE: the text expresses no clear stance, or is neutral
'''
    response = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[
            {
                "role": "system",
                "content": "You are an expert in mapping model predictions to a single stance label.output structure is [stance label]-[reason of choosing] stance label is one of: FAVOR, AGAINST, or NONE (uppercase)"
            },
            {"role": "user", "content": prompt}
        ],
        max_tokens=100,
        temperature=0
    )
    try:
        predicted_stance = response.choices[0].message.content.strip()
    except Exception as e:
        print(f"Error parsing response: {e}")
        predicted_stance = "Error-Error"
    time.sleep(0.02)
    return predicted_stance.upper(), prompt

# FLOW

In [None]:
# 2. Iterate over each sampled row
verbalization_logs = []
i = 0
for idx, row in sample_df.iterrows():  # pandas.DataFrame.iterrows
    text = row['text']
    target = row['target']

    # knowledge enrichment
    config = {'max_article_count': max_article_count, 'max_sentences_count': max_sentences_count}
    knowledge_array = knowledge_enrichment(
        text, target, config)
    knowledge = f" ".join(knowledge_array)

    # pridict stance
    query = query_maker(text=text, target=target, knowledge=knowledge)
    predictions = predict_mask(query)
    formatted_predictions = format_result_list(predictions)

    # verbalizer
    final_result, prompt = verbalize_with_llm(predictions)
    final_label = final_result.split("-")[0]
    final_reason = final_result.split("-")[1]
    # loging
    status = calculate_per_class_metrics(final_label, row["stance"])
    verbalization_logs.append({
        "text": text,
        "target": target,
        "knowledge": knowledge,
        "query": query,
        "predictions":formatted_predictions,
        "prompt": prompt.strip(),
        "final_label": final_label,
        "ai reason":final_reason,
        "correct_label": row["stance"],
        "FAVOR status":status["FAVOR"],
        "AGAINST status":status["AGAINST"],
        "NONE status":status["NONE"]
    })

    # 3. Print neatly

    if i % 10 == 0:
        print(i, "DONE")
    i = i + 1

In [None]:
# 2. VERSION 2
verbalization_logs = []
i = 0
start_time = time.perf_counter()
last_log_time = start_time 
for idx, row in sample_df.iterrows():  # pandas.DataFrame.iterrows
    text = row['text']
    target = row['target']

    # knowledge enrichment
    config = {'max_article_count': 50, 
              'max_sentences_count': 10, 
              'minimum_entity_count':1,
              'min_entity_coverage':0}
    knowledge_array, name_entites = new_knowledge_enrichment(
        text, target, config)
    # knowledge = f" ".join(knowledge_array)

    
    # sentiments, sent_prompt = prompt_sentiment_for_sentences(knowledge_array)
    # merged = []
    # try:
    #     for idx, sentiment in enumerate(sentiments):
    #         sent_text = knowledge_array[idx]
    #         merged.append(f"{idx+1}. {sent_text} – {sentiment}")
    #     merged_str = "; ".join(merged)
        
    #     sentiment_counts = {
    #     "positive_count": sentiments.count("POSITIVE"),
    #     "negative_count": sentiments.count("NEGATIVE"),
    #     "neutral_count": sentiments.count("NEUTRAL"),
    #     "total_sentences": len(knowledge_array)
    #     }

    # except Exception as e:
    #     print("exception: ", sentiments, knowledge_array)
    #     sentiment_counts = {
    #     "positive_count": 0,
    #     "negative_count": 0,
    #     "neutral_count": 0,
    #     "total_sentences": len(knowledge_array)
    #     }   
        

    # # verbalizer
    # final_result, prompt = prompt_llm_knowledge(text=text, target=target, knowledge=knowledge)
    # # final_result, prompt = prompt_llm(text=text, target=target)
    # final_label = final_result.split("-")[0]
    # final_reason = final_result.split("-")[1]
    
    # # loging
    # verbalization_logs.append({
    #     "text": text,
    #     "target": target,
    #     "name_entities":name_entites,
    #     "knowledge": knowledge,
    #     "sentence_sentiments": merged_str,
    #     **sentiment_counts,
    #     "prompt": prompt.strip(),
    #     "final_label": final_label.strip(),
    #     "ai reason":final_reason.strip(),
    #     "correct_label": row["stance"].strip(),
    # })

    # 3. Print neatly

    if i % 100 == 0:
        current_time = time.perf_counter()
        total_elapsed = current_time - start_time
        last_elapsed = current_time - last_log_time
        print(f"{i} DONE - Total time: {total_elapsed:.2f}s, Since last: {last_elapsed:.2f}s")
        last_log_time = current_time
    i = i + 1

ℹ️ search results for: Bernie Sanders
🔍 Extracting entities from raw text.
ℹ️ search results for: lafayette




  lis = BeautifulSoup(html).find_all('li')


ℹ️ search results for: bernies
ℹ️ search results for: indiana
0 DONE - Total time: 45.20s, Since last: 45.20s
🔍 Extracting entities from raw text.
ℹ️ search results for: progressivism
ℹ️ search results for: repub
ℹ️ search results for: elizabethwarren
ℹ️ search results for: jonathanchait
❌ No results found for this topic. jonathanchait
🔍 Extracting entities from raw text.
ℹ️ search results for: caucusgoers
ℹ️ search results for: iowa
ℹ️ search results for: berners
🔍 Extracting entities from raw text.
ℹ️ search results for: bernie sanders
ℹ️ search results for: demdebate2020
❌ No results found for this topic. demdebate2020
🔍 Extracting entities from raw text.
ℹ️ search results for: bern
ℹ️ search results for: has been trending
ℹ️ search results for: media bias fact checker
ℹ️ search results for: for hours
🔍 Extracting entities from raw text.
ℹ️ search results for: 11 y/o
ℹ️ search results for: integrity
🔍 Extracting entities from raw text.
ℹ️ search results for: sister
ℹ️ search results

# Save

In [None]:
import random
import json

In [None]:
flush_cache()
flush_name_entity_cache(text_name_entites_dict)
update_search_cache(search_cache)

In [None]:
verbalization_df = pd.DataFrame(verbalization_logs)

# Save to Excel
random_float = random.random()
try:
    name = f"verbalization_results_article_count_{config['max_article_count']}_sentences_count_{config['max_sentences_count']}_{random_float}.xlsx"
except NameError:
    name = f"verbalization_results_article_count_{random_float}.xlsx"
verbalization_df.to_excel(name, index=False)

print(f"Saved verbalization results to '{name}'")

# analyze

In [None]:
from sklearn.metrics import (
    confusion_matrix,
    ConfusionMatrixDisplay,
    accuracy_score,
    precision_recall_fscore_support
)
import matplotlib.pyplot as plt

In [None]:
# Extract true & predicted
y_true = verbalization_df['correct_label']
y_pred = verbalization_df['final_label']

# Choose label order
labels = ['FAVOR', 'AGAINST', 'NONE']

# Compute metrics
cm = confusion_matrix(y_true, y_pred, labels=labels)
acc = accuracy_score(y_true, y_pred)
prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, labels=labels, zero_division=0)
f1_weighted = precision_recall_fscore_support(y_true, y_pred, average='weighted', zero_division=0)[2]

# Plot confusion matrix
fig, ax = plt.subplots(figsize=(6, 6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(cmap='Blues', ax=ax, colorbar=True)
ax.set_title('Confusion Matrix (Blue)')

# Annotate all metrics
metrics_text = (
    f"Accuracy: {acc:.2%}\n\n"
    f"Label --- Prec - Recall - F1\n"
    f"FAVOR --- {prec[0]:.2%} - {rec[0]:.2%} - {f1[0]:.2%}\n"
    f"AGAINST - {prec[1]:.2%} - {rec[1]:.2%} - {f1[1]:.2%}\n"
    f"NONE ---- {prec[2]:.2%} - {rec[2]:.2%} - {f1[2]:.2%}\n\n"
    f"Weighted F1: {f1_weighted:.2%}"
)

# Position metrics below the plot
fig.text(0.01, -0.05, metrics_text, fontsize=11, va='top', family='monospace')

plt.tight_layout()
plt.show()


# word export

In [None]:
import pandas as pd
from docx import Document
from docx.shared import RGBColor
import json

In [None]:
import pandas as pd
from docx import Document
from docx.shared import RGBColor
import ast
import re

def export_random_examples_to_word(
    excel_path: str,
    sheet_name=0,
    columns: list = None,
    n: int = 10,
    output_docx: str = "sample_examples.docx",
    random_state: int = 42,
    condition=None,
    sort_by: str = None,
    ascending: bool = True,
    take_first: bool = False
):
    """
    Export n rows from an Excel sheet to a Word document,
    after optional filtering and sorting.

    Parameters
    ----------
    condition : str or callable, optional
        - If str: a pandas query string, e.g. "sentiment == 'NEGATIVE'".
        - If callable: function(df) -> boolean Series to filter rows.
    sort_by : str, optional
        Column name to sort by before selecting rows.
    ascending : bool, default True
        Sort order: True for ascending, False for descending.
    take_first : bool, default False
        If True, take the first n rows after filtering/sorting instead of sampling.
    """
    # Load Excel
    df = pd.read_excel(excel_path, sheet_name=sheet_name)
    if isinstance(df, dict):
        df = next(iter(df.values()))

    # Subset columns
    if columns is not None:
        missing = set(columns) - set(df.columns)
        if missing:
            raise ValueError(f"Columns not found: {missing}")
        df = df[columns]

    # Apply filter condition
    if condition is not None:
        if isinstance(condition, str):
            df = df.query(condition)
        elif callable(condition):
            mask = condition(df)
            if not isinstance(mask, pd.Series) or mask.dtype != bool:
                raise ValueError("Callable condition must return boolean Series.")
            df = df.loc[mask]
        else:
            raise ValueError("`condition` must be a query string or callable.")

    # Sort if requested
    if sort_by is not None:
        if sort_by not in df.columns:
            raise ValueError(f"sort_by column '{sort_by}' not found in DataFrame.")
        df = df.sort_values(by=sort_by, ascending=ascending)

    # Ensure enough rows
    if len(df) < n:
        raise ValueError(
            f"Not enough rows after filtering/sorting: {len(df)} available, {n} requested."
        )

    # Select rows: sample or take first
    if take_first:
        sample_df = df.head(n)
    else:
        sample_df = df.sample(n=n, random_state=random_state)

    # Build Word document
    doc = Document()
    title = f"{n} {'First' if take_first else 'Random'} Examples from {excel_path}"
    doc.add_heading(title, level=1)

    for idx, row in sample_df.iterrows():
        doc.add_heading(f"Example index {idx}", level=2)

        # Extract entities
        entities = []
        entity_lower = set()
        if 'name_entities' in row.index:
            try:
                ents = ast.literal_eval(row['name_entities'])
            except Exception:
                ents = []
            seen = set()
            for e in ents:
                e_strip = e.strip()
                if e_strip and e_strip.lower() not in seen:
                    seen.add(e_strip.lower())
                    entities.append(e_strip)
            entities = sorted(entities, key=len, reverse=True)
            entity_lower = set(e.lower() for e in entities)

        # Process each column
        for col in sample_df.columns:
            para = doc.add_paragraph()
            run = para.add_run(f"{col}: ")
            run.bold = True
            cell = str(row[col])

            if col == 'sentence_sentiments':
                parts = [cell]
                if entities:
                    pattern = re.compile(r'(' + '|'.join(map(re.escape, entities)) + r')', flags=re.IGNORECASE)
                    parts = re.split(pattern, cell)
                for part in parts:
                    if part.lower() in entity_lower:
                        r = para.add_run(part)
                        r.font.color.rgb = RGBColor(0x00, 0x00, 0xFF)
                    else:
                        for sp in re.split(r'(NEGATIVE|POSITIVE|NEUTRAL)', part, flags=re.IGNORECASE):
                            if sp.upper() in ['NEGATIVE','POSITIVE','NEUTRAL']:
                                col_map = {'NEGATIVE': RGBColor(0xFF,0x00,0x00),
                                           'POSITIVE': RGBColor(0x00,0xFF,0x00),
                                           'NEUTRAL': RGBColor(0xFF,0x80,0x00)}
                                r = para.add_run(sp)
                                r.font.color.rgb = col_map[sp.upper()]
                            else:
                                para.add_run(sp)

            elif col == 'text' and entities:
                pattern = re.compile(r'(' + '|'.join(map(re.escape, entities)) + r')', flags=re.IGNORECASE)
                for part in re.split(pattern, cell):
                    if part.lower() in entity_lower:
                        r = para.add_run(part)
                        r.font.color.rgb = RGBColor(0x00, 0x00, 0xFF)
                    else:
                        para.add_run(part)

            else:
                para.add_run(cell)

        doc.add_paragraph()

    # Save
    doc.save(output_docx)
    print(f"Wrote {n} {'first' if take_first else 'random'} examples to {output_docx}")


In [None]:
export_random_examples_to_word(
    excel_path=name,
    columns=["text", "target", "name_entities", "sentence_sentiments", "final_label", "correct_label", "ai reason"],
    n=50,
    output_docx="sample_stance_examples.docx"
)

# NOT

In [None]:
name = "vast_model_results_4_entity.xlsx"

export_random_examples_to_word(
    excel_path=name,
    sheet_name=0,
    columns=["text", "target" ,"sentence_sentiments","knowledge", "name_entities", "final_label", "correct_label", "ai reason","total_sentences"],
    n=10,
    take_first={True},
    # ascending=False,
    # sort_by="positive_count",
    condition="total_sentences != 0",
)

In [None]:
import json
import ast

def standardize_name_entities_cache(data):
    """
    Takes a dict where values are either proper lists or string representations of lists.
    Converts all values to proper Python lists.
    """
    standardized_data = {}

    for key, value in data.items():
        if isinstance(value, list):
            # Already a proper list
            standardized_data[key] = value
        elif isinstance(value, str):
            # Try to parse the string as a Python literal list (using ast.literal_eval)
            try:
                parsed_value = ast.literal_eval(value)
                if isinstance(parsed_value, list):
                    standardized_data[key] = parsed_value
                else:
                    print(f"Warning: Value for key '{key}' is a string but not a list after parsing.")
                    standardized_data[key] = []
            except (ValueError, SyntaxError):
                print(f"Error parsing string for key '{key}': {value}")
                standardized_data[key] = []
        else:
            print(f"Unexpected data type for key '{key}': {type(value)}")
            standardized_data[key] = []

    return standardized_data


# Example usage:
if __name__ == "__main__":
    # Load your JSON file (replace 'name_entities_cache.json' with your file)
    with open("./caches/name_entities_cache.json", "r", encoding="utf-8") as f:
        data = json.load(f)

    standardized_data = standardize_name_entities_cache(data)

    # Save the standardized data back to a new JSON file
    with open("name_entities_cache_standardized.json", "w", encoding="utf-8") as f:
        json.dump(standardized_data, f, indent=2, ensure_ascii=False)

    print("Standardization complete, saved to 'name_entities_cache_standardized.json'")


In [None]:
import pandas as pd
import json
import os

EXCEL_PATH = "entity llm - knowledge wiki contain target & name entity 50 10 - stance llm.xlsx"
JSON_CACHE_PATH = "name_entities_cache.json"

def save_or_load_name_entities():
    if os.path.exists(JSON_CACHE_PATH):
        print("🔁 Loading cached name entities...")
        with open(JSON_CACHE_PATH, "r", encoding="utf-8") as f:
            return json.load(f)

    print("📥 Reading Excel and extracting name entities...")
    df = pd.read_excel(EXCEL_PATH)

    if "text" not in df.columns or "name_entities" not in df.columns:
        raise ValueError("Excel must have 'text' and 'name_entities' columns")

    entity_dict = {
        str(row["text"]): row["name_entities"]
        for _, row in df.iterrows()
        if pd.notnull(row["text"]) and pd.notnull(row["name_entities"])
    }

    with open(JSON_CACHE_PATH, "w", encoding="utf-8") as f:
        json.dump(entity_dict, f, ensure_ascii=False, indent=2)

    return entity_dict

# Usage
entity_data = save_or_load_name_entities()


## fetch bad context record

In [None]:
import pandas as pd
from docx import Document

# Load the Excel files
df1 = pd.read_excel("new_dataset baseline stance llm.xlsx")
df2 = pd.read_excel("entity llm - knowledge wiki contain target & name entity 50 10 - stance llm.xlsx")

# Ensure both dataframes are aligned by a unique identifier (e.g., 'text' or index)
# You might need to adjust the merge key depending on your actual data
merged = df1.merge(df2, on=["text", "target"], suffixes=("_1", "_2"))

# Filter rows as per the condition
filtered = merged[
    (merged["final_label_1"] == merged["correct_label_1"]) & 
    (merged["final_label_2"] != merged["correct_label_2"])
]

# Save or view the result
print(filtered.columns)

doc = Document()
doc.add_heading("Filtered Examples", level=1)

# Add each matching row to the document
for _, row in filtered.iterrows():
    doc.add_paragraph(f"Text: {row['text']}")
    doc.add_paragraph(f"Target: {row['target']}")
    doc.add_paragraph(f"First File - Final Label: {row['final_label_1']} | Correct Label: {row['correct_label_1']}")
    doc.add_paragraph(f"Second File - Final Label: {row['final_label_2']} | Correct Label: {row['correct_label_2']}")
    doc.add_paragraph(f"Second File - sentiment: {row['sentence_sentiments']}")
    doc.add_paragraph(f"first File - ai: {row['ai reason_1']}")
    doc.add_paragraph(f"Second File - ai: {row['ai reason_2']}")
    doc.add_paragraph("")  # Blank line for spacing

# Save the document
doc.save("filtered_examples.docx")

## analyse df

In [None]:
from sklearn.metrics import (
    confusion_matrix,
    ConfusionMatrixDisplay,
    accuracy_score,
    precision_recall_fscore_support
)
import matplotlib.pyplot as plt

In [None]:
import pandas as pd
from sklearn.metrics import confusion_matrix, accuracy_score, precision_recall_fscore_support, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Load data
verbalization_df = pd.read_excel("vast_model_results_60%_entity.xlsx")

# Filter only rows where total_sentences != 0
filtered_df = verbalization_df

# Get labels
y_true = filtered_df['correct_label']
y_pred = filtered_df['final_label']

# Choose label order
labels = ['FAVOR', 'AGAINST', 'NONE']

# Compute metrics
cm = confusion_matrix(y_true, y_pred, labels=labels)
acc = accuracy_score(y_true, y_pred)
prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, labels=labels, zero_division=0)
f1_weighted = precision_recall_fscore_support(y_true, y_pred, average='weighted', zero_division=0)[2]

# Plot confusion matrix
fig, ax = plt.subplots(figsize=(6, 6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(cmap='Blues', ax=ax, colorbar=True)
ax.set_title('Confusion Matrix (Blue)\n(Filtered: total_sentences ≠ 0)')

# Annotate all metrics
metrics_text = (
    f"Accuracy: {acc:.2%}\n\n"
    f"Label --- Prec - Recall - F1\n"
    f"FAVOR --- {prec[0]:.2%} - {rec[0]:.2%} - {f1[0]:.2%}\n"
    f"AGAINST - {prec[1]:.2%} - {rec[1]:.2%} - {f1[1]:.2%}\n"
    f"NONE ---- {prec[2]:.2%} - {rec[2]:.2%} - {f1[2]:.2%}\n\n"
    f"Weighted F1: {f1_weighted:.2%}"
)

# Position metrics below the plot
fig.text(0.01, -0.05, metrics_text, fontsize=11, va='top', family='monospace')

plt.tight_layout()
plt.show()


In [None]:
import pandas as pd
from sklearn.metrics import confusion_matrix, accuracy_score, precision_recall_fscore_support, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Load data
verbalization_df = pd.read_excel("vast_model_results_60%_entity.xlsx")
baseline_df = pd.read_excel("vast_baseline.xlsx")

# Filter only rows where total_sentences != 0
filtered_df = verbalization_df[verbalization_df['total_sentences'] != 0]
filtered_indices = filtered_df.index

# Get labels for model results
y_true = filtered_df['correct_label']
y_pred = filtered_df['final_label']

# Get baseline predictions for same indices
baseline_pred = baseline_df.loc[filtered_indices, 'final_label']

# Choose label order
labels = ['FAVOR', 'AGAINST', 'NONE']

# Compute metrics for model
cm_model = confusion_matrix(y_true, y_pred, labels=labels)
acc_model = accuracy_score(y_true, y_pred)
prec_model, rec_model, f1_model, _ = precision_recall_fscore_support(y_true, y_pred, labels=labels, zero_division=0)
f1_weighted_model = precision_recall_fscore_support(y_true, y_pred, average='weighted', zero_division=0)[2]

# Compute metrics for baseline
cm_baseline = confusion_matrix(y_true, baseline_pred, labels=labels)
acc_baseline = accuracy_score(y_true, baseline_pred)
prec_base, rec_base, f1_base, _ = precision_recall_fscore_support(y_true, baseline_pred, labels=labels, zero_division=0)
f1_weighted_base = precision_recall_fscore_support(y_true, baseline_pred, average='weighted', zero_division=0)[2]

# Plot confusion matrix for model
fig, ax = plt.subplots(1, 2, figsize=(12, 6))

disp_model = ConfusionMatrixDisplay(confusion_matrix=cm_model, display_labels=labels)
disp_model.plot(cmap='Blues', ax=ax[0], colorbar=False)
ax[0].set_title('Model Confusion Matrix\n(Filtered: total_sentences ≠ 0)')

disp_base = ConfusionMatrixDisplay(confusion_matrix=cm_baseline, display_labels=labels)
disp_base.plot(cmap='Oranges', ax=ax[1], colorbar=False)
ax[1].set_title('Baseline Confusion Matrix\n(Same Filtered Rows)')

# Annotate all metrics
sample_count = len(y_true)

# Annotate all metrics
metrics_text = (
    f"Samples used for evaluation: {sample_count}\n\n"
    f"=== MODEL ===\n"
    f"Accuracy: {acc_model:.2%}\n"
    f"Weighted F1: {f1_weighted_model:.2%}\n"
    f"FAVOR   - P: {prec_model[0]:.2%}, R: {rec_model[0]:.2%}, F1: {f1_model[0]:.2%}\n"
    f"AGAINST - P: {prec_model[1]:.2%}, R: {rec_model[1]:.2%}, F1: {f1_model[1]:.2%}\n"
    f"NONE    - P: {prec_model[2]:.2%}, R: {rec_model[2]:.2%}, F1: {f1_model[2]:.2%}\n\n"
    f"=== BASELINE ===\n"
    f"Accuracy: {acc_baseline:.2%}\n"
    f"Weighted F1: {f1_weighted_base:.2%}\n"
    f"FAVOR   - P: {prec_base[0]:.2%}, R: {rec_base[0]:.2%}, F1: {f1_base[0]:.2%}\n"
    f"AGAINST - P: {prec_base[1]:.2%}, R: {rec_base[1]:.2%}, F1: {f1_base[1]:.2%}\n"
    f"NONE    - P: {prec_base[2]:.2%}, R: {rec_base[2]:.2%}, F1: {f1_base[2]:.2%}"
)

# Position metrics below the plot
fig.text(0.01, -0.1, metrics_text, fontsize=10, va='top', family='monospace')

plt.tight_layout()
plt.show()


# change

## save cache

In [None]:
import time
import wikipedia
from concurrent.futures import ThreadPoolExecutor, as_completed

In [None]:
article_content_result = {}
failed_titles = set()

In [None]:
def store_page_content(title, page):
    """
    Store the content & summary of a fetched page (or blanks) into the global result.
    """
    content = getattr(page, 'content', '') or ''
    summary = getattr(page, 'summary', '') or ''
    article_content_result[title] = {'content': content, 'summary': summary}

In [None]:
def safe_fetch_page_NOT(title, retries: int = 1):
    """
    Safely fetch a WikipediaPage for `title`, retrying on transient errors.
    Returns a WikipediaPage, raises DisambiguationError, or returns None on permanent failure.
    """
    try:
        return wikipedia.page(title=title, auto_suggest=False)

    except wikipedia.DisambiguationError:
        # propagate to caller for special handling
        raise

    except ValueError as e:
        # JSON decode / empty response error -> retry once
        if retries > 0 and "Expecting value" in str(e):
            time.sleep(1)
            return safe_fetch_page_NOT(title, retries - 1)
        # otherwise mark as failed
        failed_titles.add(title)
        return None

    except Exception:
        # any other error, mark and skip
        failed_titles.add(title)
        return None

In [None]:
def fetch_all_page_content(title, max_options: int = 5):
    """
    Fetch content & summary for each title in `titles`.

    - Skips titles already fetched or known to fail.
    - On DisambiguationError, records up to `max_options` options and fetches each.
    - Returns the article_content_result dict.
    """
    if title in article_content_result or title in failed_titles:
        return
    try:
        page = safe_fetch_page_NOT(title)
        if page:
            store_page_content(title, page)
        else:
            # fetch returned None
            article_content_result[title] = {'content': '', 'summary': ''}

    except wikipedia.DisambiguationError as e:
        # record limited disambiguation options
        opts = e.options[:max_options]
        article_content_result[title] = {'disambiguation': opts}
        # fetch each fallback option
        for opt in opts:
            if opt in article_content_result or opt in failed_titles:
                continue
            try:
                fallback_page = safe_fetch_page_NOT(opt)
                if fallback_page:
                    store_page_content(opt, fallback_page)
                else:
                    article_content_result[opt] = {'content': '', 'summary': ''}
            except wikipedia.DisambiguationError:
                # do not recurse further
                article_content_result[opt] = {'content': '', 'summary': ''}


In [None]:
def fetch_wikipedia_articles_knowledge (topic, max_result=25):
    try:
        search_results = wikipedia.search(topic, results=max_result)
        print(search_results)
        if search_results:
            with ThreadPoolExecutor(max_workers=25) as executor:
                futures = {executor.submit(fetch_all_page_content, title, search_results): title for title in search_results}
                for future in as_completed(futures):
                    continue
                    # title = futures[future]
                    # try:
                    #     page_text = future.result()
                    #     if page_text:
                    #         page_content_array.append(page_text)
                    # except Exception as e:
                    #     print(f"⚠️  Error fetching '{title}': {e}")            
            # for result in search_results:
            #     if result in article_content_result or result in failed_titles:
            #         continue
            #     fetch_all_page_content(result, search_results)   
        else:
            print(f"❌❌. {topic}")

    except Exception as e:
        print(f"search exception ❌ : {topic}", e)

In [None]:
i = 0
start_time = time.perf_counter()
for idx, row in sample_df.iterrows():  # pandas.DataFrame.iterrows
    text = row['text']
    target = row['target']

    # knowledge enrichment
    knowledge_array = fetch_wikipedia_articles_knowledge(target, 200)

    # 3. Print neatly
    if i % 50 == 0:
        print(i, "DONE")
    i = i + 1

In [None]:
# To save
import json
with open("wikipedia_article_content_cache.json", "w") as f:
    json.dump(article_content_result, f)        # writes text file
