-
Notifications
You must be signed in to change notification settings - Fork 46
/
CH_02_02_server.py
34 lines (27 loc) · 1.07 KB
/
CH_02_02_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
import websockets
import click
from contextlib import suppress
async def blastoff(websocket):
click.secho(">> begin blastoff")
for i in range(25):
await websocket.send(f"\n\n\n>> {' ' * i}🚀")
await asyncio.sleep(0.03)
for i in range(3):
await asyncio.sleep(0.5)
await websocket.send(f"\n\n\n>> 🚀🚀🚀🚀🚀🚀🚀🚀 BLASTOFF 🚀🚀🚀🚀🚀🚀🚀🚀 <<")
async def huston(websocket, path):
click.clear()
async for message in websocket:
if "yes" in message.lower():
click.secho(">> begin countdown")
for i in reversed(range(1, 11)):
await websocket.send(f"\n\n\n>> Taking off in: {i}")
await asyncio.sleep(0.8)
with suppress(Exception):
await blastoff(websocket)
PORT = 8765
click.secho(f"--- listening for websocket connections on port: {PORT} ---")
start_server = websockets.serve(huston, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()