In [5]:
import re
import os
import io
import sys
import requests
import tiktoken
import pandas as pd
from tqdm import tqdm
from openai import OpenAI
from dotenv import load_dotenv
from colorama import Fore, Style, init
from googleapiclient.discovery import build

load_dotenv()
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')
GOOGLE_CSE_ID = os.getenv('GOOGLE_CSE_ID')
API_KEY = os.getenv('OPENAI_API_KEY')

client = OpenAI(api_key=API_KEY)

# Initialize colorama
init()

class FactChecker:
    def __init__(self):
        self.messages = []
        self.tools_used = False
        self.model = "gpt-3.5-turbo"
        self.available_functions = {
            "call_google": self.call_google,
        }

    def set_model(self, model_name):
        """更改模型"""
        self.model = model_name
    
    def call_google(self, query, **kwargs):
        try:
            service = build(serviceName="customsearch",
                          version="v1",
                          developerKey=GOOGLE_API_KEY,
                          static_discovery=False)
            res = service.cse().list(q=query, cx=GOOGLE_CSE_ID, **kwargs).execute()
            res_items = res["items"]
            res_snippets = [r['snippet'] for r in res_items]
            self.tools_used = True
            return str(res_snippets)
        except Exception as e:
            print(Fore.RED + f"Error: {str(e)}")
            print(Style.RESET_ALL)
            return f"Error in Google search: {str(e)}"

    def extract_sub_questions(self, article):
        """抽取子問題的prompt"""
        prompt = f"""
        Please analyze the following article and break it down into 2-3 key factual claims 
        that need to be verified. Format them as numbered questions:
        
        Article: {article}
        
        Please list the sub-questions in this format:
        1. [First question]
        2. [Second question]
        3. [Third question]
        """
        
        messages = [
            {"role": "system", "content": "You are an expert at breaking down articles into key verifiable claims."},
            {"role": "user", "content": prompt}
        ]
        
        response = client.chat.completions.create(
            model=self.model,
            messages=messages
        )
        
        return response.choices[0].message.content
    
    def verify_sub_question(self, sub_question, article):
        """驗證單個子問題"""
        self.messages = []
        self.tools_used = False
        sys_prompt = """
        You are a fact-checking specialist. Verify the specific claim by:
        1. Thinking about what you already know and what you need to verify
        2. Use tool to find the information:
            - call_google: For general information and recent events
        3. Analyzing the search results
        4. Providing a conclusion
    
        You must use this format:
        Thought: [Your reasoning]
        Action: [call_google]: [search query]
    
        After receiving an observation:
        Thought: [Analysis combining your knowledge and the new information]
        Action: [Another search if needed]
        or
        Answer: [Your final conclusion]
    
        Available actions:
        call_google: [search term]
        """

        user_prompt = f"Sub-question: {sub_question}\nArticle context: {article}\n\nPlease verify this claim using both your knowledge and external sources when needed."
    
        self.messages = [
            {"role": "system", "content": sys_prompt},
            {"role": "user", "content": user_prompt}
        ]
    
        return self._run_verification_loop()

    def _run_verification_loop(self):
        action_re = re.compile('^Action: (\w+): (.*)$')
        answer_re = re.compile("Answer: ")
        count = 0
        last_answer = None  # 記錄最後一個有效的 Answer
        
        while True:
            # 修改 messages 以適應 token 限制
            self.messages = self.trim_messages_to_fit(self.messages, max_tokens=4096)

            response = client.chat.completions.create(
                model=self.model,
                messages=self.messages
            )
            
            response_msg = response.choices[0].message.content
            self.messages.append({"role": "assistant", "content": response_msg})

            # 嘗試提取 Answer
            extracted_answer = self._extract_answer(response_msg)
            if extracted_answer:
                last_answer = extracted_answer  # 更新最後提取到的有效 Answer
            
            # 檢查工具是否被使用並有 Answer
            if extracted_answer and self.tools_used:
                print(Fore.YELLOW + response_msg)
                print(Style.RESET_ALL)
                return extracted_answer

            # 如果有 Answer 但工具未被使用
            elif extracted_answer:
                print(Fore.YELLOW + "Answer found but tool has not been used. Searching tools now.")
                print(Style.RESET_ALL)

            print(Fore.GREEN + response_msg)
            print(Style.RESET_ALL)
            
            # 處理回應中的 Action
            actions = [action_re.match(a) for a in response_msg.split("\n") if action_re.match(a)]
            if actions:
                action, action_input = actions[0].groups()
                try:
                    print(Fore.CYAN + f" -- running {action} {action_input}")
                    print(Style.RESET_ALL)
                    
                    observation = self.available_functions[action](action_input)
                    self.tools_used = True
                    
                    if "No claims found" in observation:
                        self.tools_used = False

                    print(Fore.BLUE + f"Observation: {observation}")
                    print(Style.RESET_ALL)
                    self.messages.append({"role": "user", "content": "Observation: " + observation})
                
                except Exception as e:
                    print(Fore.RED + f"Error: {e}")
                    print(Style.RESET_ALL)
                    continue
            
            # 沒有動作被檢測到
            else:
                count += 1
                if count == 3:
                    print(Fore.RED + "No action is detected. The answer is Unknown.")
                    print(Style.RESET_ALL)
                    
                    # 如果有記錄到有效的 Answer
                    if last_answer:
                        print(Fore.GREEN + f"Returning last extracted Answer: {last_answer}")
                        print(Style.RESET_ALL)
                        return last_answer
                    return "Ignore this question"
                
                print(Fore.RED + "No action is detected. Continue...")
                self.tools_used = False
                continue

    def trim_messages_to_fit(self, messages, max_tokens=16385):
        """刪除前面的tokens以適應 token 限制"""
        encoding = tiktoken.encoding_for_model(self.model)

        # 如果第一條是 system prompt，暫時移除
        system_message = None
        if messages and messages[0]["role"] == "system":
            system_message = messages.pop(0)

        while self.count_tokens(messages) > max_tokens:
            # 刪除最早的tokens，直到符合限制
            messages.pop(0)

        # 如果有 system prompt，重新插入到最前面
        if system_message:
            messages.insert(0, system_message)

        return messages
    
    def count_tokens(self, messages, model=None):
        """計算 token 數"""
        model = self.model 
        encoding = tiktoken.encoding_for_model(model)
        total_tokens = 0
        for message in messages:
            for key, value in message.items():
                total_tokens += len(encoding.encode(value))
        return total_tokens
            
    def _extract_answer(self, response_msg):
        """從回應中提取答案"""
        lines = response_msg.split('\n')
        for line in lines:
            if 'Answer:' in line:
                try:
                    # 取 'Answer:' 後的內容
                    answer = line.split('Answer:')[1].strip()
                    print(f"Extracted Answer: {answer}")
                    return answer
                except IndexError as e:
                    print(f"Error extracting answer: {e}")
                    continue
        return None
    

    def _execute_action(self, action, action_input):
        """執行搜尋動作"""
        if action in self.available_functions:
            print(Fore.CYAN + f" -- running {action} {action_input}")
            print(Style.RESET_ALL)
            result = self.available_functions[action](action_input)
            print(Fore.BLUE + f"Observation: {result}")
            print(Style.RESET_ALL)
            return result
        else:
            print(Fore.RED + f"Unknown action: {action}")
            print(Style.RESET_ALL)
            return f"Unknown action: {action}"
                
    def combine_results(self, results, article):
        """綜合所有子問題的結果"""
        prompt = f"""
        Original article:
        {article}

        Based on the verification results of all sub-questions:
        {results}
    
        Please provide a final conclusion about whether the article contains misleading information.
        First provide your detailed reasoning, then end with either:
        Final Answer: 1 (if misleading)
        Final Answer: 0 (if accurate)
        """
    
        messages = [
            {"role": "system", "content": "You are a fact-checking expert providing final conclusions based on detailed analysis of multiple claims."},
            {"role": "user", "content": prompt}
        ]

        # 印出組好的 Prompt
        print("\n=== Debug: Prompt for LLM ===")
        print(prompt)
        print("=============================")
    
        response = client.chat.completions.create(
            model=self.model,
            messages=messages
        )

        return response.choices[0].message.content

    def verify_article(self, article):
        """主要驗證流程"""
        print(Fore.CYAN + "Extracting sub-questions..." + Style.RESET_ALL)
        sub_questions = self.extract_sub_questions(article)
        print(Fore.YELLOW + "Sub-questions extracted:\n" + sub_questions + Style.RESET_ALL)
        
        results = []
        for q in sub_questions.split("\n"):
            if q.strip() and q[0].isdigit():
                print(Fore.CYAN + f"\nVerifying: {q}" + Style.RESET_ALL)
                result = self.verify_sub_question(q, article)
                results.append(f"Question: {q}\nResult: {result}")
        
        print(Fore.CYAN + "\nCombining results..." + Style.RESET_ALL)
        final_conclusion = self.combine_results(results, article)
        print(Fore.YELLOW + "\nFinal conclusion:\n" + final_conclusion + Style.RESET_ALL)
        
        return final_conclusion
    
    def _clean_ansi_codes(self, text):
        """移除 ANSI 顏色代碼"""
        ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
        return ansi_escape.sub('', text)
    
    def process_article_batch(self, input_file, output_file, num_files=None):
        """處理整個Excel中的文章"""
        log_file = "fact_checking_process.log"
        
        try:
            with open(log_file, 'w') as f:
                try:
                    df = pd.read_excel(input_file)
                except UnicodeDecodeError:
                    df = pd.read_excel(input_file)
        
                if num_files is not None:
                    df_to_process = df.head(num_files)
                else:
                    df_to_process = df
            
                df['fact_check_response'] = ''
                df['Simplified Answer'] = ''

                df.to_excel(output_file, index=False)
                print("開始處理文章...")
        
                for idx, row in tqdm(df_to_process.iterrows(), total=len(df_to_process), desc="Processing articles"):
                    
                    # article = row['synthetic_misinformation']    # LLMFake 
                    article = row['text']   # mmsoc_gossipcop_text_labels-100 

                    output_capture = io.StringIO()
                    original_stdout = sys.stdout
                    sys.stdout = output_capture

                    try:
                        result = self.verify_article(article)
                    finally:
                        sys.stdout = original_stdout

                    detailed_output = output_capture.getvalue()
                    cleaned_output = self._clean_ansi_codes(detailed_output)    
                    simplified_answer = self._extract_simplified_answer(result)

                    f.write(detailed_output)
                    f.flush()

                    current_df = pd.read_excel(output_file)

                    current_df.at[idx, 'fact_check_response'] = cleaned_output
                    current_df.at[idx, 'Simplified Answer'] = simplified_answer
            
                    current_df.to_excel(output_file, index=False)

                print(f"\nAll articles processed. Results saved to {output_file}", file=original_stdout)
        
        except Exception as e:
            print(f"Error processing batch: {str(e)}", file=original_stdout)
            raise e
  
    def _extract_simplified_answer(self, response):
        """從回應中提取簡化的答案 (1 或 0)"""
        try:
            match = re.search(r"Final Answer:\s*(\d)", response)
            if match:
                return int(match.group(1))
        
            digits = re.findall(r'[01]', response)
            if digits:
                return int(digits[-1])
            else:
                return "Unknown"
        except Exception:
            return "Error"

    

