In [1]:
import json
from typing import Any, Dict, List, Optional

from openai import OpenAI
from openai.types.chat import ChatCompletion, ChatCompletionMessage
from pydantic import BaseModel, Field, ValidationError
from haystack.dataclasses import ChatMessage, StreamingChunk

from haystack import  Document, Pipeline, component
from haystack.utils import Secret
from prompts import ANTHROPIC_DEFAULT_PROMPT
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

  from .autonotebook import tqdm as notebook_tqdm
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


In [2]:

from openai import BadRequestError


def _convert_message_to_openai_format(message: ChatMessage) -> Dict[str, Any]:
    """Converts a ChatMessage to the format expected by OpenAI's API."""
    return {"role": message.role, "content": message.content}


@component
class BaseOpenAIGenerator(object):
    """
    A component that uses OpenAI's models to generate text.

    If you want to use a model that supports chat-like interactions, use the `OpenAIChatGenerator` instead.

    """

    def __init__(
        self,
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
        model: str = "gpt-4o",
        generation_kwargs: Optional[Dict[str, Any]] = None,
        system_prompt: Optional[str] = None,
    ):
        """
        :param api_key: OpenAI API key.
        :param model: The name of the model to use.
        :param generation_kwargs: Additional kwargs for the OpenAI completion endpoint.
             See the OpenAI API reference for a full list of available parameters:
             https://platform.openai.com/docs/api-reference/completions/create
        :param system_prompt: The system prompt to be used by the model.
            When used in a conversational scenario, this prompt is sent to the model before the user query to
            guide it towards a desired behavior. If not set, the model will only receive user messages.
        """
        self.api_key = api_key
        self.model = model
        self.generation_kwargs = generation_kwargs or {}
        self.system_prompt = system_prompt
        self.client = OpenAI(api_key=self.api_key.resolve_value())

    def _check_finish_reason(self, response: ChatMessage):
        if response.meta.get("finish_reason") == "length":
            raise ValueError(
                f"""The completion for the current document ended due to the 'length' of the response.
                      Consider increasing the max_tokens parameter in the generation_kwargs or the size of the document.
                   """
            )

    @component.output_types(replies=List[str], meta=List[Dict[str, Any]])
    def run(
        self,
        prompt: str,
        generation_kwargs: Optional[Dict[str, Any]] = None,
        messages: Optional[List[ChatMessage]] = None,
    ):
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
        logging.debug(f"OpenAIGenerator - Running with kwargs: {generation_kwargs}, messages: {messages}")
        if messages:
           openai_formatted_messages = [_convert_message_to_openai_format(msg) for msg in messages]
        else:
            message = ChatMessage.from_user(prompt)
            if self.system_prompt:
                messages = [ChatMessage.from_system(self.system_prompt), message]
            else:
                messages = [message]
            openai_formatted_messages = [_convert_message_to_openai_format(msg) for msg in messages]
        completion: ChatCompletion = self.client.chat.completions.create(
            model=self.model,
            messages=openai_formatted_messages,
            **generation_kwargs
        )
        logging.debug(f"OpenAIGenerator - OpenAI API response: {completion}")
        completions = [self._build_structured_message(completion, choice) for choice in completion.choices]
        for response in completions:
            self._check_finish_reason(response)

        return {
            "replies": [message.content for message in completions],
            "meta": [message.meta for message in completions],
        }

    def _build_structured_message(self, completion: Any, choice: Any) -> ChatMessage:
        chat_message = ChatMessage.from_assistant(choice.message.content or "")
        chat_message.meta.update(
            {
                "model": completion.model,
                "index": choice.index,
                "finish_reason": choice.finish_reason,
                "usage": dict(completion.usage),
            }
        )
        return chat_message


