<a href="https://colab.research.google.com/github/RachitrajeshParihar/Convo__summarization_extraction/blob/main/Convo__summarization_extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Conversation Summarization & Structured Extraction with Groq API

## This notebook demonstrates how to:

1.   Summarize user ↔ assistant conversations.
2.   Extract structured JSON data (intent, topics, sentiment, named entities, action items, confidence score) from conversations.


We will use Groq APIs, which are OpenAI-SDK compatible.
That means you can call them using openai-style code (client.chat.completions.create) or via raw HTTP requests with requests.

## How to use this notebook :
1. Run dependency installation.
2. Enter your Groq API key (either via Colab prompt or environment variable).
3. Test summarization and structured extraction on sample conversations.
4. Batch process and export results to .jsonl for downstream analysis.
5. Save outputs (README.md, LICENSE, requirements file) for publishing to GitHub.

Note:
* Use the grok API which has access to openAI's supproted models. This notebook does not provide any such API.


In [None]:
# Install required dependencies
!pip install -q openai pandas requests python-dotenv tqdm

import os
import json
import time
import re
import logging
import requests
import pandas as pd
from tqdm import tqdm
from typing import List, Dict, Any, Tuple, Optional

try:
    from dotenv import load_dotenv
except ImportError:
    pass

##Authentication
You can authenticate in two ways:

Option A: Enter API key interactively (not saved).

Option B: Store API key in **.env** or environment variables (**GROQ_API_KEY**).

In Google Colab, you can safely store secrets via:

**%env GROQ_API_KEY = your_key_here**



In [None]:
# Load environment variables (if .env exists)
try:
    load_dotenv()
except:
    pass

# Option A: Interactive prompt
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
    GROQ_API_KEY = input("Enter GROQ API key (won't be saved): ").strip()

assert GROQ_API_KEY, "Groq API key must be provided"

Note:
* The open AI model can be changed in the code as per convenience. You need to changed the commented part in the following cell.

In [None]:
class GroqClientWrapper:
    """
    Wrapper to interact with Groq API, compatible with OpenAI SDK style.
    Provides fallback to raw HTTP requests if needed.
    """
    def __init__(self, api_key: str, base_url: Optional[str] = None, use_openai_client: bool = True):
        self.api_key = api_key
        self.base_url = base_url or "https://api.groq.com/openai/v1"
        self.use_openai_client = use_openai_client

        # Setup logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger("GroqClientWrapper")

        if use_openai_client:
            try:
                import openai
                self.client = openai.OpenAI(api_key=api_key, base_url=self.base_url)
            except Exception as e:
                self.logger.warning("OpenAI client not available, falling back to requests.")
                self.use_openai_client = False
                self.client = None
        else:
            self.client = None

    def chat_completion(self, messages: List[Dict[str, str]], model: str = "openai/gpt-oss-120b", # Change model here : model: str = "YOUR_MODEL_NAME"
                        temperature: float = 0.0, max_tokens: int = 512, **kwargs) -> str:
        """Perform chat completion with retries and exponential backoff."""
        retries = 5
        backoff = 1
        for attempt in range(retries):
            try:
                if self.use_openai_client and self.client:
                    resp = self.client.chat.completions.create(
                        model=model,
                        messages=messages,
                        temperature=temperature,
                        max_tokens=max_tokens,
                        **kwargs
                    )
                    return resp.choices[0].message.content.strip()
                else:
                    headers = {"Authorization": f"Bearer {self.api_key}"}
                    url = f"{self.base_url}/chat/completions"
                    payload = {
                        "model": model,
                        "messages": messages,
                        "temperature": temperature,
                        "max_tokens": max_tokens,
                    }
                    r = requests.post(url, headers=headers, json=payload)
                    if r.status_code == 200:
                        js = r.json()
                        return js["choices"][0]["message"]["content"].strip()
                    elif r.status_code == 429:
                        self.logger.warning("Rate limit hit, retrying...")
                        time.sleep(backoff)
                        backoff *= 2
                        continue
                    else:
                        r.raise_for_status()
            except Exception as e:
                self.logger.warning(f"Error: {e}, retrying in {backoff}s...")
                time.sleep(backoff)
                backoff *= 2

        raise RuntimeError("Max retries exceeded for chat_completion")

In [None]:
# Prompt templates

SUMMARIZATION_SYSTEM_PROMPT = {
    "role": "system",
    "content": "You are a helpful assistant that summarizes conversations in 1-3 concise sentences."
}

STRUCTURED_EXTRACTION_SYSTEM_PROMPT = {
    "role": "system",
    "content": """You are an information extraction assistant.
Extract structured data as a strict JSON object with the following schema:
{
  "conversation_id": "<string>",
  "summary": "<string>",
  "intent": "<string>",
  "topics": ["<string>", ...],
  "sentiment": "positive|neutral|negative",
  "named_entities": ["<string>", ...],
  "action_items": ["<string>", ...],
  "confidence": <float 0.0-1.0>,
  "schema_version": "1.0"
}
Output ONLY a single valid JSON object (no markdown, no commentary).
If you cannot extract a value leave empty string or empty array.
Confidence should be numeric 0-1.
"""
}

