In [None]:
import json
import requests
from IPython.display import display, HTML

## Request setup

In [None]:
BASE = "<your-cml-base-url-for-rag-studio>"
API_KEY = "<your-cml-workbench-api-key-with-application-access>"

In [None]:
headers = {"Content-Type": "application/json",
          "Authorization": f"Bearer {API_KEY}",
        #   "remote-user": "alice" # optional
}

## Health Check

In [None]:
resp = requests.get(
    f"{BASE}/",
    headers = headers,
)
resp.raise_for_status()

In [None]:
resp.text

## Projects

### Get all projects

In [None]:
resp = requests.get(
    f"{BASE}/api/v1/rag/projects",
    headers=headers,
)
resp.raise_for_status()
projects = resp.json()
projects

### Get Default Project

In [None]:
resp = requests.get(
    f"{BASE}/api/v1/rag/projects/default",
    headers = headers,
)
resp.raise_for_status()
default_project = resp.json()

In [None]:
default_project

### Create a project

In [None]:
resp = requests.post(
    f"{BASE}/api/v1/rag/projects",
    headers=headers,
    json={"name": "new-project"},
)
resp.raise_for_status()
project = resp.json()
print(project["id"], project["name"])

In [None]:
project

## List Models Available

In [None]:
# Model source
resp = requests.get(f"{BASE}/llm-service/models/model_source", headers=headers)
resp.raise_for_status()
model_source = resp.json()

# LLM (inference) models
resp = requests.get(f"{BASE}/llm-service/models/llm", headers=headers)
resp.raise_for_status()
llm_models = resp.json()

# Embedding models
resp = requests.get(f"{BASE}/llm-service/models/embeddings", headers=headers)
resp.raise_for_status()
embedding_models = resp.json()

# Reranking models
resp = requests.get(f"{BASE}/llm-service/models/reranking", headers=headers)
resp.raise_for_status()
reranking_models = resp.json()

# Example: pick first names (adjust selection as needed)
llm_model_json = llm_models[0] if llm_models else None
embedding_model_json = embedding_models[0] if embedding_models else None
reranking_model_json = reranking_models[0] if reranking_models else None

print(
    f"Selected Models: \nLLM: {llm_model_json}\nEmbdedding: {embedding_model_json}\nReranker: {reranking_model_json}"
)

## Data Source/Knowledge Base

### Get All Data Sources if any

In [None]:
resp = requests.get(
    f"{BASE}/api/v1/rag/dataSources",
    headers=headers,
)
resp.raise_for_status()
data_sources = resp.json()
data_sources

### Create Data Source

In [None]:
resp = requests.post(
    f"{BASE}/api/v1/rag/dataSources",
    headers=headers,
    json={
        "name": "my-kb",
        "embeddingModel": embedding_model_json['model_id'],
        "summarizationModel": llm_model_json['model_id'],
        "chunkSize": 512,
        "chunkOverlapPercent": 10,
        "connectionType": "MANUAL",
        "availableForDefaultProject": True, # default is false, can be configured to associate with any project below
    },
)
resp.raise_for_status()

In [None]:
ds = resp.json()
print(ds["id"], ds["name"])

In [None]:
ds

### Associate Data Source with Project (Optional)

In [None]:
ds_id = ds['id']
project_id = project.get("id", 1)
resp = requests.post(
    f"{BASE}/api/v1/rag/projects/{project_id}/dataSources/{ds_id}",
    headers={"Authorization": f"Bearer {API_KEY}", "remote-user": "alice"},
)
resp.raise_for_status()

In [None]:
project_id = default_project.get("id", 1)
resp = requests.post(
    f"{BASE}/api/v1/rag/projects/{project_id}/dataSources/{ds_id}",
    headers={"Authorization": f"Bearer {API_KEY}", "remote-user": "alice"},
)
resp.raise_for_status()

### Get Data Source Metadata to confirm

