In [1]:
from aiohttp import web
from aiohttp.typedefs import Handler
import asyncio
import autokeras as ak
from hooks import Hook
import nest_asyncio
import pickle
import sys
from tblib import pickling_support
from typing import Any, Callable, Coroutine

nest_asyncio.apply()
pickling_support.install()

In [2]:
is_running = True
models: dict[str, ak.AutoModel] = {}

def stream(handler: Callable[[web.Request], Coroutine[Any, Any, Any]]) -> Handler:
    async def stream(request: web.Request) -> web.StreamResponse:
        response = web.StreamResponse()
        await response.prepare(request)
        current_loop = asyncio.get_event_loop()

        def write(type: bytes):
            return lambda __s: current_loop.run_until_complete(
                response.write(type + b":::::" + bytes(__s, 'utf-8'))
            )

        with Hook(sys.stdout.write, write(b'stdout')):
            with Hook(sys.stderr.write, write(b'stderr')):
                try:
                    result = await handler(request)
                except BaseException as exception:
                    result = exception
            
        await response.write(b"result:::::" + pickle.dumps(result))
        return response

    return stream

@stream
async def get_or_create_model(request: web.Request) -> str:
    body = await request.read()
    args, kwargs = pickle.loads(body)
    model = ak.AutoModel(*args, **kwargs)
    models[model.project_name] = model
    return model.project_name

@stream
async def get_model_attribute(request: web.Request) -> Any:
    return getattr(models[request.match_info['project_name']], request.match_info['attribute'])

@stream
async def perform_model_method(request: web.Request) -> Any:
    model = models[request.match_info['project_name']]
    args, kwargs = pickle.loads(await request.read())
    return getattr(model, request.match_info['method'])(*args, **kwargs)

app = web.Application(client_max_size=1024 ** 3)
app.add_routes(
    [
        web.get("/models/{project_name}/{attribute}", get_model_attribute),
        web.post("/models/{project_name}/{method}", perform_model_method),
        web.put("/models", get_or_create_model),
    ]
)
runner = web.AppRunner(app)
await runner.setup()

site = web.TCPSite(runner, "localhost", 8082)
await site.start()

while is_running:
    try:
        await asyncio.sleep(0.25)
    except BaseException:
        is_running = False
        break

await site.stop()

INFO:tensorflow:Reloading Oracle from existing project /Users/brad/github/bradhilton/trade-ideas/data/autokeras/labels/automodel22/oracle.json
INFO:tensorflow:Reloading Tuner from /Users/brad/github/bradhilton/trade-ideas/data/autokeras/labels/automodel22/tuner0.json


2022-07-19 15:12:02.482743: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