### Analysis datasets

In [6]:
def main():
    input_file = "gossipcop-120.xlsx"  # if LLMFake file，revise above to synthetic_misinformation
    output_file = "react_child_task_google_gossipcop-120.xlsx"
    num_files_to_process = 120
    
    checker = FactChecker()
    checker.set_model("gpt-3.5-turbo")  # 設定使用的模型
    
    try:
        checker.process_article_batch(input_file, output_file, num_files=num_files_to_process)
        
    except Exception as e:
        print(Fore.RED + f"Error in main execution: {str(e)}" + Style.RESET_ALL)
        return "Error"

if __name__ == "__main__":
    main()

開始處理文章...


Sub-questions extracted:
1. Was Cher honored with the Billboard Icon Award at the 2017 Billboard Music Awards for her significant achievements in the music industry?
2. Did Cher perform her hits "Believe" and "If I Could Turn Back Time" at the 2017 Billboard Music Awards, receiving an enthusiastic response from the audience?
3. Did Cher deliver a speech after receiving the Billboard Icon Award, where she shared personal anecdotes and expressed gratitude, notably excluding any mention of Donald Trump?

Verifying: 1. Was Cher honored with the Billboard Icon Award at the 2017 Billboard Music Awards for her significant achievements in the music industry?
Thought: To verify if Cher was indeed honored with the Billboard Icon Award at the 2017 Billboard Music Awards, I can check official sources related to the event or reputable news articles reporting on the award recipients from that year. 
Action: call_google: 2017 Billboard Music Awards winners list 

 -- running call_google 2017 Billboar


