In [2]:
from pprint import pprint
import requests

class OpenAPIBase:
    def __init__(self, host: str = "127.0.0.1", port: str = "8000", path: str = "/api/openapi.json", url: str = ''):
        self.host: str = host
        self.port: str = port
        self.path= path
        self.url= url if url else f"http://{host}:{port}{path}"
        
        self.openapi_json: dict[str, dict[str, dict]] = self.get_openapi_json()
        if not self.openapi_json:
            raise ValueError("Failed to retrieve OpenAPI JSON data. Please check the URL or server status.")
        
        self.components: dict[str, dict] = self.get_components()
        self.schemas: dict[str, dict] = self.get_schemas()
    
    def get_openapi_json(self):
        try:
            response = requests.get(self.url)
            if response.status_code == 200:
                res = response.json()
                return res
            else:
                print("Failed to retrieve data. Status code:", response.status_code)
        except requests.RequestException as e:
            print("Error during request:\n", e)
        return {}
    
    def get_components(self) -> dict[str, dict]:
        if 'components' in self.openapi_json:
            return self.openapi_json['components']
        return {}

    def get_schemas(self) -> dict[str, dict]:
        if 'schemas' in self.components:
            return  self.components['schemas']
        return {}

In [3]:
temp_schemas = """
type %s = {
%s}
"""

class OpenApiType(OpenAPIBase):
    def __init__(self, host: str = "127.0.0.1", port:str = "8000", path:str = "/api/openapi.json", url: str = None):
        super().__init__(host, port, path, url)
        self.types= self.get_types()
            
    def get_types(self):
        result : dict[str, str] = {}
        if self.schemas:
            schemas_items = self.schemas.items()
            for schemas_name, schemas_item in schemas_items:
                if (schemas_name == 'UserList'):
                    print(schemas_item)
                    break
                type_text = ""
                properties = schemas_item['properties']
                for item_name, item_type in properties.items():
                    required = '' if item_name in schemas_item['required'] else '?'
                    if item_type.get("anyOf"):
                        anyOf = " | ".join([anyType["type"] for anyType in item_type.get("anyOf")])
                        type_text += f"  {item_name}{required}: {anyOf}\n"
                    else:
                        item_type = item_type.get("type")
                        type_text += f"  {item_name}{required}: {item_type}\n"
                type_str = temp_schemas % (schemas_name, type_text)
                type_str = type_str.replace("array", "[]").replace("integer", "number")
                
                result[schemas_name] = type_str
            return result

openapi_type = OpenApiType()
openapi_type.types

{'properties': {'data': {'items': {'$ref': '#/components/schemas/UserOut'}, 'title': 'Data', 'type': 'array'}}, 'required': ['data'], 'title': 'UserList', 'type': 'object'}


{'TokenObtainPairOutputSchema': '\ntype TokenObtainPairOutputSchema = {\n  username: string\n  refresh: string\n  access: string\n}\n',
 'TokenObtainPairInputSchema': '\ntype TokenObtainPairInputSchema = {\n  password: string\n  username: string\n}\n',
 'TokenRefreshOutputSchema': '\ntype TokenRefreshOutputSchema = {\n  refresh: string\n  access: string | null\n}\n',
 'TokenRefreshInputSchema': '\ntype TokenRefreshInputSchema = {\n  refresh: string\n}\n',
 'Schema': '\ntype Schema = {\n}\n',
 'TokenVerifyInputSchema': '\ntype TokenVerifyInputSchema = {\n  token: string\n}\n',
 'GroupOut': '\ntype GroupOut = {\n  id: number\n  name: string\n}\n'}

In [4]:
temp_api = """
export async function %s {
  const url = %s;
  const options = {
    method: "%s",
    headers: getHeaders(),
    %s
  };
  return await %s(url, options);
}
"""

class OpenApiFetchApi(OpenAPIBase):
    def __init__(self, host: str = "127.0.0.1", port:str = "8000", path:str = "/api/openapi.json", url: str = None):
        super().__init__(host, port, path, url)
        self.apis= self.get_api()
    
    def get_api(self):
        apis_dict = {}
        if self.openapi_json and 'paths' in self.openapi_json:
            for api_path, api_path_item in self.openapi_json['paths'].items():
                function_name = ''
                for method, api_operation in api_path_item.items():
                    function_name = api_operation['summary'].replace(' ', '')
                    responses = api_operation["responses"]
                    
                    parameters = api_operation["parameters"] if 'parameters' in api_operation else []
                    api_parameters = ",".join([f'{parameter["name"]} : {parameter["schema"]["type"]}' for parameter in parameters]).replace("array", "[]").replace("integer", "number")
                    api_parameters = (" ," if len(api_parameters) > 0 else "") + api_parameters

                    if api_operation.get("requestBody"):
                        api_schemas = api_operation["requestBody"]["content"]
                        if api_schemas.get("application/json"):
                            api_schemas = api_schemas["application/json"]["schema"]["$ref"].split("/")[-1]
                            api_body = "\n\t\tbody: JSON.stringify(data)" if len(api_schemas) != 0 else ""
                        elif api_schemas.get("multipart/form-data"):
                            api_schemas = api_schemas["multipart/form-data"]["schema"]["properties"]
                            api_schemas = "FormData"
                            api_body = "\n\t\tbody: data" if len(api_schemas) != 0 else ""

                        function_name += f"(data: {api_schemas} {api_parameters})"
                    else:
                        function_name += f"({api_parameters})"
                    
                    if responses["200"].get("content"):
                        schema = responses["200"]["content"]["application/json"]["schema"]
                        api_responses = schema["$ref"].split("/")[-1] if schema.get("$ref") else ""
                        function_name += f": Promise<{api_responses}>" if len(api_responses) > 0 else ""
                    api_body = ''
                    api_text = temp_api % (function_name, api_path, method.upper(), api_body, "api" if "download" not in function_name else "apiFile",)
                apis_dict[api_path] = api_text
        return apis_dict

