Skip to content
This repository has been archived by the owner on Aug 1, 2021. It is now read-only.

Commit

Permalink
Voice Gateway v3 (#80)
Browse files Browse the repository at this point in the history
* [reqs] add wsaccel to performance reqs

* Support voice gateway v3
  • Loading branch information
b1naryth1ef committed Jan 22, 2018
1 parent 9ac9e26 commit 88504f7
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 138 deletions.
223 changes: 93 additions & 130 deletions disco/voice/client.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
from __future__ import print_function

import gevent
import socket
import struct
import time

try:
import nacl.secret
except ImportError:
print('WARNING: nacl is not installed, voice support is disabled')

from holster.enum import Enum
from holster.emitter import Emitter

from disco.gateway.encoding.json import JSONEncoder
from disco.util.websocket import Websocket
from disco.util.logging import LoggingClass
from disco.voice.packets import VoiceOPCode
from disco.gateway.packets import OPCode
from disco.voice.packets import VoiceOPCode
from disco.voice.udp import UDPVoiceClient

VoiceState = Enum(
DISCONNECTED=0,
AWAITING_ENDPOINT=1,
AUTHENTICATING=2,
CONNECTING=3,
CONNECTED=4,
VOICE_CONNECTING=5,
VOICE_CONNECTED=6,
RECONNECTING=1,
AWAITING_ENDPOINT=2,
AUTHENTICATING=3,
AUTHENTICATED=4,
CONNECTING=5,
CONNECTED=6,
VOICE_CONNECTING=7,
VOICE_CONNECTED=8,
)


Expand All @@ -36,98 +32,15 @@ def __init__(self, msg, client):
super(VoiceException, self).__init__(msg)


class UDPVoiceClient(LoggingClass):
def __init__(self, vc):
super(UDPVoiceClient, self).__init__()
self.vc = vc

# The underlying UDP socket
self.conn = None

# Connection information
self.ip = None
self.port = None

self.run_task = None
self.connected = False

# Buffer used for encoding/sending frames
self._buffer = bytearray(24)
self._buffer[0] = 0x80
self._buffer[1] = 0x78

def send_frame(self, frame, sequence=None, timestamp=None):
# Convert the frame to a bytearray
frame = bytearray(frame)

# Pack the rtc header into our buffer
struct.pack_into('>H', self._buffer, 2, sequence or self.vc.sequence)
struct.pack_into('>I', self._buffer, 4, timestamp or self.vc.timestamp)
struct.pack_into('>i', self._buffer, 8, self.vc.ssrc)

if self.vc.mode == 'xsalsa20_poly1305_suffix':
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
raw = self.vc.secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce
else:
# Now encrypt the payload with the nonce as a header
raw = self.vc.secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext

# Send the header (sans nonce padding) plus the payload
self.send(self._buffer[:12] + raw)

# Increment our sequence counter
self.vc.sequence += 1
if self.vc.sequence >= 65535:
self.vc.sequence = 0

def run(self):
while True:
self.conn.recvfrom(4096)

def send(self, data):
self.conn.sendto(data, (self.ip, self.port))

def disconnect(self):
self.run_task.kill()

def connect(self, host, port, timeout=10, addrinfo=None):
self.ip = socket.gethostbyname(host)
self.port = port

self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

if addrinfo:
ip, port = addrinfo
else:
# Send discovery packet
packet = bytearray(70)
struct.pack_into('>I', packet, 0, self.vc.ssrc)
self.send(packet)

# Wait for a response
try:
data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout)
except gevent.Timeout:
return (None, None)

# Read IP and port
ip = str(data[4:]).split('\x00', 1)[0]
port = struct.unpack('<H', data[-2:])[0]

# Spawn read thread so we don't max buffers
self.connected = True
self.run_task = gevent.spawn(self.run)

return (ip, port)


class VoiceClient(LoggingClass):
VOICE_GATEWAY_VERSION = 3

SUPPORTED_MODES = {
'xsalsa20_poly1305_suffix',
'xsalsa20_poly1305',
}

def __init__(self, channel, encoder=None):
def __init__(self, channel, encoder=None, max_reconnects=5):
super(VoiceClient, self).__init__()

if not channel.is_voice:
Expand All @@ -136,10 +49,13 @@ def __init__(self, channel, encoder=None):
self.channel = channel
self.client = self.channel.client
self.encoder = encoder or JSONEncoder
self.max_reconnects = max_reconnects

# Bind to some WS packets
self.packets = Emitter()
self.packets.on(VoiceOPCode.HELLO, self.on_voice_hello)
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed)
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp)

# State + state change emitter
Expand All @@ -152,18 +68,15 @@ def __init__(self, channel, encoder=None):
self.ssrc = None
self.port = None
self.mode = None
self.secret_box = None
self.udp = None

# Voice data state
self.sequence = 0
self.timestamp = 0

self.update_listener = None

# Websocket connection
self.ws = None
self.heartbeat_task = None

self._session_id = None
self._reconnects = 0
self._update_listener = None
self._heartbeat_task = None

def __repr__(self):
return u'<VoiceClient {}>'.format(self.channel)
Expand All @@ -174,9 +87,17 @@ def set_state(self, state):
self.state = state
self.state_emitter.emit(state, prev_state)

def heartbeat(self, interval):
def _connect_and_run(self):
self.ws = Websocket('wss://' + self.endpoint + '/v={}'.format(self.VOICE_GATEWAY_VERSION))
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
self.ws.emitter.on('on_message', self.on_message)
self.ws.run_forever()

