Conversation Management & JSON Extraction (Groq/OpenAI-compatible)
Author: Ayush Rai
Date: 2025-09-13

In [4]:
!pip install --upgrade openai jsonschema

Collecting openai
  Using cached openai-1.107.2-py3-none-any.whl.metadata (29 kB)
Collecting jsonschema
  Using cached jsonschema-4.25.1-py3-none-any.whl.metadata (7.6 kB)
Using cached openai-1.107.2-py3-none-any.whl (946 kB)
Using cached jsonschema-4.25.1-py3-none-any.whl (90 kB)
Installing collected packages: openai, jsonschema

   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   ---------------------------------------- 0/2 [openai]
   -------------------------------------



In [None]:
import os
from dotenv import load_dotenv

load_dotenv()
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
if not GROQ_API_KEY:
    print("WARNING: GROQ_API_KEY not set.")

In [None]:
import openai

def make_groq_client():
    api_key = os.environ.get("GROQ_API_KEY")
    if not api_key:
        raise RuntimeError("Please set the GROQ_API_KEY environment variable before calling the API.")
    # The OpenAI Python client can be constructed by setting api_key and base_url:
    client = openai.OpenAI(api_key=api_key, base_url="https://api.groq.com/openai/v1")
    return client

Task 1: Conversation Management & Summarization

Implementation notes:
Summarization uses the LLM.
After every k-th 'run', replace older history with the summary according to rules.

In [10]:
import time
from typing import List, Dict, Any

class ConversationManager:
    def __init__(self, client=None, system_prompt=None):
        self.history: List[Dict[str, Any]] = []
        self.client = client
        self.system_prompt = system_prompt or "You are an assistant."
        if self.system_prompt:
            self.history.append({"role": "system", "content": self.system_prompt})
        self.run_count = 0

    def append_message(self, role: str, content: str, meta: dict=None):
        if meta is None:
            meta = {}
        self.history.append({"role": role, "content": content, "meta": meta})

    def _last_n_turns(self, n:int) -> List[Dict[str,str]]:
        msgs = self.history[1:] if self.history and self.history[0]['role']=='system' else self.history
        return msgs[-(2*n):] if n>0 else []

    def truncate_by_turns(self, n:int):
        sys = [self.history[0]] if self.history and self.history[0]['role']=='system' else []
        kept = self._last_n_turns(n)
        self.history = sys + kept

    def truncate_by_chars(self, max_chars:int):
        sys = [self.history[0]] if self.history and self.history[0]['role']=='system' else []
        msgs = self.history[1:] if sys else self.history[:]
        total = 0
        kept = []
        # iterate reversed to keep recent
        for m in reversed(msgs):
            l = len(m['content'])
            if total + l > max_chars and kept:
                break
            kept.append(m)
            total += l
        kept.reverse()
        self.history = sys + kept

    def get_history_text(self):
        return "\n".join([f"{m['role']}: {m['content']}" for m in self.history])

    def summarize_history(self, client, max_output_tokens=512):
        if not client:
            raise RuntimeError("Client not provided for summarization.")

        messages = [
            {"role":"system", "content": "You are an expert summarizer. Produce a concise summary (1-3 sentences) capturing key user intents and facts. Output only the summary."},
            {"role":"user", "content": "Summarize the conversation below:\n\n" + self.get_history_text()}
        ]
        # Use chat completion
        resp = client.chat.completions.create(
            model="openai/gpt-oss-20b", 
            messages=messages,
            max_tokens=max_output_tokens,
            temperature=0.1
        )
        summary_text = resp.choices[0].message.content.strip()
        return summary_text

    def periodic_summarize_and_replace(self, client, k:int, keep_recent_turns:int=2):
        if k <= 0:
            return None
        if (self.run_count > 0) and (self.run_count % k == 0):
            summary = self.summarize_history(client)
            # Build new history: system + summary (assistant) + last keep_recent_turns turns
            sys = [self.history[0]] if self.history and self.history[0]['role']=='system' else []
            tmp_mgr = ConversationManager(system_prompt=None)
            tmp_mgr.history = self.history.copy()
            recent = tmp_mgr._last_n_turns(keep_recent_turns)
            self.history = sys + [{"role":"assistant","content":f"[Summary]: {summary}"}] + recent
            return summary
        return None

    def run_one_step(self, user_input:str, client, do_summary_every_k:int=3):
        self.append_message("user", user_input)
        # build messages for LLM from current history
        messages = [{"role":m["role"], "content":m["content"]} for m in self.history]
        messages.append({"role":"user", "content":user_input}) 

        resp = client.chat.completions.create(
            model="openai/gpt-oss-20b",
            messages=messages,
            max_tokens=256,
            temperature=0.2
        )
        assistant_text = resp.choices[0].message.content.strip()
        self.append_message("assistant", assistant_text)
        self.run_count += 1

        summary = self.periodic_summarize_and_replace(client, k=do_summary_every_k, keep_recent_turns=2)
        return assistant_text, summary


Demonstration

In [11]:

class DummyClient:
    def __init__(self):
        pass
    class chat:
        class completions:
            @staticmethod
            def create(model, messages, max_tokens=256, temperature=0.2):
                last_user = [m for m in messages if m['role']=='user'][-1]['content']
                if "Summarize the conversation below:" in messages[0]['content']:
                    text = messages[1]['content'].split("Summarize the conversation below:\n\n")[-1]
                    s = ("Summary: " + text[:120].replace("\n", " ") + "...").strip()
                    class R: pass
                    R.choices = [type("C",(object,),{"message":type("M",(object,),{"content":s})})()]
                    return R()
                else:
                    reply = f"Assistant reply to: {last_user[:80]}"
                    class R: pass
                    R.choices = [type("C",(object,),{"message":type("M",(object,),{"content":reply})})()]
                    return R()

