From bb1ebe4dab525b11bbfe1a0bd4baa261549a5f93 Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Fri, 28 Apr 2023 14:42:58 +0200 Subject: [PATCH 1/8] feat: Wave share client. --- py/h2o_wave/h2o_wave/cli.py | 63 +++++++++++++++++++++++++++++------ py/h2o_wave/h2o_wave/share.py | 29 ++++++++++++++++ 2 files changed, 81 insertions(+), 11 deletions(-) create mode 100644 py/h2o_wave/h2o_wave/share.py diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 1ed31d1d58..330ffe39e1 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -12,24 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time -import tarfile +import os +import platform import shutil -import sys import socket -from contextlib import closing import subprocess +import sys +import tarfile +import time +from contextlib import closing from pathlib import Path -import platform -import uvicorn +from threading import Thread +from urllib import request +from urllib.parse import urlparse + import click +import httpx import inquirer -import os +import uvicorn from click import Choice, option -from urllib import request -from urllib.parse import urlparse +from h2o_wave.share import listen_on_socket + +from .metadata import __arch__, __platform__ from .version import __version__ -from .metadata import __platform__, __arch__ _localhost = '127.0.0.1' @@ -276,4 +281,40 @@ def learn(): from h2o_wave_university import cli cli.main() except ImportError: - print('You need to run \x1b[7;30;43mpip install h2o_wave_university\x1b[0m first.') \ No newline at end of file + print('You need to run \x1b[7;30;43mpip install h2o_wave_university\x1b[0m first.') + + +@main.command() +@click.option('--port', default='10101', help='Port your app is running on (defaults to 10101).') +@click.option('--subdomain', default='?new', help='Subdomain to use. If not available, a random one is generated.') +def share(port: str, subdomain: str): + if not port.isdigit(): + print('Port must be a number.') + exit(1) + + try: + res = httpx.get(f'https://h2oai.app/{subdomain}', headers={'Content-Type': 'application/json'}) + if res.status_code != 200: + print('Could not connect to the server.') + exit(1) + + res = res.json() + print(f'BETA: Proxying localhost:{port} ==> {res["url"]}.') + print('The URL is accesible to anyone on the internet. \x1b[7;30;43mDO NOT SHARE YOUR APP IF IT CONTAINS SENSITIVE INFO\x1b[0m.') + + threads = [] + for _ in range(res['max_conn_count']): + # Run threads as daemons so they exit when the main thread exits. + t = Thread(target=listen_on_socket, args=('127.0.0.1', int(port), 'h2oai.app', res['port']), daemon=True) + t.start() + threads.append(t) + # Block the main thread since the threads are daemons. + while True: + threads = [t for t in threads if t.is_alive()] + if not threads: + break + time.sleep(1) + + # Handle ctrl+c + except KeyboardInterrupt: + pass diff --git a/py/h2o_wave/h2o_wave/share.py b/py/h2o_wave/h2o_wave/share.py new file mode 100644 index 0000000000..39659b0449 --- /dev/null +++ b/py/h2o_wave/h2o_wave/share.py @@ -0,0 +1,29 @@ +import socket +import ssl +from threading import Thread + + +def pipe(s1: socket.socket, s2: socket.socket) -> None: + while True: + data = s1.recv(4096) + s2.sendall(data) + # Client disconnected. + if not data: + return + + +def listen_on_socket(local_ip: str, local_port: int, remote_host: str, remote_port: int) -> None: + while True: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as local_socket: + local_socket.connect((local_ip, local_port)) + remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + with ssl.wrap_socket(remote_socket) as remote_socket: + remote_socket.connect((remote_host, remote_port)) + + remote_local = Thread(target=pipe, args=(remote_socket, local_socket)) + local_remote = Thread(target=pipe, args=(local_socket, remote_socket)) + + remote_local.start() + local_remote.start() + remote_local.join() + local_remote.join() From a69c1fbe481f4f413e71f56b567836dfd562870a Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Fri, 28 Apr 2023 14:47:38 +0200 Subject: [PATCH 2/8] chore: Explain how to stop sharing. --- py/h2o_wave/h2o_wave/cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 330ffe39e1..5fca9d080e 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -301,10 +301,11 @@ def share(port: str, subdomain: str): res = res.json() print(f'BETA: Proxying localhost:{port} ==> {res["url"]}.') print('The URL is accesible to anyone on the internet. \x1b[7;30;43mDO NOT SHARE YOUR APP IF IT CONTAINS SENSITIVE INFO\x1b[0m.') + print('Press Ctrl+C to stop sharing.') threads = [] for _ in range(res['max_conn_count']): - # Run threads as daemons so they exit when the main thread exits. + # Run threads as daemons so that they do not prevent the main thread from exiting via CTRL + C. t = Thread(target=listen_on_socket, args=('127.0.0.1', int(port), 'h2oai.app', res['port']), daemon=True) t.start() threads.append(t) From 1e36c9b56e45268cebaea3d237874f04e83c6715 Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Fri, 28 Apr 2023 14:56:03 +0200 Subject: [PATCH 3/8] docs: Add better CLI help hints. --- py/h2o_wave/h2o_wave/cli.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 5fca9d080e..57ce66774a 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -285,13 +285,14 @@ def learn(): @main.command() -@click.option('--port', default='10101', help='Port your app is running on (defaults to 10101).') +@click.option('--port', default=10101, help='Port your app is running on (defaults to 10101).') @click.option('--subdomain', default='?new', help='Subdomain to use. If not available, a random one is generated.') def share(port: str, subdomain: str): - if not port.isdigit(): - print('Port must be a number.') - exit(1) + """Share your locally running app with the world. + \b + $ wave share + """ try: res = httpx.get(f'https://h2oai.app/{subdomain}', headers={'Content-Type': 'application/json'}) if res.status_code != 200: @@ -299,7 +300,7 @@ def share(port: str, subdomain: str): exit(1) res = res.json() - print(f'BETA: Proxying localhost:{port} ==> {res["url"]}.') + print(f'BETA: Proxying localhost:{port} ==> {res["url"]}') print('The URL is accesible to anyone on the internet. \x1b[7;30;43mDO NOT SHARE YOUR APP IF IT CONTAINS SENSITIVE INFO\x1b[0m.') print('Press Ctrl+C to stop sharing.') From dcd86f6e2ae847b29caec99d65a24b6859f4c8f0 Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Thu, 25 May 2023 10:20:48 +0200 Subject: [PATCH 4/8] chore: Refactor into asyncio. --- py/h2o_wave/h2o_wave/cli.py | 64 +++++++++++++++++++++-------------- py/h2o_wave/h2o_wave/share.py | 37 ++++++++++---------- 2 files changed, 56 insertions(+), 45 deletions(-) diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 57ce66774a..4923128ea2 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import os import platform import shutil @@ -293,30 +294,43 @@ def share(port: str, subdomain: str): \b $ wave share """ - try: - res = httpx.get(f'https://h2oai.app/{subdomain}', headers={'Content-Type': 'application/json'}) - if res.status_code != 200: - print('Could not connect to the server.') - exit(1) - - res = res.json() - print(f'BETA: Proxying localhost:{port} ==> {res["url"]}') - print('The URL is accesible to anyone on the internet. \x1b[7;30;43mDO NOT SHARE YOUR APP IF IT CONTAINS SENSITIVE INFO\x1b[0m.') - print('Press Ctrl+C to stop sharing.') - - threads = [] - for _ in range(res['max_conn_count']): - # Run threads as daemons so that they do not prevent the main thread from exiting via CTRL + C. - t = Thread(target=listen_on_socket, args=('127.0.0.1', int(port), 'h2oai.app', res['port']), daemon=True) - t.start() - threads.append(t) - # Block the main thread since the threads are daemons. - while True: - threads = [t for t in threads if t.is_alive()] - if not threads: - break - time.sleep(1) - # Handle ctrl+c + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(_share(port, subdomain)) except KeyboardInterrupt: - pass + tasks = asyncio.all_tasks(loop) + for task in tasks: + task.cancel() + loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) + loop.close() + + +async def _share(port: str, subdomain: str): + if _scan_free_port(port) == port: + print(f'Could not connect to localhost:{port}. Please make sure your app is running.') + exit(1) + + res = httpx.get(f'https://h2oai.app/{subdomain}', headers={'Content-Type': 'application/json'}) + if res.status_code != 200: + print('Could not connect to the remote sharing server.') + exit(1) + + res = res.json() + print(f'BETA: Proxying localhost:{port} ==> {res["url"]}') + print('\x1b[7;30;43mDO NOT SHARE YOUR APP IF IT CONTAINS SENSITIVE INFO\x1b[0m.') + print('Press Ctrl+C to stop sharing.') + + max_conn_count = res['max_conn_count'] + step = 100 if max_conn_count > 10 else max_conn_count + + tasks = [] + for _ in range(max_conn_count // step): + for _ in range(step): + tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', int(port), 'h2oai.app', res['port']))) + await asyncio.sleep(1) + # Handle the rest if any. + for _ in range(max_conn_count % step): + tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', int(port), 'h2oai.app', res['port']))) + + await asyncio.gather(*tasks) diff --git a/py/h2o_wave/h2o_wave/share.py b/py/h2o_wave/h2o_wave/share.py index 39659b0449..c5eea19845 100644 --- a/py/h2o_wave/h2o_wave/share.py +++ b/py/h2o_wave/h2o_wave/share.py @@ -1,29 +1,26 @@ -import socket -import ssl -from threading import Thread +import asyncio -def pipe(s1: socket.socket, s2: socket.socket) -> None: +async def pipe(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: while True: - data = s1.recv(4096) - s2.sendall(data) - # Client disconnected. + data = await r.read(4096) if not data: - return + break + w.write(data) + await w.drain() -def listen_on_socket(local_ip: str, local_port: int, remote_host: str, remote_port: int) -> None: +async def listen_on_socket(local_host: str, local_port: int, remote_host: str, remote_port: int) -> None: while True: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as local_socket: - local_socket.connect((local_ip, local_port)) - remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - with ssl.wrap_socket(remote_socket) as remote_socket: - remote_socket.connect((remote_host, remote_port)) + try: + local_reader, local_writer = await asyncio.open_connection(local_host, local_port) + remote_reader, remote_writer = await asyncio.open_connection(remote_host, remote_port, ssl=True) - remote_local = Thread(target=pipe, args=(remote_socket, local_socket)) - local_remote = Thread(target=pipe, args=(local_socket, remote_socket)) + await asyncio.gather(pipe(local_reader, remote_writer), pipe(remote_reader, local_writer)) - remote_local.start() - local_remote.start() - remote_local.join() - local_remote.join() + # Swallow exceptions and reconnect. + except Exception: + pass + finally: + local_writer.close() + remote_writer.close() From 127b488f21cac6576217da7d7986be013e83fa9a Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Thu, 25 May 2023 11:59:03 +0200 Subject: [PATCH 5/8] fix: Make sure Windows can close sharing. --- py/h2o_wave/h2o_wave/cli.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 4923128ea2..92512f6d07 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -295,7 +295,20 @@ def share(port: str, subdomain: str): $ wave share """ - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + if 'Windows' in platform.system(): + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + + async def wakeup(): + while True: + await asyncio.sleep(1) + + # HACK: Allow Ctrl-C to work on Windows when opening multiple TCP connections. + # https://stackoverflow.com/questions/27480967/why-does-the-asyncios-event-loop-suppress-the-keyboardinterrupt-on-windows. + loop.create_task(wakeup()) + try: loop.run_until_complete(_share(port, subdomain)) except KeyboardInterrupt: From a180c44c928848408e31108e67ee1aab05684c1c Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Thu, 25 May 2023 12:16:24 +0200 Subject: [PATCH 6/8] fix: Make sure Windows can close sharing. --- py/h2o_wave/h2o_wave/cli.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 92512f6d07..8a59a171ae 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -294,18 +294,19 @@ def share(port: str, subdomain: str): \b $ wave share """ + if 'Windows' in platform.system(): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if 'Windows' in platform.system(): - asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) async def wakeup(): while True: await asyncio.sleep(1) - # HACK: Allow Ctrl-C to work on Windows when opening multiple TCP connections. + # HACK: Enable Ctrl-C on Windows when opening multiple TCP connections. # https://stackoverflow.com/questions/27480967/why-does-the-asyncios-event-loop-suppress-the-keyboardinterrupt-on-windows. loop.create_task(wakeup()) From 2dfe8ab2cdb4030825224f969dd4faef550b70a8 Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Mon, 19 Jun 2023 11:49:45 +0200 Subject: [PATCH 7/8] feat: Allow specifying remote host. --- py/h2o_wave/h2o_wave/cli.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 8a59a171ae..5eb69e2c9d 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -23,7 +23,6 @@ import time from contextlib import closing from pathlib import Path -from threading import Thread from urllib import request from urllib.parse import urlparse @@ -125,15 +124,18 @@ def run(app: str, no_reload: bool, no_autostart: bool): # Try to start Wave daemon if not running or turned off. server_port = int(os.environ.get('H2O_WAVE_LISTEN', ':10101').split(':')[-1]) server_not_running = _scan_free_port(server_port) == server_port + + waved_process = None + if no_autostart: + autostart = False + else: + autostart = os.environ.get('H2O_WAVE_NO_AUTOSTART', 'false').lower() in ['false', '0', 'f'] + + waved = 'waved.exe' if 'Windows' in platform.system() else './waved' + # OS agnostic wheels do not have waved - needed for HAC. + is_waved_present = os.path.isfile(os.path.join(sys.exec_prefix, waved)) + try: - waved = 'waved.exe' if 'Windows' in platform.system() else './waved' - waved_process = None - # OS agnostic wheels do not have waved - needed for HAC. - is_waved_present = os.path.isfile(os.path.join(sys.exec_prefix, waved)) - if no_autostart: - autostart = False - else: - autostart = os.environ.get('H2O_WAVE_NO_AUTOSTART', 'false').lower() in ['false', '0', 'f'] if autostart and is_waved_present and server_not_running: waved_process = subprocess.Popen([waved], cwd=sys.exec_prefix, env=os.environ.copy(), shell=True) time.sleep(1) @@ -288,7 +290,8 @@ def learn(): @main.command() @click.option('--port', default=10101, help='Port your app is running on (defaults to 10101).') @click.option('--subdomain', default='?new', help='Subdomain to use. If not available, a random one is generated.') -def share(port: str, subdomain: str): +@click.option('--remote-host', default='h2oai.app', help='Remote host to use (defaults to h2oai.app).') +def share(port: int, subdomain: str, remote_host: str): """Share your locally running app with the world. \b @@ -311,7 +314,7 @@ async def wakeup(): loop.create_task(wakeup()) try: - loop.run_until_complete(_share(port, subdomain)) + loop.run_until_complete(_share(port, subdomain, remote_host)) except KeyboardInterrupt: tasks = asyncio.all_tasks(loop) for task in tasks: @@ -320,12 +323,12 @@ async def wakeup(): loop.close() -async def _share(port: str, subdomain: str): +async def _share(port: int, subdomain: str, remote_host: str): if _scan_free_port(port) == port: print(f'Could not connect to localhost:{port}. Please make sure your app is running.') exit(1) - res = httpx.get(f'https://h2oai.app/{subdomain}', headers={'Content-Type': 'application/json'}) + res = httpx.get(f'https://{remote_host}/{subdomain}', headers={'Content-Type': 'application/json'}) if res.status_code != 200: print('Could not connect to the remote sharing server.') exit(1) @@ -341,10 +344,10 @@ async def _share(port: str, subdomain: str): tasks = [] for _ in range(max_conn_count // step): for _ in range(step): - tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', int(port), 'h2oai.app', res['port']))) + tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', port, remote_host, res['port']))) await asyncio.sleep(1) # Handle the rest if any. for _ in range(max_conn_count % step): - tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', int(port), 'h2oai.app', res['port']))) + tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', port, remote_host, res['port']))) await asyncio.gather(*tasks) From 30d2acc247a7d42ba191a2aeb72a3bc681315ca2 Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Wed, 21 Jun 2023 10:50:29 +0200 Subject: [PATCH 8/8] chore: Add extra explanation. --- py/h2o_wave/h2o_wave/cli.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/py/h2o_wave/h2o_wave/cli.py b/py/h2o_wave/h2o_wave/cli.py index 5eb69e2c9d..9ec7a3113f 100644 --- a/py/h2o_wave/h2o_wave/cli.py +++ b/py/h2o_wave/h2o_wave/cli.py @@ -339,6 +339,8 @@ async def _share(port: int, subdomain: str, remote_host: str): print('Press Ctrl+C to stop sharing.') max_conn_count = res['max_conn_count'] + # The server can be configured to either support 10 concurrent connections (default) or more. + # If more, connect in batches of 100 for better performance. step = 100 if max_conn_count > 10 else max_conn_count tasks = []