class Metadata(BaseModel):
    PrimaryQuestion: str = Field(description="De belangrijkste vraag die dit item beantwoordt.")
    PrimaryTheme: str = Field(description="Het hoofdthema waar dit item bij hoort.")
    SecondaryThemes: List[str] = Field(description="Specifieke subthema's die het hoofdthema verfijnen.")
    Entities: List[str] = Field(description="Belangrijke personen, plaatsen of objecten die aan het item zijn gekoppeld.")
    EntityRelationships: List[str] = Field(description="Beschrijvingen van relaties tussen entiteiten.")
    TimePeriod: str = Field(description="De historische periode of datum die met het item wordt geassocieerd.")
    Location: str = Field(description="De geografische context die bij het item hoort.")
    AssetType: str = Field(description="Het type item (bijv. Brief, Foto, Kaart).")
    StorylineDimension: str = Field(description="Dominante narratieve structuur (Chronologisch, Entiteitgericht, Emotiegedreven).")
    NarrativeFocus: str = Field(description="Hoe het item bijdraagt aan het verhaal of narratief.")
    Keywords: List[str] = Field(description="Extra zoekwoorden of tags voor betere zoekresultaten.")
    ExplorationTags: List[str] = Field(description="Gerelateerde thema's of onderwerpen voor verdere verkenning.")
    FollowUpQuestionTags: List[str] = Field(description="Tags voor het genereren van vervolgvragen.")
    Summary: Optional[str] = Field(description="Een korte samenvatting van de volledige tekst.", default=None)
    Sender: Optional[str] = Field(description="Naam van de afzender.", default=None)
    Recipient: Optional[str] = Field(description="Naam van de ontvanger.", default=None)
    DateSent: Optional[str] = Field(description="Datum waarop de brief is verstuurd.", default=None)
    LetterType: Optional[str] = Field(description="Type brief (bijv. Persoonlijk, Officieel).", default=None)
    ContentSummary: Optional[str] = Field(description="Korte samenvatting van de inhoud van de brief.", default=None)
    Scale: Optional[str] = Field(description="Schaal van de kaart (bijv. 1:5000).", default=None)
    MapFeatures: Optional[str] = Field(description="Opvallende kenmerken, zoals gebouwen, grenzen, rivieren.", default=None)
    DateCreated: Optional[str] = Field(description="Datum waarop de kaart is gemaakt.", default=None)
    LocationCovered: Optional[str] = Field(description="Gebieden of plaatsen die op de kaart worden weergegeven.", default=None)
    Photographer: Optional[str] = Field(description="Naam van de fotograaf.", default=None)
    DateTaken: Optional[str] = Field(description="Datum waarop de foto is genomen.", default=None)
    Event: Optional[str] = Field(description="Gebeurtenis die op de foto is vastgelegd.", default=None)
    PeopleInPhoto: Optional[str] = Field(description="Namen van personen op de foto.", default=None)
    ArticleTopic: Optional[str] = Field(description="Onderwerp of thema van het artikel.", default=None)
    Author: Optional[str] = Field(description="Naam van de auteur.", default=None)
    PublicationDate: Optional[str] = Field(description="Datum van publicatie.", default=None)
    Source: Optional[str] = Field(description="Naam van het tijdschrift of de bron.", default=None)


@component
class OpenAIGenerator(BaseOpenAIGenerator):
    @component.output_types(replies=List[str], meta=List[Dict[str, Any]], structured_reply=BaseModel)
    def run(
        self,
        prompt: str,
        generation_kwargs: Optional[Dict[str, Any]] = None,
        messages: Optional[List[ChatMessage]] = None,
    ):
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
        if "image" in generation_kwargs.keys():
            raise ValueError("The 'image' parameter is not supported by the OpenAIGenerator component")
        if "response_format" in generation_kwargs.keys():
            system_prompt = ChatMessage.from_system(ANTHROPIC_DEFAULT_PROMPT)
            message = ChatMessage.from_user(prompt)
            messages = [system_prompt, message]
            openai_formatted_messages = [_convert_message_to_openai_format(msg) for msg in messages]
            try:
                completion: ChatCompletion = self.client.chat.completions.create(
                    model=self.model,
                    messages=openai_formatted_messages,
                    **generation_kwargs
                )
            except BadRequestError as e:
                logging.error(f"OpenAIGenerator - BadRequestError: {e} \n\n for prompt {prompt}")
                return {"replies": [], "meta": [], "structured_reply": {}}           
            completions = [self._build_structured_message(completion, choice) for choice in completion.choices]
            for response in completions:
                self._check_finish_reason(response)
            return {
                "replies": [message.content for message in completions],
                "meta": [message.meta for message in completions],
                "structured_reply": completions[0].content
            }
        else:
            return super().run(prompt, generation_kwargs, messages)

    def _build_structured_message(self, completion: Any, choice: Any) -> ChatMessage:
        chat_message = ChatMessage.from_assistant(choice.message.content or "")
        chat_message.meta.update(
            {
                "model": completion.model,
                "index": choice.index,
                "finish_reason": choice.finish_reason,
                "usage": dict(completion.usage),
            }
        )
        return chat_message


