In [1]:
import os
from dotenv import load_dotenv
load_dotenv()

True

## Initializing model

In [2]:
from langchain.chat_models import init_chat_model

llm = init_chat_model("openai:gpt-4.1", temperature=0)

## Test Query

In [14]:
query = "Is there any remarkable event regarding judicial independence in Belgium?"
query_data_json = """
{"query": "judicial independence", "impact_score": {"$gte": 4}, "pillar_1": 1, "pillar_7": 1, "pillar_8": 1}
"""

## Init Clients

In [4]:
from typing import List, Dict, Any
import voyageai
from pinecone import Pinecone
import boto3
from botocore.exceptions import ClientError

class VoyageClient():
    """Initialize Voyage Client"""
    
    def __init__(self, api_key: str, model: str):
        self.engine = voyageai.Client(api_key)
        self.model = model
    
    def embbed_text(self, text: str) -> List[float]:
        """
        Embeds the given text using the VoyageAI engine.

        Args:
            text (str): The text to embed.

        Returns:
            list: The embedded text as a list of floats.
        """

        return self.engine.embed(
            text,
            model = self.model,
            input_type = "document"
        ).embeddings[0]
    

class PineconeClient():
    """Initialize Pinecone Client"""
    def __init__(self, api_key: str, index: str, namespace: str):
        self.index = index
        self.namespace = namespace
        self.index = (
            Pinecone(
                api_key=api_key, 
                pool_threads=30
            )
            .Index(self.index)
        )
        
    def query(self, embedded_query: List[float], filters: Dict[Any, Any]) -> Dict[str, Any]:
        """
        Query the Pinecone index with the given embedded query and filters.

        Args:
            embedded_query (list): The embedded query to search for.
            filters (dict): The filters to apply to the query.

        Returns:
            dict: The query response from Pinecone.
        """
        
        return self.index.query(
            vector = embedded_query,
            top_k = 250,
            namespace = self.namespace,
            include_metadata = True,
            filter = filters
        ).matches
    

class DynamoDBClient():
    """Encapsulates an Amazon DynamoDB table of chunked news articles."""

    def __init__(self, dyn_resource, table_name: str):
        """
        Initializes the DynamoDB class with a DynamoDB resource.
        :param dyn_resource: A boto3 DynamoDB resource.
        """
        self.dyn_resource = dyn_resource
        self.table = dyn_resource.Table(table_name)
    
    
    def get_chunk(self, chunk_id: str) -> str:
        """
        Gets a chunk from the table.

        :param chunk_id: Chunk id.
        :return: Chunk data.
        """

        try:
            response = self.table.get_item(
                Key = {
                    "chunk_id": chunk_id
                }
            )
            chunk = response["Item"]
            return chunk["text"]
        except ClientError as e:
            print(f"Chunk {chunk_id} not found ❌. Here's why: {e.response["Error"]["Message"]}")
            return None

In [5]:
import boto3
from botocore.exceptions import ClientError

vc = VoyageClient(
    api_key=os.getenv("VOYAGE_API_KEY"),
    model = "voyage-3.5"
)
pc = PineconeClient(
    api_key=os.getenv("PINECONE_API_KEY"), 
    index="eurovoices-news-articles-vdb", 
    namespace="ns-1"
)
ddbc = DynamoDBClient(
    dyn_resource = boto3.resource("dynamodb", region_name="us-east-1"), 
    table_name = "eurovoices-chunked-news"
)

## Checkpointer

In [None]:
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
config = {
    "configurable": {"thread_id": "singlethread"}
}

## Custom Tool Creation

In [7]:
import json
from langchain.tools import BaseTool
from pydantic import PrivateAttr

class NewsSearchTool(BaseTool):
    """Tool for searching news events using vector similarity search"""
    
    name: str = "news_events_search"
    description: str = ""  # Will be set in __init__

    _vc: any = PrivateAttr()
    _pc: any = PrivateAttr()
    _ddbc: any = PrivateAttr()
    

    def __init__(self, vc, pc, ddbc, description: str):
        super().__init__(description=description)
        self._vc = vc         # Voyage Client
        self._pc = pc         # Pinecone Client
        self._ddbc = ddbc     # DynamoDB Client
    

    def _prepare_doc(self, doc):
        """
        Formats a document's metadata and content chunk into a structured context string.
        Args:
            doc: An object containing document metadata and an identifier. 
                 Expected to have a 'metadata' dictionary with 'title' and 'country' keys, 
                 and an 'id' attribute for retrieving the document chunk.
        Returns:
            str: A formatted string including the document's title, country, and content chunk, 
                 delimited by context markers.
        """
        
        context_input = f"""
        [START OF CONTEXT EVENT]
        Title: {doc.metadata["title"]}
        Country: {doc.metadata["country"]}

        Retrieved information:
        {self._ddbc.get_chunk(doc.id)}
        [END OF CONTEXT EVENT]
        """

        return context_input

    def _run(self, query_data_json: str) -> str:
        try:
            query_data = json.loads(query_data_json)
            query = query_data.get("query", "")
            
            # Build comprehensive metadata filter
            filter_dict = {}
            
            if query_data.get("country"):
                filter_dict["country"] = query_data["country"]
            
            if query_data.get("pillar_1") is not None:
                filter_dict["pillar_1"] = query_data["pillar_1"]
            
            if query_data.get("pillar_2") is not None:
                filter_dict["pillar_2"] = query_data["pillar_2"]

            if query_data.get("pillar_3") is not None:
                filter_dict["pillar_3"] = query_data["pillar_3"]
            
            if query_data.get("pillar_4") is not None:
                filter_dict["pillar_4"] = query_data["pillar_4"]
            
            if query_data.get("pillar_5") is not None:
                filter_dict["pillar_5"] = query_data["pillar_5"]
            
            if query_data.get("pillar_6") is not None:
                filter_dict["pillar_6"] = query_data["pillar_6"]

            if query_data.get("pillar_7") is not None:
                filter_dict["pillar_7"] = query_data["pillar_7"]
            
            if query_data.get("pillar_8") is not None:
                filter_dict["pillar_8"] = query_data["pillar_8"]

            if query_data.get("impact_score"):
                
                impact_filter = query_data["impact_score"]

                if isinstance(impact_filter, dict):  # Handle comparison operators like {"gte": 7.5}
                    filter_dict["impact_score"] = impact_filter
                else:
                    filter_dict["impact_score"] = impact_filter
            
            # Perform similarity search
            embedded_query = self._vc.embbed_text(query)
            docs = self._pc.query(
                embedded_query = embedded_query,
                filters = filter_dict
            )
            
            # Format results
            results = [self._prepare_doc(doc) for doc in docs]
            formatted_results = "\n\n".join(results)
            
            return formatted_results
                               
        except Exception as e:
            print(f"Error searching news events: {str(e)}")

