# Political Chatbot Analysis: Canadian Bill Importance Evaluation

This notebook performs an analysis of Canadian bills using a **text-based** approach. The goal is to evaluate the importance of each bill using **LLMs** and divide the text into smaller chunks to assess their significance in deciding whether to pass the bill.

In [131]:
%pip install -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [132]:
import json
import csv
from datetime import datetime
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
import openai
from dotenv import load_dotenv
import os
import faiss
import numpy as np
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from langchain.output_parsers import EnumOutputParser
from enum import Enum
from typing import Optional, Union, Sequence, Any, List, Dict

# load the .env file
load_dotenv()
openai_api_key = os.getenv('OPENAI_API_KEY')
openai.api_key = openai_api_key

PASSED_BILL_ID = 'C-242'
REJECTED_BILL_ID = 'C-230'

# load the bill data
with open("detailed_bills_with_full_text.json", "r", encoding="utf-8") as file:
    bills_data = json.load(file)

passed_bill_text = [bill for bill in bills_data if bill['id'] == PASSED_BILL_ID][0]['full_text']
rejected_bill_text = [bill for bill in bills_data if bill['id'] == REJECTED_BILL_ID][0]['full_text']

# split bill into chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=3000,
    chunk_overlap=200,  
    length_function=len,  
    is_separator_regex=False,  
)

passed_bill_text_chunks = text_splitter.split_text(passed_bill_text)
rejected_bill_text_chunks = text_splitter.split_text(rejected_bill_text)

## TSV Logger

In [133]:
# Logging utilities
class StructuredLogger:
    """
    A logger that outputs to TSV files for structured logging of LLM interactions.
    """

    # Define all possible message types for documentation
    MESSAGE_TYPES = [
        "vectorstore_add",           # Adding a case to vectorstore
        "vectorstore_retrieve",      # Retrieving similar cases
        "vectorstore_summarize",     # Summarizing cached decisions
        "fast_prompt",               # Input prompt for fast LLM
        "fast_response",             # Fast LLM respnose
        "slow_prompt",               # Input prompt for slow LLM
        "slow_response",             # Slow LLM response
        
        # Legislation-specific messages
        "game_init",
    ]

    def __init__(self, tsv_filepath: str):
        """
        Initialize the logger with a base filepath.

        Args:
            tsv_filepath: Filepath for TSV log files (includes timestamp)
        """
        self.tsv_filepath = tsv_filepath

        # Create directory if it doesn't exist
        os.makedirs(os.path.dirname(self.tsv_filepath), exist_ok=True)

        # Initialize TSV file with headers if it doesn't exist
        if not os.path.exists(self.tsv_filepath):
            with open(self.tsv_filepath, "w", newline="", encoding="utf-8") as tsvfile:
                writer = csv.writer(tsvfile, delimiter="\t")
                writer.writerow(["timestamp", "message_type", "message"])

    def log(self, message_type: str, message_data: Union[str, Dict[str, Any], list[Any], BaseModel]) -> None:
        """
        Log a message to the TSV file.

        Args:
            message_type: Type of message (one of MESSAGE_TYPES)
            message_data: Data to be logged (will be converted to JSON if not already a string)
        """
        if message_type not in self.MESSAGE_TYPES:
            raise ValueError(
                f"Invalid message_type: {message_type}. Must be one of {self.MESSAGE_TYPES}"
            )

        # Convert message_data to string if it's not already
        if isinstance(message_data, BaseModel):
            message = json.dumps(message_data.model_dump(), ensure_ascii=False)
        elif isinstance(message_data, (dict, list)):
            message = json.dumps(message_data, ensure_ascii=False)
        else:
            message = str(message_data)

        timestamp = datetime.now().isoformat()

        # Append to TSV file
        with open(self.tsv_filepath, "a", newline="", encoding="utf-8") as tsvfile:
            writer = csv.writer(tsvfile, delimiter="\t", quotechar='`', quoting=csv.QUOTE_MINIMAL)
            writer.writerow([timestamp, message_type, message])

## Vectorstore Implementations