@component
class MetadataEnricher:
    def __init__(self, metadata_model: BaseModel, prompt: str = ANTHROPIC_DEFAULT_PROMPT):
        self.metadata_model = metadata_model
        self.metadata_prompt = prompt
        self.pipeline = Pipeline()
        

    def _create_openai_message(self, document: Document, master_prompt: str):
        """Constructs the correct message for OpenAI based on document type."""
        meta = document.meta
        url = meta.get("link", "")
        beschrijving = meta.get("beschrijving", "")
        title = meta.get("Title", "")
        extra_info = meta.get("Context en beschrijving (aan elkaar geplakt)", "")
        extra_info += meta.get("Extra info (informatie\n uit velden)", "")
        metadata_for_prompt = f"URL: {url}\nTitel: {title}\nBeschrijving: {beschrijving}, andere data {extra_info}\n"
        logging.debug(f"MetadataEnricher - Metadata for prompt: {metadata_for_prompt}")
        if meta.get("VorT") == "visueel":
            image_url = meta.get("representatieve\nafbeelding")
            if not image_url:
                logging.warning(f"MetadataEnricher - No image URL found for document: {document.meta.get('ID')}")
                return None, None
            
            messages = [
                ChatMessage.from_system(master_prompt),
                ChatMessage.from_user(metadata_for_prompt),
                 ChatMessage.from_user(
                 [
                    {"type": "text", "text": "What's in this image?"},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": image_url,
                        }
                    },
                 ]
                )
            ]
            logging.debug(f"MetadataEnricher - Created image messages for document: {document.meta.get('ID')}")
            return messages, "image"

        else:
            content = document.content
            messages = [
                ChatMessage.from_system(master_prompt),
                ChatMessage.from_user(metadata_for_prompt),
                ChatMessage.from_user(f"Inhoud van het item: {content}")
            ]
            logging.debug(f"MetadataEnricher - Created text messages for document: {document.meta.get('ID')}")
            return messages, "text"


    def _process_document(self, document: Document):
        """Processes a single document and enriches its metadata."""
        logging.debug(f"MetadataEnricher - Processing document: {document.meta.get('ID')}")
        messages, doc_type = self._create_openai_message(document, self.metadata_prompt)
        if messages is None:
            logging.warning(f"MetadataEnricher - No messages created for document: {document.meta.get('ID')}")
            return document
        llm_kwargs = {"response_format": {"type": "json_object"}}
        if doc_type == "image":
            llm_kwargs["max_tokens"] = 1500
            logging.debug(f"MetadataEnricher - Processing as image, llm_kwargs: {llm_kwargs}")
            
            openai_formatted_messages = [_convert_message_to_openai_format(msg) for msg in messages]
            try:
                completion: ChatCompletion = OpenAI(api_key=Secret.from_env_var("OPENAI_API_KEY").resolve_value()).chat.completions.create(
                    model="gpt-4o",
                    messages=openai_formatted_messages,
                    **llm_kwargs
                )
            except BadRequestError as e:
                logging.error(f"MetadataEnricher - BadRequestError: {e} \n\n for document {document.meta.get('ID')}")
                return document
            if completion.choices:
                reply = completion.choices[0].message.content
                logging.debug(f"MetadataEnricher - Raw LLM reply for image document: {reply}")
                try:
                    metadata = json.loads(reply)
                    logger.info("-------------------------------------------------------")
                    logger.info(metadata)
                    document.meta.update(metadata)
                    logging.debug(f"MetadataEnricher - Successfully updated metadata for image document: {document.meta.get('ID')}")
                except json.JSONDecodeError as e:
                    logging.error(f"MetadataEnricher - JSONDecodeError: {e} \n\n for reply \n\n {reply} for document {document.meta.get('ID')}")
                except ValidationError as e:
                    logging.error(f"MetadataEnricher - Pydantic validation error: {e} \n\n for reply \n\n {reply} for document {document.meta.get('ID')}")
                except Exception as e:
                    logging.error(f"MetadataEnricher - Unexpected error: {e} \n\n for reply \n\n {reply} for document {document.meta.get('ID')}")
                return document
            logging.warning(f"MetadataEnricher - No completion choices for image document: {document.meta.get('ID')}")
            return document
        
        else:
            logging.debug(f"MetadataEnricher - Processing as text, llm_kwargs: {llm_kwargs}")
            llm = OpenAIGenerator(generation_kwargs=llm_kwargs)
            self.pipeline.add_component(name="llm", instance=llm)
            result = self.pipeline.run(data={"llm": {"prompt": messages[-1].content, "messages":messages[:-1]}})
            metadata_str = result['llm']['replies'][0]
            logging.debug(f"MetadataEnricher - Raw LLM reply for text document: {metadata_str}")
            try:
                metadata = json.loads(metadata_str)
                document.meta.update(metadata)
                logging.debug(f"MetadataEnricher - Successfully updated metadata for text document: {document.meta.get('ID')}")
            except json.JSONDecodeError as e:
                logging.error(f"MetadataEnricher - JSONDecodeError: {e} \n\n for reply \n\n {metadata_str} for document {document.meta.get('ID')}")
            except ValidationError as e:
                logging.error(f"MetadataEnricher - Pydantic validation error: {e} \n\n for reply \n\n {metadata_str} for document {document.meta.get('ID')}")
            except Exception as e:
                logging.error(f"MetadataEnricher - Unexpected error: {e} \n\n for reply \n\n {metadata_str} for document {document.meta.get('ID')}")
            self.pipeline.remove_component("llm")
            return document


    @component.output_types(documents=List[Document])
    def run(self, documents: List[Document]):
        logging.debug(f"MetadataEnricher - Running with {len(documents)} documents")
        documents_with_meta = []
        for document in documents:
            documents_with_meta.append(self._process_document(document))

        return {"documents": documents_with_meta}

