From e7fb6a0f26c18bf1c6b07688cd89fcbbe0daff84 Mon Sep 17 00:00:00 2001 From: danjoyboy Date: Sun, 4 Feb 2024 15:52:43 +0700 Subject: [PATCH 1/3] feat: add async client --- oborpc/builder/__init__.py | 2 - oborpc/builder/_client.py | 130 ++++++++++++++++++++++++++++++++++--- oborpc/builder/_fastapi.py | 2 +- oborpc/builder/_server.py | 3 +- oborpc/security.py | 21 ++++++ setup.py | 2 +- 6 files changed, 146 insertions(+), 14 deletions(-) create mode 100644 oborpc/security.py diff --git a/oborpc/builder/__init__.py b/oborpc/builder/__init__.py index c7831e1..9bb8d46 100644 --- a/oborpc/builder/__init__.py +++ b/oborpc/builder/__init__.py @@ -1,8 +1,6 @@ """ OBORPC Builder """ - -from ._base import OBORBuilder from ._client import ClientBuilder from ._server import ServerBuilder from ._fastapi import FastAPIServerBuilder diff --git a/oborpc/builder/_client.py b/oborpc/builder/_client.py index 58b625d..73bcec8 100644 --- a/oborpc/builder/_client.py +++ b/oborpc/builder/_client.py @@ -5,15 +5,65 @@ import json import logging import time -import requests -from ._base import OBORBuilder -from ..exception import RPCCallException +import httpx +from ..security import BASIC_AUTH_TOKEN +from ..exception import OBORPCBuildException, RPCCallException -class ClientBuilder(OBORBuilder): +class ClientBuilder: """ Client Builder """ + __registered_base = set() + + def __init__(self, host, port=None, timeout=1, retry=0) -> None: + self.master_instances = [] + self.host = host + self.port = port + self.timeout = timeout + self.retry = retry + + protocol = "http://" + if self.check_has_protocol(host): + protocol = "" + + self.base_url = f"{protocol}{host}" + if port: + self.base_url += f":{port}" + + # request client + headers = { + "Authorization": f"Basic {BASIC_AUTH_TOKEN}", + "Content-Type": "application/json" + } + self.request_client = httpx.Client( + base_url=self.base_url, + headers=headers + ) + self.async_request_client = httpx.AsyncClient( + base_url=self.base_url, + headers=headers + ) + + def check_has_protocol(self, host: str): + """ + Check whether the given host already defined with protocol or not + """ + if host.startswith("http://"): + return True + if host.startswith("https://"): + return True + return False + + def check_registered_base(self, base: str): + """ + Check whether the base RPC class is already built + """ + if base in ClientBuilder.__registered_base: + msg = f"Failed to build client RPC {base} : base class can only built once" + raise OBORPCBuildException(msg) + ClientBuilder.__registered_base.add(base) + def create_remote_caller( self, class_name: str, @@ -35,10 +85,9 @@ def remote_call(*args, **kwargs): "args": args[1:], "kwargs": kwargs } - url = f"{self.base_url}{url_prefix}/{class_name}/{method_name}" - response = requests.post( - url, - headers={"Content-Type": "application/json"}, + url = f"{url_prefix}/{class_name}/{method_name}" + response = self.request_client.post( + url=url, json=json.dumps(data), timeout=timeout if timeout is not None else self.timeout ) @@ -65,6 +114,57 @@ def remote_call(*args, **kwargs): return remote_call + def create_async_remote_caller( + self, + class_name: str, + method_name: str, + url_prefix: str, + timeout: float = None, + retry: int = None + ): # pylint: disable=too-many-arguments + """ + create async remote caller + """ + async def async_remote_call(*args, **kwargs): + """ + async remote call wrapper + """ + start_time = time.time() + try: + data = { + "args": args[1:], + "kwargs": kwargs + } + url = f"{url_prefix}/{class_name}/{method_name}" + response = await self.async_request_client.post( + url=url, + json=json.dumps(data), + timeout=timeout if timeout is not None else self.timeout + ) + + if not response: + msg = f"rpc call failed method={method_name}" + raise RPCCallException(msg) + + return response.json().get("data") + + except Exception as e: + _retry = retry if retry is not None else self.retry + if _retry: + return await async_remote_call(*args, **kwargs, retry=_retry-1) + + if isinstance(e, RPCCallException): + raise e + msg = f"rpc call failed method={method_name} : {e}" + raise RPCCallException(msg) from e + + finally: + elapsed = f"{(time.time() - start_time) * 1000}:.2f" + logging.debug("[RPC-Clientt] remote call take %s ms", elapsed) + + return async_remote_call + + def build_client_rpc(self, instance: object, url_prefix: str = ""): """ Setup client rpc @@ -78,3 +178,17 @@ def build_client_rpc(self, instance: object, url_prefix: str = ""): if name not in iterator_class.__oborprocedures__: continue setattr(_class, name, self.create_remote_caller(_class.__name__, name, url_prefix)) + + def build_async_client_rpc(self, instance: object, url_prefix: str = ""): + """ + Setup async client rpc + """ + _class = instance.__class__ + iterator_class = _class + + self.check_registered_base(_class) + + for (name, _) in inspect.getmembers(iterator_class, predicate=inspect.isfunction): + if name not in iterator_class.__oborprocedures__: + continue + setattr(_class, name, self.create_async_remote_caller(_class.__name__, name, url_prefix)) diff --git a/oborpc/builder/_fastapi.py b/oborpc/builder/_fastapi.py index b8fe159..5be86dc 100644 --- a/oborpc/builder/_fastapi.py +++ b/oborpc/builder/_fastapi.py @@ -42,7 +42,7 @@ def create_remote_responder_async( async def final_func(request: Request): request_body = await request.body() body = json.loads(json.loads(request_body.decode())) - return await self.dispatch_rpc_request(instance, method, body) + return await self.dispatch_rpc_request_async(instance, method, body) def build_router_from_instance( self, diff --git a/oborpc/builder/_server.py b/oborpc/builder/_server.py index ea8f164..647c33f 100644 --- a/oborpc/builder/_server.py +++ b/oborpc/builder/_server.py @@ -2,10 +2,9 @@ Server Builder Base """ import inspect -from ._base import OBORBuilder -class ServerBuilder(OBORBuilder): +class ServerBuilder: """ Server Builder """ diff --git a/oborpc/security.py b/oborpc/security.py new file mode 100644 index 0000000..bef1f87 --- /dev/null +++ b/oborpc/security.py @@ -0,0 +1,21 @@ +""" +Security Related Utility +""" +import httpx + +BASIC_AUTH_TOKEN = "44a3f281214d481b8408e7b2355968d1" + + +def validate_rpc_response(response: httpx.Response): + """ + Validate rpc response with defined security basic token + """ + if response.status_code != 200: + return False + + try: + valid_content_type = response.headers.get("content-type", "").lower() == "application/json" + valid_token = response.headers.get("authorization", "").lower() == f"basic {BASIC_AUTH_TOKEN}" + return valid_content_type and valid_token + except: + return False diff --git a/setup.py b/setup.py index 6f72a05..cca4e9a 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ long_description_content_type="text/markdown", long_description=long_description, packages=find_packages(), - install_requires=[], + install_requires=["httpx"], keywords=["fastapi", "flask", "rpc", "OOP"], classifiers=[ "Development Status :: 3 - Alpha", From dfb87aae96e798462a73e0e97505f86ccccfd0a9 Mon Sep 17 00:00:00 2001 From: danjoyboy Date: Sun, 4 Feb 2024 15:56:21 +0700 Subject: [PATCH 2/3] build: v0.1.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index cca4e9a..f3d1c96 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text() -VERSION = "0.0.7" +VERSION = "0.1.0" DESCRIPTION = "An easy setup object oriented RPC. Built-in setup for FastAPI and Flask" # Setting up From 984b36c517e40568803b94524f01445f5c4f23ec Mon Sep 17 00:00:00 2001 From: danjoyboy Date: Sun, 4 Feb 2024 15:59:31 +0700 Subject: [PATCH 3/3] build: v0.1.2 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f3d1c96..f6420c2 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text() -VERSION = "0.1.0" +VERSION = "0.1.2" DESCRIPTION = "An easy setup object oriented RPC. Built-in setup for FastAPI and Flask" # Setting up