In [None]:
project_id = project.get("id", 1)
resp = requests.get(
    f"{BASE}/api/v1/rag/dataSources/{ds_id}",
    headers={"Authorization": f"Bearer {API_KEY}", "remote-user": "alice"},
)
resp.raise_for_status()
ds = resp.json()

In [None]:
ds

### Check if the Data Source is in the Project's Data Sources

In [None]:
project_id = default_project.get("id", 1)
resp = requests.get(
    f"{BASE}/api/v1/rag/projects/{project_id}/dataSources",
    headers=headers,
)
resp.raise_for_status()
def_project_ds = resp.json()
print(f"Project ID: {project_id}")
def_project_ds

In [None]:
project_id = project.get("id", 1)
resp = requests.get(
    f"{BASE}/api/v1/rag/projects/{project_id}/dataSources",
    headers=headers,
)
resp.raise_for_status()
project_ds = resp.json()
print(f"Project ID: {project_id}")
project_ds

### Add documents to the knowledge base

In [None]:
files = [
    {"file": open("doc1.pdf", "rb")},
    {"file": open("doc2.pdf", "rb")},
    {"file": open("doc3.pdf", "rb")},
]

In [None]:
uploaded_files = []

for file in files:
    resp = requests.post(
        f"{BASE}/api/v1/rag/dataSources/{ds_id}/files",
        headers={"Authorization": f"Bearer {API_KEY}", "remote-user": "alice"},
        files=file,
    )
    resp.raise_for_status()
    document = resp.json()
    uploaded_files.append(document)

print(uploaded_files)

### Check if documents are indexed

In [None]:
resp = requests.get(
    f"{BASE}/api/v1/rag/dataSources/{ds_id}/files",
    headers=headers,
)
resp.raise_for_status()
docs = resp.json()

In [None]:
docs

Once indexingStatus is successful for any document, we can go ahead and create a session with a data source in a project (default project is used if not mentioned)

## Sessions

### Get all sessions

In [None]:
resp = requests.get(
    f"{BASE}/api/v1/rag/sessions",
    headers=headers,
)
resp.raise_for_status()
all_sessions = resp.json()
all_sessions

### Create a session

In [None]:
session_query_configuration = {
      "enableHyde": False,
      "enableSummaryFilter": False,
      "enableToolCalling": False, # enable at your own risk
      "disableStreaming": False, 
      "selectedTools": [] # add in tools, from the available tools endpoint
}

session_payload = {
    "name": "My Session",
    "dataSourceIds": [ds_id],
    "projectId": project_id,
    "inferenceModel": "gpt-4o-mini",
    "rerankModel": "bge-reranker",
    "responseChunks": 8,
    "queryConfiguration": session_query_configuration,
}
resp = requests.post(
    f"{BASE}/api/v1/rag/sessions",
    headers=headers,
    json=session_payload,
)
resp.raise_for_status()
session = resp.json()
session_id = session["id"]

In [None]:
session

### Update a session

Update the inference model and reranker based on the list of models available to us

In [None]:
updated = session.copy()
updated['inferenceModel'] = llm_model_json['model_id']
updated['rerankModel'] = reranking_model_json['model_id']
updated['queryConfiguration']['disableStreaming'] = False

resp = requests.post(
    f"{BASE}/api/v1/rag/sessions/{session_id}",
    headers=headers,
    json=updated,
)
resp.raise_for_status()
session = resp.json()

In [None]:
session

### Chat in the session

(Recommended) Streaming

In [None]:
query_configuration = {
        "exclude_knowledge_base": False,
        "use_question_condensing": True,
    }
query_payload = {
    "query": "What benefits does cloudera offer?",
    "configuration": query_configuration,
}

response = requests.post(
    f"{BASE}/llm-service/sessions/{session_id}/stream-completion",
    headers=headers,
    json=query_payload,
    stream=True,
)