DEBUG:haystack.core.component.component:Registering <class '__main__.BaseOpenAIGenerator'> as a component
DEBUG:haystack.core.component.component:Registered Component <class '__main__.BaseOpenAIGenerator'>
DEBUG:haystack.core.component.component:Registering <class '__main__.OpenAIGenerator'> as a component
DEBUG:haystack.core.component.component:Registered Component <class '__main__.OpenAIGenerator'>
DEBUG:haystack.core.component.component:Registering <class '__main__.MetadataEnricher'> as a component
DEBUG:haystack.core.component.component:Registered Component <class '__main__.MetadataEnricher'>


In [3]:
with open('../data/prototyping/new.json', 'r') as file:
    data = json.load(file)
    

In [4]:
docs = []   
for item in data:
    if item.get('VorT') == 'visueel':
        item['Full text'] = item.get('Beschrijving', '')
    item_meta = {k: v for k, v in item.items() if k != 'Full text'}
    docs.append(Document(content=item['Full text'], meta=item_meta))

In [5]:
valid_docs = [doc for doc in docs if doc.content or doc.meta.get('VorT') == 'visueel']
exclusive_visual = [doc for doc in valid_docs if doc.meta.get('VorT') == 'visueel']

In [6]:
from haystack_integrations.document_stores.pinecone import PineconeDocumentStore
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.utils import Secret
from haystack.document_stores.types.policy import DuplicatePolicy
from haystack.components.writers import DocumentWriter
import os
from haystack import Pipeline
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter


def create_docstore() -> PineconeDocumentStore:
    return PineconeDocumentStore(
        api_key=Secret.from_env_var("PINECONE_API_KEY"),
        index="archiefutrecht", # is nu statisch, raad aan gewoon in .env te zetten
        dimension=1536, # text-embedding-3-small
    )

def create_document_embedder() -> OpenAIDocumentEmbedder:
    return OpenAIDocumentEmbedder(
        model="text-embedding-3-small",
        api_key=Secret.from_env_var("OPENAI_API_KEY"),
        meta_fields_to_embed=[
    "Title", "Description", "PrimaryQuestion", "PrimaryTheme", 
    "SecondaryThemes", "Entities", "EntityRelationships", "TimePeriod", 
    "Location", "AssetType", "StorylineDimension", "NarrativeFocus", 
    "Keywords", "ExplorationTags", "FollowUpQuestionTags", "FullText", 
    "Summary", "Sender", "Recipient", "DateSent", "LetterType", 
    "ContentSummary", "Scale", "MapFeatures", "DateCreated", 
    "LocationCovered", "Photographer", "DateTaken", "Event", 
    "PeopleInPhoto", "ArticleTopic", "Author", "PublicationDate", "Source"
] # Zorgt ervoor dat niet alleen tekst in embedding wordt meegenomen maar ook gespecificeerde metadata. Vet handig voor als je belangrijke metadata genereert.
    )
    