All articles processed. Results saved to react_child_task_google_gossipcop-120.xlsx


### Analysis single article

In [None]:
checker = FactChecker()
checker.set_model("gpt-3.5-turbo") # 設定使用的模型
article = """Chris Wattie / ReutersCanada warns immigrants in US about fleeing north Canada officials want to make it clear for would-be immigrants looking to cross the US-Canada border that they should do so legally."""
result = checker.verify_article(article)

### Caculate Accuracy

In [7]:
import pandas as pd

file_name = "react_child_task_google_gossipcop-120"

data = pd.read_excel(f'{file_name}.xlsx')
data['Simplified Answer'] = data['Simplified Answer'].astype(str)

# 計算總體準確率（僅考慮 0 和 1）
valid_predictions = data[data['Simplified Answer'].isin(['0', '1'])]
overall_accuracy = (valid_predictions['label'] == valid_predictions['Simplified Answer'].astype(int)).mean()

# 計算 label=1（假新聞）的準確率
label_1_data = valid_predictions[valid_predictions['label'] == 1]
accuracy_label_1 = (label_1_data['label'] == label_1_data['Simplified Answer'].astype(int)).mean()
mistakes_label_1 = len(label_1_data) - (label_1_data['label'] == label_1_data['Simplified Answer'].astype(int)).sum()

