Skip to content

Commit

Permalink
Add: application performance monitoring via Honeycomb (#71)
Browse files Browse the repository at this point in the history
The amount of traces send to Honeycomb are rate limited per hour,
to ensure we can stay within a predefined amount of events per
month.
  • Loading branch information
TrueBrain committed Oct 16, 2021
1 parent ec9e4c0 commit fb4d6f6
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 28 deletions.
91 changes: 88 additions & 3 deletions game_coordinator/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import beeline
import click
import logging
import random
import signal

from openttd_helpers import click_helper
Expand All @@ -22,10 +24,69 @@
from .database.redis import click_database_redis
from .web import start_webserver

TRACES_PER_HOUR = 100
TRACES_SAMPLE_RATE = 10

log = logging.getLogger(__name__)
samples = {}
samples_length = [0, 0]
samples_bucket = [0.0]


def beeline_sampler(event):
trace_id = event["trace.trace_id"]

# New trace. Check if we want to sample it.
if trace_id not in samples:
# Count how many new traces we have seen.
samples_length[0] += 1

# Check if we can send this trace.
if samples_bucket[0] > 0 and random.randint(1, TRACES_SAMPLE_RATE) == 1:
samples_bucket[0] -= 1

samples[trace_id] = True
samples_length[1] += 1
else:
samples[trace_id] = False

# Calculate the result and sample-rate.
result = samples[trace_id]
sample_rate = samples_length[1] / samples_length[0]

# This trace is closing. So forget any information about it.
if event["trace.parent_id"] is None:
del samples[trace_id]
samples_length[0] -= 1
if result:
samples_length[1] -= 1

return result, sample_rate


async def run_server(application, bind, port, ProtocolClass):
async def fill_samples_bucket():
# Every five seconds, fill the bucket a bit, so we can sample randomly
# during the hour.
while True:
await asyncio.sleep(5)
samples_bucket[0] += TRACES_PER_HOUR * 5 / 3600

# Ensure we never allow more than an hour of samples going out at once.
if samples_bucket[0] > TRACES_PER_HOUR:
samples_bucket[0] = TRACES_PER_HOUR


async def run_server(application, bind, port, ProtocolClass, honeycomb_api_key):
if honeycomb_api_key:
beeline.init(
writekey=honeycomb_api_key,
dataset="game-coordinator",
service_name=application.name,
sampler_hook=beeline_sampler,
)
asyncio.create_task(fill_samples_bucket())
log.info("Honeycomb beeline initialized")

loop = asyncio.get_event_loop()

server = await loop.create_server(
Expand Down Expand Up @@ -64,6 +125,13 @@ async def close_server(loop, app_instance, server):
@click_helper.command()
@click_logging # Should always be on top, as it initializes the logging
@click_sentry
@click.option("--honeycomb-api-key", help="Honeycomb API key.")
@click.option(
"--honeycomb-rate-limit", help="How many traces to send to Honeycomb per hour.", default=100, show_default=True
)
@click.option(
"--honeycomb-sample-rate", help="The sample rate of traces to send to Honeycomb.", default=10, show_default=True
)
@click.option(
"--bind", help="The IP to bind the server to", multiple=True, default=["::1", "127.0.0.1"], show_default=True
)
Expand Down Expand Up @@ -92,7 +160,24 @@ async def close_server(loop, app_instance, server):
@click_database_redis
@click_application_coordinator
@click_application_turn
def main(bind, app, coordinator_port, stun_port, turn_port, web_port, db, proxy_protocol):
def main(
honeycomb_api_key,
honeycomb_rate_limit,
honeycomb_sample_rate,
bind,
app,
coordinator_port,
stun_port,
turn_port,
web_port,
db,
proxy_protocol,
):
global TRACES_PER_HOUR, TRACES_SAMPLE_RATE

TRACES_PER_HOUR = honeycomb_rate_limit
TRACES_SAMPLE_RATE = honeycomb_sample_rate

loop = asyncio.get_event_loop()

db_instance = db()
Expand All @@ -113,7 +198,7 @@ def main(bind, app, coordinator_port, stun_port, turn_port, web_port, db, proxy_
port = stun_port
protocol = StunProtocol

server = loop.run_until_complete(run_server(app_instance, bind, port, protocol))
server = loop.run_until_complete(run_server(app_instance, bind, port, protocol, honeycomb_api_key))

loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.ensure_future(close_server(loop, app_instance, server)))
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(close_server(loop, app_instance, server)))
Expand Down
2 changes: 2 additions & 0 deletions game_coordinator/application/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@


class Application:
name = "coordinator"

def __init__(self, database):
if not _shared_secret:
raise Exception("Please set --shared-secret for this application")
Expand Down
21 changes: 21 additions & 0 deletions game_coordinator/application/helpers/token_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import random

from openttd_protocol import tracer
from openttd_protocol.wire.exceptions import SocketClosed

log = logging.getLogger(__name__)
Expand All @@ -25,10 +26,12 @@ def __init__(self, application, source, protocol_version, token, server):
self._timeout_task = None
self._given_up = False

@tracer.traced("token-connect")
def delete_client_token(self):
if self._source.client.connections[self._server.server_id] == self:
del self._source.client.connections[self._server.server_id]

@tracer.traced("token-connect")
async def connect(self):
self._tracking_number = 1
self._connect_result_event = asyncio.Event()
Expand All @@ -51,6 +54,7 @@ async def connect(self):
self._connect_task = asyncio.create_task(self._connect_guard())
self._timeout_task = asyncio.create_task(self._timeout())

@tracer.traced("token-connect")
async def connected(self):
self._given_up = True
self._connect_task.cancel()
Expand All @@ -60,9 +64,11 @@ async def connected(self):

await self._application.database.stats_connect(self._connect_method, True)

@tracer.traced("token-connect")
async def abort_attempt(self, reason):
asyncio.create_task(self._connect_give_up(f"abort-{reason}"))

@tracer.traced("token-connect")
async def connect_failed(self, tracking_number):
if tracking_number == 0:
# Client requested we stop with this connection attempt. So clean it up!
Expand All @@ -83,6 +89,7 @@ async def connect_failed(self, tracking_number):
self._tracking_number += 1
self._connect_result_event.set()

@tracer.traced("token-connect")
async def stun_result(self, prefix, interface_number, peer_type, peer_ip, peer_port):
if peer_type == "ipv6":
peer_ip = f"[{peer_ip}]"
Expand All @@ -99,6 +106,7 @@ async def stun_result(self, prefix, interface_number, peer_type, peer_ip, peer_p
# Both sides reported all their STUN results. Inform _connect().
self._stun_pairs.put_nowait(None)

@tracer.traced("token-connect")
async def stun_result_concluded(self, prefix, interface_number, result):
if result:
# Successful STUN results will call stun_result() eventually too.
Expand All @@ -109,7 +117,10 @@ async def stun_result_concluded(self, prefix, interface_number, result):
# Both sides reported all their STUN results. Inform _connect().
self._stun_pairs.put_nowait(None)

@tracer.untraced
async def _timeout(self):
tracer.add_context({"command": "connect.timeout"})

try:
await asyncio.sleep(TIMEOUT)

Expand All @@ -121,7 +132,11 @@ async def _timeout(self):
except Exception:
log.exception("Exception during _timeout()")

@tracer.untraced
@tracer.traced("token-connect")
async def _connect_guard(self):
tracer.add_context({"command": "connect.connect"})

try:
await self._connect()
except asyncio.CancelledError:
Expand All @@ -136,6 +151,7 @@ async def _connect_guard(self):

self._connect_task = None

@tracer.traced("token-connect")
async def _connect(self):
# Try connecting via direct-IPs first.
for direct_ip in self._server.direct_ips:
Expand Down Expand Up @@ -164,6 +180,7 @@ async def _connect(self):
# There are no more methods.
asyncio.create_task(self._connect_give_up("out-of-methods"))

@tracer.traced("token-connect")
async def _connect_direct_connect(self, server_ip, server_port):
ip_type = "ipv6" if server_ip.startswith("[") else "ipv4"
self._connect_method = f"direct-{ip_type}"
Expand All @@ -177,10 +194,12 @@ async def _connect_direct_connect(self, server_ip, server_port):
self._protocol_version, self.client_token, self._tracking_number, server_ip, server_port
)

@tracer.traced("token-connect")
async def _connect_stun_request(self):
await self._source.protocol.send_PACKET_COORDINATOR_GC_STUN_REQUEST(self._protocol_version, self.client_token)
await self._server.send_stun_request(self._protocol_version, self.server_token)

@tracer.traced("token-connect")
async def _connect_stun_connect(self, ip_type):
self._connect_method = f"stun-{ip_type}"
self._connect_result_event.clear()
Expand Down Expand Up @@ -209,6 +228,7 @@ async def _connect_stun_connect(self, ip_type):
client_peer[2],
)