In [None]:
def load_sample_conversations() -> pd.DataFrame:
    """Return small dataset of conversations for demonstration."""
    sample_data = [
        {
            "conversation_id": "conv1",
            "messages": [
                {"role": "user", "content": "Can you suggest resources to learn Python?"},
                {"role": "assistant", "content": "Sure! You can try Codecademy, Real Python, or the official docs."}
            ],
            "annotated_intent": "learn_resources"
        },
        {
            "conversation_id": "conv2",
            "messages": [
                {"role": "user", "content": "Your app keeps crashing when I click 'Export'."},
                {"role": "assistant", "content": "Sorry to hear! Could you provide the error message?"}
            ],
            "annotated_intent": "report_bug"
        },
        {
            "conversation_id": "conv3",
            "messages": [
                {"role": "user", "content": "Can I get a copy of my invoice for last month?"},
                {"role": "assistant", "content": "Yes, you can download it in your billing dashboard."}
            ],
            "annotated_intent": "billing_query"
        },
        {
            "conversation_id": "conv4",
            "messages": [
                {"role": "user", "content": "Please cancel my subscription effective immediately."},
                {"role": "assistant", "content": "Understood. I will process your cancellation."}
            ],
            "annotated_intent": "cancel_subscription"
        },
    ]
    return pd.DataFrame(sample_data)

def format_messages_for_api(messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
    return messages

def safe_parse_json(text: str) -> Tuple[Optional[Dict[str, Any]], bool]:
    """Attempt to safely parse JSON from text. Handle common formatting issues."""
    try:
        return json.loads(text), True
    except Exception:
        match = re.search(r"\{.*\}", text, re.DOTALL)
        if match:
            candidate = match.group(0)
            candidate = re.sub(r",\s*}", "}", candidate)
            candidate = re.sub(r",\s*]", "]", candidate)
            try:
                return json.loads(candidate), True
            except Exception:
                return None, False
    return None, False

In [None]:
def summarize_conversation(conversation_id: str, messages: List[Dict[str, str]], client: GroqClientWrapper, max_tokens: int = 150):
    prompt = [SUMMARIZATION_SYSTEM_PROMPT] + format_messages_for_api(messages)
    start = time.time()
    summary = client.chat_completion(prompt, max_tokens=max_tokens)
    end = time.time()
    return {
        "conversation_id": conversation_id,
        "summary": summary,
        "metadata": {"runtime_sec": round(end - start, 2)}
    }

# Test
df = load_sample_conversations()
client = GroqClientWrapper(GROQ_API_KEY)
example = summarize_conversation(df.iloc[0].conversation_id, df.iloc[0].messages, client)
example

In [None]:
def extract_structured(conversation_id: str, messages: List[Dict[str, str]], client: GroqClientWrapper, max_tokens: int = 512):
    prompt = [STRUCTURED_EXTRACTION_SYSTEM_PROMPT] + format_messages_for_api(messages)
    response = client.chat_completion(prompt, max_tokens=max_tokens)

    data, valid_json = safe_parse_json(response)
    if not valid_json or not isinstance(data, dict):
        return {"conversation_id": conversation_id, "valid_json": False}, False

    # Ensure required keys exist
    required_keys = ["conversation_id","summary","intent","topics","sentiment","named_entities","action_items","confidence","schema_version"]
    for key in required_keys:
        if key not in data:
            data[key] = "" if key not in ["topics","named_entities","action_items"] else []
    try:
        data["confidence"] = min(max(float(data.get("confidence", 0.0)), 0.0), 1.0)
    except:
        data["confidence"] = 0.0

    return data, True

# Quick test
res, ok = extract_structured(df.iloc[0].conversation_id, df.iloc[0].messages, client)
res, ok

In [None]:
def process_all(df: pd.DataFrame, client: GroqClientWrapper, out_filename="results.jsonl"):
    results = []
    with open(out_filename, "w") as f:
        for _, row in tqdm(df.iterrows(), total=len(df)):
            summary_info = summarize_conversation(row.conversation_id, row.messages, client)
            structured_info, valid = extract_structured(row.conversation_id, row.messages, client)
            out_record = {"conversation_id": row.conversation_id,
                          "summary": summary_info["summary"],
                          "structured": structured_info,
                          "valid_json": valid}
            results.append(out_record)
            f.write(json.dumps(out_record)+"\n")
    return results

# Process on sample data
all_results = process_all(df, client)
all_results[:2]

In [None]:
# Evaluation & Simple Tests

# Test safe_parse_json
assert safe_parse_json('{"a":1}')[1] == True
assert isinstance(safe_parse_json('text {"a":1} text')[0], dict)

# Validate extraction has required keys
structured, ok = extract_structured(df.iloc[1].conversation_id, df.iloc[1].messages, client)
required = ["conversation_id","summary","intent","topics","sentiment","named_entities","action_items","confidence","schema_version"]
assert all(k in structured for k in required)

print("All tests passed.")

#Error Handling and Rate-Limits
The **GroqClientWrapper** implements exponential backoff:

* Retries up to 5 times.

* On **429 Too Many Requests**, waits (1s, 2s, 4s, …).

* Logs warnings but hides API keys.

You can tune **retries** by modifying **retries** and **backoff** logic.