/
cache.py
1210 lines (983 loc) · 44.7 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Handles caching of server statuses.
Server states are stored in Redis. It tracks their general info such as,
the server name, map, player counts and scores. Additionally it maintains
an index of all the tags applied to a server.
Each server is uniquely identified by its address which is a combination of
the IP address of the host and the port number -- these are represented by
:class:`Address` objects. These address also act as Redis keys.
Redis Schema:
-------------
Redis has a fairly limited type system meaning that the stored values are
all UTF-8 encoded strings. The API provided by this module will transparently
translate these strings to more appropriate Python types.
All keys are UTF-8 encoded. When a server address is used in a conventional
colon-separated IP-port form where the IP address it self is in the dotted
decimal format. For example: ``0.0.0.0:8000``.
Each key is prefixed by ``serverstf`` in order to act as a kind of namespace.
Key *namespaces* are separated by forward slashes.
``SET serverstf/servers``
A set containing the addresses of all the servers in the cache. This is
the authorative list of all servers tracked by the cache. The addresses
themselves are just UTF-8 encoded string representations of
:class:`Address`es.
``HASH serverstf/servers/<ip>:<port>``
These hashes hold the current state of the server corresponding to the
key. Each hash has the following keys:
* ``name``
* ``map``
* ``application_id``
* ``players``
The ``players`` field tracks the players on the server. Include the
current number of players, the maximum allowed, how many are boths and
each individual player's name, score and connection time.
As this is a non-trivial structure it is stored as a JSON object (still
UTF-8 encoded.) The object contains four fields:
``current``
The current number of players.
``max``
The maximum number of players supported by the server.
``bots``
The number of players that are bots.
``scores``
An array of three-item arrays which contains (in the following order)
the player name as a string, their score as a number and their
connection duration in seconds as a float.
When one of these server status hashes is retrieved from the cache it
translated to a :class:`Status` object.
``SET serverstf/servers/<ip>:<port>/tags``
A set containing all the tags currently applied to the server referenced
by the key. The tags themselves are UTF-8 encoded.
``NUMBER serverstf/servers/<ip>:<port>/interest``
This is an integer key which is used to track how much interest there is
in a server. It is used by the interest queue to determine whether or not
items should be re-enqueued.
``ZSET serverstf/tags/<tag>``
These sets hold any number of server addresses (formatted as described
above and UTF-8 encoded). Server's who's addresses are contained in one
of these hashes is understood to have the ``<tag>`` in their own
``serverstf/servers/<ip>:<port>/tags`` set.
For the purpose of providing predictable ordering this is a sorted set
but the actual scoring algorithm is opaque.
``LIST serverstf/interesting``
This is the *interest queue*. It is a LIST which holds UTF-8
encoded JSON arrays. Each array has two items: an interest level and
a stringified :class:`Address`.
This list is actively iterated on by pollers in order to update cache
entries for servers whose address occurs in the queue.
The first item in the JSON array -- the interest level -- signals what
the interest in the server was when that particular item was added to
the queue. See :attr:`Address.interest`.
"""
import asyncio
import contextlib
import datetime
import functools
import inspect
import ipaddress
import json
import logging
import uuid
import urllib.parse
import asyncio_redis
import asyncio_redis.encoders
import iso3166
log = logging.getLogger(__name__)
class AddressError(ValueError):
"""Exception for all errors related to :class:`Address`es."""
class PlayersError(ValueError):
"""Exception raised for invalid :class:`Players` objects."""
class NotifierError(Exception):
"""Exception raised for all errors stemming from :class:`Notifier`s."""
class CacheError(Exception):
"""Base exception for all cache related errors."""
class EmptyQueueError(Exception):
"""Raised when attempting to pop from an empty interest queue."""
class Address:
"""Represents a server address.
Each server address is comprised of an IPv4 address and a port number.
Addresses are hashable.
:ivar ip: the :class:`ipaddress.IPv4Address` of the address.
:ivar port: the port number for the address as an integer.
:raises AddressError: if either the given IP address or port is invalid.
"""
def __init__(self, ip, port):
try:
self._ip = ipaddress.IPv4Address(ip)
except ipaddress.AddressValueError as exc:
raise AddressError("Malformed IP address") from exc
try:
self._port = int(port)
except TypeError as exc:
raise AddressError("Port number is not an integer") from exc
if self._port < 1 or self._port > 65535:
raise AddressError("Port number is out of range")
def __repr__(self):
return "<{0.__class__.__name__} {0}>".format(self)
def __str__(self):
return "{0.ip}:{0.port}".format(self)
def __hash__(self):
return hash(str(self))
def __eq__(self, other):
"""Check another :class:`Address` for equality.
The other address is only considered equal if both the IP address
and port match.
"""
if not isinstance(other, self.__class__):
return NotImplemented
return self.ip == other.ip and self.port == other.port
@classmethod
def parse(cls, address):
"""Parse an address from a string.
This is effectively the inverse of :meth:`__str__`. The given address
is expected to be in the ``<ip>:<port>`` form.
:param str address: the address to parse.
:raises AddressError: if the string is not formatted correctly or the
address is in any other way invalid.
:return: a new :class:`Address` instance.
"""
split = address.split(":", 1)
if len(split) != 2:
raise AddressError("Addresses must be in the form <ip/host>"
":<port> but got {!r}".format(address))
return cls(*split)
@property
def ip(self):
"""Get the IP of the address."""
return self._ip
@property
def port(self):
"""Get the port number of the address."""
return self._port
class Players:
"""Immutable representation of the server players at a point in time.
:ivar current: the current number of players as an integer.
:ivar max: the maximum number of players supported by the server as an
integer.
:ivar bots: the current number of players that are bots as an integer.
:param scores: an iterable of three-item tuples containing the player's
name as a string, their score as an integer and their connection
duration as a :class:`datetime.timedelta`.
"""
def __init__(self, *, current, max_, bots, scores):
self._current = int(current)
self._max = int(max_)
self._bots = int(bots)
normalised_scores = []
for name, score, duration in scores:
name = str(name)
score = int(score)
if not isinstance(duration, datetime.timedelta):
raise PlayersError("Player connection duration must "
"be a {} object".format(datetime.timedelta))
normalised_scores.append((name, score, duration))
self._scores = tuple(normalised_scores)
def __repr__(self):
return ("<{0.__class__.__name__} "
"{0.current}/{0.max} ({0.bots} bots)>".format(self))
def __iter__(self):
"""Iterate over players and scores.
When iterated on this yields a tuple for each connected player
containing their name, score and connection duration. The players
themselves should never be considered to be in any particular order.
.. note::
It's possible for the number of players returned by this iterator
to be less than or greater than that of the :attr:`current`
players.
"""
return iter(self._scores)
@property
def current(self):
"""Get the current number of players."""
return self._current
@property
def max(self):
"""Get the maximum number of players."""
return self._max
@property
def bots(self):
"""Get the current number of NPC players."""
return self._bots
@classmethod
def from_json(cls, encoded):
"""Parse a JSON-encoded object.
This is effectively the inverse of :meth:`to_json`.
:raises PlayersError: if the JSON or the structure of the JSON object
is invalid in any way.
:return: a new :class:`Players`.
"""
try:
decoded = json.loads(encoded)
except ValueError as exc:
raise PlayersError("Bad JSON: {}".format(exc)) from exc
for field in ["current", "max", "bots", "scores"]:
if field not in decoded:
raise PlayersError("Missing field {!r}".format(field))
try:
current = int(decoded["current"])
max_ = int(decoded["max"])
bots = int(decoded["bots"])
except (ValueError, TypeError) as exc:
raise PlayersError(
"Malformed integer fields: {}".format(exc)) from exc
else:
scores = []
for entry in decoded["scores"]:
if not isinstance(entry, list) or not len(entry) == 3:
raise PlayersError(
"Malformed scores; expected list "
"of length 3 but got: {!r}".format(entry))
if not isinstance(entry[0], str):
raise PlayersError("First item must be a string "
"for but got: {!r}".format(entry[0]))
if not all(isinstance(i, (int, float)) for i in entry[1:]):
raise PlayersError("Last two items must be numbers "
"but got: {!r}".format(entry[1:]))
scores.append((entry[0], entry[1],
datetime.timedelta(seconds=entry[2])))
return cls(current=current, max_=max_, bots=bots, scores=scores)
def to_json(self):
"""Convert the players to a JSON-encoded object.
The JSON object will have four fields: ``current``, ``max``, ``bots``
and ``scores``. The first three are just plain numbers corresponding
to the attributes of the same names. ``scores`` is an array of
arrays which contain the name, score and duration (in seconds) of
each player.
:return: a string containing the JSON-encoded object.
"""
object_ = {
"current": self.current,
"max": self.max,
"bots": self.bots,
"scores": [],
}
for name, score, duration in self:
object_["scores"].append([name, score, duration.total_seconds()])
return json.dumps(object_)
class Status: # pylint: disable=too-many-instance-attributes
"""Immutable representation of the state of a server at a point in time.
:ivar address: the :class:`Address` that identifies the server.
:ivar interest: this is ammount of *interest* in the server's state as
expressed as an integer. The interest correlates to the number of
clients subscribed to the servers state. High interest levels means
the state is updated more frequently.
:ivar name: the display name of the server. Note that it may contain
non-printable characters.
:ivar map: the name of the map being played by the server.
:ivar application_id: the Steam application ID of the game being played
by the server. Note that this is the Steam application ID of the
game's client not the server. For example, in the case of TF2 it is
440 not 232250.
:ivar players: a :class:`Players` instance containing all the players
currently on the server. Defaults to an empty :class:`Players` object
if not set.
:ivar country: the country of server as an ISO 3166 two-letter
country identifier string.
:ivar latitude: the latitude for the location of the server as a float.
:ivar longitude: the longitude for the location of the server as a float.
:ivar tags: a frozen set of all the tags applied to the server.
"""
def __init__(self, address, *, interest, name,
map_, application_id, players,
country, latitude, longitude, tags):
self._address = address
self._interest = 0 if interest is None else int(interest)
self._name = name if name is None else str(name)
self._map = map_ if map_ is None else str(map_)
self._application_id = application_id
if self._application_id is not None:
self._application_id = int(application_id)
if players is None:
players = Players(current=0, max_=0, bots=0, scores=[])
if not isinstance(players, Players):
raise TypeError("Status players must be "
"a {} instance or None".format(Players))
self._players = players
if country is not None and country not in iso3166.countries_by_alpha2:
raise TypeError("{!r} is not a valid ISO "
"3166 country code".format(country))
self._country = country
self._latitude = latitude if latitude is None else float(latitude)
self._longitude = longitude if longitude is None else float(longitude)
self.tags = frozenset(tags)
def __repr__(self):
return ("<{0.__class__.__name__} {0.address} "
"({1} tags)>".format(self, len(self.tags)))
@property
def address(self):
"""Get the address of the server."""
return self._address
@property
def interest(self):
"""Get the current interest in the server."""
return self._interest
@property
def name(self):
"""Get the server name."""
return self._name
@property
def map(self):
"""Get the server map."""
return self._map
@property
def application_id(self):
"""Get the server Steam application ID."""
return self._application_id
@property
def players(self):
"""Get the current players."""
return self._players
@property
def country(self):
"""Get the country of the server as a ISO 3166 identifier."""
return self._country
@property
def latitude(self):
"""Get the latitude for the server's location."""
return self._latitude
@property
def longitude(self):
"""get the longitude for the server's location."""
return self._longitude
class Notifier:
"""Send and receive notifications about cache state changes.
This class wraps the Redis pub/sub subsystem to provide a method for
signaling when the cache's state changes. It also allows other services
to listen for these changes.
You should never instantiate this class directly, instead use
:meth:`AsyncCache.notifier`. These notifier objects are strictly
asynchronous and therefore public use of them is not possible with
synchronous caches.
When a notifier is used to listen for notifications it enters the
*watching* state. In this state it is not possible to use the same
notifier to send notifications. This is a limitation of the Redis
pub/sub system. Any attempts to send notifcations whilst in this state
will result in :exc:`NotifierError`.
"""
SERVER = "servers"
TAG = "tags"
def __init__(self, connection, encoding, namespace):
self._connection = connection
self._encoding = encoding
self._namespace = namespace
self._subscriber = None
@property
def watching(self):
"""Determine if the notifier is in watching mode."""
return self._subscriber is not None
def close(self):
"""Close the Redis connection."""
self._connection.close()
def _channel(self, *parts):
"""Construct a Redis pub/sub channel name from contituent parts.
:return: a bytestring containing the encoded channel name.
"""
channel = [self._namespace, "channels"]
for part in parts:
channel.append(str(part))
return "/".join(channel).encode(self._encoding)
@asyncio.coroutine
def _get_subscriber(self):
"""Get the :mod:`asyncio_redis` subscriber.
This will put the Redis connection in pub/sub mode making it
impossible send most commands and hence the notifier enteres the
*watching* state.
:return: a :class:`asyncio_redis.Subscription`.
"""
if not self._subscriber:
self._subscriber = yield from self._connection.start_subscribe()
return self._subscriber
@asyncio.coroutine
def notify_server(self, address):
"""Send a notification of server status update.
This publishes a UTF-8 encoded stringified version of the given
address to a channel dedicated to that address.
:param Address address: the address to send the notification for.
:raises NotifierError: if the notifier has entered watching mode.
"""
if self.watching:
raise NotifierError(
"Notifier in watch mode; cannot send notifications")
channel_server = self._channel(self.SERVER, address)
log.debug("Publish %s", channel_server)
yield from self._connection.publish(
channel_server, str(address).encode(self._encoding))
@asyncio.coroutine
def notify_tag(self, tag, address):
"""Send a notification of a server being added to a tag.
This publishes a UTF-8 encoded stringified version of the given
address to a channel dedicated to the tag.
"""
if self.watching:
raise NotifierError(
"Notifier in watch mode; cannot send notifications")
channel_server = self._channel(self.TAG, tag)
log.debug("Publish %s", channel_server)
yield from self._connection.publish(
channel_server, str(address).encode(self._encoding))
@asyncio.coroutine
def watch_server(self, address):
"""Watch for server status updates.
:param Address address: the address of the server to subscribe to
updates for.
"""
channel_server = self._channel(self.SERVER, address)
subscriber = yield from self._get_subscriber()
yield from subscriber.subscribe([channel_server])
log.debug("Subscribed to %s", channel_server)
@asyncio.coroutine
def unwatch_server(self, address):
"""Stop watching for server status updates.
This is the inverse of :meth:`watch_server`.
"""
channel_server = self._channel(self.SERVER, address)
subscriber = yield from self._get_subscriber()
yield from subscriber.unsubscribe([channel_server])
log.debug("Unsubscribed from %s", channel_server)
@asyncio.coroutine
def watch_tag(self, tag):
"""Watch a tag for updates."""
channel_server = self._channel(self.TAG, tag)
subscriber = yield from self._get_subscriber()
yield from subscriber.subscribe([channel_server])
log.debug("Subscribed to %s", channel_server)
@asyncio.coroutine
def unwatch_tag(self, tag):
"""Stop watching a tag for updates."""
channel_server = self._channel(self.TAG, tag)
subscriber = yield from self._get_subscriber()
yield from subscriber.unsubscribe([channel_server])
log.debug("Unsubscribed from %s", channel_server)
@asyncio.coroutine
def watch(self):
"""Wait for server status or tag updates.
This coroutine will block until a notification has been published
for a server or tag that is being actively watched. If the notifier
is not currently watching any servers (e.g. no calls
:meth:`watch_server` of :meth:`watch_tag` have been made) then the
coroutine will wait indefinately.
:return: a tuple containing the type of update (either :attr:`SERVER`
or :attr:`TAG`) and a corresponding :class:`Address`.
"""
subscriber = yield from self._get_subscriber()
address = None
while not address:
message = yield from subscriber.next_published()
type_ = message.channel.decode(self._encoding).split("/")[-2]
try:
address = Address.parse(message.value.decode(self._encoding))
except (UnicodeDecodeError, AddressError) as exc:
log.error("Malformed address on channel "
"%s: %s: %s", message.channel, message.value, exc)
return type_, address
class AsyncCache:
"""Asynchronous access to a Redis state cache.
Do not instantiate this class directly. Instead use the :meth:`connect`
coroutine.
"""
# :ivar _connection: the :class:`asyncio_redis.Connection` to use.
# :ivar _loop: the :mod:`asyncio` event loop to use.
#: The encoding to use for Unicode strings.
ENCODING = "utf-8"
#: The root key namespace
NAMESPACE = "serverstf"
def __init__(self, connection, loop):
self._connection = connection
self._loop = loop
self._notifier = None
self._active_iq_item = (None, None)
self._iq_key = self._key("interesting")
def __repr__(self):
return "<{0.__class__.__name__} using {0._connection}>".format(self)
@classmethod
@asyncio.coroutine
def connect(cls, url, loop):
"""Establish a connection to a Redis database.
:param str url: the URL of the Redis database to connect to.
:param loop: the :mod:`asyncio` event loop to use.
:return: a context manager that, when entered yields a newly
create :class:`AsyncCache` instance which is bound to a Redis
connection.
"""
log.info("Connecting to cache at %s", url)
url = urllib.parse.urlsplit(url)
connection = yield from asyncio_redis.Connection.create(
host=url.hostname,
port=url.port,
db=int(url.path.split("/")[1]),
loop=loop,
encoder=asyncio_redis.encoders.BytesEncoder(),
)
# TODO: This isn't actually needed. Just use __enter__ and __exit__.
@contextlib.contextmanager
def cache_context(cache): # pylint: disable=missing-docstring
yield cache
cache.close()
return cache_context(cls(connection, loop))
@property
def loop(self):
"""Get the event loop used by this cache."""
return self._loop
def close(self):
"""Close the connection to Redis.
Once the connection is closed the object is invalidated and can no
longer be used.
"""
self._connection.close()
# Hack to work around the fact that closing the connection doesn't
# clean up tasks started by asyncio_redis. See:
# https://github.com/jonathanslenders/asyncio-redis/issues/56
if not self._loop.is_running():
self._loop.run_until_complete(asyncio.sleep(0))
@asyncio.coroutine
def __internal_notifier(self):
"""Get the internal notifier used for publishing changes.
The return value is cached so that subsequent calls always return
same notifier.
:return: a :class:`Notifier`.
"""
if not self._notifier:
self._notifier = yield from self.__notifier()
return self._notifier
@asyncio.coroutine
def __notifier(self):
"""Get a notifier for the cache.
:return: a :class:`Notifier` object connected to the same Redis
database as the cache.
"""
connection = yield from asyncio_redis.Connection.create(
host=self._connection.host,
port=self._connection.port,
db=self._connection.protocol.db,
loop=self._loop,
encoder=asyncio_redis.encoders.BytesEncoder(),
)
return Notifier(connection, self.ENCODING, self.NAMESPACE)
def _key(self, *parts):
"""Construct a Redis key from contituent parts.
Each part of the key will be converted to a string before being
joined together separated by forward slashes. Each key has an
implicitly first part ``serverstf``. The key will be encoded as
UTF-8.
:return: a bytestring containing the encoded key.
"""
key = [self.NAMESPACE]
for part in parts:
key.append(str(part))
return "/".join(key).encode(self.ENCODING)
def _random_key(self):
"""Construct a random Redis key.
This will create a normal key (see :meth:`_key`) using a random UUID
so it's safe to use for temporary usage.
:return: a bytestring containing the encoded key.
"""
return self._key("random", uuid.uuid4())
@asyncio.coroutine
def __ensure(self, address):
"""Ensure the address exists in the authorative set.
The address is stringified and UTF-8 encoded before being added to
the authorative set.
:param Address address: the address to add to the cache.
:return: ``True`` if the address didn't already exist in the cache,
``False`` otherwise.
"""
added = yield from self._connection.sadd(
self._key("servers"), [str(address).encode(self.ENCODING)])
return added == 1
@asyncio.coroutine
def __get(self, address):
"""Retrieve a server status from the cache.
:param Address address: the address of the server whose status is
to be retrieved.
:return: a :class:`Status` representing the current state of the
cache for the give address.
"""
log.debug("Get %s", address)
key_hash = self._key("servers", address)
key_tags = self._key("servers", address, "tags")
key_interest = self._key("servers", address, "interest")
transaction = yield from self._connection.multi()
f_hash_ = yield from transaction.hgetall_asdict(key_hash)
f_tags = yield from transaction.smembers_asset(key_tags)
f_interest = yield from transaction.incrby(key_interest, 0)
yield from transaction.exec()
tags = {tag.decode(self.ENCODING) for tag in (yield from f_tags)}
hash_ = {key.decode(self.ENCODING):
value.decode(self.ENCODING) for
key, value in (yield from f_hash_).items()}
kwargs = {
"interest": (yield from f_interest),
"name": hash_.get("name"),
"map_": hash_.get("map"),
"application_id": None,
"players": None,
"country": hash_.get("country"),
"latitude": None,
"longitude": None,
"tags": tags,
}
try:
kwargs["application_id"] = int(hash_.get("application_id"))
except (ValueError, TypeError) as exc:
log.warning("Could not convert application_id "
"for %s to int: %s", address, exc)
for field in ("latitude", "longitude"):
try:
kwargs[field] = float(hash_.get(field))
except (ValueError, TypeError) as exc:
log.warning("Could not convert %s for %s "
"to float: %s", field, address, exc)
try:
kwargs["players"] = Players.from_json(hash_.get("players", ""))
except PlayersError as exc:
log.warning("Could not decode players "
"JSON object for %s: %s", address, exc)
return Status(address, **kwargs) # pylint: disable=missing-kwoa
# TODO: This is too large; needs refactoring.
@asyncio.coroutine
def __set(self, status): # pylint: disable=too-many-locals
"""Commit a server status to the cache.
This sets the primary server state HASH key and the tags SET for the
server. Both the HASH and SET are completely overridden in a MULTI
block. If any fields on the server status are ``None`` then they will
not be added to the hash.
All hash fields are converted to strings and encoded as UTF-8. The
tags are also UTF-8 encoded.
As well as updating server-specific keys this will update the global
tag SETs. The UTF-8 encoded stringified address is added to the new
global tag SETs as part of the MULTI transation. For tags that have
been removed by the new status the address is removed from the
corresponding tag SETs outside of the transaction.
Note that the :attr:`Status.interest` field is ignored when setting
the state.
:param Status status: the new status for the server.
"""
address = str(status.address).encode(self.ENCODING)
key_hash = self._key("servers", status.address)
key_tags = self._key("servers", status.address, "tags")
hash_ = {}
for attribute in {"name", "map", "application_id",
"country", "latitude", "longitude"}:
value = getattr(status, attribute)
if value is not None:
hash_[attribute] = str(value)
hash_["players"] = status.players.to_json()
hash_ = {key.encode(self.ENCODING):
value.encode(self.ENCODING) for key, value in hash_.items()}
tags = {tag.encode(self.ENCODING) for tag in status.tags}
yield from self.__ensure(status.address)
transaction = yield from self._connection.multi()
f_old_tags = yield from transaction.smembers(key_tags)
yield from transaction.delete([key_hash, key_tags])
yield from transaction.hmset(key_hash, hash_)
yield from transaction.sadd(key_tags, (t for t in tags))
for tag in status.tags:
key_tag = self._key("tags", tag)
yield from transaction.sadd(key_tag, [address])
yield from transaction.exec()
notifier = yield from self.__internal_notifier()
yield from notifier.notify_server(status.address)
old_tags = (yield from (yield from f_old_tags).asset())
removed_tags = old_tags - tags
for old_tag in removed_tags:
key_old_tag = self._key("tags", old_tag)
yield from self._connection.srem(key_old_tag, [address])
new_tags = tags - old_tags
for tag in new_tags:
yield from notifier.notify_tag(
tag.decode(self.ENCODING), status.address)
log.debug("Set %s with %i tags (%i removed)",
status.address, len(status.tags), len(removed_tags))
@asyncio.coroutine
def subscribe(self, address):
"""Increase the interest in an address.
This will increase the interest for a server and add an entry in the
interest queue for it.
:param Address address: the address of the server to increase the
interest for.
"""
key_interest = self._key("servers", address, "interest")
interest = yield from self._connection.incr(key_interest)
yield from self.__push_iq(interest, address)
log.debug("Interest in %s now %i", address, interest)
def _encode_iq_item(self, interest, address):
"""Encode an item for the interest queue.
The interest and stringified address are converted to a JSON array
being being UTF-8 encoded.
:param int interest: the interest value for the address.
:param Address address: the server address.
:return: a bytestring containing the encoded interest queue item.
"""
return json.dumps([int(interest), str(address)]).encode(self.ENCODING)
def _decode_iq_item(self, encoded):
"""Decode an item from the interest queue.
The given item should be a UTF-8 encoded JSON array with two
elements. The first element should be a number and the second is a
string that will be parsed an :class:`Address`.
:param bytes encoded: the encoded JSON array from the interest queue.
:raises ValueError: if the queue item couldn't be decoded.
:return: a two-tuple containing a interest value and an
:class:`Address`.
"""
try:
item_decoded = json.loads(encoded.decode(self.ENCODING))
except (UnicodeDecodeError, ValueError) as exc:
raise ValueError(exc) from exc
if not isinstance(item_decoded, list) or len(item_decoded) != 2:
raise ValueError("Must be an array of length 2")
interest_raw, address_raw = item_decoded
try:
interest = int(interest_raw)
except (TypeError, ValueError) as exc:
raise ValueError(
"Interest must be an integer: %s", exc) from exc
return interest, Address.parse(str(address_raw))
@asyncio.coroutine
def __push_iq(self, interest, address):
"""Push an item into the interest queue.
The item will be encoded as a JSON array and added to the end of
the queue.
"""
yield from self._connection.rpush(
self._iq_key, [self._encode_iq_item(interest, address)])
@asyncio.coroutine
def update_interest_queue(self):
"""Reinsert address into the interest queue.
This should be called after calls to :meth:`interesting` in order
reinsert the address back into the interest queue. The address is
only reinserted if there is still sufficient interest in it.
Otherwise this method does nothing.
"""
interest, address = self._active_iq_item
status = yield from self.__get(address)
if status.interest >= interest:
yield from self.__push_iq(interest, address)
self._active_iq_item = (None, None)
@asyncio.coroutine
def interesting(self):
"""Get an address from the interest queue.
Once finished with the address you must call
:meth:`update_interest_queue` which will reinsert the address back
into the queue if necessary. If not done then subsequent calls to
this method will throw an exception.
:raises EmptyQueueError: if the interest queue is empty.
:return: a :class:`Address` from the interest queue.
"""
if self._active_iq_item != (None, None):
raise CacheError("There is already an active interest queue item. "
"Did you forget to call update_interest_queue?")
while self._active_iq_item == (None, None):
item_raw = yield from self._connection.lpop(self._iq_key)
if not item_raw:
raise EmptyQueueError
try:
self._active_iq_item = self._decode_iq_item(item_raw)
except ValueError as exc:
log.warning("Bad interest queue item: %s", exc)
return self._active_iq_item[1]
@asyncio.coroutine
def __fetch_addresses_from_cursor(self, cursor, queue):
"""Fetch addresses from a Redis cursor.
This will read all available values from a Redis cusor, convert them
to :class:`Address` and then place them into a asynchronous queue
returning when the end of the cursor is reached.
The cursor is expected to yield UTF-8 encoded stringified
:class:`Address`es.
:param asyncio.cursors.Cursor cursor: the Redis cursor to fetch
results from.
:param FiniteAsyncQueue queue: the queue to place :class:`Address`es
in.
"""
with queue:
while True:
item = yield from cursor.fetchone()
if item is None:
return
try:
yield from queue.put(
Address.parse(item.decode(self.ENCODING)))
except (UnicodeDecodeError, AddressError) as exc:
log.warning("Bad address from cursor %s: %s", cursor, exc)
@asyncio.coroutine
def all(self):
"""Get all the addresses in the cache.
This returns a queue which will be populated by addresses that are
held by the cache.
:return: a :class:`FiniteAsyncQueue` containing :class:`Address`es.
"""
key = self._key("servers")
queue = FiniteAsyncQueue(loop=self._loop)
cursor = yield from self._connection.sscan(key)
asyncio.Task(self.__fetch_addresses_from_cursor(cursor, queue))
return queue
@asyncio.coroutine
def search(self, *, include=None, exclude=None):
"""Search for addresses by tags.
:param include: a sequence of tags that addresses must have.
:param exclude: a sequence of tags that will be used to filter the
the set of addresses.