In [8]:
retriever_description = """
Use this tool when users ask about:
- Summaries of events related to a rule of law topic or pillar within the European Union
- News about specific subjects related to the rule of law within the European Union
- What happened regarding a topic related to the rule of law within the European Union
- Events related to the rule of law in a specific member state of the European Union
- Events with specific impact levels
- Events related to specific dimensions of the rule of law

Available metadata filters:
- country: Filter by specific country name. Available options: "Austria", "Belgium", "Bulgaria", "Croatia", "Cyprus", "Czechia", "Denmark", "Estonia", "Finland", "France", "Germany", "Greece", "Hungary", "Ireland", "Italy", "Latvia", "Lithuania", "Luxembourg", "Malta", "Netherlands", "Poland", "Portugal", "Romania", "Slovakia", "Slovenia", "Spain", "Sweden"
- impact_score: Filter by numerical impact score. Values can range from 1 to 5, where 1 and 2 means that the news article has a strong or low negative impact on the rule of law and 5 and 4 means that the news article has a strong or mild positive impact on the thematic pillar. Following the same reasoning, a value of 3 means that the events have a neutral impact on the thematic pillar. Use comparison operators that are compatible with pinecone vector databases: $eq, $ne, $gt, $gte, $lt, $lte, $in
- pillar_1: Binary filter (1 or 0) for "Constraints on Government Powers" related events
- pillar_2: Binary filter (1 or 0) for "Absence of Corruption" related events
- pillar_3: Binary filter (1 or 0) for "Open Government" related events
- pillar_4: Binary filter (1 or 0) for "Fundamental Freedoms" related events
- pillar_5: Binary filter (1 or 0) for "Order and Security" related events
- pillar_6: Binary filter (1 or 0) for "Regulatory Enforcement" related events
- pillar_7: Binary filter (1 or 0) for "Civil Justice" related events
- pillar_8: Binary filter (1 or 0) for "Criminal Justice" related events

IMPACT SCORES:
- Negative impact: {"$lt": 3} (values 1-2)
- Neutral impact: {"$eq": 3}
- Positive impact: {"$gt": 3} (values 4-5)

Input should be a JSON string with 'query' (required) and any combination of optional filters:
{"query": "corruption", "country": "France", "pillar_2": 1}
{"query": "judicial independence", "impact_score": {"$gte": 4}, "pillar_1": 1, "pillar_7": 1, "pillar_8": 1}
{"query": "elections", "country": "Germany", "pillar_1": 1}
{"query": "freedom of speech", "pillar_4": 1}
{"query": "discrimination or equality", "country": "Spain", "pillar_4": 1}
{"query": "judicial reform", "country": "Spain", "pillar_7": 1, "pillar_8": 1}
{"query": "crime, order, safety, violence", "pillar_5": 1}
{"query": "environmental regulation", "country": "Sweden", "pillar_6": 1}
{"query": "labor regulation", "country": "Finland", "pillar_6": 1}
{"query": "busines or commercial laws", "pillar_6": 1}
{"query": "media freedom or freedom of press", "pillar_4": 1}
"""

In [9]:
retriever = NewsSearchTool(
    vc = vc,
    pc = pc,
    ddbc = ddbc,
    description = retriever_description
)

In [10]:
# retriever.invoke(
#     query_data_json.strip()
# )

In [11]:
from langgraph.prebuilt import create_react_agent

agent_executor = create_react_agent(
    llm, 
    [retriever], 
    checkpointer=memory
)

In [None]:
from IPython.display import Image, display
display(Image(agent_executor.get_graph().draw_mermaid_png()))

In [None]:
for event in agent_executor.stream(
    {"messages": [{"role": "user", "content": query}]},
    stream_mode="values",
    config=config,
):
    event["messages"][-1].pretty_print()