# 計算 label=0（真新聞）的準確率
label_0_data = valid_predictions[valid_predictions['label'] == 0]
accuracy_label_0 = (label_0_data['label'] == label_0_data['Simplified Answer'].astype(int)).mean()
mistakes_label_0 = len(label_0_data) - (label_0_data['label'] == label_0_data['Simplified Answer'].astype(int)).sum()

# 統計 "Uncertain" 和 "error" 的情況
uncertain_count = len(data[data['Simplified Answer'] == 'Unknown'])
error_count = len(data[data['Simplified Answer'] == 'ERROR'])

# 計算f1 score
f1_score = 2 * (accuracy_label_1 * accuracy_label_0) / (accuracy_label_1 + accuracy_label_0)

print(f"檔案名稱: {file_name}.xlsx")
print(f"Accuracy: {overall_accuracy:.4f}")
print(f"F1 score: {f1_score:.4f}")
print(f"label=1（假新聞）的準確率: {accuracy_label_1:.2f}, 錯誤數量: {mistakes_label_1}")
print(f"label=0（真新聞）的準確率: {accuracy_label_0:.2f}, 錯誤數量: {mistakes_label_0}")
print(f"Uncertain 的數量: {uncertain_count}")
print(f"ERROR 的數量: {error_count}")


檔案名稱: react_child_task_google_gossipcop-120.xlsx
Accuracy: 0.6417
F1 score: 0.6000
label=1（假新聞）的準確率: 0.55, 錯誤數量: 9
label=0（真新聞）的準確率: 0.66, 錯誤數量: 34
Uncertain 的數量: 0
ERROR 的數量: 0
