# Step 1: Prepare Unified Dataset from Swagger JSON

In [21]:
import json
import sys
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple, Set

INPUT_FILE = "swagger.json"
OUTPUT_FILE = "api_unified_data_full.json"

# -----------------------------
# Load OpenAPI (3.x) document
# -----------------------------
with open(INPUT_FILE, "r", encoding="utf-8") as f:
    doc: Dict[str, Any] = json.load(f)

# Basic handles
components: Dict[str, Any] = doc.get("components", {}) or {}
schemas: Dict[str, Any]    = components.get("schemas", {}) or {}
parameters_comp: Dict[str, Any] = components.get("parameters", {}) or {}
responses_comp: Dict[str, Any]  = components.get("responses", {}) or {}
paths: Dict[str, Any]      = doc.get("paths", {}) or {}

# -----------------------------
# JSON Pointer ($ref) resolver
# -----------------------------
def resolve_pointer(doc_obj: Dict[str, Any], ref: str) -> Optional[Dict[str, Any]]:
    """
    Resolve JSON pointer of the form '#/a/b/c' anywhere in the OpenAPI doc.
    Returns a dict or None if not found.
    """
    if not isinstance(ref, str) or not ref.startswith("#/"):
        return None
    parts = ref.lstrip("#/").split("/")
    cur: Any = doc_obj
    for p in parts:
        if isinstance(cur, dict) and p in cur:
            cur = cur[p]
        else:
            return None
    if isinstance(cur, dict):
        return cur
    return None

def deep_copy(obj: Any) -> Any:
    return json.loads(json.dumps(obj))

# ---------------------------------------
# Schema flattening (handles allOf/oneOf)
# ---------------------------------------
def merge_dict(dst: Dict[str, Any], src: Dict[str, Any]) -> Dict[str, Any]:
    """
    Shallow merge with priority to existing dst keys; merges nested 'properties' and 'required'
    safely for OpenAPI objects.
    """
    out = deep_copy(dst)
    for k, v in (src or {}).items():
        if k == "properties" and isinstance(v, dict):
            out.setdefault("properties", {})
            out["properties"].update(v)
        elif k == "required" and isinstance(v, list):
            out.setdefault("required", [])
            # keep order, avoid duplicates
            for item in v:
                if item not in out["required"]:
                    out["required"].append(item)
        else:
            # don't overwrite existing non-empty values unless empty
            if k not in out or out[k] in (None, "", [], {}):
                out[k] = v
    return out

def flatten_schema(schema: Dict[str, Any], visited: Set[str]) -> Dict[str, Any]:
    """
    Recursively resolve $ref and flatten allOf/oneOf/anyOf blocks.
    Returns a single merged schema dict (best-effort).
    """
    if not isinstance(schema, dict):
        return {}

    # Resolve $ref first
    if "$ref" in schema:
        target = resolve_pointer(doc, schema["$ref"])
        if target is None:
            return {"$ref_unresolved": schema["$ref"]}
        # guard against cycles
        ref_key = schema["$ref"]
        if ref_key in visited:
            return {"$ref_cycle": ref_key}
        visited.add(ref_key)
        resolved = flatten_schema(target, visited)
        # allow local overrides (e.g., description, nullable on the $ref)
        return merge_dict(resolved, {k: v for k, v in schema.items() if k != "$ref"})

    out = deep_copy(schema)

    # Handle composition
    for key in ("allOf", "oneOf", "anyOf"):
        if key in out and isinstance(out[key], list):
            merged = {}
            for part in out[key]:
                merged = merge_dict(merged, flatten_schema(part, visited))
            # prefer merged over list
            out.pop(key, None)
            out = merge_dict(out, merged)

    # Flatten items for arrays
    if out.get("type") == "array" and "items" in out:
        out["items"] = flatten_schema(out["items"], visited)

    # Flatten properties for objects
    if out.get("type") == "object" and "properties" in out:
        props = out.get("properties", {})
        flat_props = {}
        for pname, pschema in props.items():
            flat_props[pname] = flatten_schema(pschema, visited)
        out["properties"] = flat_props

    return out

# -------------------------------------------------------
# Pretty print of schema into a multiline documentation
# -------------------------------------------------------
def schema_to_text(schema: Dict[str, Any], depth: int = 0) -> str:
    """
    Turn a schema dict into a human-readable multiline string.
    """
    indent = "  " * depth
    if not schema:
        return f"{indent}- (empty)\n"

    out = ""
    if "$ref_unresolved" in schema:
        out += f"{indent}- $ref unresolved: {schema['$ref_unresolved']}\n"
        return out
    if "$ref_cycle" in schema:
        out += f"{indent}- $ref cycle detected: {schema['$ref_cycle']}\n"
        return out

    stype = schema.get("type", "object")
    fmt   = schema.get("format")
    desc  = schema.get("description")
    enum  = schema.get("enum")
    nullable = schema.get("nullable", False)

    out += f"{indent}- Type: {stype}{f' ({fmt})' if fmt else ''}{' (nullable)' if nullable else ''}\n"
    if desc:
        out += f"{indent}  Description: {desc}\n"
    if enum and isinstance(enum, list) and len(enum) > 0:
        out += f"{indent}  Enum: {', '.join(map(str, enum))}\n"

    # Object properties
    if stype == "object":
        required = schema.get("required", []) or []
        props = schema.get("properties", {}) or {}
        if props:
            for pname, pschema in props.items():
                req_flag = " (required)" if pname in required else ""
                out += f"{indent}  Field: {pname}{req_flag}\n"
                out += schema_to_text(pschema, depth + 2)

    # Arrays
    if stype == "array":
        items = schema.get("items", {})
        out += f"{indent}  Items:\n"
        out += schema_to_text(items, depth + 2)

    # Example/Default
    if "example" in schema:
        out += f"{indent}  Example: {json.dumps(schema['example'], ensure_ascii=False)}\n"
    if "default" in schema:
        out += f"{indent}  Default: {schema['default']}\n"

    return out

