Skip to content

Commit

Permalink
Merge pull request #16 from Danangjoyoo/feature/async-rpc-client
Browse files Browse the repository at this point in the history
[FEATURE] add async client
  • Loading branch information
Danangjoyoo committed Feb 4, 2024
2 parents a14ddd1 + 2ae5abb commit 6b720bc
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 14 deletions.
2 changes: 0 additions & 2 deletions oborpc/builder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""
OBORPC Builder
"""

from ._base import OBORBuilder
from ._client import ClientBuilder
from ._server import ServerBuilder
from ._fastapi import FastAPIServerBuilder
Expand Down
130 changes: 122 additions & 8 deletions oborpc/builder/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,65 @@
import json
import logging
import time
import requests
from ._base import OBORBuilder
from ..exception import RPCCallException
import httpx
from ..security import BASIC_AUTH_TOKEN
from ..exception import OBORPCBuildException, RPCCallException


class ClientBuilder(OBORBuilder):
class ClientBuilder:
"""
Client Builder
"""
__registered_base = set()

def __init__(self, host, port=None, timeout=1, retry=0) -> None:
self.master_instances = []
self.host = host
self.port = port
self.timeout = timeout
self.retry = retry

protocol = "http://"
if self.check_has_protocol(host):
protocol = ""

self.base_url = f"{protocol}{host}"
if port:
self.base_url += f":{port}"

# request client
headers = {
"Authorization": f"Basic {BASIC_AUTH_TOKEN}",
"Content-Type": "application/json"
}
self.request_client = httpx.Client(
base_url=self.base_url,
headers=headers
)
self.async_request_client = httpx.AsyncClient(
base_url=self.base_url,
headers=headers
)

def check_has_protocol(self, host: str):
"""
Check whether the given host already defined with protocol or not
"""
if host.startswith("http://"):
return True
if host.startswith("https://"):
return True
return False

def check_registered_base(self, base: str):
"""
Check whether the base RPC class is already built
"""
if base in ClientBuilder.__registered_base:
msg = f"Failed to build client RPC {base} : base class can only built once"
raise OBORPCBuildException(msg)
ClientBuilder.__registered_base.add(base)

def create_remote_caller(
self,
class_name: str,
Expand All @@ -35,10 +85,9 @@ def remote_call(*args, **kwargs):
"args": args[1:],
"kwargs": kwargs
}
url = f"{self.base_url}{url_prefix}/{class_name}/{method_name}"
response = requests.post(
url,
headers={"Content-Type": "application/json"},
url = f"{url_prefix}/{class_name}/{method_name}"
response = self.request_client.post(
url=url,
json=json.dumps(data),
timeout=timeout if timeout is not None else self.timeout
)
Expand All @@ -65,6 +114,57 @@ def remote_call(*args, **kwargs):

return remote_call

def create_async_remote_caller(
self,
class_name: str,
method_name: str,
url_prefix: str,
timeout: float = None,
retry: int = None
): # pylint: disable=too-many-arguments
"""
create async remote caller
"""
async def async_remote_call(*args, **kwargs):
"""
async remote call wrapper
"""
start_time = time.time()
try:
data = {
"args": args[1:],
"kwargs": kwargs
}
url = f"{url_prefix}/{class_name}/{method_name}"
response = await self.async_request_client.post(
url=url,
json=json.dumps(data),
timeout=timeout if timeout is not None else self.timeout
)

if not response:
msg = f"rpc call failed method={method_name}"
raise RPCCallException(msg)

return response.json().get("data")

except Exception as e:
_retry = retry if retry is not None else self.retry
if _retry:
return await async_remote_call(*args, **kwargs, retry=_retry-1)

if isinstance(e, RPCCallException):
raise e
msg = f"rpc call failed method={method_name} : {e}"
raise RPCCallException(msg) from e

finally:
elapsed = f"{(time.time() - start_time) * 1000}:.2f"
logging.debug("[RPC-Clientt] remote call take %s ms", elapsed)

return async_remote_call


def build_client_rpc(self, instance: object, url_prefix: str = ""):
"""
Setup client rpc
Expand All @@ -78,3 +178,17 @@ def build_client_rpc(self, instance: object, url_prefix: str = ""):
if name not in iterator_class.__oborprocedures__:
continue
setattr(_class, name, self.create_remote_caller(_class.__name__, name, url_prefix))

def build_async_client_rpc(self, instance: object, url_prefix: str = ""):
"""
Setup async client rpc
"""
_class = instance.__class__
iterator_class = _class

self.check_registered_base(_class)

for (name, _) in inspect.getmembers(iterator_class, predicate=inspect.isfunction):
if name not in iterator_class.__oborprocedures__:
continue
setattr(_class, name, self.create_async_remote_caller(_class.__name__, name, url_prefix))
3 changes: 1 addition & 2 deletions oborpc/builder/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
Server Builder Base
"""
import inspect
from ._base import OBORBuilder


class ServerBuilder(OBORBuilder):
class ServerBuilder:
"""
Server Builder
"""
Expand Down
21 changes: 21 additions & 0 deletions oborpc/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
Security Related Utility
"""
import httpx

BASIC_AUTH_TOKEN = "44a3f281214d481b8408e7b2355968d1"


def validate_rpc_response(response: httpx.Response):
"""
Validate rpc response with defined security basic token
"""
if response.status_code != 200:
return False

try:
valid_content_type = response.headers.get("content-type", "").lower() == "application/json"
valid_token = response.headers.get("authorization", "").lower() == f"basic {BASIC_AUTH_TOKEN}"
return valid_content_type and valid_token
except:
return False
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text()

VERSION = "0.1.1"
VERSION = "0.1.2"
DESCRIPTION = "An easy setup object oriented RPC. Built-in setup for FastAPI and Flask"

# Setting up
Expand All @@ -21,7 +21,7 @@
long_description_content_type="text/markdown",
long_description=long_description,
packages=find_packages(),
install_requires=[],
install_requires=["httpx"],
keywords=["fastapi", "flask", "rpc", "OOP"],
classifiers=[
"Development Status :: 3 - Alpha",
Expand Down

0 comments on commit 6b720bc

Please sign in to comment.