In [None]:
from haystack import component, Document
from typing import Any, Dict, List, Optional, Union
from haystack.dataclasses import ByteStream

import json
from dotenv import load_dotenv
import os

import re
from bs4 import BeautifulSoup
from pathlib import Path

import logging

from haystack.components.preprocessors import DocumentCleaner
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack import Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore


from haystack.utils import Secret
from dotenv import load_dotenv



load_dotenv(".env")
open_ai_key = os.environ.get("OPENAI_API_KEY")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def safe_deserialize(data):
        """
        Safely deserialize JSON data, handling various formats.
        
        :param data: JSON data to deserialize.
        :return: Deserialized data or None if an error occurs.
        """
        try:
            parsed_data = json.loads(data)
            if isinstance(parsed_data, list):
                if len(parsed_data) == 2 and (parsed_data[0] is None or isinstance(parsed_data[0], str)):
                    event = parsed_data[1]
                else:
                    logger.info(f"Skipping unexpected list format: {data}")
                    return None
            elif isinstance(parsed_data, dict):
                event = parsed_data
            else:
                logger.info(f"Skipping unexpected data type: {data}")
                return None
            
            if 'link' in event:
                event['url'] = event.pop('link')
            
            if "url" in event:
                return event
            else:
                logger.info(f"Missing 'url' key in data: {data}")
                return None

        except json.JSONDecodeError as e:
            logger.error(f"JSON decode error ({e}) for data: {data}")
            return None
        except Exception as e:
            logger.error(f"Error processing data ({e}): {data}")
            return None
        
@component
class BenzingaNews:
    
    @component.output_types(documents=List[Document])
    def run(self, sources: Dict[str, Any]) -> None:
             
        documents = []
        for source in sources:
        
            for key in source:
                if type(source[key]) == str:
                    source[key] = self.clean_text(source[key])
                    
            if source['content'] == "":
                continue

            #drop content from source dictionary
            content = source['content']
            document = Document(content=content, meta=source) 
            
            documents.append(document)
         
        return {'documents':documents}
               
    def clean_text(self, text):
        # Remove HTML tags using BeautifulSoup
        soup = BeautifulSoup(text, "html.parser")
        text = soup.get_text()
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    

            
                

In [None]:
results = []
file_path = "./data/news_out.jsonl"

with open(file_path, 'r', encoding='utf-8') as file:
    for line in file:
        news_event = safe_deserialize(line.strip())
        results.append(news_event)
        if not news_event:
            continue


In [None]:
#BenzingaNews().run(sources=results)

In [None]:
get_news = BenzingaNews()
document_store = InMemoryDocumentStore(embedding_similarity_function="cosine")
document_cleaner = DocumentCleaner(
                    remove_empty_lines=True,
                    remove_extra_whitespaces=True,
                    remove_repeated_substrings=False
                )
document_splitter = DocumentSplitter(split_by="word", split_length=5)
document_writer = DocumentWriter(document_store=document_store,
                                policy = DuplicatePolicy.OVERWRITE)
embedding = OpenAIDocumentEmbedder(api_key=Secret.from_token(open_ai_key))

pipeline = Pipeline()
pipeline.add_component("get_news", get_news)
pipeline.add_component("document_cleaner", document_cleaner)
pipeline.add_component("document_splitter", document_splitter)
pipeline.add_component("embedding", embedding)
pipeline.add_component("document_writer", document_writer)

pipeline.connect("get_news", "document_cleaner")
pipeline.connect("document_cleaner", "document_splitter")
pipeline.connect("document_splitter", "embedding")
pipeline.connect("embedding", "document_writer")

In [None]:
pipeline.run(data={"get_news":{"sources":results}})