In [1]:
import json
import os
import logging
import requests
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
CUSTOMER_ID = os.environ.get('CUSTOMER_ID')
CORPUS_ID = os.environ.get('CORPUS_ID')
API_KEY = os.environ.get('API_KEY')
AUTH_URL = os.environ.get('AUTH_URL')
APP_CLIENT_ID = os.environ.get('APP_CLIENT_ID')
APP_CLIENT_SECRET = os.environ.get('APP_CLIENT_SECRET')
IDX_ADDRESS = os.environ.get('IDX_ADDRESS')

## Create Corpus

In [3]:
def _get_create_corpus_json():
    """ Returns a create corpus json. """
    corpus = {
        "name": "Vectara Test Corpus(Python)",
        "description": "An example corpus generated via REST API from Python code.",
    }
    return json.dumps({"corpus":corpus})

def create_corpus(customer_id: int, admin_address: str, jwt_token: str):
    """Create a corpus.
    Args:
        customer_id: Unique customer ID in vectara platform.
        admin_address: Address of the admin server. e.g., api.vectara.io
        jwt_token: A valid Auth token.

    Returns:
        (response, True) in case of success and returns (error, False) in case of failure.
    """

    post_headers = {
        "customer-id": f"{customer_id}",
        "Authorization": f"Bearer {jwt_token}"
    }
    response = requests.post(
        f"https://{admin_address}/v1/create-corpus",
        data=_get_create_corpus_json(),
        verify=True,
        headers=post_headers)

    if response.status_code != 200:
        logging.error("Create Corpus failed with code %d, reason %s, text %s",
                       response.status_code,
                       response.reason,
                       response.text)
        return response, False

    message = response.json()
    if message["status"] and message["status"]["code"] != "OK":
        logging.error("Create Corpus failed with status: %s", message["status"])
        return message["status"], False

    return message, True

## Delete Corpus

In [4]:
def _get_delete_corpus_json(customer_id: int, corpus_id: int):
    """Returns a delete corpus JSON."""
    corpus = {
        "customer_id": customer_id,
        "corpus_id": corpus_id,
    }

    return json.dumps(corpus)

def delete_corpus(customer_id: int, corpus_id: int, admin_address: str, jwt_token: str):
    """Deletes a corpus.

    Args:
        customer_id: Unique customer ID in vectara platform.
        corpus_id: Corpus ID in vectara platform.
        admin_address: Address of the admin server. e.g., api.vectara.io
        jwt_token: A valid Auth token.

    Returns:
        (response, True) in case of success and returns (error, False) in case of failure.
    """
    post_headers = {
        "customer-id": f"{customer_id}",
        "Authorization": f"Bearer {jwt_token}"
    }
    response = requests.post(
        f"https://{admin_address}/v1/delete-corpus",
        data=_get_delete_corpus_json(customer_id, corpus_id),
        verify=True,
        headers=post_headers)

    if response.status_code != 200:
        logging.error("Delete Corpus failed with code %d, reason %s, text %s",
                       response.status_code,
                       response.reason,
                       response.text)
        return response, False

    message = response.json()
    if message["status"] and message["status"]["code"] != "OK":
        logging.error("Delete Corpus failed with status: %s", message.status)
        return message.status, False

    return message, True

## Reset Corpus

In [5]:
def _get_reset_corpus_json(customer_id: int, corpus_id: int):
    """ Returns a reset corpus json. """
    corpus = {
        "customer_id": customer_id,
        "corpus_id": corpus_id,
    }

    return json.dumps(corpus)

def reset_corpus(customer_id: int, corpus_id: int, admin_address: str, jwt_token: str):
    """Reset a corpus.
    Args:
        customer_id: Unique customer ID in vectara platform.
        corpus_id: Corpus ID in vectara platform.
        admin_address: Address of the admin server. e.g., api.vectara.io
        jwt_token: A valid Auth token.

    Returns:
        (response, True) in case of success and returns (error, False) in case of failure.
    """

    post_headers = {
        "customer-id": f"{customer_id}",
        "Authorization": f"Bearer {jwt_token}"
    }
    response = requests.post(
        f"https://{admin_address}/v1/reset-corpus",
        data=_get_reset_corpus_json(customer_id, corpus_id),
        verify=True,
        headers=post_headers)

    if response.status_code != 200:
        logging.error("Reset Corpus failed with code %d, reason %s, text %s",
                       response.status_code,
                       response.reason,
                       response.text)
        return response, False

    message = response.json()
    if message["status"] and message["status"]["code"] != "OK":
        logging.error("Delete Corpus failed with status: %s", message.status)
        return message.status, False

    return message, True

