<a href="https://colab.research.google.com/github/Pavun-KumarCH/Agentic-RAG-Systems/blob/main/BQProcessor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title requirements
%pip install --q langchain_groq langchain_core

In [None]:
#@title Libraries
import os, time, queue, threading
from typing import List, Dict
from datetime import datetime
from langchain_core.prompts import PromptTemplate
from langchain_groq import ChatGroq

# Environment
from google.colab import userdata
os.environ['GROQ_API_KEY'] = userdata.get('GROQ_API_KEY')

In [None]:
#@title Initializing Mistral:7B Model

llm = ChatGroq(
  model = "gemma2-9b-it",
  temperature = 0.4,
  max_tokens = None,
  timeout = None,
  max_retries = 2
)

In [None]:
#@title Prompt
# Prompt Template for Customer Service Assistant
prompt_template = """Customer Support Conversation:
{conversation}
AI: You are a chatbot simulating a customer support assistant. Provide helpful, concise answers to customer inquiries without unnecessary details.
"""
# PromptTemplate Instance
chat_prompt = PromptTemplate(
  template = prompt_template,
  input_variables = ["conversation"]
)

In [None]:
#@title Class Batch Processor
class BatchProcessor:
  def __init__(self, processor_id: int, batch_size: int, batch_interval: float):
    self.processor_id = processor_id
    self.batch_size = batch_size
    self.batch_interval = batch_interval
    self.request_queue = queue.Queue()
    self.lock = threading.Lock()
    self._stop_event = threading.Event()
    self.worker_thread = None

# Start Batching
  def start_batching(self):
    if self.worker_thread is None or not self.worker_thread.is_alive():
      self._stop_event.clear()
      self.worker_thread = threading.Thread(target = self.batch_worker)
      self.worker_thread.start()
      print(f"Batch processor {self.processor_id} started batching.")

# Add Request
  def add_request(self, request_data: Dict[str, str]):
    self.request_queue.put(request_data)
    print(f"{request_data}Request added to processor {self.processor_id}.")

# Process Batch
  def process_batch_with_langchain(self, batch: List[Dict[str, str]]):
    formatted_prompts = [
        chat_prompt.format(conversation = "\n".join(f"{item['role'].capitalize()}: {item['content']}" for item in [request]))
        for request in batch
    ]

    # Batch Method
    responses = llm.batch(formatted_prompts)
    for response in responses:
      print(f"Response from processor {self.processor_id}: {response}")
      print(response, "\n")

# Batch Worker
  def batch_worker(self):
    while not self._stop_event.is_set():
      batch = []
      start_time = time.time()
      while len(batch) < self.batch_size and (time.time() - start_time) < self.batch_interval:
        try:
          request_data = self.request_queue.get(timeout = self.batch_interval - (time.time() - start_time))
          batch.append(request_data)
          self.request_queue.task_done()
        except queue.Empty:
          break

      if batch:
        print(f"Processor {self.processor_id} - Processing batch of size {len(batch)}")
        with self.lock:
          self.process_batch_with_langchain(batch)
        print(f"Processor {self.processor_id} - Batch processor complete.\n")
      else:
        print(f"Processor {self.processor_id} - No requests  to process in the cycle.")


# Stop Batching
  def stop_batching(self):
    self._stop_event.set()
    if self.worker_thread:
      self.worker_thread.join()
      self.worker_thread = None
      print(f"Batch processor {self.processor_id} stopped batching.")

In [None]:
#@title Class Load Balancer -> round robin distribution
class LoadBalancer:
  def __init__(self, processors:List['BatchProcessor']):
    self.processors = processors
    self.current_processor_index = 0

  def distribute_request(self, request_data: Dict[str, str]):
    processor = self.processors[self.current_processor_index]
    processor.add_request(request_data)
    self.current_processor_index = (self.current_processor_index + 1) % len(self.processors)


# Chat Interactions as Incoming Requests
chat_interactions = [
  {"role": "user", "content": "Hello, I need help with my account login issues."},
  {"role": "user", "content": "Can you tell me about the latest offers on premium memberships?"},
  {"role": "user", "content": "My last payment didn't go through. Can you help me resolve this?"},
  {"role": "user", "content": "How can I update my contact details in my profile?"},
  {"role": "user", "content": "Can you suggest the best plan for a small business?"},
  {"role": "user", "content": "I forgot my password. Can you help me reset it?"},
  {"role": "user", "content": "I'm having trouble with the app crashing frequently."},
  {"role": "user", "content": "Can I cancel my subscription? If so, how?"},
  {"role": "user", "content": "I need help understanding the billing statement I received."},
  {"role": "user", "content": "How do I enable notifications for account updates?"}
]

# Initializing two batch processors
batch_processor1 = BatchProcessor(processor_id = 1, batch_size = 2, batch_interval = 10)
batch_processor2 = BatchProcessor(processor_id = 2, batch_size = 2, batch_interval = 10)

# Initializing load balancer with processors
load_balancer = LoadBalancer(processors = [batch_processor1, batch_processor2])

# Starting batching for both processors
batch_processor1.start_batching()
batch_processor2.start_batching()

# Adding each meassge to the load balancer with a timestamp
for interaction in chat_interactions:
  interaction_with_timestamp = interaction.copy()
  interaction_with_timestamp["timestamp"] = datetime.now().isoformat()
  load_balancer.distribute_request(interaction_with_timestamp)
  time.sleep(1)

# Stopping batching for both processors
batch_processor1.stop_batching()
batch_processor2.stop_batching()