Skip to content

Commit

Permalink
Try to ensure live records are transmitted in order. If they end up b…
Browse files Browse the repository at this point in the history
…eing

decoded out of order put out-of-order packets aside for later reassembly so we
can recover from decode errors faster.
  • Loading branch information
davidrg committed Jun 7, 2020
1 parent f1b4f2e commit cf87802
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 21 deletions.
88 changes: 84 additions & 4 deletions weather_push/zxw_push/client/weather_push_server/tcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def __init__(self, authorisation_code,
self._outgoing_samples = {} # List of waiting samples for each station
self._previous_live_record = {} # Last live record for each station

# This is keyed by station and live sequence id
self._outgoing_weather_packets = dict()

self._stations = None
self._station_ids = None
self._image_type_ids = None
Expand Down Expand Up @@ -406,7 +409,7 @@ def flush_samples(self):
pass

def _make_live_weather_record(self, data, previous_live, previous_sample,
hardware_type, station_id):
hardware_type, station_id, sequence_id):

# We send a full uncompressed record every so often just in case a
# packet has gone missing or arrived out of order at some point.
Expand All @@ -428,7 +431,7 @@ def _make_live_weather_record(self, data, previous_live, previous_sample,
if encoded is not None:
record = LiveDataRecord()
record.station_id = station_id
record.sequence_id = self._live_sequence_id[station_id]()
record.sequence_id = sequence_id
record.field_list = field_ids
record.field_data = encoded

Expand Down Expand Up @@ -487,6 +490,15 @@ def _flush_data_buffer(self, station_id, live_data, hardware_type):
:param hardware_type: Hardware type used by the live data
:type hardware_type: str
"""

live_sequence_id = self._live_sequence_id[station_id]()

if station_id not in self._outgoing_weather_packets.keys():
self._outgoing_weather_packets[station_id] = dict()

# Reserve a transmission slot for the packet we're about to assemble.
self._outgoing_weather_packets[station_id][live_sequence_id] = None

previous_sample = yield self._confirmed_sample_func(
self._station_codes[station_id])

Expand Down Expand Up @@ -544,7 +556,8 @@ def _flush_data_buffer(self, station_id, live_data, hardware_type):

live_record = self._make_live_weather_record(
live_data, previous_live, previous_sample,
hardware_type, station_id
hardware_type, station_id,
live_sequence_id
)

# live_record is None when compression throws away *all* data (meaning
Expand All @@ -555,6 +568,10 @@ def _flush_data_buffer(self, station_id, live_data, hardware_type):

self._previous_live_record[station_id] = (live_data,
live_record.sequence_id)
else:
# Live record doesn't need transmitting. Clear its reservation.
self._outgoing_weather_packets[station_id].pop(live_sequence_id, None)
live_sequence_id = None

if len(weather_records) == 0:
# Nothing to send.
Expand All @@ -565,4 +582,67 @@ def _flush_data_buffer(self, station_id, live_data, hardware_type):
for record in weather_records:
packet.add_record(record)

self._send_packet(packet)
self._send_weather_data_packet(packet, station_id, live_sequence_id)

def _send_weather_data_packet(self, packet, station_id, live_sequence_id):
"""
This function will ensure packets containing live weather data are sent strictly in the order they are
prepared.
Most of the time this should happen naturally but under certain circumstances (such is when
there are a large number of sample records waiting to be sent) the event driven nature of packet
assembly can result in multiple packets being assembled at one time and completed out of order.
:param packet: Packet to transmit
:param station_id: Station the packet is for
:param live_sequence_id: ID of the live record in the packet
"""

if live_sequence_id is None:
self._send_packet(packet) # No live record in the packet - transmission order doesn't matter.
return

if station_id not in self._outgoing_weather_packets:
raise Exception("No outgoing packets reserved for station {0}".format(station_id))

if live_sequence_id not in self._outgoing_weather_packets[station_id]:
raise Exception("Packet {0} for station {1} has no reserved transmission slot".format(
live_sequence_id, station_id))

if self._outgoing_weather_packets[station_id][live_sequence_id] is not None:
raise Exception("Packet {0} for station {1} is already awaiting transmission!")

log.msg("Queue data for station {0} packet {1}".format(station_id, live_sequence_id))

self._outgoing_weather_packets[station_id][live_sequence_id] = packet

waiting_packets = sorted(self._outgoing_weather_packets[station_id].keys())

other_queued = False
if len(waiting_packets) > 2:
log.msg("Weather data packets with the following live sequence IDs are awaiting transmission "
"for station {0} while transmitting live {2}: {1}".format(
station_id, repr(waiting_packets), live_sequence_id))
other_queued = True

while len(waiting_packets) > 0:
packet_id = waiting_packets.pop(0)

this_packet = self._outgoing_weather_packets[station_id][packet_id]

if this_packet is None:
log.msg("No data for packet {0} yet. Stopping transmission loop. The following additional "
"packets remain to be transmitted: {1}".format(packet_id, repr(waiting_packets)))
# Stop transmission until we've got data for this packet to ensure packets aren't transmitted
# out of order.
break

log.msg("Transmit packet {0} for station {1}. Queue is now: {2}".format(
packet_id, station_id, repr(waiting_packets)))
self._send_packet(this_packet)

# Packet is sent, remove it from the queue
self._outgoing_weather_packets[station_id].pop(packet_id, None)

if other_queued and len(self._outgoing_weather_packets[station_id].keys()) == 0:
log.msg("Live queue cleared.")
145 changes: 128 additions & 17 deletions weather_push/zxw_push/server/tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, authorisation_code):

self._live_record_cache = {}
self._live_record_cache_ids = {}
self._last_live_record_id = dict()

self._sample_record_cache = {}
self._sample_record_cache_ids = []
Expand All @@ -62,6 +63,8 @@ def __init__(self, authorisation_code):

self._authorisation_code = authorisation_code

self._undecoded_live_records = dict()

def start_protocol(self, db):
"""
Called when the protocol has started. Connects to the database, etc.
Expand Down Expand Up @@ -247,15 +250,38 @@ def _reset_tracking_variables(self):
self._live_record_cache = {}
self._live_record_cache_ids = {}

@defer.inlineCallbacks
def _get_live_record(self, record_id, station_id):

if station_id not in self._live_record_cache.keys():
return None # We've never seen live data for this station before.
log.msg("No live data cached for station {0}".format(station_id))
# We've never seen live data for this station before.
defer.returnValue(None)
return

if record_id in self._live_record_cache[station_id].keys():
return self._live_record_cache[station_id][record_id]
defer.returnValue(self._live_record_cache[station_id][record_id])
return

log.msg("Live record {0} for station {1} not found. Cached records are: {2}".format(
record_id, station_id, repr(self._live_record_cache[station_id].keys())
))

if station_id in self._undecoded_live_records:
if record_id in self._undecoded_live_records[station_id]:
log.msg("Found record {0} in undecoded records store. Attempting to decode...".format(
record_id))
record = self._undecoded_live_records[station_id][record_id]
new_record = yield self._decode_live_record(record, False, False)
if new_record is not None:
log.msg("Successfully decoded record {0} from undecoded records store!".format(
record_id
))
self._undecoded_live_records[station_id].pop(record_id, None)
defer.returnValue(new_record)
return

return None
defer.returnValue(None)

def _cache_live_record(self, new_live, record_id, station_id):

Expand All @@ -270,6 +296,10 @@ def _cache_live_record(self, new_live, record_id, station_id):
rid = self._live_record_cache_ids[station_id].pop(0)
del self._live_record_cache[station_id][rid]

log.msg("live cache for station {0} is now: {1}".format(
station_id, repr(sorted(self._live_record_cache[station_id]))
))

@staticmethod
def _to_real_dict(value):
result = {}
Expand Down Expand Up @@ -358,6 +388,7 @@ def _build_live_from_sample_diff(self, data, station_id, fields, hw_type):

defer.returnValue(new_live)

@defer.inlineCallbacks
def _build_live_from_live_diff(self, data, station_id, fields, hw_type, sequence_id):
"""
Decompresses a live record compressed using live-diff. If the required
Expand All @@ -374,7 +405,9 @@ def _build_live_from_live_diff(self, data, station_id, fields, hw_type, sequence
:return: Decompressed live data or None on failure
"""
other_live_id = data["live_diff_sequence"]
other_live = self._get_live_record(other_live_id, station_id)
other_live = yield self._get_live_record(other_live_id, station_id)

log.msg("Decode live {0} based on {1}".format(sequence_id, other_live_id))

if other_live is None:
# base record could not be found. This means that
Expand All @@ -384,33 +417,35 @@ def _build_live_from_live_diff(self, data, station_id, fields, hw_type, sequence
# Either we can't decode this record. Count it as an
# error and move on.
log.msg("** NOTICE: Live record decoding failed for station {2} - other live not "
"in cache. This is probably a client bug. This live is {0},"
" other is {1}".format(sequence_id, other_live_id, station_id))
return None
"in cache. Likely cause is network error or out-of-order processing. "
"This live is {0}, other is {1}".format(sequence_id, other_live_id, station_id))
defer.returnValue(None)
return

return patch_live_from_live(data, other_live, fields, hw_type)
defer.returnValue(patch_live_from_live(data, other_live, fields, hw_type))

@defer.inlineCallbacks
def _handle_live_record(self, record):
def _decode_live_record(self, record, cache=True, put_aside=True):
"""
Handles decoding and broadcasting a live record
Handles decoding a live record.
:param record: The received weather record
On success the live record will be stored in the recent live records cache and returned.
On failure None is returned.
:param record: Received weather record
:type record: LiveDataRecord
:return: True on success, False on Failure
:rtype: bool
:return: decoded live record
"""

fields = record.field_list
data = record.field_data
hw_type = self._station_id_hardware_type[record.station_id]
station_code = self._station_id_code[record.station_id]

rec_data = decode_live_data(data, hw_type, fields)

if "live_diff_sequence" in rec_data:

new_live = self._build_live_from_live_diff(
new_live = yield self._build_live_from_live_diff(
rec_data, record.station_id, fields, hw_type,
record.sequence_id
)
Expand All @@ -425,18 +460,94 @@ def _handle_live_record(self, record):
# the record.
new_live = rec_data

# We can now clear old un-decoded records from the un-decoded
# records store.
if record.station_id in self._undecoded_live_records:
for key in self._undecoded_live_records[record.station_id].keys():
# Throw away all records older than this one or all records that
# are a long way in the future (to account for the sequence ID
# wrapping around)
if key < record.sequence_id or key > record.sequence_id + 100:
log.msg("Discarding live {0} for station {1} - "
"made obsolete by receipt of full live record".format(
key, record.station_id))
self._undecoded_live_records[record.station_id].pop(key, None)

if new_live is None:
# Couldn't decompress - base record doesn't exist

if put_aside:
# We'll just put it aside for now in case the base record turns up later.
if record.station_id not in self._undecoded_live_records:
self._undecoded_live_records[record.station_id] = dict()
self._undecoded_live_records[record.station_id][record.sequence_id] = record

log.msg("Undecoded messages for station {0}: {1}".format(
record.station_id,
repr(sorted(self._undecoded_live_records[record.station_id].keys()))
))

# Decode failed.
defer.returnValue(None)
return

if cache:
self._cache_live_record(new_live, record.sequence_id,
record.station_id)

defer.returnValue(new_live)

@defer.inlineCallbacks
def _handle_live_record(self, record):
"""
Handles decoding and broadcasting a live record
:param record: The received weather record
:type record: LiveDataRecord
:return: True on success, False on Failure
:rtype: bool
"""

if record.station_id not in self._last_live_record_id:
self._last_live_record_id[record.station_id] = None

new_live = yield self._decode_live_record(record)

if new_live is None:
defer.returnValue(False)
return

# We only really care about *NEW* live data. If a record arrives out-of-order and its
# older than the most recently received record we don't care about it. We still decode
# it above so the result can end up in cache so we can recover from missing dependency
# decode errors later on but we don't want to broadcast it.
last_live_id = self._last_live_record_id[record.station_id]
this_live_id = record.sequence_id
delta = None
if last_live_id is not None:
delta = this_live_id - last_live_id

# The live sequence wraps around after 65534. So 65535 == 0. So live 100 could actually
# be more recent than live 65400. For this reason if the ID appears to jump back a long
# way (more than 1000 live records or around 41 minutes) we'll consider this record to
# be newer.
if last_live_id is not None and this_live_id < last_live_id and delta > -1000:

# This live is obsolete. Its older than the most recent live record we've received
# for this station. No need to do anything with it.
log.msg("Discard live {0} for station {1} as its older than the "
"last received ({2})".format(this_live_id, record.station_id, last_live_id))
return

self._cache_live_record(new_live, record.sequence_id,
record.station_id)
station_code = self._station_id_code[record.station_id]

# Insert decoded into the database as live data
yield self._db.store_live_data(station_code, new_live)

# TODO: Broadcast to message bus if configured

self._last_live_record_id[record.station_id] = this_live_id

defer.returnValue(True)

@defer.inlineCallbacks
Expand Down

0 comments on commit cf87802

Please sign in to comment.