text = ""
resp_id = None
for resp_chunk in response:
    decoded_resp_chunk = resp_chunk.decode('utf-8').strip()
    if decoded_resp_chunk:
        decoded_resp_json = json.loads(decoded_resp_chunk.replace("data:", ""))
        if 'text' in decoded_resp_json:
            text += decoded_resp_json['text']
            print(decoded_resp_json['text'], end="")
        if 'response_id' in decoded_resp_json:
            resp_id = decoded_resp_json['response_id']

In [None]:
print("Response ID:", resp_id)
print("Response:")
print(text)

Non-streaming (Update session to disable streaming)

In [None]:
updated = session.copy()
updated['queryConfiguration']['disableStreaming'] = True 
resp = requests.post(
    f"{BASE}/api/v1/rag/sessions/{session_id}",
    headers=headers,
    json=updated,
)
resp.raise_for_status()
session = resp.json()

In [None]:
session

In [None]:
query_configuration = {
        "exclude_knowledge_base": False,
        "use_question_condensing": True,
    }
query_payload = {
    "query": "What about upskilling for cloudera employees?",
    "configuration": query_configuration,
}

response = requests.post(
    f"{BASE}/llm-service/sessions/{session_id}/stream-completion",
    headers=headers,
    json=query_payload,
    stream=True,
)

resp_id_1 = None
last_chunk = None
for resp_chunk in response:
    decoded_resp_chunk = resp_chunk.decode('utf-8').strip()
    print(decoded_resp_chunk)
    last_chunk = decoded_resp_chunk
    if decoded_resp_chunk:
        decoded_resp_json = json.loads(decoded_resp_chunk.replace("data:", ""))
        if "response_id" in decoded_resp_json:
            resp_id_1 = decoded_resp_json["response_id"]

In [None]:
resp_id_1 = json.loads(last_chunk.replace("data:", ""))["response_id"]
resp_id_1

In case of non-streaming, it is advised to fetch the response from the response ID after the above is done since the response is not returned from the endpoint but stored in chat history and fetched on UI. (shown below)

### Fetch response information i.e. Source Nodes etc

#### From the streaming result

In [None]:
resp_id

In [None]:
# Single message by id
resp = requests.get(
    f"{BASE}/llm-service/sessions/{session_id}/chat-history/{resp_id}",
    headers=headers,
)
resp.raise_for_status()
message = resp.json()
message

#### From the non-streaming result

In [None]:
resp_id_1

In [None]:
# Single message by id
resp_1 = requests.get(
    f"{BASE}/llm-service/sessions/{session_id}/chat-history/{resp_id_1}",
    headers=headers,
)
resp_1.raise_for_status()
message_1 = resp_1.json()
message_1

### Get Node Information

In [None]:
nodes = message.get("source_nodes", [])  # [{node_id, doc_id, source_file_name, score, dataSourceId}]

# Fetch chunk contents for the first node
if nodes:
    ds_id = nodes[0].get("dataSourceId")
    chunk_id = nodes[0].get("node_id")
    resp = requests.get(
        f"{BASE}/llm-service/data_sources/{ds_id}/chunks/{chunk_id}",
        headers={"Authorization": f"Bearer {API_KEY}"},
    )
    resp.raise_for_status()
    chunk = resp.json()  # {"text": ..., "metadata": {...}}

In [None]:
chunk

### Retrieve Chat History

In [None]:
resp = requests.get(
    f"{BASE}/llm-service/sessions/{session_id}/chat-history",
    headers={"Authorization": f"Bearer {API_KEY}", "remote-user": "alice"},
)
resp.raise_for_status()
history = resp.json()
history

### Rename the session with AI

(Needs at least 1 message in the chat history)

In [None]:
resp = requests.post(
    f"{BASE}/llm-service/sessions/{session_id}/rename-session",
    headers=headers,
)
resp.raise_for_status()
new_name = resp.text.strip()
print(new_name)

In [None]:
# check if the session name changed

resp = requests.get(
    f"{BASE}/api/v1/rag/sessions/{session_id}",
    headers=headers,
)
resp.raise_for_status()
updated_session = resp.json()
updated_session