-
Notifications
You must be signed in to change notification settings - Fork 1
/
provider.py
164 lines (135 loc) 路 5.47 KB
/
provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from __future__ import annotations
import typing
import requests
from allauth.socialaccount.providers.base import ProviderAccount
from allauth_cas.providers import CASProvider
from django.core.files.base import ContentFile
from django.db import transaction
from django.http import HttpRequest
from django.utils.text import slugify
from phonenumber_field.phonenumber import to_python
from apps.accounts.models import User, UserProfile
from apps.accounts.models.profile import user_profile_picture_storage
from apps.accounts.services.user_profile_state_synchronizer import synchronizer
from apps.esnaccounts import logger
from apps.sections.models import Section, SectionMembership
from apps.sections.models.services.section_plugins_reconciler import SectionPluginsReconciler
if typing.TYPE_CHECKING:
from allauth.socialaccount.models import SocialAccount, SocialLogin
class ESNAccountsAccount(ProviderAccount):
accounts: SocialAccount
def get_avatar_url(self):
sa: SocialAccount = self.account
return sa.extra_data.get("picture")
def __str__(self):
return self.account.uid
class ESNAccountsProvider(CASProvider):
id = "esnaccounts"
name = "ESN Accounts"
account_class = ESNAccountsAccount
def extract_common_fields(self, data):
uid, extra = data
return {
"username": extra.get("username", uid),
"email": extra.get("mail"),
"first_name": extra.get("first"),
"last_name": extra.get("last"),
}
MEMBER_ROLE = "Local.activeMember"
EDITOR_ROLE = "Local.regularBoardMember"
ADMIN_ROLE = "Local.president"
@classmethod
@transaction.atomic
def update_section_membership(
cls,
*,
request: HttpRequest,
sociallogin: SocialLogin,
):
user: User = sociallogin.user
sa: SocialAccount = sociallogin.account
roles = sa.extra_data.get("roles", [])
section_code = sa.extra_data.get("sc")
section_name = sa.extra_data.get("section")
user_nationality = sa.extra_data.get("nationality")
# national_section = sa.extra_data.get("country")
section, is_section_new = Section.objects.get_or_create(
code__iexact=section_code.upper(),
defaults=dict(
name=section_name,
code=section_code.upper(),
# TODO: definitely not, user nationality != section assignment
country=user_nationality,
space_slug=slugify(section_name).lower().replace("-", ""),
),
)
SectionPluginsReconciler.reconcile(section)
already_has_admin_user = section.memberships.filter(role=SectionMembership.Role.ADMIN).exists()
promote_editor_to_admin = not already_has_admin_user or is_section_new
# role and activation only for first time
SectionMembership.objects.get_or_create(
user=user,
section=section,
defaults=dict(
# we trust ESN Acccounts, so directly active membership
# TODO: should be as AccountsConfiguration attribute, so section can select
# if they want to trust
state=SectionMembership.State.ACTIVE,
role=(
SectionMembership.Role.ADMIN
if cls.ADMIN_ROLE in roles or (promote_editor_to_admin and cls.EDITOR_ROLE in roles)
else (
SectionMembership.Role.EDITOR
if cls.EDITOR_ROLE in roles
else (
SectionMembership.Role.MEMBER
if cls.MEMBER_ROLE in roles
else SectionMembership.Role.INTERNATIONAL
)
)
),
),
)
try:
phone_number = to_python(sa.extra_data.get("telephone"))
except TypeError:
phone_number = None
user_profile, _ = UserProfile.objects.update_or_create(
user=user,
defaults=dict(
nationality=user_nationality,
gender={
"M": "male",
"F": "female",
}.get(sa.extra_data.get("gender")),
state=UserProfile.State.INCOMPLETE,
phone_number=phone_number,
),
)
cls.sync_profile_picture(
profile=user_profile,
picture_url=sa.extra_data.get("picture"),
)
synchronizer.revalidate_user_profile(user_profile)
@staticmethod
def sync_profile_picture(profile: UserProfile, picture_url: str | None):
if not picture_url:
return
if profile.picture:
logger.info("User profile %s already has a picture.", profile)
return
picture_response = requests.get(
picture_url,
headers={
"User-Agent": "Fiesta+; @esnvutbrno/fiesta-plus",
},
timeout=(3, 5),
)
if not picture_response.ok:
logger.warn("Picture request %s for user profile %s failed.", picture_url, profile)
return
name = user_profile_picture_storage.upload_to(profile, "--dummy--")
name = user_profile_picture_storage.save(name, ContentFile(picture_response.content))
profile.picture = name
profile.save(update_fields=["picture"], skip_hooks=True)
provider_classes = [ESNAccountsProvider]