In [134]:
class ReflectiveVectorstoreMemory:
    """Reflective Vectorstore-based memory using FAISS and OpenAI embeddings."""
    
    def __init__(self, embedding_model: str = "text-embedding-ada-002"):
        """
        Initialize the vectorstore with FAISS and OpenAI embeddings.
        
        Args:
            embedding_model (str): The OpenAI embedding model to use.
        """
        self.embedding_model = embedding_model
        # Vector Store
        self.index = faiss.IndexFlatL2(1536)  # 1536 is the dimensionality of 'text-embedding-ada-002'
        self.analyses = []  # To store actual analyses (content)
        self.summaries = []  # To store summaries

    def _get_embedding(self, text: str) -> np.ndarray:
        """
        Generate an embedding for the given text using OpenAI.
        
        Args:
            text (str): The input text to embed.
        
        Returns:
            np.ndarray: The embedding vector as a NumPy array.
        """
        client = openai.OpenAI()

        response = client.embeddings.create(
            input=text,
            model=self.embedding_model
        )
        
        return np.array(response.data[0].embedding, dtype="float32")

    def add_analysis(self, analysis: str):
        """Add an analysis to the FAISS index."""
        self.analyses.append(analysis)
        
    def summarize_cycle(self, slow_mind_model_name: str, logger: StructuredLogger):
        """
        Generate a summary of the conversation after a day/night cycle,
        and add it to the vector store.
        
        Args:
            survivors (list): List of current alive players.
            secondary_model (str): Name of secondary model.
            cycle_type (str): The type of cycle ("day" or "night").
        """
        client = openai.OpenAI()

        # Combine all analyses since the last summary
        past_analyses = "\n".join(self.analyses)
    
        response = client.chat.completions.create(
            model=slow_mind_model_name,
            messages=[
                {"role": "system", "content": "\n".join([ 
                    f"You are a strategic policymaker, evaluating in-depth analyses of bill sections.",
                    f"Summarize the decisions and reflect on the most important implications for future strategic decisions."
                ])},
                {"role": "user", "content": past_analyses},
            ],
            max_tokens=4096
        )
    
        summary = response.choices[0].message.content

        # Embed the summary and add it to the vector store
        embedding = self._get_embedding(summary)
        self.index.add(np.array([embedding]))
        self.summaries.append(summary)

        # add summarized vectorstore entry to tsv logger
        logger.log("vectorstore_summarize", {
            "input_decisions": past_analyses, 
            "summary": summary
        })
        
        # Clear analyses for the next cycle
        self.analyses.clear()

    def get_relevant_summaries(self, query: str, top_k: int = 2) -> str:
        """Retrieve the top-k most relevant summaries.
        
        Args:
            query (str): query to find similar summaries to.
            top_k (int): number of relevant summaries to retrieve.
        """
        if not self.summaries:
            return []
        
        query_embedding = self._get_embedding(query)
        distances, indices = self.index.search(np.array([query_embedding]), top_k)
        results = [
            self.summaries[idx] for idx in indices[0] if idx < len(self.summaries)
        ]
        return results

class VectorstoreMemory:
    """Vectorstore-based memory using FAISS and OpenAI embeddings."""
    
    def __init__(self, embedding_model: str = "text-embedding-ada-002"):
        """
        Initialize the vectorstore with FAISS and OpenAI embeddings.
        
        Args:
            embedding_model (str): The OpenAI embedding model to use.
        """
        self.embedding_model = embedding_model
        self.index = faiss.IndexFlatL2(1536)  # 1536 is the dimensionality of 'text-embedding-ada-002'
        self.analyses = []

    def _get_embedding(self, text: str) -> np.ndarray:
        """
        Generate an embedding for the given text using OpenAI.
        
        Args:
            text (str): The input text to embed.
        
        Returns:
            np.ndarray: The embedding vector as a NumPy array.
        """
        client = openai.OpenAI()
        response = client.embeddings.create(
            input=text,
            model=self.embedding_model
        )
        return np.array(response.data[0].embedding, dtype="float32")

    def add_analysis(self, analysis: str):
        """Add an analysis to the FAISS index."""
        embeddings = [self._get_embedding(analysis)]
        self.index.add(np.array(embeddings)) 
        self.analyses.extend(analysis)

    def get_relevant_analyses(self, query: str, top_k: int = 2) -> list:
        """Retrieve the top-k most relevant analyses."""
        if not self.analyses:
            return []
        query_embedding = self._get_embedding(query)
        distances, indices = self.index.search(np.array([query_embedding]), top_k)

        results = [
            self.analyses[idx] for idx in indices[0] if idx < len(self.analyses)
        ]
        return results

## Prompt Templates

In [135]:
bill_chunk_prompt_template = """
You are a legislative assistant analyzing sections of a bill to determine its impact and importance in the overall decision to support or oppose the bill.

### **Task**
Analyze the following section of the bill and:
1. Summarize the key points in **neutral terms**.
2. Assign an **importance score (0-100%)** to reflect how crucial this section is in making a decision on the bill.
    - **0%**: No impact on decision-making.
    - **100%**: Crucial in determining the bill’s outcome.
3. Assign a score between 0-1 on how uncertain you are about your response, 0 if your completely certain, 1 if your completely uncertain.

### **Bill Section:**
{text}

Relevant past analyses:
{relevant_analyses}

Format Instructions:
{format_instructions}
"""

final_decision_prompt_template = """
You are reviewing the key analyses from important sections of a bill. Your task is to make a final decision on whether the bill should be **approved or rejected**.

### **Input Data**
Here are the key analyses from important sections of the bill:
{important_analyses}

### **Task**
1. Summarize the major **arguments for** and **against** the bill.
2. Justify your decision to **pass** or **reject** the bill using **evidence from the provided analyses**.
3. Based on the analyses, provide a **final decision**: Should the bill be **passed** or **rejected**?
4. Assign a score between 0-1 on how uncertain you are about your response, 0 if your completely certain, 1 if your completely uncertain.
"""

