In [6]:
%load_ext autoreload
%autoreload 2

import os, sys

sys.path.append(os.path.abspath(".."))

In [7]:
import logging

from fire import Fire

from streaming_pipeline import constants, initialize
from streaming_pipeline.embeddings import EmbeddingModelSingleton
from streaming_pipeline.qdrant import build_qdrant_client

logger = logging.getLogger(__name__)


def search(query_string: str):
    """
    Searches for the closest points to the given query string in the vector database.

    Args:
        query_string (str): The query string to search for.

    Returns:
        None
    """

    initialize()

    client = build_qdrant_client()
    model = EmbeddingModelSingleton()

    query_embedding = model(query_string, to_list=True)

    hits = client.search(
        collection_name=constants.VECTOR_DB_OUTPUT_COLLECTION_NAME,
        query_vector=query_embedding,
        limit=2,  # Return 5 closest points
    )
    
    for hit in hits:
        logger.info(hit)


  from .autonotebook import tqdm as notebook_tqdm


In [8]:
search("What did Sangamo announced today?")

No logging configuration file found at: logging.yaml. Setting logging level to INFO.
INFO:streaming_pipeline:Initializing env vars...
INFO:streaming_pipeline:Loading environment variables from: .env
INFO:dotenv.main:Python-dotenv could not find configuration file .env.
INFO:httpx:HTTP Request: POST https://73bdd42b-86a7-49fc-bcf4-e6bf85cfca17.us-east4-0.gcp.cloud.qdrant.io:6333/collections/alpaca_financial_news/points/search "HTTP/1.1 200 OK"
INFO:__main__:id='bf1b51d4-2187-6cbc-9217-e66e3555e459' version=6 score=0.8309769 payload={'headline': 'Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease', 'summary': 'Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announced that the U.S. Food and Drug Administration (FDA) has granted Fast Track Designation to isaralgagene civaparvovec, or ST-920, a wholly', 'url': 'https://www.benzinga.com/general/biotech/23/05/32505835/sangamo-therapeutics-r

In [None]:
import datetime
from pathlib import Path
from typing import List, Optional

from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
from pydantic import parse_obj_as
from qdrant_client import QdrantClient

from streaming_pipeline import mocked
# from streaming_pipeline.alpaca_batch import AlpacaNewsBatchInput
# from streaming_pipeline.alpaca_stream import AlpacaNewsStreamInput
from streaming_pipeline.embeddings import EmbeddingModelSingleton
from streaming_pipeline.models import NewsArticle, Document
from streaming_pipeline.qdrant import QdrantVectorOutput

from bytewax import operators as op

model = EmbeddingModelSingleton(cache_dir=None)

In [3]:
def build_input(is_input_mocked: bool = True,):
    if is_input_mocked:
        return TestingSource(mocked.financial_news)

In [4]:
def build_output(model: EmbeddingModelSingleton, in_memory: bool = False):
    if in_memory:
        return QdrantVectorOutput(
            vector_size=model.max_input_length,
            client=QdrantClient(":memory:"),
        )
    else:
        return QdrantVectorOutput(
            vector_size=model.max_input_length,
        )

In [5]:
from pydantic import TypeAdapter

article_adapter = TypeAdapter(List[NewsArticle])

flow = Dataflow("alpaca_news_input")

alpaca_news_input = op.input("input", flow, build_input())

article_to_class = op.flat_map("class_to_article", alpaca_news_input, lambda messages: article_adapter.validate_python(messages))
_ = op.inspect("articles", article_to_class)


In [6]:
document = op.map("document", article_to_class, lambda article: article.to_document())
_ = op.inspect("inspect_document", document)

In [7]:
compute_chunks = op.map("chunks", document, lambda document: document.compute_chunks(model))
_ = op.inspect("inspect_chunks", compute_chunks)

In [8]:
compute_embeddings = op.map("embeddings", compute_chunks, lambda document: document.compute_embeddings(model)) # flow.map(lambda document: document.compute_embeddings(model))
_ = op.inspect("inspect_embeddings", compute_embeddings)

In [9]:
output = op.output("output", compute_embeddings, build_output(model)) # flow.output("output", _build_output(model, in_memory=debug))

In [10]:
from bytewax.testing import run_main

run_main(flow)

Building QdrantVectorSink for worker: step_id alpaca_news_input.output worker_index 0 and worker_count 1
alpaca_news_input.articles: NewsArticle(id=32505835, headline='Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease', summary='Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announced that the U.S. Food and Drug Administration (FDA) has granted Fast Track Designation to isaralgagene civaparvovec, or ST-920, a wholly', author='Benzinga Newsdesk', created_at=datetime.datetime(2023, 5, 22, 12, 6, 20, tzinfo=TzInfo(UTC)), updated_at=datetime.datetime(2023, 5, 22, 12, 6, 21, tzinfo=TzInfo(UTC)), url='https://www.benzinga.com/general/biotech/23/05/32505835/sangamo-therapeutics-receives-u-s-fda-fast-track-designation-for-isaralgagene-civaparvovec-for-th', content='<p>Sangamo Therapeutics, Inc. (NASDAQ:<a class="ticker" href="https://www.benzinga.com/stock/SGMO#NASDAQ">SGMO</a>), a genomic

[32m2024-11-17 03:28:32.021[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_document[0m:[36m57[0m - [1mDocument ID: d1f408284595c367ea1bce5b365a7848[0m
[32m2024-11-17 03:28:32.902[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_document[0m:[36m62[0m - [1mNumber of article elements: 6[0m
[32m2024-11-17 03:28:32.902[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_document[0m:[36m67[0m - [1mCleaned content: Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announced that the U.S. Food and Drug Administration (FDA) has granted Fast Track Designation to isaralgagene civaparvovec, or ST-920, a wholly owned gene therapy product candidate for the treatment of Fabry disease. Fast Track designation aims to facilitate the development and expedite the review of new therapeutics that are intended to treat serious or life-threatening conditions and that demonstrate the potential to address unmet me

alpaca_news_input.inspect_document: Document(id='d1f408284595c367ea1bce5b365a7848', group_key=None, metadata={'headline': 'Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease', 'summary': 'Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announced that the U.S. Food and Drug Administration (FDA) has granted Fast Track Designation to isaralgagene civaparvovec, or ST-920, a wholly', 'url': 'https://www.benzinga.com/general/biotech/23/05/32505835/sangamo-therapeutics-receives-u-s-fda-fast-track-designation-for-isaralgagene-civaparvovec-for-th', 'symbols': ['SGMO'], 'author': 'Benzinga Newsdesk', 'created_at': datetime.datetime(2023, 5, 22, 12, 6, 20, tzinfo=TzInfo(UTC))}, text=['Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease', 'Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announc

[32m2024-11-17 03:28:33.733[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_payloads[0m:[36m132[0m - [1mTotal chunks: 4[0m
[32m2024-11-17 03:28:33.733[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_payloads[0m:[36m133[0m - [1mUnique chunk IDs: 4[0m
[32m2024-11-17 03:28:33.737[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_payloads[0m:[36m134[0m - [1mTotal payloads: 4[0m


Writing 1 embeddings to Qdrant...
The Payloads in Qdrant &&&######### : [{'headline': 'Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease', 'summary': 'Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announced that the U.S. Food and Drug Administration (FDA) has granted Fast Track Designation to isaralgagene civaparvovec, or ST-920, a wholly', 'url': 'https://www.benzinga.com/general/biotech/23/05/32505835/sangamo-therapeutics-receives-u-s-fda-fast-track-designation-for-isaralgagene-civaparvovec-for-th', 'symbols': ['SGMO'], 'author': 'Benzinga Newsdesk', 'created_at': datetime.datetime(2023, 5, 22, 12, 6, 20, tzinfo=TzInfo(UTC)), 'text': 'Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease'}, {'headline': 'Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Trea

[32m2024-11-17 03:28:36.382[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_document[0m:[36m57[0m - [1mDocument ID: ae376f968b4fe036a8209a9495301d81[0m
[32m2024-11-17 03:28:36.390[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_document[0m:[36m62[0m - [1mNumber of article elements: 6[0m
[32m2024-11-17 03:28:36.391[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_document[0m:[36m67[0m - [1mCleaned content: ContraFect Corporation (NASDAQ:CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move. Wednesday morning, ContraFect announced the dosing of the first patient in Phase 1b/2 of exebacase in the setting of an arthroscopic debridement, antibiotics, irrigation, and retention procedure in patients with chronic prosthetic joint infections of the knee due to Staphylococcus aureus or Coagulase-Negative Staphylococci. The study was initiated earlier this month. ContraFe

alpaca_news_input.inspect_document: Document(id='ae376f968b4fe036a8209a9495301d81', group_key=None, metadata={'headline': 'What&#39;s Going On With ContraFecta Stock Today', 'summary': 'ContraFect Corporation (NASDAQ: CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move.', 'url': 'https://www.benzinga.com/general/biotech/23/04/32052192/whats-going-on-with-contrafecta-stock-today', 'symbols': ['CFRX'], 'author': 'Vandana Singh', 'created_at': datetime.datetime(2023, 4, 27, 18, 24, 49, tzinfo=TzInfo(UTC))}, text=['What&#39;s Going On With ContraFecta Stock Today', 'ContraFect Corporation (NASDAQ: CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move.', 'ContraFect Corporation (NASDAQ:CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move. Wednesday morning, ContraFect announced the dosing of the first patient in Phase 1b/2 of exebacase i

[32m2024-11-17 03:28:36.553[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_payloads[0m:[36m132[0m - [1mTotal chunks: 3[0m
[32m2024-11-17 03:28:36.553[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_payloads[0m:[36m133[0m - [1mUnique chunk IDs: 3[0m
[32m2024-11-17 03:28:36.553[0m | [1mINFO    [0m | [36mstreaming_pipeline.models[0m:[36mto_payloads[0m:[36m134[0m - [1mTotal payloads: 3[0m


Writing 1 embeddings to Qdrant...
The Payloads in Qdrant &&&######### : [{'headline': 'What&#39;s Going On With ContraFecta Stock Today', 'summary': 'ContraFect Corporation (NASDAQ: CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move.', 'url': 'https://www.benzinga.com/general/biotech/23/04/32052192/whats-going-on-with-contrafecta-stock-today', 'symbols': ['CFRX'], 'author': 'Vandana Singh', 'created_at': datetime.datetime(2023, 4, 27, 18, 24, 49, tzinfo=TzInfo(UTC)), 'text': 'What&#39;s Going On With ContraFecta Stock Today'}, {'headline': 'What&#39;s Going On With ContraFecta Stock Today', 'summary': 'ContraFect Corporation (NASDAQ: CFRX) shares are trading higher Thursday morning. However, there is no specific news to justify the move.', 'url': 'https://www.benzinga.com/general/biotech/23/04/32052192/whats-going-on-with-contrafecta-stock-today', 'symbols': ['CFRX'], 'author': 'Vandana Singh', 'created_at': datetime.datetime(2023, 

In [12]:
from typing import List
from pathlib import Path
from langchain_core.embeddings import Embeddings

class CustomEmbeddings(Embeddings):
    """
    Wrapper for the custom embedding model to make it compatible with LangChain.
    """
    
    def __init__(self):
        """
        Initialize the custom embeddings wrapper.
        
        Args:
            model_id: The identifier of the pre-trained transformer model
            max_input_length: Maximum length of input text to tokenize
            device: Device to use for running the model
            cache_dir: Directory to cache the pre-trained model files
        """
        
        self.embedding_model = EmbeddingModelSingleton()

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        Generate embeddings for a list of documents.
        
        Args:
            texts: List of text strings to embed
            
        Returns:
            List of embeddings, one per text
        """
        embeddings = []
        for text in texts:
            embedding = self.embedding_model(text, to_list=True)
            embeddings.append(embedding)
        return embeddings

    def embed_query(self, text: str) -> List[float]:
        """
        Generate embedding for a single query text.
        
        Args:
            text: Text string to embed
            
        Returns:
            Query embedding
        """
        return self.embedding_model(text, to_list=True)

In [44]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_qdrant import QdrantVectorStore, RetrievalMode
from typing import List, Dict
from langchain_core.documents import Document as LangChainDocument

class NewsRAGSystem:
    def __init__(
        self,
        qdrant_url: Optional[str] = None,
        qdrant_api_key: Optional[str] = None,
        openai_api_key: str = "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",
        retrieval_mode: RetrievalMode = RetrievalMode.DENSE,
        content_payload_key: str = "text",
        metadata_payload_key: str = "payload",
    ):
        """
        Initialize the RAG system with Qdrant and OpenAI credentials.
        
        Args:
            qdrant_url: Qdrant Cloud URL
            qdrant_api_key: Qdrant Cloud API key
            collection_name: Name of the collection in Qdrant
            openai_api_key: OpenAI API key
        """
        # Initialize Qdrant client
        self.client = build_qdrant_client(api_key=qdrant_api_key, url=qdrant_url)
        
        # Initialize embeddings
        self.embeddings = CustomEmbeddings()
        
        # Initialize vector store
        self.vectorstore = QdrantVectorStore(
            client=self.client,
            collection_name=constants.VECTOR_DB_OUTPUT_COLLECTION_NAME,
            embedding=self.embeddings,
            content_payload_key=content_payload_key,
            metadata_payload_key=metadata_payload_key,
            retrieval_mode=retrieval_mode
        )
        
        # Initialize retriever
        self.retriever = self.vectorstore.as_retriever(
            search_type="mmr",  # Using MMR for better diversity in results
            search_kwargs={"k": 3}
        )
        
        # Initialize LLM
        self.llm = ChatOpenAI(
            model_name="gpt-4o",
            openai_api_key=openai_api_key,
            base_url="https://openrouter.ai/api/v1"
        )
        
        # Create prompt template
        self.prompt = ChatPromptTemplate.from_template("""You are a helpful assistant that answers questions about financial news articles.
        Use the following pieces of context to answer the question at the end.
        If you don't know the answer, just say that you don't know. Don't try to make up an answer.
        
        Context:
        {context}
        
        Question: {question}
        
        Helpful Answer:""")
        
        # Initialize RAG chain
        self.chain = self._create_rag_chain()
    
    def _format_docs(self, docs: List[LangChainDocument]) -> str:
        """Format documents into a string."""
        print("########### The Documents ########### :: ", docs)
        return "\n\n".join(f'Content {i}:\n{doc.page_content}' for i, doc in enumerate(docs))
    
    def _create_rag_chain(self):
        """
        Create the RAG chain using the latest LangChain syntax.
        """
        # Define the RAG chain
        chain = (
            RunnableParallel(
                {"context": self.retriever | self._format_docs, "question": RunnablePassthrough()}
            )
            | self.prompt
            | self.llm
            | StrOutputParser()
        )
        
        return chain
    
    def query(self, question: str) -> Dict:
        """
        Query the RAG system with a question.
        
        Args:
            question: The question to ask
            
        Returns:
            Dict containing the answer and source documents
        """
        # Get the answer
        answer = self.chain.invoke(question)
        
        # Get source documents
        docs = self.retriever.invoke(question)
        
        # Format source documents
        sources = []
        for doc in docs:
            source = {
                "headline": doc.metadata.get("headline", "N/A"),
                "url": doc.metadata.get("url", "N/A"),
                "created_at": doc.metadata.get("created_at", "N/A"),
                "symbols": doc.metadata.get("symbols", []),
                "author": doc.metadata.get("author", "N/A")
            }
            sources.append(source)
        
        return {
            "answer": answer,
            "sources": sources
        }
    
    def query_by_filters(
        self,
        question: str,
        symbols: List[str] = None,
        date_from: str = None,
        date_to: str = None
    ) -> Dict:
        """
        Query with additional filters for symbols and date range.
        
        Args:
            question: The question to ask
            symbols: List of stock symbols to filter by
            date_from: Start date in ISO format
            date_to: End date in ISO format
            
        Returns:
            Dict containing the answer and filtered source documents
        """
        # Build filter conditions
        filter_conditions = {}
        
        if symbols:
            filter_conditions["symbols"] = {"$in": symbols}
            
        if date_from or date_to:
            filter_conditions["created_at"] = {}
            if date_from:
                filter_conditions["created_at"]["$gte"] = date_from
            if date_to:
                filter_conditions["created_at"]["$lte"] = date_to
        
        # Update retriever search parameters
        self.retriever.search_kwargs["filter"] = filter_conditions
        
        # Get results
        result = self.query(question)
        
        # Reset retriever search parameters
        self.retriever.search_kwargs.pop("filter", None)
        
        return result

In [45]:

# Initialize the RAG system
rag_system = NewsRAGSystem()


INFO:httpx:HTTP Request: GET https://73bdd42b-86a7-49fc-bcf4-e6bf85cfca17.us-east4-0.gcp.cloud.qdrant.io:6333/collections/alpaca_financial_news "HTTP/1.1 200 OK"


In [46]:

# Basic query
question = "What was the FDA designation given to Sangamo Therapeutics for their Fabry Disease treatment?"
result = rag_system.query(question)


INFO:httpx:HTTP Request: GET https://73bdd42b-86a7-49fc-bcf4-e6bf85cfca17.us-east4-0.gcp.cloud.qdrant.io:6333/collections/alpaca_financial_news "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://73bdd42b-86a7-49fc-bcf4-e6bf85cfca17.us-east4-0.gcp.cloud.qdrant.io:6333/collections/alpaca_financial_news/points/query "HTTP/1.1 200 OK"


########### The Documents ########### ::  [Document(metadata={'_id': '40f5b90c-e31b-0e9d-feb5-04fcc98c331f', '_collection_name': 'alpaca_financial_news'}, page_content='Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease'), Document(metadata={'_id': '77970f5e-c01e-e1e3-8427-d97c0958d405', '_collection_name': 'alpaca_financial_news'}, page_content='Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announced that the U.S. Food and Drug Administration (FDA) has granted Fast Track Designation to isaralgagene civaparvovec, or ST-920, a wholly owned gene therapy product candidate for the treatment of Fabry disease. Fast Track designation aims to facilitate the development and expedite the review of new therapeutics that are intended to treat serious or life-threatening conditions and that demonstrate the potential to address unmet medical needs. Companies granted this designation are given th

INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://73bdd42b-86a7-49fc-bcf4-e6bf85cfca17.us-east4-0.gcp.cloud.qdrant.io:6333/collections/alpaca_financial_news "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://73bdd42b-86a7-49fc-bcf4-e6bf85cfca17.us-east4-0.gcp.cloud.qdrant.io:6333/collections/alpaca_financial_news/points/query "HTTP/1.1 200 OK"


In [47]:
result

{'answer': 'The FDA granted Fast Track Designation to Sangamo Therapeutics for their Fabry Disease treatment, isaralgagene civaparvovec (ST-920).',
 'sources': [{'headline': 'N/A',
   'url': 'N/A',
   'created_at': 'N/A',
   'symbols': [],
   'author': 'N/A'},
  {'headline': 'N/A',
   'url': 'N/A',
   'created_at': 'N/A',
   'symbols': [],
   'author': 'N/A'},
  {'headline': 'N/A',
   'url': 'N/A',
   'created_at': 'N/A',
   'symbols': [],
   'author': 'N/A'}]}

In [42]:
rag_system.vectorstore.similarity_search(
    "What was the FDA designation given to Sangamo Therapeutics for their Fabry Disease treatment?", k=2
)

INFO:httpx:HTTP Request: POST https://73bdd42b-86a7-49fc-bcf4-e6bf85cfca17.us-east4-0.gcp.cloud.qdrant.io:6333/collections/alpaca_financial_news/points/query "HTTP/1.1 200 OK"


[Document(metadata={'_id': '40f5b90c-e31b-0e9d-feb5-04fcc98c331f', '_collection_name': 'alpaca_financial_news'}, page_content='Sangamo Therapeutics Receives U.S. FDA Fast Track Designation For Isaralgagene Civaparvovec For The Treatment Of Fabry Disease'),
 Document(metadata={'_id': '77970f5e-c01e-e1e3-8427-d97c0958d405', '_collection_name': 'alpaca_financial_news'}, page_content='Sangamo Therapeutics, Inc. (NASDAQ:SGMO), a genomic medicine company, today announced that the U.S. Food and Drug Administration (FDA) has granted Fast Track Designation to isaralgagene civaparvovec, or ST-920, a wholly owned gene therapy product candidate for the treatment of Fabry disease. Fast Track designation aims to facilitate the development and expedite the review of new therapeutics that are intended to treat serious or life-threatening conditions and that demonstrate the potential to address unmet medical needs. Companies granted this designation are given the opportunity for more frequent interacti