-
Notifications
You must be signed in to change notification settings - Fork 722
/
cli.py
124 lines (89 loc) · 3.28 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import argparse
import asyncio
import logging
import time
from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.signaling import BYE, add_signaling_arguments, create_signaling
def channel_log(channel, t, message):
print("channel(%s) %s %s" % (channel.label, t, message))
def channel_send(channel, message):
channel_log(channel, ">", message)
channel.send(message)
async def consume_signaling(pc, signaling):
while True:
obj = await signaling.receive()
if isinstance(obj, RTCSessionDescription):
await pc.setRemoteDescription(obj)
if obj.type == "offer":
# send answer
await pc.setLocalDescription(await pc.createAnswer())
await signaling.send(pc.localDescription)
elif isinstance(obj, RTCIceCandidate):
await pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
break
time_start = None
def current_stamp():
global time_start
if time_start is None:
time_start = time.time()
return 0
else:
return int((time.time() - time_start) * 1000000)
async def run_answer(pc, signaling):
await signaling.connect()
@pc.on("datachannel")
def on_datachannel(channel):
channel_log(channel, "-", "created by remote party")
@channel.on("message")
def on_message(message):
channel_log(channel, "<", message)
if isinstance(message, str) and message.startswith("ping"):
# reply
channel_send(channel, "pong" + message[4:])
await consume_signaling(pc, signaling)
async def run_offer(pc, signaling):
await signaling.connect()
channel = pc.createDataChannel("chat")
channel_log(channel, "-", "created by local party")
async def send_pings():
while True:
channel_send(channel, "ping %d" % current_stamp())
await asyncio.sleep(1)
@channel.on("open")
def on_open():
asyncio.ensure_future(send_pings())
@channel.on("message")
def on_message(message):
channel_log(channel, "<", message)
if isinstance(message, str) and message.startswith("pong"):
elapsed_ms = (current_stamp() - int(message[5:])) / 1000
print(" RTT %.2f ms" % elapsed_ms)
# send offer
await pc.setLocalDescription(await pc.createOffer())
await signaling.send(pc.localDescription)
await consume_signaling(pc, signaling)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Data channels ping/pong")
parser.add_argument("role", choices=["offer", "answer"])
parser.add_argument("--verbose", "-v", action="count")
add_signaling_arguments(parser)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
signaling = create_signaling(args)
pc = RTCPeerConnection()
if args.role == "offer":
coro = run_offer(pc, signaling)
else:
coro = run_answer(pc, signaling)
# run event loop
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(pc.close())
loop.run_until_complete(signaling.close())