In [None]:
class OpenApiHepler(OpenApiFetchApi, OpenApiType):
    def __init__(self, host: str = "127.0.0.1", port:str = "8000", path:str = "/api/openapi.json", url: str = None):
        super().__init__(host, port, path, url)


{'properties': {'data': {'items': {'$ref': '#/components/schemas/UserOut'}, 'title': 'Data', 'type': 'array'}}, 'required': ['data'], 'title': 'UserList', 'type': 'object'}


{'/api/token/pair': '\nexport async function ObtainToken(data: TokenObtainPairInputSchema ): Promise<TokenObtainPairOutputSchema> {\n  const url = /api/token/pair;\n  const options = {\n    method: "POST",\n    headers: getHeaders(),\n    \n  };\n  return await api(url, options);\n}\n',
 '/api/token/refresh': '\nexport async function RefreshToken(data: TokenRefreshInputSchema ): Promise<TokenRefreshOutputSchema> {\n  const url = /api/token/refresh;\n  const options = {\n    method: "POST",\n    headers: getHeaders(),\n    \n  };\n  return await api(url, options);\n}\n',
 '/api/token/verify': '\nexport async function VerifyToken(data: TokenVerifyInputSchema ): Promise<Schema> {\n  const url = /api/token/verify;\n  const options = {\n    method: "POST",\n    headers: getHeaders(),\n    \n  };\n  return await api(url, options);\n}\n',
 '/api/user': '\nexport async function CreateUser(data: UserIn ) {\n  const url = /api/user;\n  const options = {\n    method: "POST",\n    headers: getHead

In [6]:

class OpenApiHepler:
    def __init__(self, host: str = "127.0.0.1", port: str = "8000", path: str = "/api/openapi.json", url: str = None):
        self.host: str = host,
        self.port: str = port,
        self.path= path
        self.url= url if url else f"http://{host}:{port}{path}"
        
        # openapi json
        # base
        self.openapi_json: dict[str, dict] = self.get_openapi_json()
        self.components: dict[str, dict] = self.get_components()
        self.schemas: dict[str, dict[dict]] = self.get_schemas()
        
        # info list
        self.paths: dict[str, dict] = self.get_paths()
        self.tags: list[str] = self.get_tags()
        
        # main
        self.types: dict[str, list] = self.get_all_types()
        self.apis: dict[str, list] = self.get_all_apis()

    def get_openapi_json(self):
        try:
            response = requests.get(self.url)
            if response.status_code == 200:
                res = response.json()
                return res
            else:
                print(
                    "Failed to retrieve data. Status code:",
                    response.status_code,
                )
        except requests.RequestException as e:
            print("Error during request:\n", e)
    
    def get_components(self) -> dict[str, dict]:
        if 'components' in self.openapi_json:
            return self.openapi_json['components']
        
    def get_schemas(self) -> dict[str, dict]:
        if 'schemas' in self.components:
            return  self.components['schemas']
    
    def get_paths(self) -> dict[str, dict]:
        if 'paths' in self.openapi_json:
            return self.openapi_json['paths']
    
    def get_tags(self) -> list[str]:
        tags: set[str] = set()
        paths: dict[str, dict] = self.paths
        if paths:
            for path in paths:
                for method in self.paths[path]:
                    if 'tags' in self.paths[path][method]:
                        for tag in self.paths[path][method]['tags']:
                            tags.add(tag)
        return list(tags)
    
    def get_path_names(self) -> list[str]:
        path_names: list[str] = []
        paths: dict[str, dict] = self.paths
        if paths:
            for path in paths:
                path_names.append(path)
        return path_names
    

    
    def get_all_types(self):
        types_dict = {}
        if self.schemas:
            for schemas_name, schemas_item in self.schemas.items():
                type_text = ""
                for item_name, item_type in schemas_item["properties"].items():
                    required = '' if item_name in schemas_item['required'] else '?'
                    if item_type.get("anyOf"):
                        anyOf = " | ".join([anyType["type"] for anyType in item_type.get("anyOf")])
                        type_text += f"  {item_name}{required}: {anyOf}\n"
                    else:
                        item_type = item_type.get("type")
                        type_text += f"  {item_name}{required}: {item_type}\n"
                
                type_str = temp_schemas % (schemas_name, type_text)
                type_str = type_str.replace("array", "[]").replace("integer", "number")
                
                types_dict[schemas_name] = type_str
            return types_dict