-
Notifications
You must be signed in to change notification settings - Fork 189
/
views.py
171 lines (135 loc) · 6.18 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint
from social_core.exceptions import MissingBackend
from social_django.utils import load_strategy, load_backend
from social_django.views import NAMESPACE
from oauth2_provider.contrib.rest_framework import OAuth2Authentication
from oauth2_provider.models import Application, AccessToken
from oauth2_provider.settings import oauth2_settings
from oauth2_provider.views.mixins import OAuthLibMixin
from rest_framework import permissions
from rest_framework import status
from rest_framework.decorators import api_view, authentication_classes, permission_classes
from rest_framework.response import Response
from rest_framework.views import APIView
from .oauth2_backends import KeepRequestCore
from .oauth2_endpoints import SocialTokenServer
class CsrfExemptMixin(object):
"""
Exempts the view from CSRF requirements.
NOTE:
This should be the left-most mixin of a view.
"""
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(CsrfExemptMixin, self).dispatch(*args, **kwargs)
class TokenView(CsrfExemptMixin, OAuthLibMixin, APIView):
"""
Implements an endpoint to provide access tokens
The endpoint is used in the following flows:
* Authorization code
* Password
* Client credentials
"""
server_class = oauth2_settings.OAUTH2_SERVER_CLASS
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS
oauthlib_backend_class = oauth2_settings.OAUTH2_BACKEND_CLASS
permission_classes = (permissions.AllowAny,)
def post(self, request, *args, **kwargs):
# Use the rest framework `.data` to fake the post body of the django request.
mutable_data = request.data.copy()
request._request.POST = request._request.POST.copy()
for key, value in mutable_data.items():
request._request.POST[key] = value
url, headers, body, status = self.create_token_response(request._request)
response = Response(data=json.loads(body), status=status)
for k, v in headers.items():
response[k] = v
return response
class ConvertTokenView(CsrfExemptMixin, OAuthLibMixin, APIView):
"""
Implements an endpoint to convert a provider token to an access token
The endpoint is used in the following flows:
* Authorization code
* Client credentials
"""
server_class = SocialTokenServer
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS
oauthlib_backend_class = KeepRequestCore
permission_classes = (permissions.AllowAny,)
def post(self, request, *args, **kwargs):
# Use the rest framework `.data` to fake the post body of the django request.
mutable_data = request.data.copy()
request._request.POST = request._request.POST.copy()
for key, value in mutable_data.items():
request._request.POST[key] = value
url, headers, body, status = self.create_token_response(request._request)
response = Response(data=json.loads(body), status=status)
for k, v in headers.items():
response[k] = v
return response
class RevokeTokenView(CsrfExemptMixin, OAuthLibMixin, APIView):
"""
Implements an endpoint to revoke access or refresh tokens
"""
server_class = oauth2_settings.OAUTH2_SERVER_CLASS
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS
oauthlib_backend_class = oauth2_settings.OAUTH2_BACKEND_CLASS
permission_classes = (permissions.AllowAny,)
def post(self, request, *args, **kwargs):
# Use the rest framework `.data` to fake the post body of the django request.
mutable_data = request.data.copy()
request._request.POST = request._request.POST.copy()
for key, value in mutable_data.items():
request._request.POST[key] = value
url, headers, body, status = self.create_revocation_response(request._request)
response = Response(data=json.loads(body) if body else '', status=status if body else 204)
for k, v in headers.items():
response[k] = v
return response
@api_view(['POST'])
@authentication_classes([OAuth2Authentication])
@permission_classes([permissions.IsAuthenticated])
def invalidate_sessions(request):
client_id = request.data.get("client_id", None)
if client_id is None:
return Response({
"client_id": ["This field is required."]
}, status=status.HTTP_400_BAD_REQUEST)
try:
app = Application.objects.get(client_id=client_id)
except Application.DoesNotExist:
return Response({
"detail": "The application linked to the provided client_id could not be found."
}, status=status.HTTP_400_BAD_REQUEST)
tokens = AccessToken.objects.filter(user=request.user, application=app)
tokens.delete()
return Response({}, status=status.HTTP_204_NO_CONTENT)
class DisconnectBackendView(APIView):
"""
An endpoint for disconnect social auth backend providers such as Facebook.
"""
permission_classes = (permissions.IsAuthenticated, )
def get_object(self, queryset=None):
return self.request.user
def post(self, request, *args, **kwargs):
backend = request.data.get("backend", None)
if backend is None:
return Response({
"backend": ["This field is required."]
}, status=status.HTTP_400_BAD_REQUEST)
association_id = request.data.get("association_id", None)
if association_id is None:
return Response({
"association_id": ["This field is required."]
}, status=status.HTTP_400_BAD_REQUEST)
strategy = load_strategy(request=request)
try:
backend = load_backend(strategy, backend, reverse(NAMESPACE + ":complete", args=(backend,)))
except MissingBackend:
return Response({"backend": ["Invalid backend."]}, status=status.HTTP_400_BAD_REQUEST)
backend.disconnect(user=self.get_object(), association_id=association_id, *args, **kwargs)
return Response(status=status.HTTP_204_NO_CONTENT)