In [1]:
import os
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
mistral_api_key = os.environ["MISTRAL_API_KEY"]

In [None]:
# poetry add pypdf
# or pip install pypdf

In [None]:
# Data load from PDF

In [2]:
from langchain_community.document_loaders import PyPDFLoader

loader = PyPDFLoader('./data/kafka_definite_guide.pdf')

loaded_data = loader.load_and_split()

In [None]:
# Using Vector DB from Chroma and Embedding from langchain_huggingface

In [3]:
from langchain_chroma import Chroma

from langchain_huggingface import HuggingFaceEmbeddings

In [4]:
# Initialize Vector DB. Proving PDF content and Embedding details.

vectorstore = Chroma.from_documents(
    documents=loaded_data, 
    embedding=HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
)



  from tqdm.autonotebook import tqdm, trange


In [5]:
# Creating Vector retriever for LLM 

retriever = vectorstore.as_retriever(search_kwargs={"k": 5})



In [7]:
# Pass three things to LLM

# 1. Instruction or Prompt to LLM
# 2. Context or Content as PDF chunks in splited
# 3. Question or query

from langchain_core.prompts import ChatPromptTemplate
template = """Answer the question based only on the following context. If context is not available then you reply based on your information:

{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)


In [8]:
# Helping function for formating 

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)



In [13]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_mistralai import ChatMistralAI

In [14]:
# Create LLM

llm = ChatMistralAI(model_name="mistral-large-latest", temperature=0.7)



In [15]:
# Create a chain function. It will take PDF content as LLM context, format those input
# Question through invoke method
# call Prompt variable
# hit LLM
# Show output


rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)


In [16]:
query= "what is the use of Apache Kafka"
for chunk in rag_chain.stream(query):
    print(chunk, end="", flush=True)

Based on the provided context,

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


 Apache Kafka is used for the following purposes:

1. **Messaging System**: Kafka behaves as a message queue, providing ordering guarantees on the messages it stores. It allows clients to send messages to brokers, which then process and respond to these requests.

2. **Data Streaming**: Kafka is used for building real-time data pipelines and streaming apps. It is a publish-subscribe messaging system that allows for high-throughput and low-latency data streaming.

3. **Distributed Systems**: Kafka is used in distributed systems to facilitate communication between different components or microservices. It helps manage and route data between various parts of a distributed system.

4. **Data Integration**: Kafka is used for integrating data from various sources and making it available to different targets. It can connect to external systems (for data import/export) through Kafka Connect and provide real-time data integration.

5. **Log Aggregation**: Kafka can be used for log aggregation s

In [18]:
# Call direct LLM without RAG for same question

response = llm.invoke("what is the use of Apache Kafka")
response.content

'Apache Kafka is a distributed event streaming platform that is widely used for building real-time data pipelines and streaming applications. Its primary uses include:\n\n1. **Real-Time Data Pipeline**:\n   - **Data Ingestion**: Kafka can ingest large volumes of data from various sources such as databases, sensors, logs, and other applications.\n   - **Data Processing**: It allows for real-time data processing using stream processing frameworks like Apache Flink, Apache Spark, or Kafka Streams.\n   - **Data Storage**: Kafka can store data for a configurable amount of time, making it easy to replay and analyze historical data.\n\n2. **Event Streaming**:\n   - **Publish-Subscribe Messaging**: Kafka supports a publish-subscribe messaging model, where producers publish data to topics, and consumers subscribe to those topics to read the data.\n   - **Event-Driven Architectures**: It is often used to build event-driven applications where different components react to events in real-time.\n\n