## File Upload

In [6]:
def get_jwt_token():
    """Get JWT token from authentication service."""
    auth_url = AUTH_URL
    client_id = APP_CLIENT_ID
    client_secret = APP_CLIENT_SECRET

    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }

    response = requests.post(auth_url, headers=headers, data=data)

    if response.status_code == 200:
        response_data = response.json()
        return response_data.get("access_token")
    else:
        print("Error:", response.text)
        return None

# #get jwt_token
# auth_url = AUTH_URL

# # Your client ID and client secret
# client_id = APP_CLIENT_ID
# client_secret = APP_CLIENT_SECRET

# # Data payload for the POST request
# data = {
#     "grant_type": "client_credentials",
#     "client_id": client_id,
#     "client_secret": client_secret
# }

# # Headers for the POST request
# headers = {
#     "Content-Type": "application/x-www-form-urlencoded"
# }

# # Send POST request to the authentication URL
# response = requests.post(auth_url, headers=headers, data=data)

# # Check if request was successful
# if response.status_code == 200:
#     # Parse response JSON
#     response_data = response.json()
#     # Extract access token
#     jwt_token = response_data.get("access_token")
# else:
#     print("Error:", response.text)



In [7]:
def upload_file(customer_id: int, corpus_id: int, idx_address: str, jwt_token: str, file_path: str):
    """Uploads a file to the corpus.

    Args:
        customer_id: Unique customer ID in vectara platform.
        corpus_id: ID of the corpus to which data needs to be indexed.
        idx_address: Address of the indexing server. e.g., api.vectara.io
        jwt_token: A valid Auth token.
        file_path: Path to the file to be uploaded.

    Returns:
        (response, True) in case of success and returns (error, False) in case of failure.
    """
    post_headers = {
        "Authorization": f"Bearer {jwt_token}"
    }
    with open(file_path, 'rb') as file:
        response = requests.post(
            f"https://{idx_address}/v1/upload?c={customer_id}&o={corpus_id}",
            files={"file": (file.name, file, "application/octet-stream")},
            verify=True,
            headers=post_headers)

    if response.status_code != 200:
        logging.error("REST upload failed with code %d, reason %s, text %s",
                       response.status_code,
                       response.reason,
                       response.text)
        return response, False

    message = response.json()["response"]
    # An empty status indicates success.
    if message["status"] and message["status"]["code"] not in ("OK", "ALREADY_EXISTS"):
        logging.error("REST upload failed with status: %s", message["status"])
        return message["status"], False

    return message, True


def upload_files_in_directory(customer_id: int, corpus_id: int, idx_address: str, directory_path: str):
    """Uploads all files in a directory to the corpus.

    Args:
        customer_id: Unique customer ID in Vectara platform.
        corpus_id: ID of the corpus to which data needs to be indexed.
        idx_address: Address of the indexing server. e.g., api.vectara.io
        directory_path: Path to the directory containing files to be uploaded.

    Returns:
        A list of tuples containing (response, success) for each file upload.
    """
    jwt_token = get_jwt_token()
    if not jwt_token:
        return []

    file_uploads = []
    for file_name in os.listdir(directory_path):
        file_path = os.path.join(directory_path, file_name)
        if os.path.isfile(file_path):
            response, success = upload_file(customer_id, corpus_id, idx_address, jwt_token, file_path)
            file_uploads.append((response, success))
    return file_uploads


In [8]:
# upload_file(customer_id=CUSTOMER_ID, corpus_id=CORPUS_ID, idx_address='api.vectara.io', jwt_token=get_jwt_token(), file_path='data/paul_graham.txt')

In [9]:
# upload_files_in_directory(customer_id=CUSTOMER_ID, corpus_id=CORPUS_ID, idx_address='api.vectara.io', directory_path="data")

