Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Wave share client. #1953

Merged
merged 8 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 95 additions & 19 deletions py/h2o_wave/h2o_wave/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import tarfile
import asyncio
import os
import platform
import shutil
import sys
import socket
from contextlib import closing
import subprocess
import sys
import tarfile
import time
from contextlib import closing
from pathlib import Path
import platform
import uvicorn
from urllib import request
from urllib.parse import urlparse

import click
import httpx
import inquirer
import os
import uvicorn
from click import Choice, option
from urllib import request
from urllib.parse import urlparse
from h2o_wave.share import listen_on_socket

from .metadata import __arch__, __platform__
from .version import __version__
from .metadata import __platform__, __arch__

_localhost = '127.0.0.1'

Expand Down Expand Up @@ -119,15 +124,18 @@ def run(app: str, no_reload: bool, no_autostart: bool):
# Try to start Wave daemon if not running or turned off.
server_port = int(os.environ.get('H2O_WAVE_LISTEN', ':10101').split(':')[-1])
server_not_running = _scan_free_port(server_port) == server_port

waved_process = None
if no_autostart:
autostart = False
else:
autostart = os.environ.get('H2O_WAVE_NO_AUTOSTART', 'false').lower() in ['false', '0', 'f']

waved = 'waved.exe' if 'Windows' in platform.system() else './waved'
# OS agnostic wheels do not have waved - needed for HAC.
is_waved_present = os.path.isfile(os.path.join(sys.exec_prefix, waved))

try:
waved = 'waved.exe' if 'Windows' in platform.system() else './waved'
waved_process = None
# OS agnostic wheels do not have waved - needed for HAC.
is_waved_present = os.path.isfile(os.path.join(sys.exec_prefix, waved))
if no_autostart:
autostart = False
else:
autostart = os.environ.get('H2O_WAVE_NO_AUTOSTART', 'false').lower() in ['false', '0', 'f']
if autostart and is_waved_present and server_not_running:
waved_process = subprocess.Popen([waved], cwd=sys.exec_prefix, env=os.environ.copy(), shell=True)
time.sleep(1)
Expand Down Expand Up @@ -276,4 +284,72 @@ def learn():
from h2o_wave_university import cli
cli.main()
except ImportError:
print('You need to run \x1b[7;30;43mpip install h2o_wave_university\x1b[0m first.')
print('You need to run \x1b[7;30;43mpip install h2o_wave_university\x1b[0m first.')


@main.command()
@click.option('--port', default=10101, help='Port your app is running on (defaults to 10101).')
@click.option('--subdomain', default='?new', help='Subdomain to use. If not available, a random one is generated.')
@click.option('--remote-host', default='h2oai.app', help='Remote host to use (defaults to h2oai.app).')
def share(port: int, subdomain: str, remote_host: str):
"""Share your locally running app with the world.

\b
$ wave share
"""
if 'Windows' in platform.system():
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

if 'Windows' in platform.system():

async def wakeup():
while True:
await asyncio.sleep(1)

# HACK: Enable Ctrl-C on Windows when opening multiple TCP connections.
# https://stackoverflow.com/questions/27480967/why-does-the-asyncios-event-loop-suppress-the-keyboardinterrupt-on-windows.
loop.create_task(wakeup())

try:
loop.run_until_complete(_share(port, subdomain, remote_host))
except KeyboardInterrupt:
tasks = asyncio.all_tasks(loop)
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()


async def _share(port: int, subdomain: str, remote_host: str):
if _scan_free_port(port) == port:
print(f'Could not connect to localhost:{port}. Please make sure your app is running.')
exit(1)

res = httpx.get(f'https://{remote_host}/{subdomain}', headers={'Content-Type': 'application/json'})
if res.status_code != 200:
print('Could not connect to the remote sharing server.')
exit(1)

res = res.json()
print(f'BETA: Proxying localhost:{port} ==> {res["url"]}')
print('\x1b[7;30;43mDO NOT SHARE YOUR APP IF IT CONTAINS SENSITIVE INFO\x1b[0m.')
print('Press Ctrl+C to stop sharing.')

max_conn_count = res['max_conn_count']
# The server can be configured to either support 10 concurrent connections (default) or more.
# If more, connect in batches of 100 for better performance.
step = 100 if max_conn_count > 10 else max_conn_count
mturoci marked this conversation as resolved.
Show resolved Hide resolved

tasks = []
for _ in range(max_conn_count // step):
for _ in range(step):
tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', port, remote_host, res['port'])))
await asyncio.sleep(1)
# Handle the rest if any.
for _ in range(max_conn_count % step):
tasks.append(asyncio.create_task(listen_on_socket('127.0.0.1', port, remote_host, res['port'])))

await asyncio.gather(*tasks)
26 changes: 26 additions & 0 deletions py/h2o_wave/h2o_wave/share.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import asyncio


async def pipe(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
while True:
data = await r.read(4096)
if not data:
break
w.write(data)
await w.drain()


async def listen_on_socket(local_host: str, local_port: int, remote_host: str, remote_port: int) -> None:
while True:
mturoci marked this conversation as resolved.
Show resolved Hide resolved
try:
local_reader, local_writer = await asyncio.open_connection(local_host, local_port)
remote_reader, remote_writer = await asyncio.open_connection(remote_host, remote_port, ssl=True)

await asyncio.gather(pipe(local_reader, remote_writer), pipe(remote_reader, local_writer))

# Swallow exceptions and reconnect.
except Exception:
pass
finally:
local_writer.close()
remote_writer.close()