dm = ConversationManager(system_prompt="System: You are a helpful assistant.")
dummy_client = DummyClient()
runs = [
    "Hi, I want to book a flight from Bangalore to Delhi next month.",
    "Actually date changed to Oct 25, preference evening flight.",
    "I also need a hotel near Connaught Place.",
    "Add dietary preference: vegetarian meals."
]

print("=== Demo: periodic summarization after every 3 runs ===")
for i, u in enumerate(runs, start=1):
    assistant_text, summary = dm.run_one_step(u, client=dummy_client, do_summary_every_k=3)
    print(f"\n--- Run {i} ---")
    print("User:", u)
    print("Assistant:", assistant_text)
    if summary:
        print("PERIODIC SUMMARY CREATED:\n", summary)
    print("History now:")
    print(dm.get_history_text())


=== Demo: periodic summarization after every 3 runs ===

--- Run 1 ---
User: Hi, I want to book a flight from Bangalore to Delhi next month.
Assistant: Assistant reply to: Hi, I want to book a flight from Bangalore to Delhi next month.
History now:
system: System: You are a helpful assistant.
user: Hi, I want to book a flight from Bangalore to Delhi next month.
assistant: Assistant reply to: Hi, I want to book a flight from Bangalore to Delhi next month.

--- Run 2 ---
User: Actually date changed to Oct 25, preference evening flight.
Assistant: Assistant reply to: Actually date changed to Oct 25, preference evening flight.
History now:
system: System: You are a helpful assistant.
user: Hi, I want to book a flight from Bangalore to Delhi next month.
assistant: Assistant reply to: Hi, I want to book a flight from Bangalore to Delhi next month.
user: Actually date changed to Oct 25, preference evening flight.
assistant: Assistant reply to: Actually date changed to Oct 25, preference eveni

Task 2: JSON Schema Classification & Function-calling style extraction

In [None]:
import json
from jsonschema import validate, ValidationError

# JSON Schema
user_info_schema = {
    "type": "object",
    "properties": {
        "name": {"type":"string"},
        "email": {"type": "string", "format":"email"},
        "phone": {"type": "string", "pattern": "^[0-9\\+\\-\\(\\)\\s]{7,20}$"},
        "location": {"type":"string"},
        "age": {"type":"integer", "minimum":0, "maximum":150}
    },
    "required": ["name", "email"],
    "additionalProperties": False
}

functions = [
    {
        "name": "extract_user_info",
        "description": "Extract user contact and basic demographics from chat.",
        "parameters": user_info_schema
    }
]


# Sample
sample_chats = [
    "Hi, I'm Ayush Rai. You can reach me at ayushrai@gmail.com or +91-9876543210. I'm 22 and live in Lucknow.",
    "Hello - name: Aisha Khan, email: aisha.k@example.org; phone (555) 203-9988. Location: New Delhi. Age: 29.",
    "Hey, it's John. Email john_doe@nomail. I don't want to give phone. I'm 32 from Bangalore."
]

In [14]:

def call_extract_function(client, chat_text, model="openai/gpt-oss-20b"):

    messages = [
        {"role":"system", "content": "You are an extraction agent. Strictly return JSON matching the function schema."},
        {"role":"user", "content": chat_text}
    ]
    resp = client.chat.completions.create(
        model=model,
        messages=messages,
        functions=functions,
        function_call={"name":"extract_user_info"},
        max_tokens=300,
        temperature=0.0
    )
    
    choice = resp.choices[0]
    fc = getattr(choice.message, "function_call", None)
    if fc is None:
        raw = choice.message.content.strip()
        try:
            data = json.loads(raw)
            return data
        except Exception:
            raise ValueError("Model didn't return valid JSON for function args.")
    else:
        args_text = fc.get("arguments") if isinstance(fc, dict) else fc.arguments
        try:
            data = json.loads(args_text)
            return data
        except Exception as e:
            raise ValueError("Failed to parse function arguments JSON: " + str(e))


In [18]:
client = make_groq_client()

for idx, chat in enumerate(sample_chats, start=1):
    print(f"\n--- Sample {idx} ---")
    try:
        extracted = call_extract_function(client, chat, model="openai/gpt-oss-20b") 
        print("Extracted:", extracted)
        
        # Validate
        validate(instance=extracted, schema=user_info_schema)
        print("✅ Schema validation passed")
    except ValidationError as ve:
        print("❌ Schema validation failed:", ve.message)
    except Exception as e:
        print("⚠️ Extraction error:", e)


--- Sample 1 ---
Extracted: {'age': 24, 'email': 'rahul.sharma@email.com', 'location': 'Pune', 'name': 'Rahul Sharma', 'phone': '+91-9876543210'}
✅ Schema validation passed

--- Sample 2 ---
Extracted: {'age': 29, 'email': 'aisha.k@example.org', 'location': 'New Delhi', 'name': 'Aisha Khan', 'phone': '(555) 203-9988'}
✅ Schema validation passed

--- Sample 3 ---
Extracted: {'age': 32, 'email': 'john_doe@nomail', 'location': 'Bangalore', 'name': 'John'}
✅ Schema validation passed
