-
Notifications
You must be signed in to change notification settings - Fork 32
/
backends.py
117 lines (96 loc) · 4.25 KB
/
backends.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from celery_once.backends import Redis
from django.conf import settings
from django.contrib.auth.backends import ModelBackend
from django_redis import get_redis_connection
from mozilla_django_oidc.auth import OIDCAuthenticationBackend
from pydash import get
from redis import Sentinel
class QueueOnceRedisSentinelBackend(Redis):
def __init__(self, backend_settings):
# pylint: disable=super-init-not-called
self._sentinel = Sentinel(backend_settings['sentinels'])
self._sentinel_master = backend_settings['sentinels_master']
self.blocking_timeout = backend_settings.get("blocking_timeout", 1)
self.blocking = backend_settings.get("blocking", False)
@property
def redis(self):
return self._sentinel.master_for(self._sentinel_master)
class QueueOnceRedisBackend(Redis):
"""
Calls get_redis_connection from django_redis so that it re-uses django redis cache config.
"""
def __init__(self, backend_settings):
# pylint: disable=super-init-not-called
self.blocking_timeout = backend_settings.get("blocking_timeout", 1)
self.blocking = backend_settings.get("blocking", False)
@property
def redis(self):
return get_redis_connection('default')
class OCLOIDCAuthenticationBackend(OIDCAuthenticationBackend):
"""
1. overrides Default OIDCAuthenticationBackend
2. creates/updates user from OID to django on successful auth
"""
def create_user(self, claims):
"""Return object for a newly created user account."""
# {
# 'sub': '<str:uuid>',
# 'email_verified': <boolean>,
# 'realm_access': {
# 'roles': ['offline_access', 'default-roles-ocl', 'uma_authorization']
# },
# 'name': 'Inactive User',
# 'preferred_username': 'inactive',
# 'given_name': 'Inactive',
# 'family_name': 'User',
# 'email': 'inactive@user.com'
# }
from core.users.models import UserProfile
user = UserProfile.objects.create_user(
claims.get('preferred_username'),
email=claims.get('email'),
first_name=claims.get('given_name'),
last_name=claims.get('family_name'),
verified=claims.get('email_verified'),
company=claims.get('company', None),
location=claims.get('location', None)
)
if user.id:
user.set_checksums()
return user
def update_user(self, user, claims):
user.first_name = claims.get('given_name') or user.first_name
user.last_name = claims.get('family_name') or user.last_name
user.email = claims.get('email') or user.email
user.company = claims.get('company', None) or user.company
user.location = claims.get('location', None) or user.location
user.save()
user.set_checksums()
return user
def filter_users_by_claims(self, claims):
from core.users.models import UserProfile
username = claims.get('preferred_username')
if not username:
return UserProfile.objects.none()
return UserProfile.objects.filter(username=username)
class OCLAuthenticationBackend(ModelBackend):
"""
1. authentication backend defined in settings.AUTHENTICATION_BACKENDS.
2. switches between Django/OID Auth Backends based on type of request
3. switches to django auth if a valid django token is used in request
"""
def get_auth_backend(self, request=None):
if get(self, '_authentication_backend'):
return get(self, '_authentication_backend')
from core.services.auth.core import AuthService
if AuthService.is_valid_django_token(request) or get(settings, 'TEST_MODE', False):
klass = ModelBackend
else:
klass = AuthService.get().authentication_backend_class
self._authentication_backend = klass()
return self._authentication_backend
def authenticate(self, request, username=None, password=None, **kwargs):
return self.get_auth_backend(request).authenticate(
request=request, username=username, password=password, **kwargs)
def get_user(self, user_id):
return self.get_auth_backend().get_user(user_id=user_id)