/
manager.py
398 lines (319 loc) · 12.4 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# ../auth/manager.py
"""Provides a singleton class to access player permissions."""
# =============================================================================
# >> IMPORTS
# =============================================================================
# Python Imports
# Re
import re
# Importlib
from importlib.machinery import SourceFileLoader
# Source.Python Imports
# Auth
from auth.base import Backend
# Core
from core.settings import _core_settings
# Listeners
from listeners import OnLevelEnd
# Paths
from paths import BACKENDS_PATH
# Players
from players.helpers import playerinfo_from_index
# Steam
from steam import SteamID
# =============================================================================
# >> ALL DECLARATION
# =============================================================================
__all__ = ('_AuthManager',
'auth_manager',
'ParentPermissionDict',
'ParentPermissions',
'PermissionBase',
'PlayerPermissionDict',
'PlayerPermissions',
)
# =============================================================================
# >> CONSTANTS
# =============================================================================
# Name of the guest parent.
GUEST_PARENT_NAME = 'guest'
# =============================================================================
# >> CLASSES
# =============================================================================
class PermissionBase(dict):
"""Base class for parent and player permissions."""
def __init__(self, name):
"""Initialize the object."""
super().__init__()
self.parents = set()
self.name = name
if self.name != GUEST_PARENT_NAME:
# Don't update the backend, because it's a hidden group
self.add_parent(GUEST_PARENT_NAME, update_backend=False)
def __hash__(self):
"""Return a hash value based on the name."""
# This is required, because we are adding dicts to sets
return hash(self.name)
def add(self, permission, server_id=None, update_backend=True):
"""Add a permission.
:param str permission:
The permission to add.
:param int server_id:
The server ID to which the permission should be added. If no
server ID is given, it will be only added to this server.
:param bool update_backend:
If True, the backend will be updated.
"""
if (auth_manager.targets_this_server(server_id) and
permission not in self.keys()):
self[permission] = self._compile_permission(permission)
if update_backend and auth_manager.active_backend is not None:
auth_manager.active_backend.permission_added(
self, permission,
auth_manager.server_id if server_id is None else server_id)
def remove(self, permission, server_id=None, update_backend=True):
"""Remove a permission.
:param str permission:
The permission to remove.
:param int server_id:
The server ID from which the permission should be removed. If no
server ID is given, it will be only removed from this server.
:param bool update_backend:
If True, the backend will be updated.
"""
if auth_manager.targets_this_server(server_id):
try:
del self[permission]
except KeyError:
pass
if update_backend and auth_manager.active_backend is not None:
auth_manager.active_backend.permission_removed(
self, permission,
auth_manager.server_id if server_id is None else server_id)
def add_parent(self, parent_name, update_backend=True):
"""Add a parent.
:param str parent_name:
Name of the parent.
:param bool update_backend:
If True, the backend will be updated.
"""
parent = auth_manager.parents[parent_name]
if parent not in self.parents:
# TODO: Detect cycles
self.parents.add(parent)
parent.children.add(self)
if update_backend and auth_manager.active_backend is not None:
auth_manager.active_backend.parent_added(self, parent_name)
def remove_parent(self, parent_name, update_backend=True):
"""Remove a parent.
:param str parent_name:
Name of the parent.
:param bool update_backend:
If True, the backend will be updated.
"""
parent = auth_manager.parents[parent_name]
if parent in self.parents:
self.parents.remove(parent)
parent.children.remove(self)
if update_backend and auth_manager.active_backend is not None:
auth_manager.active_backend.parent_removed(self, parent_name)
@staticmethod
def _compile_permission(permission):
"""Compile a permission."""
return re.compile('^{}$'.format(
permission.replace('.', '\\.').replace('*', '(.*)')))
def __contains__(self, permission):
"""Return True if the permission is granted by this object.
:rtype: bool
"""
return self._has_permission(permission, [])
def _has_permission(self, permission, name_list):
# Checks to see if parents are recursive
if self.name in name_list:
# Break if recursive
return False
else:
name_list.append(self.name)
for re_perm in self.values():
if re_perm.match(permission):
return True
for parent in self.parents:
if parent._has_permission(permission, name_list):
return True
return False
def flatten(self):
"""Return all permissions flattened recursively.
:rtype: generator
"""
yield from self
for parent in self.parents:
yield from parent
def clear(self):
"""Removes all permissions stored in this object and its parents."""
super().clear()
self.parents.clear()
class PlayerPermissions(PermissionBase):
"""A container for player permissions."""
def __init__(self, name, steamid64):
"""Initialize the object.
:param str name:
A SteamID2, SteamID3 or SteamID64 value.
:param int steamid64:
The SteamID64 value that was also used to store the object in the
:class:PlayerPermissionDict`` object.
"""
super().__init__(name)
self.steamid64 = steamid64
class ParentPermissions(PermissionBase):
"""A container for parent permissions."""
def __init__(self, name):
"""Initialize the object.
:param str name:
Name of the parent.
"""
super().__init__(name)
self.children = set()
class _PermissionDict(dict):
"""A permission storage."""
def clear(self):
for value in self.values():
value.clear()
super().clear()
class ParentPermissionDict(_PermissionDict):
def __missing__(self, parent_name):
"""Create, store and return a :class:`ParentPermissions` object.
:param str parent_name:
The name of the parent to retrieve.
:rtype: ParentPermissions
"""
instance = self[parent_name] = ParentPermissions(parent_name)
return instance
class PlayerPermissionDict(_PermissionDict):
def __missing__(self, steamid):
"""Create, store and return a :class:`PlayerPermissions` object.
:param str/int steamid:
A SteamID2, SteamID3 or SteamID64 value.
:rtype: PlayerPermissions
"""
if not isinstance(steamid, int):
steamid64 = SteamID.parse(steamid).to_uint64()
if steamid64 in self:
return self[steamid64]
# We got a SteamID in a string format, so we can store it by using
# its SteamID64 value, but keep the original name.
instance = self[steamid64] = PlayerPermissions(steamid, steamid64)
else:
instance = self[steamid] = PlayerPermissions(steamid, steamid)
return instance
class _AuthManager(dict):
"""Manages backends and configuration files."""
def __init__(self):
"""Initialize the object."""
self.parents = ParentPermissionDict()
self.players = PlayerPermissionDict()
self.active_backend = None
self.server_id = -1
def find_and_add_available_backends(self):
"""Find and add all available backends.
:raise ValueError:
Raised if no backend or multiple backends are found within a
single file.
"""
for backend in BACKENDS_PATH.glob('*.py'):
name = 'auth.backend.' + backend.basename().splitext()[0]
loader = SourceFileLoader(name, str(backend))
module = loader.load_module(name)
for var in vars(module).values():
if isinstance(var, Backend):
self[var.name.casefold()] = var
break
else:
raise ValueError(
'Found no backend or multiple backends in "{}".'.format(
backend))
def load(self):
"""Load the auth manager."""
self.server_id = int(_core_settings['AUTH_SETTINGS']['server_id'])
self.set_active_backend(_core_settings['AUTH_SETTINGS']['backend'])
def unload(self):
"""Unload the auth manager."""
self._unload_active_backend()
def set_active_backend(self, backend_name):
"""Set the active backend.
:param str backend_name:
Name of the backend.
:raise ValueError:
Raised if the backend does not exist.
"""
try:
backend = self[backend_name.casefold()]
except KeyError:
raise ValueError(
'Backend "{}" does not exist.'.format(backend_name))
self._unload_active_backend()
backend.load()
self.active_backend = backend
def _unload_active_backend(self):
"""Unload the active backend if there is one."""
if self.active_backend is not None:
self.active_backend.unload()
self.parents.clear()
self.players.clear()
self.active_backend = None
def is_backend_loaded(self, backend_name):
"""Return True if the given backend is currently loaded.
:rtype: bool
"""
return (self.active_backend is not None and
backend_name.casefold() == self.active_backend.name)
def get_player_permissions(self, index):
""".. seealso:: :meth:`get_player_permissions_from_steamid`"""
return self.get_player_permissions_from_steamid(
playerinfo_from_index(index).steamid)
def get_player_permissions_from_steamid(self, steamid):
"""Return the permissions of a player.
:param str/int steamid:
The SteamID2, SteamID3 or SteamID64 of a player.
:return:
If the given SteamID is invalid (e.g. 'BOT'), None will be
returned.
:rtype: PlayerPermissions
"""
try:
return self.players[steamid]
except ValueError:
return None
def is_player_authorized(self, index, permission):
"""Return True if the player has been granted the given permission.
:rtype: bool
"""
permissions = self.get_player_permissions(index)
if permissions is None:
return False
return permission in permissions
def get_parent_permissions(self, parent_name):
"""Return the parent permissions.
:param str parent_name:
Name of the parent.
:rtype: ParentPermissions
"""
return self.parents[parent_name]
def is_parent_authorized(self, parent_name, permission):
"""Return True if the parent has been granted the given permission.
:rtype: bool
"""
return permission in self.get_parent_permissions(parent_name)
def targets_this_server(self, server_id):
"""Return whether the server ID targets this server.
:param int server_id:
A server ID to test.
:rtype: bool
"""
return server_id in (-1, self.server_id, None)
#: The singleton object of :class:`_AuthManager`.
auth_manager = _AuthManager()
@OnLevelEnd
def _on_level_end():
backend = auth_manager.active_backend
if backend is not None:
backend.sync()