In [27]:
import os
import requests
import json
import pandas as pd

from dotenv import load_dotenv

load_dotenv()

# For weaviate does not accept OPENAI_API_KEY
os.environ["OPENAI_APIKEY"] = os.getenv("OPENAI_API_KEY")

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


In [18]:
def get_resource_docs(api_version: str):

    obp_host = os.getenv("OBP_BASE_URL")
    resource_doc_path = f"/obp/v5.1.0/resource-docs/{api_version}/obp"
    resource_doc_url = obp_host + resource_doc_path

    response = requests.get(resource_doc_url)

    return response.json()

In [54]:
resource_docs = get_resource_docs('v5.1.0')
with open('resource_docs.json', 'w') as f:
    json.dump(resource_docs, f, indent=3)

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): test.openbankproject.com:443
DEBUG:urllib3.connectionpool:https://test.openbankproject.com:443 "GET /obp/v5.1.0/resource-docs/v5.1.0/obp HTTP/1.1" 200 3093439


In [56]:
resource_docs['resource_docs'][0]

{'operation_id': 'OBPv4.0.0-getDynamicResourceDoc',
 'implemented_by': {'version': 'OBPv4.0.0',
  'function': 'getDynamicResourceDoc'},
 'request_verb': 'GET',
 'request_url': '/obp/v4.0.0/management/dynamic-resource-docs/DYNAMIC-RESOURCE-DOC-ID',
 'summary': 'Get Dynamic Resource Doc by Id',
 'description': '<p>Get a Dynamic Resource Doc by DYNAMIC-RESOURCE-DOC-ID.</p>\n<p>User Authentication is Required. The User must be logged in. The Application must also be authenticated.</p>\n<p><strong>JSON response body fields:</strong></p>\n<p><a href="/glossary#description"><strong>description</strong></a>: Description of the object. Maximum length is 2000. It can be any characters here.</p>\n<p><a href="/glossary#name"><strong>name</strong></a>: ACCOUNT_MANAGEMENT_FEE</p>\n<p><a href="/glossary#roles"><strong>roles</strong></a>: CanCreateMyUser</p>\n<p><a href="/glossary#summary"><strong>summary</strong></a>:</p>\n<p><a href="/glossary#tags"><strong>tags</strong></a>: Create-My-User</p>\n',


In [20]:
resource_docs_df = pd.DataFrame(resource_docs['resource_docs'])
print(resource_docs_df.info())
for column in resource_docs_df.columns:
    print(f"{column} {type(resource_docs_df.iloc[100][column])}")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 546 entries, 0 to 545
Data columns (total 18 columns):
 #   Column                       Non-Null Count  Dtype 
---  ------                       --------------  ----- 
 0   operation_id                 546 non-null    object
 1   implemented_by               546 non-null    object
 2   request_verb                 546 non-null    object
 3   request_url                  546 non-null    object
 4   summary                      546 non-null    object
 5   description                  546 non-null    object
 6   description_markdown         546 non-null    object
 7   success_response_body        481 non-null    object
 8   error_response_bodies        546 non-null    object
 9   tags                         546 non-null    object
 10  typed_success_response_body  472 non-null    object
 11  roles                        306 non-null    object
 12  is_featured                  546 non-null    bool  
 13  special_instructions         546 no

In [36]:
from pydantic import BaseModel, Field, Json
from typing import List, Dict, Any, Optional

class ResourceDoc(BaseModel):
    operation_id: str = Field(
        description="Unique identifier for the API operation. Can be used to create UUIDs and hashes."
    )
    implemented_by: Dict[str, str] = Field(
        description="Dictionary describing which API version and function the endpoint is implemented by"
    )
    request_verb: str = Field(
        description="HTTP verb (e.g., GET, POST) for this request."
    )
    request_url: str = Field(
        description="URL endpoint for the request i.e. the path on the API",
        examples=['/obp/v5.1.0/banks']
    )
    summary: str = Field(
        description="Short summary of the API operation."
    )
    description: str = Field(
        description="Detailed description of the API operation."
    )
    description_markdown: str = Field(
        description="Markdown-formatted description."
    )
    success_response_body: Optional[Json[Any]] = Field(
        description="Dictionary describing the success response body, if any.",
        default={},
    )
    error_response_bodies: List[Any] = Field(
        description="List of potential error response bodies. These are OBP error response codes.",
        examples=[
            [
                'OBP-20001: User not logged in. Authentication is required!',
                'OBP-50000: Unknown Error.'
            ]
        ]
    )
    tags: List[str] = Field(
        description="List of API tags related to the operation. Used for categorization and filtering."
    )
    typed_success_response_body: Optional[Json[Any]] = Field(
        description="Typed structure for the success response body.",
        default={},
    )
    is_featured: bool = Field(
        description="Indicates whether this operation is featured."
    )
    special_instructions: str = Field(
        description="Any special instructions for using the operation."
    )
    specified_url: str = Field(
        description="Explicitly specified URL if different from request_url."
    )
    connector_methods: List[Any] = Field(
        description="List of connector methods involved."
    )
    example_request_body: Optional[Json[Any]] = Field(
        description="Example of the request body.",
        default={},
    )
    typed_request_body: Optional[Json[Any]] = Field(
        description="Typed schema for the request body.",
        default={},
    )
    roles: Optional[List[Any]] = Field(
        description="List of entitlements (or roles) needed to successfully complete this operation",
        default=[],
    )

In [34]:
# Test connection
import weaviate
from weaviate.classes.config import Configure
import weaviate.classes as wvc
from prettytable import PrettyTable

client = weaviate.connect_to_local(
    port=8085,
    headers = {
        "X-Openai-Api-Key": os.getenv("OPENAI_API_KEY")
    }  
)

try:
    if client.is_ready():
        print(f"Client Ready\n")
    else:
        print(f"Client not ready\n")
    print("Collections:\n")

    # Create a table showing the schema
    
    for collection_name, obj in client.collections.list_all(simple=True).items():
        schema_table = PrettyTable()
        table_rows = [[prop.name, prop.data_type] for prop in obj.properties] 

        schema_table.field_names = ['property', 'data type']
        schema_table.add_rows(table_rows)

        collection = client.collections.get('TestResourceDocs')

        response = collection.aggregate.over_all(
            total_count=True,
            return_metrics=wvc.query.Metrics("special_instructions").integer(
                count=True,
            ),
                
        )
        
        print(f"Collection: {collection_name}")
        print(schema_table)
        print(response)
    
finally:
    client.close()


def connect_to_weaviate_collection(client, collection_name: str):
    """
    Connects to a weaviate vector database of name 'collection_name'
    If the database does not exist, it is created.

    port can be specified, 
    """
    
    if not client.collections.exists(collection_name):
        collection = client.collections.create(
            collection_name,
            vectorizer_config=Configure.Vectorizer.text2vec_openai(),
        )
        
    else: 
        collection = client.collections.get(collection_name)

    return collection

DEBUG:httpx:load_ssl_context verify=True cert=None trust_env=False http2=False
DEBUG:httpx:load_verify_locations cafile='/home/nemo/anaconda3/envs/opey/lib/python3.13/site-packages/certifi/cacert.pem'
DEBUG:httpcore.connection:connect_tcp.started host='localhost' port=8085 local_address=None timeout=5.0 socket_options=None
DEBUG:httpcore.connection:connect_tcp.complete return_value=<httpcore._backends.anyio.AnyIOStream object at 0x782b0f062c60>
DEBUG:httpcore.http11:send_request_headers.started request=<Request [b'GET']>
DEBUG:httpcore.http11:send_request_headers.complete
DEBUG:httpcore.http11:send_request_body.started request=<Request [b'GET']>
DEBUG:httpcore.http11:send_request_body.complete
DEBUG:httpcore.http11:receive_response_headers.started request=<Request [b'GET']>
DEBUG:httpcore.http11:receive_response_headers.complete return_value=(b'HTTP/1.1', 404, b'Not Found', [(b'Access-Control-Allow-Headers', b'Content-Type, Authorization, Batch, X-Openai-Api-Key, X-Openai-Organization,

Client Ready

Collections:

Collection: TestResourceDocs
+-----------------------------+-----------------------+
|           property          |       data type       |
+-----------------------------+-----------------------+
|         operation_id        |     DataType.TEXT     |
|      connector_methods      |  DataType.TEXT_ARRAY  |
|        implemented_by       |    DataType.OBJECT    |
| typed_success_response_body |    DataType.OBJECT    |
|            roles            | DataType.OBJECT_ARRAY |
|     special_instructions    |     DataType.TEXT     |
|     description_markdown    |     DataType.TEXT     |
|         is_featured         |     DataType.BOOL     |
|    success_response_body    |    DataType.OBJECT    |
|         request_verb        |     DataType.TEXT     |
|             tags            |  DataType.TEXT_ARRAY  |
|     example_request_body    |    DataType.OBJECT    |
|           summary           |     DataType.TEXT     |
|         description         |     DataType.TE

In [59]:
import logging

logger = logging.basicConfig(level=logging.ERROR)

from pydantic import ValidationError
from weaviate.util import generate_uuid5

from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_weaviate.vectorstores import WeaviateVectorStore

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-large",
)

print(resource_docs['resource_docs'][0])
langchain_docs = []

client.connect()

try:
    
    errors = []
    for resource_doc in resource_docs['resource_docs']:

        try:
            # Change dicts in python to strings for JSON validation by pydantic
            if resource_doc.get('example_request_body'):
                resource_doc['example_request_body'] = json.dumps(resource_doc['example_request_body'])
                
            if resource_doc.get('success_response_body'):
                resource_doc['success_response_body'] = json.dumps(resource_doc['success_response_body'])
            
            if resource_doc.get('typed_request_body'):
                resource_doc['typed_request_body'] = json.dumps(resource_doc['typed_request_body'])

            if resource_doc.get('typed_success_response_body'):
                resource_doc['typed_success_response_body'] = json.dumps(resource_doc['typed_success_response_body'])

            
            # Validate and ingest into pydantic model
            resource_doc_pydantic = ResourceDoc.model_validate(resource_doc)
            if resource_doc == resource_docs['resource_docs'][0]:
                print(resource_doc.get('success_response_body'))
            
        except ValidationError as e:
            print(f'\nerror validating: {e} saving error...')
            validation_error = {
                'operationId': resource_doc['operation_id'],
                'errors': e.errors()
            }
    
            errors.append(validation_error)
            #print(json.dumps(doc, indent=4))
            continue

        # Generate a unique UUID for the ResourceDoc based on operationId
        resource_doc_uuid = generate_uuid5(resource_doc_pydantic.operation_id)
        #if store.get_by_ids(resource_doc_uuid):
        #    print(f"UPDATE    {resource_doc_pydantic.request_url}")
        #else:
        #    print(f"ADD     {resource_doc_pydantic.request_url}")

        # Construct langchain document from ResourceDoc pydantic object and add to docs
        
        langchain_docs.append(
            Document(
                id=resource_doc_uuid,
                page_content=resource_doc_pydantic.model_dump_json(indent=2),
                metadata={
                    "tags": resource_doc_pydantic.tags,
                    "operation_id": resource_doc_pydantic.operation_id,
                    "implemented_by": resource_doc_pydantic.implemented_by,
                }
                
            )
        )

    print(len(langchain_docs))
    print(langchain_docs[0])
    
    store = WeaviateVectorStore.from_documents(langchain_docs, embeddings, client=client)
finally:
    client.close()

DEBUG:httpx:load_ssl_context verify=True cert=None trust_env=True http2=False
DEBUG:httpx:load_verify_locations cafile='/home/nemo/anaconda3/envs/opey/lib/python3.13/site-packages/certifi/cacert.pem'
DEBUG:httpx:load_ssl_context verify=True cert=None trust_env=True http2=False
DEBUG:httpx:load_verify_locations cafile='/home/nemo/anaconda3/envs/opey/lib/python3.13/site-packages/certifi/cacert.pem'
DEBUG:httpx:load_ssl_context verify=True cert=None trust_env=False http2=False
DEBUG:httpx:load_verify_locations cafile='/home/nemo/anaconda3/envs/opey/lib/python3.13/site-packages/certifi/cacert.pem'
DEBUG:httpcore.connection:connect_tcp.started host='localhost' port=8085 local_address=None timeout=5.0 socket_options=None
DEBUG:httpcore.connection:connect_tcp.complete return_value=<httpcore._backends.anyio.AnyIOStream object at 0x782af2cd2df0>
DEBUG:httpcore.http11:send_request_headers.started request=<Request [b'GET']>
DEBUG:httpcore.http11:send_request_headers.complete
DEBUG:httpcore.http11

{'operation_id': 'OBPv4.0.0-getDynamicResourceDoc', 'implemented_by': {'version': 'OBPv4.0.0', 'function': 'getDynamicResourceDoc'}, 'request_verb': 'GET', 'request_url': '/obp/v4.0.0/management/dynamic-resource-docs/DYNAMIC-RESOURCE-DOC-ID', 'summary': 'Get Dynamic Resource Doc by Id', 'description': '<p>Get a Dynamic Resource Doc by DYNAMIC-RESOURCE-DOC-ID.</p>\n<p>User Authentication is Required. The User must be logged in. The Application must also be authenticated.</p>\n<p><strong>JSON response body fields:</strong></p>\n<p><a href="/glossary#description"><strong>description</strong></a>: Description of the object. Maximum length is 2000. It can be any characters here.</p>\n<p><a href="/glossary#name"><strong>name</strong></a>: ACCOUNT_MANAGEMENT_FEE</p>\n<p><a href="/glossary#roles"><strong>roles</strong></a>: CanCreateMyUser</p>\n<p><a href="/glossary#summary"><strong>summary</strong></a>:</p>\n<p><a href="/glossary#tags"><strong>tags</strong></a>: Create-My-User</p>\n', 'descri

DEBUG:httpcore.http11:receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Access-Control-Allow-Headers', b'Content-Type, Authorization, Batch, X-Openai-Api-Key, X-Openai-Organization, X-Openai-Baseurl, X-Anyscale-Baseurl, X-Anyscale-Api-Key, X-Cohere-Api-Key, X-Cohere-Baseurl, X-Huggingface-Api-Key, X-Azure-Api-Key, X-Azure-Deployment-Id, X-Azure-Resource-Name, X-Azure-Concurrency, X-Azure-Block-Size, X-Google-Api-Key, X-Google-Vertex-Api-Key, X-Google-Studio-Api-Key, X-Goog-Api-Key, X-Goog-Vertex-Api-Key, X-Goog-Studio-Api-Key, X-Palm-Api-Key, X-Jinaai-Api-Key, X-Aws-Access-Key, X-Aws-Secret-Key, X-Voyageai-Baseurl, X-Voyageai-Api-Key, X-Mistral-Baseurl, X-Mistral-Api-Key, X-Anthropic-Baseurl, X-Anthropic-Api-Key, X-Databricks-Endpoint, X-Databricks-Token, X-Databricks-User-Agent, X-Friendli-Token, X-Friendli-Baseurl, X-Weaviate-Api-Key, X-Weaviate-Cluster-Url'), (b'Access-Control-Allow-Methods', b'*'), (b'Access-Control-Allow-Origin', b'*'), (b'Content-Type',

RateLimitError: Error code: 429 - {'error': {'message': 'Request too large for text-embedding-3-large in organization org-Lzj8ofx7HDzlLY9kxOyE2zhB on tokens per min (TPM): Limit 1000000, Requested 1089996. The input or output tokens must be reduced in order to run successfully. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}}

In [24]:
# this would probably crash if it gets too long

print(failed_objs_b, failed_refs_b)
error_log = ""
for e in errors:
    err_string = f"[ValidationError]: {e['operationId']}\n\n" + '\n'.join(json.dumps(n, indent=3) for n in e['errors']) + "\n"+  "---"*40 + "\n\n\n"
    error_log += err_string

#with open("resource_docs_validation_errors.txt", "w") as error_file:
    #error_file.write(error_log)

print(error_log)

[] []
[ValidationError]: OBPv3.0.0-dataWarehouseSearch

{
   "type": "json_type",
   "loc": [
      "success_response_body"
   ],
   "msg": "JSON input should be string, bytes or bytearray",
   "input": {},
   "url": "https://errors.pydantic.dev/2.10/v/json_type"
}
------------------------------------------------------------------------------------------------------------------------


[ValidationError]: OBPv2.0.0-elasticSearchMetrics

{
   "type": "json_type",
   "loc": [
      "success_response_body"
   ],
   "msg": "JSON input should be string, bytes or bytearray",
   "input": {},
   "url": "https://errors.pydantic.dev/2.10/v/json_type"
}
------------------------------------------------------------------------------------------------------------------------


[ValidationError]: OBPv3.0.0-dataWarehouseStatistics

{
   "type": "json_type",
   "loc": [
      "success_response_body"
   ],
   "msg": "JSON input should be string, bytes or bytearray",
   "input": {},
   "url": "https://err

In [33]:
client.connect()

try:
    collection = connect_to_weaviate_collection(client, 'TestResourceDocs')
    response = collection.query.fetch_objects(limit=250)
    print(len(response.objects))

except Exception as e:
    print('exception:', e)

finally:
    client.close()

DEBUG:httpx:load_ssl_context verify=True cert=None trust_env=False http2=False
DEBUG:httpx:load_verify_locations cafile='/home/nemo/anaconda3/envs/opey/lib/python3.13/site-packages/certifi/cacert.pem'
DEBUG:httpcore.connection:connect_tcp.started host='localhost' port=8085 local_address=None timeout=5.0 socket_options=None
DEBUG:httpcore.connection:connect_tcp.complete return_value=<httpcore._backends.anyio.AnyIOStream object at 0x782b0fbf6e40>
DEBUG:httpcore.http11:send_request_headers.started request=<Request [b'GET']>
DEBUG:httpcore.http11:send_request_headers.complete
DEBUG:httpcore.http11:send_request_body.started request=<Request [b'GET']>
DEBUG:httpcore.http11:send_request_body.complete
DEBUG:httpcore.http11:receive_response_headers.started request=<Request [b'GET']>
DEBUG:httpcore.http11:receive_response_headers.complete return_value=(b'HTTP/1.1', 404, b'Not Found', [(b'Access-Control-Allow-Headers', b'Content-Type, Authorization, Batch, X-Openai-Api-Key, X-Openai-Organization,

65


In [35]:
from langchain_weaviate.vectorstores import WeaviateVectorStore

store = WeaviateVectorStore.

  return cast(Any, type_)._evaluate(globalns, localns, recursive_guard=set())
  return cast(Any, type_)._evaluate(globalns, localns, recursive_guard=set())