@tracer.traced("token-connect")
async def _connect_turn_connect(self):
self._connect_method = "turn"
self._connect_result_event.clear()
Expand Down Expand Up @@ -240,6 +260,7 @@ async def _connect_turn_connect(self):
connection_string,
)

@tracer.traced("token-connect")
async def _connect_give_up(self, failure_reason):
# Because we are async, it can happen more than one way suggests we
# should give up. For example, a user sends a "stop", but directly
Expand Down
14 changes: 14 additions & 0 deletions game_coordinator/application/helpers/token_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import pproxy

from openttd_protocol import tracer
from openttd_protocol.protocol.coordinator import ConnectionType
from openttd_protocol.protocol.game import GameProtocol
from openttd_protocol.wire.exceptions import SocketClosed
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(self, application, source, protocol_version, token, server):
def delete_client_token(self):
pass

@tracer.traced("token-verify")
async def connect(self):
self._pending_detection_tasks = []
self._stun_concluded = set()
Expand All @@ -63,6 +65,7 @@ async def connect(self):

self._conclude_task = asyncio.create_task(self._conclude_detection())

@tracer.traced("token-verify")
async def abort_attempt(self, reason):
self._conclude_task.cancel()
for task in self._pending_detection_tasks:
Expand All @@ -72,6 +75,7 @@ async def abort_attempt(self, reason):
await self._application.database.stats_verify("abort")
self._application.delete_token(self.token)

