Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the synchronous RPC mode #315

Merged
merged 21 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6e011cf
Get rid of capnp timer functionality.
LasseBlaauwbroek Jun 7, 2023
97bdeae
Disable option to run capnp without asyncio
LasseBlaauwbroek Jun 8, 2023
b29f18e
Cleanup
LasseBlaauwbroek Jun 8, 2023
af99e38
Fix and improve a bunch of tests
LasseBlaauwbroek Jun 8, 2023
854d910
Make tests run faster by reducing timeouts
LasseBlaauwbroek Jun 8, 2023
a69bc72
Remove a bunch of unused code
LasseBlaauwbroek Jun 8, 2023
4b5c421
Force server methods to be async and client calls to use await
LasseBlaauwbroek Jun 8, 2023
20868d7
Get rid of dead code
LasseBlaauwbroek Jun 8, 2023
da6a07e
No more need for promise joining
LasseBlaauwbroek Jun 8, 2023
7a8175e
Get rid of VoidPromise and (almost) Promise
LasseBlaauwbroek Jun 8, 2023
770be41
Bugfix: Attach server to on_disconnect to prevent early closing
LasseBlaauwbroek Jun 8, 2023
bc01774
Add pytest-asyncio to ci
LasseBlaauwbroek Jun 8, 2023
84d0f36
Fix formatting
LasseBlaauwbroek Jun 8, 2023
0d160fc
Fix forgotten async server method
LasseBlaauwbroek Jun 9, 2023
74ebaff
Make older python versions work
LasseBlaauwbroek Jun 9, 2023
d6261b6
Manually handle deallocation of some objects for the benefit of p3.7
LasseBlaauwbroek Jun 9, 2023
0483596
Miscellaneous
LasseBlaauwbroek Jun 9, 2023
83d610c
Remove some more c++ helper functions
LasseBlaauwbroek Jun 11, 2023
95bb528
Delete test_capability_old.py, which is mostly redundant
LasseBlaauwbroek Jun 11, 2023
5edf700
Remove .capnp fixture from test_capability_context.py
LasseBlaauwbroek Jun 11, 2023
1e94f2e
Remove the option to create servers through _new_server.
LasseBlaauwbroek Jun 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
# TODO: Disable building PyPy wheels. If the build system gets modernized, this should be
# auto-detected based on the Cython dependency.
CIBW_SKIP: pp*
CIBW_TEST_REQUIRES: pytest
CIBW_TEST_REQUIRES: pytest pytest-asyncio
CIBW_TEST_COMMAND: pytest {project}
# Only needed to make the macosx arm64 build work
CMAKE_OSX_ARCHITECTURES: "${{ matrix.arch == 'arm64' && 'arm64' || '' }}"
Expand Down
109 changes: 78 additions & 31 deletions benchmark/bin/run_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,78 @@

_this_dir = os.path.dirname(__file__)


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-l', "--langs", help="Add languages to test, ie: -l pyproto -l pyproto_cpp", action='append', default=['pycapnp'])
parser.add_argument("-r", "--reuse", help="If this flag is passed, re-use tests will be run", action='store_true')
parser.add_argument("-c", "--compression", help="If this flag is passed, compression tests will be run", action='store_true')
parser.add_argument("-i", "--scale_iters", help="Scaling factor to multiply the default iters by", type=float, default=1.0)
parser.add_argument(
"-l",
"--langs",
help="Add languages to test, ie: -l pyproto -l pyproto_cpp",
action="append",
default=["pycapnp"],
)
parser.add_argument(
"-r",
"--reuse",
help="If this flag is passed, re-use tests will be run",
action="store_true",
)
parser.add_argument(
"-c",
"--compression",
help="If this flag is passed, compression tests will be run",
action="store_true",
)
parser.add_argument(
"-i",
"--scale_iters",
help="Scaling factor to multiply the default iters by",
type=float,
default=1.0,
)

return parser.parse_args()


def run_one(prefix, name, mode, iters, faster, compression):
res_type = prefix
reuse = 'no-reuse'
reuse = "no-reuse"

if faster:
reuse = 'reuse'
res_type += '_reuse'
if compression != 'none':
res_type += '_' + compression

command = [os.path.join(_this_dir, prefix+"-"+name), mode, reuse, compression, str(iters)]
reuse = "reuse"
res_type += "_reuse"
if compression != "none":
res_type += "_" + compression

command = [
os.path.join(_this_dir, prefix + "-" + name),
mode,
reuse,
compression,
str(iters),
]
start = time.time()
print('running: ' + ' '.join(command), file=sys.stderr)
print("running: " + " ".join(command), file=sys.stderr)
p = Popen(command, stdout=PIPE, stderr=PIPE)
res = p.wait()
end = time.time()

data = {}

if p.returncode != 0:
sys.stderr.write(' '.join(command) + ' failed to run with errors: \n' + p.stderr.read().decode(sys.stdout.encoding) + '\n')
sys.stderr.write(
" ".join(command)
+ " failed to run with errors: \n"
+ p.stderr.read().decode(sys.stdout.encoding)
+ "\n"
)
sys.stderr.flush()

data['type'] = res_type
data['mode'] = mode
data['name'] = name
data['iters'] = iters
data['time'] = end - start
data["type"] = res_type
data["mode"] = mode
data["name"] = name
data["iters"] = iters
data["time"] = end - start

return data

Expand All @@ -55,28 +90,40 @@ def run_each(name, langs, reuse, compression, iters):
ret = []

for lang_name in langs:
ret.append(run_one(lang_name, name, 'object', iters, False, 'none'))
ret.append(run_one(lang_name, name, 'bytes', iters, False, 'none'))
ret.append(run_one(lang_name, name, "object", iters, False, "none"))
ret.append(run_one(lang_name, name, "bytes", iters, False, "none"))
if reuse:
ret.append(run_one(lang_name, name, 'object', iters, True, 'none'))
ret.append(run_one(lang_name, name, 'bytes', iters, True, 'none'))
ret.append(run_one(lang_name, name, "object", iters, True, "none"))
ret.append(run_one(lang_name, name, "bytes", iters, True, "none"))
if compression:
ret.append(run_one(lang_name, name, 'bytes', iters, True, 'packed'))
ret.append(run_one(lang_name, name, "bytes", iters, True, "packed"))
if compression:
ret.append(run_one(lang_name, name, 'bytes', iters, False, 'packed'))
ret.append(run_one(lang_name, name, "bytes", iters, False, "packed"))

return ret


def main():
args = parse_args()

os.environ['PATH'] += ':.'
os.environ["PATH"] += ":."

data = []
data += run_each('carsales', args.langs, args.reuse, args.compression, int(2000 * args.scale_iters))
data += run_each('catrank', args.langs, args.reuse, args.compression, int(100 * args.scale_iters))
data += run_each('eval', args.langs, args.reuse, args.compression, int(10000 * args.scale_iters))
json.dump(data, sys.stdout, sort_keys=True, indent=4, separators=(',', ': '))

if __name__ == '__main__':
data += run_each(
"carsales",
args.langs,
args.reuse,
args.compression,
int(2000 * args.scale_iters),
)
data += run_each(
"catrank", args.langs, args.reuse, args.compression, int(100 * args.scale_iters)
)
data += run_each(
"eval", args.langs, args.reuse, args.compression, int(10000 * args.scale_iters)
)
json.dump(data, sys.stdout, sort_keys=True, indent=4, separators=(",", ": "))


if __name__ == "__main__":
main()
63 changes: 50 additions & 13 deletions benchmark/bin/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,83 @@
import random

_this_dir = os.path.dirname(__file__)
sys.path.append(os.path.join(_this_dir, '..'))
sys.path.append(os.path.join(_this_dir, ".."))
from common import do_benchmark


def parse_args_simple():
parser = argparse.ArgumentParser()
parser.add_argument("mode", help="Mode to use for serialization, ie. object or bytes")
parser.add_argument(
"mode", help="Mode to use for serialization, ie. object or bytes"
)
parser.add_argument("reuse", help="Currently ignored")
parser.add_argument("compression", help="Valid values are none or packed")
parser.add_argument("iters", help="Number of iterations to run for", type=int)
parser.add_argument("-I", "--includes", help="Directories to add to PYTHONPATH", default='/usr/local/include')
parser.add_argument(
"-I",
"--includes",
help="Directories to add to PYTHONPATH",
default="/usr/local/include",
)

return parser.parse_args()


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("name", help="Name of the benchmark to run, eg. carsales", nargs='?', default='carsales')
parser.add_argument("-c", "--compression", help="Specify the compression type", default=None)
parser.add_argument("-s", "--suffix", help="Choose the protocol type.", default='pycapnp')
parser.add_argument("-m", "--mode", help="Specify the mode", default='object')
parser.add_argument("-i", "--iters", help="Specify the number of iterations manually. By default, it will be looked up in preset table", default=10, type=int)
parser.add_argument("-r", "--reuse", help="If this flag is passed, objects will be re-used", action='store_true')
parser.add_argument("-I", "--includes", help="Directories to add to PYTHONPATH", default='/usr/local/include')
parser.add_argument(
"name",
help="Name of the benchmark to run, eg. carsales",
nargs="?",
default="carsales",
)
parser.add_argument(
"-c", "--compression", help="Specify the compression type", default=None
)
parser.add_argument(
"-s", "--suffix", help="Choose the protocol type.", default="pycapnp"
)
parser.add_argument("-m", "--mode", help="Specify the mode", default="object")
parser.add_argument(
"-i",
"--iters",
help="Specify the number of iterations manually. By default, it will be looked up in preset table",
default=10,
type=int,
)
parser.add_argument(
"-r",
"--reuse",
help="If this flag is passed, objects will be re-used",
action="store_true",
)
parser.add_argument(
"-I",
"--includes",
help="Directories to add to PYTHONPATH",
default="/usr/local/include",
)

return parser.parse_args()


def run_test(name, mode, reuse, compression, iters, suffix, includes):
tic = default_timer()

name = name
sys.path.append(includes)
module = import_module(name + '_' + suffix)
module = import_module(name + "_" + suffix)
benchmark = module.Benchmark(compression=compression)

do_benchmark(mode=mode, benchmark=benchmark, iters=iters, reuse=reuse)
toc = default_timer()
return toc - tic


def main():
args = parse_args()
run_test(**vars(args))

if __name__ == '__main__':
main()

if __name__ == "__main__":
main()
1 change: 0 additions & 1 deletion capnp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
_StructModule,
_write_message_to_fd,
_write_packed_message_to_fd,
_Promise as Promise,
_AsyncIoStream as AsyncIoStream,
_init_capnp_api,
)
Expand Down
13 changes: 0 additions & 13 deletions capnp/helpers/asyncHelper.h

This file was deleted.

63 changes: 7 additions & 56 deletions capnp/helpers/capabilityHelper.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#include "capnp/helpers/capabilityHelper.h"
#include "capnp/lib/capnp_api.h"

::kj::Promise<kj::Own<PyRefCounter>> convert_to_pypromise(kj::Own<capnp::RemotePromise<capnp::DynamicStruct>> promise) {
return promise->then([](capnp::Response<capnp::DynamicStruct>&& response) {
::kj::Promise<kj::Own<PyRefCounter>> convert_to_pypromise(capnp::RemotePromise<capnp::DynamicStruct> promise) {
return promise.then([](capnp::Response<capnp::DynamicStruct>&& response) {
return stealPyRef(wrap_dynamic_struct_reader(response)); } );
}

Expand Down Expand Up @@ -58,75 +58,26 @@ void check_py_error() {
}
}

inline kj::Promise<kj::Own<PyRefCounter>> maybeUnwrapPromise(PyObject * result) {
check_py_error();
auto promise = extract_promise(result);
Py_DECREF(result);
return kj::mv(*promise);
}

kj::Promise<kj::Own<PyRefCounter>> wrapPyFunc(kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> arg) {
GILAcquire gil;
// Creates an owned reference, which will be destroyed in maybeUnwrapPromise
PyObject * result = PyObject_CallFunctionObjArgs(func->obj, arg->obj, NULL);
return maybeUnwrapPromise(result);
}

kj::Promise<kj::Own<PyRefCounter>> wrapPyFuncNoArg(kj::Own<PyRefCounter> func) {
GILAcquire gil;
// Creates an owned reference, which will be destroyed in maybeUnwrapPromise
PyObject * result = PyObject_CallFunctionObjArgs(func->obj, NULL);
return maybeUnwrapPromise(result);
}

kj::Promise<kj::Own<PyRefCounter>> wrapRemoteCall(kj::Own<PyRefCounter> func, capnp::Response<capnp::DynamicStruct> & arg) {
GILAcquire gil;
// Creates an owned reference, which will be destroyed in maybeUnwrapPromise
PyObject * ret = wrap_remote_call(func->obj, arg);
return maybeUnwrapPromise(ret);
check_py_error();
return stealPyRef(result);
}

::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<kj::Promise<kj::Own<PyRefCounter>>> promise,
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Promise<kj::Own<PyRefCounter>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func) {
if(error_func->obj == Py_None)
return promise->then(kj::mvCapture(func, [](auto func, kj::Own<PyRefCounter> arg) {
return promise.then(kj::mvCapture(func, [](auto func, kj::Own<PyRefCounter> arg) {
return wrapPyFunc(kj::mv(func), kj::mv(arg)); } ));
else
return promise->then
return promise.then
(kj::mvCapture(func, [](auto func, kj::Own<PyRefCounter> arg) {
return wrapPyFunc(kj::mv(func), kj::mv(arg)); }),
kj::mvCapture(error_func, [](auto error_func, kj::Exception arg) {
return wrapPyFunc(kj::mv(error_func), stealPyRef(wrap_kj_exception(arg))); } ));
}

::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<::capnp::RemotePromise<::capnp::DynamicStruct>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func) {
if(error_func->obj == Py_None)
return promise->then(kj::mvCapture(func, [](auto func, capnp::Response<capnp::DynamicStruct>&& arg) {
return wrapRemoteCall(kj::mv(func), arg); } ));
else
return promise->then
(kj::mvCapture(func, [](auto func, capnp::Response<capnp::DynamicStruct>&& arg) {
return wrapRemoteCall(kj::mv(func), arg); }),
kj::mvCapture(error_func, [](auto error_func, kj::Exception arg) {
return wrapPyFunc(kj::mv(error_func), stealPyRef(wrap_kj_exception(arg))); } ));
}

::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<kj::Promise<void>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func) {
if(error_func->obj == Py_None)
return promise->then(kj::mvCapture(func, [](auto func) { return wrapPyFuncNoArg(kj::mv(func)); } ));
else
return promise->then(kj::mvCapture(func, [](auto func) { return wrapPyFuncNoArg(kj::mv(func)); }),
kj::mvCapture(error_func, [](auto error_func, kj::Exception arg) {
return wrapPyFunc(kj::mv(error_func), stealPyRef(wrap_kj_exception(arg))); } ));
}

::kj::Promise<kj::Own<PyRefCounter>> then(kj::Promise<kj::Array<kj::Own<PyRefCounter>> > && promise) {
return promise.then([](kj::Array<kj::Own<PyRefCounter>>&& arg) {
return stealPyRef(convert_array_pyobject(arg)); } );
}

kj::Promise<void> PythonInterfaceDynamicImpl::call(capnp::InterfaceSchema::Method method,
capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> context) {
auto methodName = method.getProto().getName();
Expand Down
Loading
Loading