def create_document_writer(docstore) -> DocumentWriter:
    return DocumentWriter(document_store=docstore, policy=DuplicatePolicy.OVERWRITE) 


DEBUG:haystack.core.component.component:Registering <class 'haystack.components.embedders.azure_document_embedder.AzureOpenAIDocumentEmbedder'> as a component
DEBUG:haystack.core.component.component:Registered Component <class 'haystack.components.embedders.azure_document_embedder.AzureOpenAIDocumentEmbedder'>
DEBUG:haystack.core.component.component:Registering <class 'haystack.components.embedders.azure_text_embedder.AzureOpenAITextEmbedder'> as a component
DEBUG:haystack.core.component.component:Registered Component <class 'haystack.components.embedders.azure_text_embedder.AzureOpenAITextEmbedder'>
DEBUG:haystack.core.component.component:Registering <class 'haystack.components.embedders.hugging_face_api_document_embedder.HuggingFaceAPIDocumentEmbedder'> as a component
DEBUG:haystack.core.component.component:Registered Component <class 'haystack.components.embedders.hugging_face_api_document_embedder.HuggingFaceAPIDocumentEmbedder'>
DEBUG:haystack.core.component.component:Registering 

In [12]:
test = Pipeline()


test.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=15, split_overlap=3))
test.add_component("metadata", MetadataEnricher(metadata_model=Metadata))
test.add_component("embedder", create_document_embedder())
test.add_component("writer", create_document_writer(create_docstore()))

test.connect("splitter", "metadata")
test.connect("metadata", "embedder")
test.connect("embedder", "writer")

docs = test.run(data={"splitter": {"documents": valid_docs[:1]}}, include_outputs_from=["metadata"])

DEBUG:haystack.core.pipeline.base:Adding component 'splitter' (<haystack.components.preprocessors.document_splitter.DocumentSplitter object at 0x000001FF357C4E10>

Inputs:
  - documents: List[Document]
Outputs:
  - documents: List[Document])
DEBUG:haystack.core.pipeline.base:Adding component 'metadata' (<__main__.MetadataEnricher object at 0x000001FF10C26BD0>

Inputs:
  - documents: List[Document]
Outputs:
  - documents: List[Document])
DEBUG:haystack.core.pipeline.base:Adding component 'embedder' (<haystack.components.embedders.openai_document_embedder.OpenAIDocumentEmbedder object at 0x000001FF357C5A50>

Inputs:
  - documents: List[Document]
Outputs:
  - documents: List[Document]
  - meta: Dict[str, Any])
DEBUG:haystack.core.pipeline.base:Adding component 'writer' (<haystack.components.writers.document_writer.DocumentWriter object at 0x000001FF35881950>

Inputs:
  - documents: List[Document]
  - policy: Optional[DuplicatePolicy]
Outputs:
  - documents_written: int)
DEBUG:haystack.cor

In [9]:
docs

{'metadata': {'documents': [Document(id=d99855f8454d4c4c7355885931d1d6910c40b390def9feb2bc54a575634a0433, content: '', meta: {'Keuze': 'Ja', 'ID': '', 'AET_ID': '', 'num_scans': 1, 'invnr': 106653, 'GUID': '9E4432EBEDB9548088F205B4F2783FF0', 'beschrijving': 'Portret van Margaretha Turnor, geboren 1613, echtgenote van Godard Adriaan van Reede van Amerongen (huwelijk in 1643), overleden 1700. Ten voeten uit links, staande.', 'link': 'https://hetutrechtsarchief.nl/collectie/9E4432EBEDB9548088F205B4F2783FF0', 'representatieve\nafbeelding': 'https://proxy.archieven.nl/large/39/9E4432EBEDB9548088F205B4F2783FF0', 'Thumb': '', 'Soort / brontype': 'Schilderij (digitale reproductie)', 'Bron subtype': 'Portret', 'Personen': '', 'Locaties': '', 'Periodes': '', 'Onderwerpen': '', 'Extra info (informatie\n uit velden)': "Datering vroegst:\t\n01-01-1661\nDatering laatst:\t\n31-12-1661\nOpmerkingen:\t\nFotoreproductie van het Iconografisch Bureau, Den Haag, uit 1977 van een schilderij van Jurriaen Ove