@tracer.traced("token-verify")
async def stun_result(self, prefix, interface_number, peer_type, peer_ip, peer_port):
peer_ip = ipaddress.IPv6Address(peer_ip) if peer_type == "ipv6" else ipaddress.IPv4Address(peer_ip)

Expand All @@ -82,6 +86,7 @@ async def stun_result(self, prefix, interface_number, peer_type, peer_ip, peer_p
task = asyncio.create_task(self._start_detection(interface_number, peer_ip))
self._pending_detection_tasks.append(task)

@tracer.traced("token-verify")
async def stun_result_concluded(self, prefix, interface_number, result):
if result:
# Successful STUN results will call stun_result() eventually too.
Expand All @@ -93,7 +98,11 @@ async def stun_result_concluded(self, prefix, interface_number, result):
if len(self._stun_concluded) == 2:
self._stun_done_event.set()

@tracer.untraced
@tracer.traced("token-verify")
async def _start_detection(self, interface_number, server_ip):
tracer.add_context({"command": "verify.start"})

try:
await asyncio.wait_for(self._create_connection(server_ip, self._server.server_port), TIMEOUT_DIRECT_CONNECT)

Expand Down Expand Up @@ -123,7 +132,11 @@ async def _start_detection(self, interface_number, server_ip):

self._pending_detection_tasks.remove(asyncio.current_task())

@tracer.untraced
@tracer.traced("token-verify")
async def _conclude_detection(self):
tracer.add_context({"command": "verify.conclude"})

try:
await asyncio.wait_for(self._stun_done_event.wait(), TIMEOUT_VERIFY)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -158,6 +171,7 @@ async def _conclude_detection(self):

self._application.delete_token(self.token)

@tracer.traced("token-verify")
async def _create_connection(self, server_ip, server_port):
connected = asyncio.Event()

Expand Down
2 changes: 2 additions & 0 deletions game_coordinator/application/stun.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@


class Application:
name = "stun"

def __init__(self, database):
self.database = database

Expand Down
2 changes: 2 additions & 0 deletions game_coordinator/application/turn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@


class Application:
name = "turn"

def __init__(self, database):
if not _turn_address:
raise Exception("Please set --turn-address for this application")
Expand Down

0 comments on commit fb4d6f6

Please sign in to comment.