## Output Parsers

In [136]:
class BillChunkAnalysis(BaseModel):
    summary: str = Field(..., description="Neutral summary of the bill section.")
    importance_score: float = Field(..., ge=0, le=100, description="Quantitative importance score (0-100%) to determine how crucial this section is for making a decision on the bill.")
    uncertainty_score: float = Field(..., ge=0, le=1, description="Score between 0-1 on how uncertain you are about your vote, 0 if your completely certain, 1 if your completely uncertain.")

class FinalBillDecision(BaseModel):
    arguments_for: str = Field(..., description="Summary of key arguments supporting the bill.")
    arguments_against: str = Field(..., description="Summary of key arguments against the bill.")
    justification: str = Field(..., description="Rationale for the final decision based on the provided analyses.")
    final_decision: str = Field(..., pattern="^(pass|reject)$", description="Final decision on whether to pass or reject the bill.")
    uncertainty_score: float = Field(..., ge=0, le=1, description="Score between 0-1 on how uncertain you are about your vote, 0 if your completely certain, 1 if your completely uncertain.")

## Custom Agent

In [137]:
class LegislativeAgent():
    def __init__(
        self,
        reflect_before_vectorstore: bool,
        fast_mind_model: ChatOpenAI,
        slow_mind_model: ChatOpenAI,
        parser: BillChunkAnalysis,
        logger: StructuredLogger
    ):
        self.reflect_before_vectorstore = reflect_before_vectorstore
        self.fast_mind_model = fast_mind_model
        self.slow_mind_model = slow_mind_model
        self.parser = parser
        self.logger = logger
        if self.reflect_before_vectorstore:
            self.memory = ReflectiveVectorstoreMemory()
        else:
            self.memory = VectorstoreMemory()

    def analyze_bill_chunk(self, bill_chunk):
        prompt = PromptTemplate(input_variables=["text"], template=bill_chunk_prompt_template, partial_variables={"format_instructions": self.parser.get_format_instructions()})

        if self.reflect_before_vectorstore:
            relevant_analyses = self.memory.get_relevant_summaries(bill_chunk)
        else: 
            relevant_analyses = self.memory.get_relevant_analyses(bill_chunk)
        formatted_prompt = prompt.format(text=bill_chunk, relevant_analyses='\n'.join(relevant_analyses))

        self.logger.log('vectorstore_retrieve', '\n'.join(relevant_analyses))
        self.logger.log('fast_prompt', formatted_prompt)

        response = self.fast_mind_model.predict(formatted_prompt)

        # Parse the output using our parser
        parsed_response = self.parser.parse(response)

        self.logger.log('fast_response', parsed_response)

        summary = parsed_response.summary
        importance_score = parsed_response.importance_score
        uncertainty_score = parsed_response.uncertainty_score

        if uncertainty_score >= 0.2:
            self.logger.log('slow_prompt', formatted_prompt)

            response = self.slow_mind_model.predict(formatted_prompt)
            parsed_response = self.parser.parse(response)

            self.logger.log('slow_response', parsed_response)

            summary = parsed_response.summary
            importance_score = parsed_response.importance_score
            uncertainty_score = parsed_response.uncertainty_score

        if importance_score >= 80:
            self.memory.add_analysis(summary)
            self.logger.log('vectorstore_add', summary)

    def summarize_cycle(self, slow_mind_model_name: str, logger: StructuredLogger):
        self.memory.summarize_cycle(slow_mind_model_name, logger)

## Run Experiment Function

In [143]:
def run_experiment(
    reflect_before_vectorstore: bool,
    basefilepath: str,
    bill_name: str,
    bill_text_chunks: list
):
    print("length of bill text chunks is " + str(len(bill_text_chunks)))

    log_filepath = f'{basefilepath}/{'reflect' if reflect_before_vectorstore else 'noreflect'}/political_raw_{bill_name}.tsv'
    logger = StructuredLogger(log_filepath)

    fast_mind_model = ChatOpenAI(model='gpt-4o-mini', openai_api_key=openai_api_key, temperature=1, max_tokens=4096)
    slow_mind_model = ChatOpenAI(model='gpt-4o', openai_api_key=openai_api_key, temperature=1, max_tokens=4096)

    bill_chunk_parser = PydanticOutputParser(pydantic_object=BillChunkAnalysis)
    agent = LegislativeAgent(reflect_before_vectorstore, fast_mind_model, slow_mind_model, bill_chunk_parser, logger)

    for idx, bill_text_chunk in enumerate(bill_text_chunks):
        agent.analyze_bill_chunk(bill_text_chunk)

        if idx % 5 == 0 and idx > 0 and reflect_before_vectorstore:
            agent.summarize_cycle('gpt-4o', logger)
            break


## Run Experiment

In [144]:
run_experiment(
    True,
    'logs/fastslow',
    PASSED_BILL_ID,
    passed_bill_text_chunks 
)

length of bill text chunks is 2


IndexError: list index out of range