## Query Corpus

In [10]:
def _get_query_json(customer_id: int, corpus_id: int, query_value: str):
    """Returns a query JSON."""
    query = {
        "query": [
            {
                "query": query_value,
                "num_results": 3,
                "corpus_key": [{"customer_id": customer_id, "corpus_id": corpus_id}],
            },
        ],
    }
    return json.dumps(query)


def query_corpus(customer_id: int, corpus_id: int, query_address: str, jwt_token: str, query: str):
    """Queries the data.

    Args:
        customer_id: Unique customer ID in vectara platform.
        corpus_id: ID of the corpus to which data needs to be indexed.
        query_address: Address of the querying server. e.g., api.vectara.io
        jwt_token: A valid Auth token.

    Returns:
        (response, True) in case of success and returns (error, False) in case of failure.

    """
    post_headers = {
        "customer-id": f"{customer_id}",
        "Authorization": f"Bearer {jwt_token}"
    }

    response = requests.post(
        f"https://{query_address}/v1/query",
        data=_get_query_json(customer_id, corpus_id, query),
        verify=True,
        headers=post_headers)

    if response.status_code != 200:
        logging.error("Query failed with code %d, reason %s, text %s",
                       response.status_code,
                       response.reason,
                       response.text)
        return response, False

    message = response.json()
    if (message["status"] and
        any(status["code"] != "OK" for status in message["status"])):
        logging.error("Query failed with status: %s", message["status"])
        return message["status"], False

    for response_set in message["responseSet"]:
        for status in response_set["status"]:
            if status["code"] != "OK":
                return status, False

    return message, True

In [11]:
query_txt = "what did the author do growing up?"
query_result = query_corpus(CUSTOMER_ID, CORPUS_ID, IDX_ADDRESS, get_jwt_token(), query_txt)

In [12]:
query_result

({'responseSet': [{'response': [{'text': '[8]\r\n\r\nThere were three main parts to the software: the editor, which people used to build sites and which I wrote, the shopping cart, which Robert wrote, and the manager, which kept track of orders and statistics, and which Trevor wrote.',
      'score': 0.6061516,
      'metadata': [{'name': 'lang', 'value': 'eng'},
       {'name': 'section', 'value': '1'},
       {'name': 'offset', 'value': '31230'},
       {'name': 'len', 'value': '238'}],
      'documentIndex': 0,
      'corpusKey': {'customerId': 0,
       'corpusId': 4,
       'semantics': 'DEFAULT',
       'dim': [],
       'metadataFilter': '',
       'lexicalInterpolationConfig': None},
      'resultOffset': 0,
      'resultLength': 238},
     {'text': '[6]\r\n\r\nThere were plenty of earnest students too: kids who "could draw" in high school, and now had come to what was supposed to be the best art school in the country, to learn to draw even better.',
      'score': 0.6046405,
 

In [13]:
summarization_models = ["vectara-summary-ext-v1.3.0", "vectara-summary-abstr-v1.3.0"]

def send_query(customer_id, api_key, corpus_id, query_text, num_results, summarizer_prompt_name, response_lang, max_summarized_results):
        api_key_header = {
            "customer-id": customer_id,
            "x-api-key": api_key,
            "Content-Type": "application/json"
        }

        data_dict = {
            "query": [
                {
                    "query": query_text,
                    "num_results": num_results,
                    "corpus_key": [{"customer_id": customer_id, "corpus_id": corpus_id}],
                    'summary': [
                        {
                            'summarizerPromptName': summarizer_prompt_name,
                            'responseLang': response_lang,
                            'maxSummarizedResults': max_summarized_results
                        }
                    ]
                }
            ]
        }

        payload = json.dumps(data_dict)

        response = requests.post(
            "https://api.vectara.io/v1/query",
            data=payload,
            verify=True,
            headers=api_key_header
        )

        if response.status_code == 200:
                print("Request was successful!")
                data = response.json()
                return [
                    item['text'] for item in data['responseSet'][0]['response']
                    if 'text' in item
                ]
        else:
                print("Request failed with status code:", response.status_code)
                print("Response:", response.text)
                return None