def _heartbeat(self, interval):
while True:
self.send(VoiceOPCode.HEARTBEAT, time.time() * 1000)
self.send(VoiceOPCode.HEARTBEAT, time.time())
gevent.sleep(interval / 1000)

def set_speaking(self, value):
Expand All @@ -192,6 +113,11 @@ def send(self, op, data):
'd': data,
}), self.encoder.OPCODE)

def on_voice_hello(self, data):
self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self)
self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'] * 0.75)
self.set_state(VoiceState.AUTHENTICATED)

def on_voice_ready(self, data):
self.log.info('[%s] Recived Voice READY payload, attempting to negotiate voice connection w/ remote', self)
self.set_state(VoiceState.CONNECTING)
Expand All @@ -206,8 +132,6 @@ def on_voice_ready(self, data):
else:
raise Exception('Failed to find a supported voice mode')

self.heartbeat_task = gevent.spawn(self.heartbeat, data['heartbeat_interval'])

self.log.debug('[%s] Attempting IP discovery over UDP to %s:%s', self, self.endpoint, self.port)
self.udp = UDPVoiceClient(self)
ip, port = self.udp.connect(self.endpoint, self.port)
Expand All @@ -227,11 +151,15 @@ def on_voice_ready(self, data):
},
})

def on_voice_resumed(self, data):
self.log.info('[%s] Recieved resumed', self)
self.set_state(VoiceState.CONNECTED)

def on_voice_sdp(self, sdp):
self.log.info('[%s] Recieved session description, connection completed', self)

# Create a secret box for encryption/decryption
self.secret_box = nacl.secret.SecretBox(bytes(bytearray(sdp['secret_key'])))
self.udp.setup_encryption(bytes(bytearray(sdp['secret_key'])))

# Toggle speaking state so clients learn of our SSRC
self.set_speaking(True)
Expand All @@ -253,12 +181,8 @@ def on_voice_server_update(self, data):
self.set_state(VoiceState.AUTHENTICATING)

self.endpoint = data.endpoint.split(':', 1)[0]
self.ws = Websocket('wss://' + self.endpoint)
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
self.ws.emitter.on('on_message', self.on_message)
self.ws.run_forever()

self._connect_and_run()

def on_message(self, msg):
try:
Expand All @@ -271,25 +195,60 @@ def on_error(self, err):
self.log.error('[%s] Voice websocket error: %s', self, err)

def on_open(self):
if self._session_id:
return self.send(VoiceOPCode.RESUME, {
'server_id': self.channel.guild_id,
'user_id': self.client.state.me.id,
'session_id': self._session_id,
'token': self.token,
})

self._session_id = self.client.gw.session_id

self.send(VoiceOPCode.IDENTIFY, {
'server_id': self.channel.guild_id,
'user_id': self.client.state.me.id,
'session_id': self.client.gw.session_id,
'session_id': self._session_id,
'token': self.token,
})

def on_close(self, code, error):
self.log.warning('[%s] Voice websocket disconnected (%s, %s)', self, code, error)
def on_close(self, code, reason):
self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects)

if self._heartbeat_task:
self._heartbeat_task.kill()

# If we're not in a connected state, don't try to resume/reconnect
if self.state != VoiceState.CONNECTED:
return

self.log.info('[%s] Attempting Websocket Resumption', self)
self._reconnects += 1

if self.state == VoiceState.CONNECTED:
self.log.info('Attempting voice reconnection')
self.connect()
if self.max_reconnects and self._reconnects > self.max_reconnects:
raise VoiceException('Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects))

self.set_state(VoiceState.RECONNECTING)

# Don't resume for these error codes:
if code and 4000 <= code <= 4016:
self._session_id = None

if self.udp and self.udp.connected:
self.udp.disconnect()

wait_time = (self._reconnects - 1) * 5
self.log.info(
'[%s] Will attempt to %s after %s seconds', self, 'resume' if self._session_id else 'reconnect', wait_time)
gevent.sleep(wait_time)

self._connect_and_run()

def connect(self, timeout=5, mute=False, deaf=False):
self.log.debug('[%s] Attempting connection', self)
self.set_state(VoiceState.AWAITING_ENDPOINT)

self.update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update)
self._update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update)

self.client.gw.send(OPCode.VOICE_STATE_UPDATE, {
'self_mute': mute,
Expand All @@ -299,17 +258,18 @@ def connect(self, timeout=5, mute=False, deaf=False):
})

if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout):
self.disconnect()
raise VoiceException('Failed to connect to voice', self)

def disconnect(self):
self.log.debug('[%s] disconnect called', self)
self.set_state(VoiceState.DISCONNECTED)

if self.heartbeat_task:
self.heartbeat_task.kill()
self.heartbeat_task = None
if self._heartbeat_task:
self._heartbeat_task.kill()
self._heartbeat_task = None

if self.ws and self.ws.sock.connected:
if self.ws and self.ws.sock and self.ws.sock.connected:
self.ws.close()

if self.udp and self.udp.connected:
Expand All @@ -324,3 +284,6 @@ def disconnect(self):

def send_frame(self, *args, **kwargs):
self.udp.send_frame(*args, **kwargs)

def increment_timestamp(self, *args, **kwargs):
self.udp.increment_timestamp(*args, **kwargs)
5 changes: 5 additions & 0 deletions disco/voice/packets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@
HEARTBEAT=3,
SESSION_DESCRIPTION=4,
SPEAKING=5,
HEARTBEAT_ACK=6,
RESUME=7,
HELLO=8,
RESUMED=9,
CLIENT_DISCONNECT=13,
)

0 comments on commit 88504f7

Please sign in to comment.