# ---------------------------------------------
# Helper: gather schema names used by an entity
# ---------------------------------------------
def collect_ref_names(schema: Dict[str, Any]) -> Set[str]:
    """
    Collect component schema names referenced inside a schema dict
    (via $ref to #/components/schemas/NAME). Useful for dependency graph.
    """
    found: Set[str] = set()

    def walk(node: Any):
        if isinstance(node, dict):
            if "$ref" in node and isinstance(node["$ref"], str):
                ref = node["$ref"]
                if ref.startswith("#/components/schemas/"):
                    found.add(ref.split("/")[-1])
            # recurse
            for v in node.values():
                walk(v)
        elif isinstance(node, list):
            for item in node:
                walk(item)

    walk(schema)
    return found

# ---------------------------------------------
# Extract parameter (inline or $ref) + schema
# ---------------------------------------------
def extract_parameter(param_obj: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """
    Returns (normalized_param, flattened_schema)
    """
    # Resolve $ref parameter
    if "$ref" in param_obj:
        p_res = resolve_pointer(doc, param_obj["$ref"]) or {}
        base = deep_copy(p_res)
        # allow local overrides
        for k, v in param_obj.items():
            if k != "$ref":
                base[k] = v
        param_obj = base

    schema = param_obj.get("schema", {}) or {}
    flat = flatten_schema(schema, visited=set())
    normalized = {
        "name": param_obj.get("name"),
        "in": param_obj.get("in"),
        "required": bool(param_obj.get("required", False)),
        "description": param_obj.get("description", ""),
    }
    return normalized, flat

# ---------------------------------------------
# Extract requestBody across content types
# ---------------------------------------------
def extract_request_body(operation: Dict[str, Any]) -> Dict[str, Any]:
    rb = operation.get("requestBody", {})
    if not isinstance(rb, dict) or not rb:
        return {"schema_text": "", "schema_flat": {}, "content_types": []}

    content = rb.get("content", {}) or {}
    content_types = list(content.keys())

    # Prefer application/json when available
    chosen_ct = "application/json" if "application/json" in content else (content_types[0] if content_types else None)

    schema_flat = {}
    schema_text = ""
    if chosen_ct:
        schema_obj = content.get(chosen_ct, {}).get("schema", {}) or {}
        flat = flatten_schema(schema_obj, visited=set())
        schema_flat = flat
        schema_text = f"{chosen_ct}:\n" + schema_to_text(flat, depth=1)
    else:
        schema_text = ""

    return {
        "schema_text": schema_text.strip(),
        "schema_flat": schema_flat,
        "content_types": content_types,
        "required": bool(rb.get("required", False)),
        "description": rb.get("description", "")
    }

# ---------------------------------------------
# Extract responses (iterate all status codes)
# ---------------------------------------------
def extract_responses(operation: Dict[str, Any]) -> Dict[str, Any]:
    out: Dict[str, Any] = {}
    responses = operation.get("responses", {}) or {}
    for status, resp_obj in responses.items():
        # can be $ref response
        if "$ref" in resp_obj:
            resolved = resolve_pointer(doc, resp_obj["$ref"]) or {}
            resp_obj = merge_dict(resolved, {k: v for k, v in resp_obj.items() if k != "$ref"})

        desc = resp_obj.get("description", "")
        content = resp_obj.get("content", {}) or {}
        cts = list(content.keys())
        # Prefer application/json
        chosen_ct = "application/json" if "application/json" in content else (cts[0] if cts else None)

        flat = {}
        text = ""
        if chosen_ct:
            schema_obj = content.get(chosen_ct, {}).get("schema", {}) or {}
            flat = flatten_schema(schema_obj, visited=set())
            text = f"{chosen_ct}:\n" + schema_to_text(flat, depth=1)

        out[status] = {
            "description": desc,
            "content_types": cts,
            "schema_text": text.strip(),
            "schema_flat": flat
        }
    return out

# ---------------------------------------------------
# Path-level parameters (apply to all operations)
# ---------------------------------------------------
def gather_path_parameters(path_item: Dict[str, Any]) -> List[Dict[str, Any]]:
    params = path_item.get("parameters", []) or []
    normalized = []
    for p in params:
        p_norm, p_flat = extract_parameter(p)
        p_norm["schema_text"] = schema_to_text(p_flat, depth=1).strip()
        p_norm["schema_flat"] = p_flat
        normalized.append(p_norm)
    return normalized

# ---------------------------------------------------
# Build unified dataset
# ---------------------------------------------------
api_unified_data: List[Dict[str, Any]] = []

for path, path_item in paths.items():
    if not isinstance(path_item, dict):
        continue

    # Path-scoped params
    path_params = gather_path_parameters(path_item)

    for method, operation in path_item.items():
        if method.lower() not in ("get", "post", "put", "patch", "delete", "options", "head"):
            continue
        if not isinstance(operation, dict):
            continue
        operation_id = operation.get("operationId", "")
        summary = (operation.get("summary") or "").strip()
        description = (operation.get("description") or "").strip()
        tags = operation.get("tags", []) or []

        # Parameters (operation + path-level)
        op_params = operation.get("parameters", []) or []
        param_details: List[Dict[str, Any]] = []

        # Include path-level params first (as inherited)
        for pp in path_params:
            param_details.append(pp)

        # Then operation-level params
        for p in op_params:
            p_norm, p_flat = extract_parameter(p)
            p_norm["schema_text"] = schema_to_text(p_flat, depth=1).strip()
            p_norm["schema_flat"] = p_flat
            param_details.append(p_norm)

        # Request body
        request_body_info = extract_request_body(operation)
        req_schema_text = request_body_info["schema_text"]
        req_schema_flat  = request_body_info["schema_flat"]

        # Responses
        responses_info = extract_responses(operation)
        # Build a consolidated text block for responses
        resp_schema_lines = []
        for status_code, info in responses_info.items():
            resp_schema_lines.append(f"Status {status_code}:\n{info['schema_text']}" if info["schema_text"] else f"Status {status_code}:")
        resp_schema_text = "\n".join([l for l in resp_schema_lines if l]).strip()

        entry = {
            "path": path,
            "method": method.upper(),
            "operation_id": operation_id,
            "tags": tags,
            "summary": summary,
            "description": description,
            "parameters": param_details,
            "request_body": {
                "description": request_body_info.get("description", ""),
                "required": request_body_info.get("required", False),
                "content_types": request_body_info.get("content_types", []),
                "schema_text": req_schema_text,
                "schema_flat": req_schema_flat,
            },
            "responses": responses_info,  # keep structured per status
            "responses_schema_text": resp_schema_text,
        }
        api_unified_data.append(entry)

# ---------------------------------------------------
# Dependency map (schema-name level intersection)
# ---------------------------------------------------
# For each API: collect response schema names; for each target: collect request schema names (params+body)
resp_schema_names_by_idx: List[Set[str]] = []
req_schema_names_by_idx: List[Set[str]]  = []

for entry in api_unified_data:
    # Responses set
    rset: Set[str] = set()
    for _status, info in (entry.get("responses") or {}).items():
        flat = info.get("schema_flat") or {}
        rset |= collect_ref_names(flat)
    resp_schema_names_by_idx.append(rset)

    # Requests set (body + params)
    qset: Set[str] = set()
    # body
    qset |= collect_ref_names(entry.get("request_body", {}).get("schema_flat") or {})
    # params
    for p in entry.get("parameters", []) or []:
        qset |= collect_ref_names(p.get("schema_flat") or {})
    req_schema_names_by_idx.append(qset)

dependency_map: Dict[int, List[int]] = defaultdict(list)
for i, rnames in enumerate(resp_schema_names_by_idx):
    if not rnames:
        continue
    for j, qnames in enumerate(req_schema_names_by_idx):
        if i == j:
            continue
        # If any response schema name of i appears in request schema names of j -> i feeds j
        if rnames & qnames:
            dependency_map[i].append(j)

# Attach "calls_next" by path
for i, entry in enumerate(api_unified_data):
    next_indices = dependency_map.get(i, [])
    next_paths = []
    for j in next_indices:
        target_path = api_unified_data[j]["path"]
        if target_path not in next_paths:
            next_paths.append(target_path)
    entry["calls_next"] = next_paths

# -----------------------------
# Save output
# -----------------------------
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
    json.dump(api_unified_data, f, indent=2, ensure_ascii=False)

print(f"âœ… Unified API dataset (with full schema documentation & dependencies) saved: {len(api_unified_data)} APIs")
print(f"ðŸ“„ File: {OUTPUT_FILE}")

âœ… Unified API dataset (with full schema documentation & dependencies) saved: 1024 APIs
ðŸ“„ File: api_unified_data_full.json


In [8]:

# import json
# import uuid
# from datetime import datetime
 
# # === CONFIG ===
# SWAGGER_FILE = "swagger.json"          # your Swagger/OpenAPI file
# OUTPUT_FILE = "api_dataset_cleaned.json"
 
# # === LOAD SWAGGER FILE ===
# with open(SWAGGER_FILE, "r", encoding="utf-8") as f:
#     swagger_data = json.load(f)
 
# print(f"Loaded Swagger file successfully")
 
# # === HELPER FUNCTION TO PARSE PARAMETERS ===
# def extract_parameters(api_data):
#     parameters = []
#     if "parameters" in api_data:
#         for param in api_data["parameters"]:
#             parameters.append({
#                 "name": param.get("name", ""),
#                 "in": param.get("in", "body"),
#                 "type": param.get("schema", {}).get("type", "string"),
#                 "required": param.get("required", False),
#                 "description": param.get("description", "")
#             })
#     # Handle requestBody schema (OpenAPI 3.x)
#     if "requestBody" in api_data:
#         try:
#             props = api_data["requestBody"]["content"]["application/json"]["schema"]["properties"]
#             for key, val in props.items():
#                 parameters.append({
#                     "name": key,
#                     "in": "body",
#                     "type": val.get("type", "string"),
#                     "required": True,
#                     "description": val.get("description", "")
#                 })
#         except Exception:
#             pass
#     return parameters
 
# # === HELPER FUNCTION TO PARSE RESPONSES ===
# def extract_responses(api_data):
#     responses = {}
#     for code, detail in api_data.get("responses", {}).items():
#         responses[code] = {"description": detail.get("description", "")}
#     return responses
 
# # === MAIN EXTRACTION ===
# api_entries = []
 
# for path, methods in swagger_data.get("paths", {}).items():
#     for method, api_data in methods.items():
#         api_id = api_data.get("operationId") or f"{method}_{path.strip('/').replace('/', '_')}"
#         summary = api_data.get("summary", "")
#         description = api_data.get("description", "")
 
#         entry = {
#             "api_id": api_id,
#             "path": path,
#             "method": method.upper(),
#             "summary": summary,
#             "description": description,
#             "parameters": extract_parameters(api_data),
#             "responses": extract_responses(api_data),
#             "category": (path.split("/")[1] if len(path.split("/")) > 1 else "general"),
#             "tags": api_data.get("tags", []),
#             "example_request": {},
#             "example_response": {},
#             "source_file": SWAGGER_FILE,
#             "last_updated": datetime.now().strftime("%Y-%m-%d")
          
#         }
 
#         api_entries.append(entry)
 
# print(f" Extracted {len(api_entries)} endpoints from Swagger")
 
# # === SAVE TO CLEANED DATASET ===
# with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
#     json.dump(api_entries, f, indent=2, ensure_ascii=False)
 
# print(f"\n Final dataset saved to: {OUTPUT_FILE}")
# print(f"Total APIs processed: {len(api_entries)}")

# Step 2: Generate Embeddings for APIs

In [22]:

import json
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
 
# === CONFIG ===
DATA_FILE = "api_unified_data_full.json"
INDEX_FILE = "updated_api_index.faiss"
EMBED_MODEL = r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\models\all-MiniLM-L6-v2"
 
# === LOAD DATA ===
with open(DATA_FILE, "r", encoding="utf-8") as f:
    api_data = json.load(f)
 
print(f" Loaded dataset with {len(api_data)} API entries")
 
# === LOAD EMBEDDING MODEL ===
print(f" Loading model: {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)
 
# === PREPARE TEXT CORPUS FOR EACH API ===
corpus = []
for api in api_data:
    text = f"{api['path']} | {api['method']} | {api['summary']} | {api['description']} | {' '.join(api.get('tags', []))}"
    corpus.append(text)
 
print(f" Creating embeddings for {len(corpus)} entries...")
 
# === GENERATE EMBEDDINGS ===
embeddings = model.encode(corpus, show_progress_bar=True)
embeddings = np.array(embeddings).astype("float32")
 
# === CREATE & SAVE FAISS INDEX ===
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)
 
faiss.write_index(index, INDEX_FILE)
print(f" FAISS index saved to: {INDEX_FILE}")
 
# === SAVE MAPPING (API Metadata) ===
with open("updated_api_mapping.json", "w", encoding="utf-8") as f:
    json.dump(api_data, f, indent=2, ensure_ascii=False)
 
print(" Saved API metadata to updated_api_mapping.json")
 
print("\n Embedding generation completed successfully!")

 Loaded dataset with 1024 API entries
 Loading model: C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\models\all-MiniLM-L6-v2
 Creating embeddings for 1024 entries...


Batches:   0%|          | 0/32 [00:00<?, ?it/s]

 FAISS index saved to: updated_api_index.faiss
 Saved API metadata to updated_api_mapping.json

 Embedding generation completed successfully!


# Step 3.1 : Load Models & Metadata ---

In [23]:

import json
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
 
# === CONFIG ===
API_MAPPING_FILE = r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\updated_api_mapping.json"
FAISS_INDEX_FILE = r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\updated_api_index.faiss"
EMBED_MODEL = r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\models\all-MiniLM-L6-v2"
 
# === Load Metadata ===
print("Loading API metadata...")
with open(API_MAPPING_FILE, "r", encoding="utf-8") as f:
    api_metadata = json.load(f)
print(f"Loaded {len(api_metadata)} API entries")
 
# === Load Embedding Model ===
print("Loading embedding model...")
embed_model = SentenceTransformer(EMBED_MODEL)
 
# === Load FAISS Index ===
print("Loading FAISS index...")
index = faiss.read_index(FAISS_INDEX_FILE)
print(f"FAISS index loaded with {index.ntotal} vectors")
 
# === Build BM25 (optional) ===
print("Building BM25 index...")
bm25_corpus = []
for api in api_metadata:
    txt = f"{api.get('path','')} {api.get('method','')} {api.get('summary','')} {api.get('description','')}"
    bm25_corpus.append(txt.split())
 
bm25 = BM25Okapi(bm25_corpus)
print("BM25 index built successfully ")

Loading API metadata...
Loaded 1024 API entries
Loading embedding model...
Loading FAISS index...
FAISS index loaded with 1024 vectors
Building BM25 index...
BM25 index built successfully 


# Steps 3.2 : Hybrid Retrieval ---

In [25]:

import numpy as np
 
def embed_query(query: str):
    """Encode query into dense vector"""
    vec = embed_model.encode([query], convert_to_numpy=True)
    return vec.astype("float32")
 
def dense_search(query_vec, top_k=20):
    """FAISS dense vector search"""
    D, I = index.search(query_vec, top_k)
    results = []
    for idx, dist in zip(I[0], D[0]):
        sim = -float(dist)  # higher = better
        results.append((int(idx), sim))
    return results
 
def bm25_search(query, top_k=20):
    """BM25 sparse text search"""
    tokenized = query.lower().split()
    scores = bm25.get_scores(tokenized)
    ranked = np.argsort(scores)[::-1][:top_k]
    return [(int(i), float(scores[i])) for i in ranked]
 
def hybrid_search(query, top_k=20, bm25_weight=0.3):
    """Combine dense + BM25 scores"""
    qvec = embed_query(query)
    dense = dense_search(qvec, top_k)
    dense_scores = {i: s for i, s in dense}
 
    bm25_results = bm25_search(query, top_k)
    bm25_scores = {i: s for i, s in bm25_results}
 
    all_ids = set(list(dense_scores.keys()) + list(bm25_scores.keys()))
    hybrid = []
    for idx in all_ids:
        score = dense_scores.get(idx, 0) * (1 - bm25_weight) + bm25_scores.get(idx, 0) * bm25_weight
        hybrid.append((idx, score))
 
    hybrid = sorted(hybrid, key=lambda x: x[1], reverse=True)[:top_k]
    print(f"Retrieved {len(hybrid)} candidates for: {query}")
    return [{"id": i, "score": s, "meta": api_metadata[i]} for i, s in hybrid]
 # --- Example ---
query = "Which api responsible for get the booking dtails?"
results = hybrid_search(query, top_k=10)
for r in results[:5]:
    
    print(f"{r['meta']['path']}  [{r['meta']['method']}]  â†’ {r['score']:.3f}")

Retrieved 10 candidates for: Which api responsible for get the booking dtails?
/api/nsk/v1/vouchers/configuration  [GET]  â†’ 3.263
/api/nsk/v1/booking/payments/{paymentMethod}/dcc  [POST]  â†’ 2.357
/api/nsk/v6/booking/payments/dcc/{dccKey}  [POST]  â†’ 2.023
/api/nsk/v1/vouchers/configuration/{configurationCode}  [GET]  â†’ 1.954
/api/nsk/v2/booking/payments/available  [GET]  â†’ 1.735


# Step 3.3 : Rerank Top Results ---

In [27]:

from sentence_transformers import CrossEncoder  # pip install cross-encoder
 
# === Load Reranker ===
print("Loading cross-encoder reranker...")
reranker = CrossEncoder(r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\Reranking-Model\local-ms-marco")
 
def rerank_with_cross_encoder(query, candidates, top_k=5):
    pairs = []
    for c in candidates:
        text = f"{c['meta'].get('path','')} {c['meta'].get('summary','')} {c['meta'].get('description','')}"
        pairs.append((query, text))
 
    scores = reranker.predict(pairs)
    for i, s in enumerate(scores):
        candidates[i]['rerank_score'] = float(s)
 
    ranked = sorted(candidates, key=lambda x: x['rerank_score'], reverse=True)
    return ranked[:top_k]
 
# # --- Example ---
# query = "Which api responsible for get the booking dtails?"
# top_candidates = rerank_with_cross_encoder(query, results, top_k=5)
# for r in top_candidates:
#     print(f"{r['meta']['path']} â†’ {r['rerank_score']:.3f}")

Loading cross-encoder reranker...


# Step 3.4 : Generate Step-by-Step Plan ---

In [28]:

import json
 
def generate_plan(query, top_candidates):
    """Create a step-by-step plan template"""
    lines = [f"User Query: {query}", "\nSuggested API Workflow:\n"]
    for i, c in enumerate(top_candidates, 1):
        meta = c['meta']
        lines.append(f"Step {i}: {meta.get('summary','')}")
        lines.append(f"  Endpoint: {meta.get('path')}  |  Method: {meta.get('method')}")
        params = meta.get('parameters', [])
        if params:
            param_names = [p.get('name') for p in params]
            lines.append(f"  Parameters: {', '.join(param_names)}")
        lines.append("")  # blank line
    return "\n".join(lines)
 

In [29]:
# --- Example ---
query = "Which api responsible for check in flow"
plan = generate_plan(query, top_candidates)
print(plan)

User Query: Which api responsible for check in flow

Suggested API Workflow:

Step 1: Creates a passive segment and creates a booking if it doesn't already exist.
  Endpoint: /api/nsk/v1/trip/passiveSegments  |  Method: POST

Step 2: Retrieves the booking payment methods available for the booking in state.
  Endpoint: /api/nsk/v2/booking/payments/available  |  Method: GET
  Parameters: currencyCode

Step 3: Gets credit available by reference number and type.
  Endpoint: /api/nsk/v2/booking/payments/credit  |  Method: GET
  Parameters: ReferenceNumber, CurrencyCode, Type

Step 4: Retrieves the booking payment methods available for a refund on the booking in state.
  Endpoint: /api/nsk/v1/booking/payments/refunds  |  Method: GET

Step 5: Gets the list of seat maps for all the journeys for the booking in state.
  Endpoint: /api/nsk/v3/booking/seatmaps  |  Method: GET
  Parameters: FeePricingMode, CollectedCurrencyCode, IncludePropertyLookup, CultureCode



# final Step  Agentic API Assistant ---

In [30]:
# ---------- Public function: Answer a query ----------
def answer_query(query: str, top_k=5, bm25_weight=0.35, use_reranker=True, base_url=None, verbose=True):
    """
    End-to-end:
      - Hybrid retrieve (FAISS + BM25)
      - Optional rerank (cross-encoder if available) else RRF fallback
      - Produce deterministic plan with sample calls

    Returns a dict:
      {
        "query": ...,
        "candidates": [ {path, method, score, rerank_score}, ... ],
        "plan_text": "...",
        "raw": { "hybrid": [...], "reranked": [...] }
      }
    """
    # Step 1: Hybrid retrieve
    hybrid = hybrid_retrieve(query, top_k=max(top_k*2, 10), bm25_weight=bm25_weight)

    # Step 2: Rerank
    if use_reranker and reranker is not None:
        reranked = rerank_candidates(query, hybrid, top_k=top_k)
    else:
        # RRF fallback (non-LLM; robust)
        qvec = embed_query(normalize_query(query))
        dense = dense_search(qvec, top_k=max(top_k*2, 10))
        bm25_res = bm25_search(normalize_query(query), top_k=max(top_k*2, 10))
        reranked = rrf_fusion(dense, bm25_res, top_k=top_k)
        for r in reranked:
            r["rerank_score"] = None

    # Step 3: Build plan (deterministic, includes examples)
    plan_text = template_plan(query, reranked, base_url=base_url)

    if verbose:
        print("\nTop candidates:")
        for i, r in enumerate(reranked, 1):
            m = r["meta"]
            s = r.get("rerank_score", None)
            print(f"{i}. {m.get('path')}  [{m.get('method')}]  "
                  f"hybrid_score={r.get('score'):.3f}"
                  + (f"  rerank_score={s:.3f}" if s is not None else ""))

        print("\n" + "="*80)
        print(plan_text)
        print("="*80)

    candidates_view = [{
        "path": r["meta"].get("path"),
        "method": r["meta"].get("method"),
        "score": r.get("score"),
        "rerank_score": r.get("rerank_score")
    } for r in reranked]

    return {
        "query": query,
        "candidates": candidates_view,
        "plan_text": plan_text,
        "raw": {
            "hybrid": hybrid,
            "reranked": reranked
        }
    }

In [19]:
# ====== Initialization Cell (Run ONCE) ======
# Loads embedding model, FAISS index, metadata, BM25, (optional) reranker.
# After this cell, call: answer_query("your question") as many times as you want.

import os
import json
import faiss
import numpy as np
import re
from datetime import datetime
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
from sklearn.preprocessing import MinMaxScaler

# -------- CONFIG (update paths if needed) ----------
API_MAPPING_FILE = r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\api_unified_data_full.json"    # output from Step 2
FAISS_INDEX_FILE = r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\updated_api_index.faiss"     # output from Step 2
EMBED_MODEL_NAME = r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\models\all-MiniLM-L6-v2"

# Optional local reranker (cross-encoder).
# If this path is invalid or missing HF files, the code will safely fall back to RRF (non-LLM).
RERANKER_MODEL = CrossEncoder(r"C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\Reranking-Model\local-ms-marco")
# You can also use the hub id directly (requires internet):
# RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2"

print("Loading API metadata...")
with open(API_MAPPING_FILE, "r", encoding="utf-8") as f:
    api_metadata = json.load(f)
print(f"Loaded {len(api_metadata)} API entries")

print("Loading FAISS index...")
index = faiss.read_index(FAISS_INDEX_FILE)
print("FAISS index loaded. vectors:", index.ntotal)

print(f"Loading embedding model: {EMBED_MODEL_NAME} ...")
embed_model = SentenceTransformer(EMBED_MODEL_NAME)
print("Embedding model loaded.")

# ---------- Text cleaning / tokenization ----------
def clean_text_tokens(text: str):
    if not isinstance(text, str):
        text = ""
    text = re.sub(r'[^a-zA-Z0-9 ]', ' ', text).lower()
    tokens = text.split()
    return tokens

# Add parameters & tags into corpus text to improve relevance
def metadata_to_text(m):
    params = m.get('parameters', []) or []
    ptext = " ".join([f"{p.get('in','')}:{p.get('name','')}:{p.get('type','')}" for p in params])
    rtext = " ".join([f"{code}:{resp.get('description','')}" for code, resp in (m.get('responses') or {}).items()])
    return " | ".join([
        str(m.get('path','')),
        str(m.get('method','')),
        str(m.get('summary','')),
        str(m.get('description','')),
        "tags:" + " ".join(m.get('tags',[])),
        "params:" + ptext,
        "responses:" + rtext
    ])

print("Building BM25 index...")
bm25_corpus = []
for m in api_metadata:
    txt = metadata_to_text(m)
    bm25_corpus.append(clean_text_tokens(txt))
bm25 = BM25Okapi(bm25_corpus)
print("BM25 ready.")

# --------- (Optional) Reranker loading (safe) ----------
reranker = None
reranker_device = "cpu"

def _has_hf_files(model_dir: str) -> bool:
    """Check if a local folder looks like a valid HF model (minimal check)."""
    if not os.path.isdir(model_dir):
        return False
    needed = ["config.json"]  # minimal; you can add more like tokenizer_config.json, pytorch_model.bin
    return all(os.path.isfile(os.path.join(model_dir, f)) for f in needed)

try:
    from sentence_transformers import CrossEncoder
    if isinstance(RERANKER_MODEL, str) and os.path.isdir(RERANKER_MODEL) and _has_hf_files(RERANKER_MODEL):
        print(f"Loading reranker from local path: {RERANKER_MODEL} ...")
        reranker = CrossEncoder(RERANKER_MODEL, device=reranker_device, local_files_only=True)
        print("Reranker loaded (local).")
    elif isinstance(RERANKER_MODEL, str) and "/" in RERANKER_MODEL:
        print(f"Loading reranker from HF Hub: {RERANKER_MODEL} ...")
        reranker = CrossEncoder(RERANKER_MODEL, device=reranker_device)
        print("Reranker loaded (hub).")
    else:
        print(f"Reranker not available at '{RERANKER_MODEL}'. Using RRF fallback.")
        reranker = None
except Exception as e:
    print("Failed to load reranker; continuing without it. Error:", str(e))
    reranker = None

# --------- Utility functions ----------
def normalize_query(q: str) -> str:
    """Lightweight typo fix & phrasing cleanup to improve retrieval without LLMs."""
    q_norm = (q or "").lower().strip()
    fixes = {
        "dtails": "details",
        "responsible for get": "get",
        "api responsible for": "api for",
        "booking dtail": "booking detail",
        "bokking": "booking",
        "resrvation": "reservation",
    }
    for k, v in fixes.items():
        q_norm = q_norm.replace(k, v)
    q_norm = re.sub(r"\s+", " ", q_norm).strip()
    return q_norm

def embed_query(query: str):
    vec = embed_model.encode([query], convert_to_numpy=True)
    if vec.ndim == 1:
        vec = vec.reshape(1, -1)
    return vec.astype("float32")

def dense_search(query_vec, top_k=50):
    D, I = index.search(query_vec, top_k)
    results = []
    for idx, dist in zip(I[0], D[0]):
        if idx < 0:
            continue
        # Convert L2 distance to a similarity-like score (bounded, higher=better)
        sim = 1.0 / (1.0 + float(dist))
        results.append((int(idx), sim))
    return results

def bm25_search(query, top_k=50):
    tokens = clean_text_tokens(query)
    scores = bm25.get_scores(tokens)
    ranked = np.argsort(scores)[::-1][:top_k]
    return [(int(i), float(scores[i])) for i in ranked]

def _minmax(arr):
    """Safe MinMax scaling for list/np array; returns zeros if degenerate."""
    arr = np.array(arr, dtype=float).reshape(-1,1)
    if arr.size == 0:
        return np.array([])
    if np.allclose(arr.min(), arr.max()):
        return np.zeros_like(arr).flatten()
    return MinMaxScaler().fit_transform(arr).flatten()

def hybrid_retrieve(query, top_k=20, bm25_weight=0.35):
    """
    Hybrid retrieval combining dense (FAISS) and BM25 with MinMax normalization.
    Returns list of dicts: {'id', 'score', 'meta'}
    """
    # Use normalized query for retrieval (keep original for display)
    query_norm = normalize_query(query)

    qvec = embed_query(query_norm)
    dense = dense_search(qvec, top_k*2)
    bm25_res = bm25_search(query_norm, top_k*2)

    d_norm = _minmax([s for _, s in dense]) if len(dense) > 0 else np.array([])
    b_norm = _minmax([s for _, s in bm25_res]) if len(bm25_res) > 0 else np.array([])

    dense_dict = {dense[i][0]: float(d_norm[i]) for i in range(len(dense))} if len(dense) > 0 else {}
    bm25_dict = {bm25_res[i][0]: float(b_norm[i]) for i in range(len(bm25_res))} if len(bm25_res) > 0 else {}

    all_ids = set(list(dense_dict.keys()) + list(bm25_dict.keys()))
    hybrid = []
    for idx in all_ids:
        d = dense_dict.get(idx, 0.0)
        b = bm25_dict.get(idx, 0.0)
        score = (1 - bm25_weight) * d + bm25_weight * b
        hybrid.append((idx, float(score)))
    hybrid = sorted(hybrid, key=lambda x: x[1], reverse=True)[:top_k]
    return [{"id": idx, "score": s, "meta": api_metadata[idx]} for idx, s in hybrid]

def rerank_candidates(query: str, candidates: list, top_k=5):
    """
    Use cross-encoder to rerank candidate list if available.
    Returns top_k list with 'rerank_score'.
    """
    if not candidates:
        return []
    if reranker is None:
        ranked = sorted(candidates, key=lambda x: x["score"], reverse=True)[:top_k]
        for r in ranked:
            r["rerank_score"] = None
        return ranked

    pair_texts = []
    for c in candidates:
        m = c["meta"]
        text = metadata_to_text(m)
        pair_texts.append((normalize_query(query), text))

    scores = reranker.predict(pair_texts)
    for i, s in enumerate(scores):
        candidates[i]["rerank_score"] = float(s)
    ranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
    return ranked

# ---------- Parameter helpers & Output formatting ----------
def _extract_params(m):
    """Normalized parameter records (optional; kept for future use)."""
    params = m.get("parameters", []) or []
    out = []
    for p in params:
        out.append({
            "name": p.get("name", ""),
            "in": p.get("in", "query"),
            "type": (p.get("type") or (p.get("schema", {}) or {}).get("type") or "string"),
            "required": bool(p.get("required", False)),
            "description": p.get("description", "")
        })
    return out

def _param_names_only(m):
    """Return a comma-separated list of parameter NAMES (no type/in/required)."""
    params = m.get("parameters", []) or []
    names = []
    for p in params:
        name = (p.get("name") or "").strip()
        if name and name != "(body)":
            names.append(name)
    # unique but keep order
    seen = set()
    uniq = [n for n in names if not (n in seen or seen.add(n))]
    return ", ".join(uniq)

def pretty_workflow_text(query: str, reranked: list) -> str:
    """
    Prints exactly in the requested format:
    User Query: ...
    
    Suggested API Workflow:
    
    Step i: <summary or path>.
      Endpoint: <path>  |  Method: <METHOD>
      Parameters: <CommaSeparatedParamNames>   # only if any
    """
    lines = []
    # Header
    lines.append(f"User Query: {query.strip()}\n")
    lines.append("Suggested API Workflow:\n")

    for i, c in enumerate(reranked, start=1):
        m = c["meta"]
        summary = (m.get("summary") or m.get("description") or m.get("path") or "").strip()
        if summary and not summary.endswith("."):
            summary += "."
        path = m.get("path", "").strip()
        method = (m.get("method", "GET") or "GET").upper()
        param_names = _param_names_only(m)

        lines.append(f"Step {i}: {summary}")
        lines.append(f"  Endpoint: {path}  |  Method: {method}")
        if param_names:
            lines.append(f"  Parameters: {param_names}")
        lines.append("")  # blank line after each step

    # Remove trailing blank line
    if lines and lines[-1] == "":
        lines.pop()
    return "\n".join(lines)

# ---------- Reciprocal Rank Fusion (fallback if no reranker) ----------
def rrf_fusion(dense_results, bm25_results, k=60, top_k=5):
    """
    dense_results: list[(idx, score)] sorted desc by dense
    bm25_results:  list[(idx, score)] sorted desc by bm25
    Returns list of dicts like hybrid_retrieve but using RRF.
    """
    def rank_map(res):
        return {i: r for r, (i, _) in enumerate(res, start=1)}

    d_sorted = sorted(dense_results, key=lambda x: x[1], reverse=True)
    b_sorted = sorted(bm25_results, key=lambda x: x[1], reverse=True)
    r_dense = rank_map(d_sorted)
    r_bm25 = rank_map(b_sorted)

    # Proper union of sets
    all_ids = set(r_dense.keys()) | set(r_bm25.keys())

    fused = []
    for i in all_ids:
        s = (1 / (k + r_dense.get(i, 9999))) + (1 / (k + r_bm25.get(i, 9999)))
        fused.append((i, s))
    fused = sorted(fused, key=lambda x: x[1], reverse=True)[:top_k]
    return [{"id": i, "score": s, "meta": api_metadata[i]} for i, s in fused]


print("\nInitialization complete âœ…  You can now call:")
print('  print(response)')

Loading API metadata...
Loaded 1024 API entries
Loading FAISS index...
FAISS index loaded. vectors: 1024
Loading embedding model: C:\Users\Arpit.x.Tripathi\Downloads\Rag_chatbot\models\all-MiniLM-L6-v2 ...
Embedding model loaded.
Building BM25 index...
BM25 ready.
Reranker not available at 'CrossEncoder(
  (model): BertForSequenceClassification(
    (bert): BertModel(
      (embeddings): BertEmbeddings(
        (word_embeddings): Embedding(30522, 384, padding_idx=0)
        (position_embeddings): Embedding(512, 384)
        (token_type_embeddings): Embedding(2, 384)
        (LayerNorm): LayerNorm((384,), eps=1e-12, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (encoder): BertEncoder(
        (layer): ModuleList(
          (0-5): 6 x BertLayer(
            (attention): BertAttention(
              (self): BertSdpaSelfAttention(
                (query): Linear(in_features=384, out_features=384, bias=True)
                (key): Linear(in_features

In [None]:
query = "Which api responsible for check in flow"
plan = generate_plan(query, top_candidates)
print(plan)

User Query: Which api responsible for check in flow

Suggested API Workflow:

Step 1: Creates a passive segment and creates a booking if it doesn't already exist.
  Endpoint: /api/nsk/v1/trip/passiveSegments  |  Method: POST

Step 2: Retrieves the booking payment methods available for the booking in state.
  Endpoint: /api/nsk/v2/booking/payments/available  |  Method: GET
  Parameters: currencyCode

Step 3: Gets credit available by reference number and type.
  Endpoint: /api/nsk/v2/booking/payments/credit  |  Method: GET
  Parameters: ReferenceNumber, CurrencyCode, Type

Step 4: Retrieves the booking payment methods available for a refund on the booking in state.
  Endpoint: /api/nsk/v1/booking/payments/refunds  |  Method: GET

Step 5: Gets the list of seat maps for all the journeys for the booking in state.
  Endpoint: /api/nsk/v3/booking/seatmaps  |  Method: GET
  Parameters: FeePricingMode, CollectedCurrencyCode, IncludePropertyLookup, CultureCode

