Is it possible to use this to verify JWT without users? #22
Comments
@mgonto totally, you'd need to build your own authentication backend. |
@jpadilla do you have a link to something I can read to find out about this? I'm a DJango n00b sorry :D. Thanks! |
So I should fork your project, change that code and then use my fork, right? If that's the case, thanks and close the issue :) |
You could define your own or subclass this one from code base. Be sure to update Django REST Framework's DEFAULT_AUTHENTICATION_CLASSES to contain your module. |
Ohh understood. Thanks for the info :). Martin Gontovnikas Twitter: @mgonto (https://twitter.com/mgonto) On Friday, 13 de June de 2014 at 17:48, José Padilla wrote:
|
@mgonto No problem |
The following example demonstrates how to do an authentication without involving the UserModel. We use it to authenticate service related requests. myapp/authentication.py
myapp/settings.py
|
Hey Guys, from rest_framework_jwt.serializers import *
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
def jwt_payload_handler_secret_based(token):
"""
Updated jwt payload handler to add last login unix timestamp for token/user-less in payload.
"""
warnings.warn(
'The following fields will be removed in the future: '
'`email` and `user_id`. ',
DeprecationWarning
)
payload = {
'token': token,
'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA,
'jwt_indee_verification_id': str(token.last_login.timestamp())
}
# Include original issued at time for a brand new token,
# to allow token refresh
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(
datetime.utcnow().utctimetuple()
)
if api_settings.JWT_AUDIENCE is not None:
payload['aud'] = api_settings.JWT_AUDIENCE
if api_settings.JWT_ISSUER is not None:
payload['iss'] = api_settings.JWT_ISSUER
return payload
class ServiceOnlyJSONWebTokenSerializer(JSONWebTokenSerializer):
"""
Serializer class used to validate a token.
Returns a JSON Web Token that can be used to authenticate later calls.
"""
def __init__(self, *args, **kwargs):
"""
Dynamically add the USERNAME_FIELD to self.fields.
"""
super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
self.fields['tdvalue'] = serializers.IntegerField(required=False)
self.fields['jwt_indee_service_id'] = serializers.CharField(required=True)
def validate(self, attrs):
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
try:
fycTokenObj = FycToken.objects.get(
token_key=attrs.get('jwt_indee_service_id')
)
except Exception as fyctoken_fetch_error:
msg = _(
'Some error occured while logging in.')
raise serializers.ValidationError(msg)
if all(credentials.values()):
if fycTokenObj:
if not fycTokenObj.active:
msg = _('Token is disabled.')
raise serializers.ValidationError(msg)
tmp_exp = api_settings.JWT_EXPIRATION_DELTA
if attrs.get('tdvalue') is not None:
api_settings.JWT_EXPIRATION_DELTA = timedelta(
seconds=attrs.get('tdvalue'))
# save the login time stamp
fycTokenObj.last_login = datetime.now()
fycTokenObj.save()
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
payload = jwt_payload_handler_secret_based(fycTokenObj.token_key)
return {
'token': jwt_encode_handler(payload),
'user': fycTokenObj
}
else:
msg = _('Unable to log in with provided credentials. Pappa')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "jwt_indee_service_id".')
raise serializers.ValidationError(msg)
class ServiceOnlyJSONWebTokenAuthentication(JSONWebTokenAuthentication):
"""
Overriding the default JSONWebTokenAuthentication to accomodate tokens for apps.
https://github.com/GetBlimp/django-rest-framework-jwt/issues/22#issuecomment-166451133
https://www.django-rest-framework.org/api-guide/authentication/
"""
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and email.
"""
jwt_indee_service_id = jwt_get_indee_service_id_from_payload(payload)
jwt_indee_verification_id = jwt_get_indee_ll_id_from_payload(payload)
if not jwt_indee_service_id:
msg = _('Invalid Token.')
raise exceptions.AuthenticationFailed(msg)
try:
fycTokenObj = FycToken.objects.get(token_id = jwt_indee_service_id)
except User.DoesNotExist:
msg = _("Couldn't find the Token. Please reach out to support@indee.tv")
raise exceptions.AuthenticationFailed(msg)
if not fycTokenObj.active:
msg = _('Token has been disabled by the sender.')
raise exceptions.AuthenticationFailed(msg)
if fycTokenObj.expiry_date < timezone.now() :
msg = _('Token has expired.')
raise exceptions.AuthenticationFailed(msg)
if jwt_indee_verification_id != str(fycTokenObj.last_login.timestamp()):
msg = _('This login was overriden, concurrent logins are not allowed.')
raise exceptions.AuthenticationFailed(msg)
return AuthenticatedServiceClient() In the urls.py I have from rest_framework_jwt.views import ObtainJSONWebToken
urlpatterns += patterns('',
url(r'^api-token-auth/test/', ObtainJSONWebToken.as_view(
serializer_class=ServiceOnlyJSONWebTokenSerializer)),
) When I try and generate the token I see this error: Could anyone provide some pointers on this? Thanks. I also checked #349 and #145 but wasn't able to figure this out. Thanks |
@Aameer You can just use PyJWT library for signing and decoding custom tokens. Alternatively, Django has built-in functionality for signing strings and tokens: |
@Alex3917 Thanks for you comment. What I ended up doing is mentioned as under. This made sure my current system with drf-jwt is working as expected and I have this parallel system which uses secrets alone as authentication mechanism and re-uses many of the things used by drf-jwt. Would have loved to see something like this in built with drf-jwt, where we could even decouple users. URLSfrom apps.myapps.api_views import FycTokenGenrator
urlpatterns += patterns('',
url(r'^loginWithSecret/',
FycTokenGenrator.as_view(),
name='fyc-token'),
) VIEWSfrom rest_framework import permissions, status, generics
from apps.mytokenapp.models import FycToken
from apps.mytokenapp.serializers import FycTokenSerializerOpen
from apps.myapps.utils import CustomJSONWebTokenAuthentication,\
ServiceOnlyJSONWebTokenAuthentication, jwt_payload_handler_token_based,
get_encoded_jwt
class FycTokenGenrator(generics.RetrieveUpdateDestroyAPIView):
"""
API Endpoint to get the secret from user and reply with jwt token.
For now added same auth over it to test.
"""
authentication_classes = (ServiceOnlyJSONWebTokenAuthentication,)
permission_classes = (permissions.AllowAny,)
serializer_class = FycTokenSerializerOpen
def get_permissions(self):
"""
Override the permissions for POST
"""
if self.request.method == 'POST':
self.permission_classes = (permissions.AllowAny,)
return super(FycTokenGenrator, self).get_permissions()
def get(self, request):
return Response({'message': "GET method not allowed"},
status=status.HTTP_405_METHOD_NOT_ALLOWED)
def post(self, request):
try:
token_key = request.data.get('jwt_service_id', None)
if token_key:
try:
fycTokenObj = FycToken.objects.get(
fyctoken_key=token_key
)
except Exception as fyctoken_fetch_error:
msg = 'Some error occured while logging in.'
return Response({'message': msg},
status=status.HTTP_403_FORBIDDEN)
fycTokenObj.last_login = timezone.now()
fycTokenObj.save()
payload = jwt_payload_handler_token_based(fycTokenObj)
encoded_jwt = get_encoded_jwt(payload)
return Response(
{'token': encoded_jwt}, status=status.HTTP_200_OK)
else:
msg = 'Some error occured while logging in.'
return Response({'message': msg},status=status.HTTP_403_FORBIDDEN)
except Exception as post_fyc_token_generator_error:
message = "Some error occured while logging in"
return Response({
"message": str(message)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def put(self, request):
return Response({'message': "Put method not allowed"},
status=status.HTTP_405_METHOD_NOT_ALLOWED)
def patch(self, request):
return Response({'message': "Patch method not allowed"},
status=status.HTTP_405_METHOD_NOT_ALLOWED)
def delete(self, request):
return Response({'message': "Delete method not allowed"},
status=status.HTTP_405_METHOD_NOT_ALLOWED) AuthMiddleWareimport jwt
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
def jwt_payload_handler_token_based(tokenObj):
"""
Updated jwt payload handler to add last login unix timestamp
for token/user-less in payload.
"""
warnings.warn(
'The following fields will be removed in the future: '
'`email` and `user_id`. ',
DeprecationWarning
)
payload = {
'jwt_service_id': tokenObj.fyctoken_key,
'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA,
'jwt_verification_id': str(tokenObj.last_login.timestamp())
}
# Include original issued at time for a brand new token,
# to allow token refresh
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(
datetime.utcnow().utctimetuple()
)
if api_settings.JWT_AUDIENCE is not None:
payload['aud'] = api_settings.JWT_AUDIENCE
if api_settings.JWT_ISSUER is not None:
payload['iss'] = api_settings.JWT_ISSUER
print(payload)
return payload
def get_encoded_jwt(payload):
encoded_jwt = jwt.encode(
payload,
settings.SECRET_KEY,
algorithm='HS256'
)
return encoded_jwt
class AuthenticatedServiceClient(object):
def is_authenticated(self, fycTokenObj=None):
"""
Always return True. This is a way to tell if the token has been
authenticated
"""
return True
def __init__(self,fycTokenObj=None):
"""
This will assign a large enough id for the with secret only user/service so that it doesn't
clash with actual Users' model. (any suggestions to improve this portion are welcome!)
"""
total_users_supported_without_clashing_throttle = 999999999
self.pk = total_users_supported_without_clashing_throttle + fycTokenObj.id #model id which has secrets
def jwt_get_ll_id_from_payload(payload):
"""
Gets the last login unix time stamp from the payload
"""
return payload.get('jwt_verification_id', None)
def jwt_get_service_id_from_payload(payload):
"""
Gets the service id payload
"""
return payload.get('jwt_service_id', None)
# TOKEN MIDDLE WARE
class ServiceOnlyJSONWebTokenAuthentication(JSONWebTokenAuthentication):
"""
Overriding the default JSONWebTokenAuthentication to accomodate tokens for apps.
https://github.com/GetBlimp/django-rest-framework-jwt/issues/22#issuecomment-166451133
https://www.django-rest-framework.org/api-guide/authentication/
"""
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and email.
"""
jwt_service_id = jwt_get_service_id_from_payload(payload)
jwt_verification_id = jwt_get_ll_id_from_payload(payload)
if not jwt_service_id:
msg = _('Invalid Token.')
raise exceptions.AuthenticationFailed(msg)
try:
# from model which has the secret
fycTokenObj = FycToken.objects.get(fyctoken_key = jwt_service_id)
except Exception as fycTokenObjDoesNotExistError:
msg = _("Couldn't find the Token.")
raise exceptions.AuthenticationFailed(msg)
if not fycTokenObj.active:
msg = _('Token has been disabled by the sender.')
raise exceptions.AuthenticationFailed(msg)
if fycTokenObj.expiry_date < timezone.now() :
msg = _('Token has expired.')
raise exceptions.AuthenticationFailed(msg)
if jwt_verification_id != str(fycTokenObj.last_login.timestamp()):
msg = _('This login was overriden, concurrent logins are not allowed.')
raise exceptions.AuthenticationFailed(msg)
return AuthenticatedServiceClient(fycTokenObj) |
Is it possible to use the JWT signature check to get the userId, but without User model and without checking if the user exists in our DB?
Thanks!
The text was updated successfully, but these errors were encountered: