Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test-library.yml
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,8 @@ jobs:
$CONTAINER_TAG
--hostname 0.0.0.0
--enable-web-server
--local-executor && ./tests/integration/test_integration.sh 8899
&&
./tests/integration/test_integration.sh 8899
- name: Push to GHCR
run: >-
REGISTRY_URL="ghcr.io/abhinavsingh/proxy.py";
Expand Down
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ EXPOSE 8899/tcp
ENTRYPOINT [ "proxy" ]
CMD [ \
"--hostname=0.0.0.0" \
"--local-executor" \
]
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ lib-profile:
--num-workers 1 \
--enable-web-server \
--plugin proxy.plugin.WebServerPlugin \
--local-executor \
--backlog 65536 \
--open-file-limit 65536 \
--log-file /dev/null
Expand All @@ -160,7 +159,6 @@ lib-speedscope:
--num-workers 1 \
--enable-web-server \
--plugin proxy.plugin.WebServerPlugin \
--local-executor \
--backlog 65536 \
--open-file-limit 65536 \
--log-file /dev/null
Expand Down
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2204,8 +2204,9 @@ To run standalone benchmark for `proxy.py`, use the following command from repo
```console
❯ proxy -h
usage: -m [-h] [--enable-events] [--enable-conn-pool] [--threadless]
[--threaded] [--num-workers NUM_WORKERS] [--local-executor]
[--backlog BACKLOG] [--hostname HOSTNAME] [--port PORT]
[--threaded] [--num-workers NUM_WORKERS]
[--local-executor LOCAL_EXECUTOR] [--backlog BACKLOG]
[--hostname HOSTNAME] [--port PORT]
[--unix-socket-path UNIX_SOCKET_PATH]
[--num-acceptors NUM_ACCEPTORS] [--version] [--log-level LOG_LEVEL]
[--log-file LOG_FILE] [--log-format LOG_FORMAT]
Expand Down Expand Up @@ -2248,13 +2249,14 @@ options:
handle each client connection.
--num-workers NUM_WORKERS
Defaults to number of CPU cores.
--local-executor Default: True. Disabled by default. When enabled
acceptors will make use of local (same process)
executor instead of distributing load across remote
(other process) executors. Enable this option to
achieve CPU affinity between acceptors and executors,
instead of using underlying OS kernel scheduling
algorithm.
--local-executor LOCAL_EXECUTOR
Default: 1. Enabled by default. Use 0 to disable. When
enabled acceptors will make use of local (same
process) executor instead of distributing load across
remote (other process) executors. Enable this option
to achieve CPU affinity between acceptors and
executors, instead of using underlying OS kernel
scheduling algorithm.
--backlog BACKLOG Default: 100. Maximum number of pending connections to
proxy server
--hostname HOSTNAME Default: 127.0.0.1. Server IP address.
Expand Down
2 changes: 1 addition & 1 deletion benchmark/_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
enable_web_server=True,
disable_proxy_server=False,
num_acceptors=10,
local_executor=True,
local_executor=1,
log_file='/dev/null',
) as _:
while True:
Expand Down
2 changes: 1 addition & 1 deletion proxy/common/flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def initialize(
)
args.timeout = cast(int, opts.get('timeout', args.timeout))
args.local_executor = cast(
bool,
int,
opts.get(
'local_executor',
args.local_executor,
Expand Down
16 changes: 8 additions & 8 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@

flags.add_argument(
'--local-executor',
action='store_true',
default=DEFAULT_LOCAL_EXECUTOR,
help='Default: ' + ('True' if DEFAULT_LOCAL_EXECUTOR else 'False') + '. ' +
'Disabled by default. When enabled acceptors will make use of ' +
'local (same process) executor instead of distributing load across ' +
type=int,
default=int(DEFAULT_LOCAL_EXECUTOR),
help='Default: ' + ('1' if DEFAULT_LOCAL_EXECUTOR else '0') + '. ' +
'Enabled by default. Use 0 to disable. When enabled acceptors ' +
'will make use of local (same process) executor instead of distributing load across ' +
'remote (other process) executors. Enable this option to achieve CPU affinity between ' +
'acceptors and executors, instead of using underlying OS kernel scheduling algorithm.',
)
Expand Down Expand Up @@ -149,7 +149,7 @@ def run_once(self) -> None:
if locked:
self.lock.release()
for work in works:
if self.flags.local_executor:
if self.flags.local_executor == int(DEFAULT_LOCAL_EXECUTOR):
assert self._local_work_queue
self._local_work_queue.put(work)
else:
Expand All @@ -172,7 +172,7 @@ def run(self) -> None:
type=socket.SOCK_STREAM,
)
try:
if self.flags.local_executor:
if self.flags.local_executor == int(DEFAULT_LOCAL_EXECUTOR):
self._start_local()
self.selector.register(self.sock, selectors.EVENT_READ)
while not self.running.is_set():
Expand All @@ -181,7 +181,7 @@ def run(self) -> None:
pass
finally:
self.selector.unregister(self.sock)
if self.flags.local_executor:
if self.flags.local_executor == int(DEFAULT_LOCAL_EXECUTOR):
self._stop_local()
self.sock.close()
logger.debug('Acceptor#%d shutdown', self.idd)
Expand Down
12 changes: 11 additions & 1 deletion proxy/core/acceptor/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,17 @@ def __exit__(self, *args: Any) -> None:
def setup(self) -> None:
"""Setup acceptors."""
self._start()
logger.info('Started %d acceptors' % self.flags.num_acceptors)
execution_mode = (
'threadless (local)'
if self.flags.local_executor
else 'threadless (remote)'
) if self.flags.threadless else 'threaded'
logger.info(
'Started %d acceptors in %s mode' % (
self.flags.num_acceptors,
execution_mode,
),
)
# Send file descriptor to all acceptor processes.
fd = self.listener.fileno()
for index in range(self.flags.num_acceptors):
Expand Down
12 changes: 6 additions & 6 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .core.event import EventManager
from .common.utils import bytes_
from .common.flag import FlagParser, flags
from .common.constants import DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL
from .common.constants import DEFAULT_LOCAL_EXECUTOR, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL
from .common.constants import DEFAULT_OPEN_FILE_LIMIT, DEFAULT_PLUGINS, DEFAULT_VERSION
from .common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_WORK_KLASS, DEFAULT_PID_FILE

Expand Down Expand Up @@ -205,19 +205,19 @@ def shutdown(self) -> None:
self._delete_pid_file()

def _write_pid_file(self) -> None:
if self.flags.pid_file is not None:
# NOTE: Multiple instances of proxy.py running on
# same host machine will currently result in overwriting the PID file
if self.flags.pid_file:
with open(self.flags.pid_file, 'wb') as pid_file:
pid_file.write(bytes_(os.getpid()))

def _delete_pid_file(self) -> None:
if self.flags.pid_file and os.path.exists(self.flags.pid_file):
if self.flags.pid_file \
and os.path.exists(self.flags.pid_file):
os.remove(self.flags.pid_file)

@property
def remote_executors_enabled(self) -> bool:
return self.flags.threadless and not self.flags.local_executor
return self.flags.threadless \
and not (self.flags.local_executor == int(DEFAULT_LOCAL_EXECUTOR))


def main(**opts: Any) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def setUp(self) -> None:
self.flags = FlagParser.initialize(
threaded=True,
work_klass=mock.MagicMock(),
local_executor=False,
local_executor=0,
)
self.acceptor = Acceptor(
idd=self.acceptor_id,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def proxy_py_subprocess(request: Any) -> Generator[int, None, None]:
'proxy_py_subprocess',
(
('--threadless'),
('--threadless --local-executor'),
('--threadless --local-executor 0'),
('--threaded'),
),
indirect=True,
Expand Down
16 changes: 8 additions & 8 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def mock_default_args(mock_args: mock.Mock) -> None:
mock_args.enable_events = DEFAULT_ENABLE_EVENTS
mock_args.enable_dashboard = DEFAULT_ENABLE_DASHBOARD
mock_args.work_klass = DEFAULT_WORK_KLASS
mock_args.local_executor = DEFAULT_LOCAL_EXECUTOR
mock_args.local_executor = int(DEFAULT_LOCAL_EXECUTOR)

@mock.patch('os.remove')
@mock.patch('os.path.exists')
Expand All @@ -93,7 +93,7 @@ def test_entry_point(
) -> None:
pid_file = os.path.join(tempfile.gettempdir(), 'pid')
mock_sleep.side_effect = KeyboardInterrupt()
mock_initialize.return_value.local_executor = False
mock_initialize.return_value.local_executor = 0
mock_initialize.return_value.enable_events = False
mock_initialize.return_value.pid_file = pid_file
entry_point()
Expand Down Expand Up @@ -141,7 +141,7 @@ def test_main_with_no_flags(
mock_sleep: mock.Mock,
) -> None:
mock_sleep.side_effect = KeyboardInterrupt()
mock_initialize.return_value.local_executor = False
mock_initialize.return_value.local_executor = 0
mock_initialize.return_value.enable_events = False
main()
mock_event_manager.assert_not_called()
Expand Down Expand Up @@ -181,7 +181,7 @@ def test_enable_events(
mock_sleep: mock.Mock,
) -> None:
mock_sleep.side_effect = KeyboardInterrupt()
mock_initialize.return_value.local_executor = False
mock_initialize.return_value.local_executor = 0
mock_initialize.return_value.enable_events = True
main()
mock_event_manager.assert_called_once()
Expand Down Expand Up @@ -228,8 +228,8 @@ def test_enable_dashboard(
mock_args = mock_parse_args.return_value
self.mock_default_args(mock_args)
mock_args.enable_dashboard = True
mock_args.local_executor = False
main(enable_dashboard=True, local_executor=False)
mock_args.local_executor = 0
main(enable_dashboard=True, local_executor=0)
mock_load_plugins.assert_called()
self.assertEqual(
mock_load_plugins.call_args_list[0][0][0], [
Expand Down Expand Up @@ -274,8 +274,8 @@ def test_enable_devtools(
mock_args = mock_parse_args.return_value
self.mock_default_args(mock_args)
mock_args.enable_devtools = True
mock_args.local_executor = False
main(enable_devtools=True, local_executor=False)
mock_args.local_executor = 0
main(enable_devtools=True, local_executor=0)
mock_load_plugins.assert_called()
self.assertEqual(
mock_load_plugins.call_args_list[0][0][0], [
Expand Down