forked from vimalloc/flask-jwt-extended
-
Notifications
You must be signed in to change notification settings - Fork 1
/
redis_blacklist.py
138 lines (118 loc) · 5.51 KB
/
redis_blacklist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Redis is a very quick in memory store. The benefits of using redis is that
# things will generally speedy, and it can be (mostly) persistent by dumping
# the data to disk (see: https://redis.io/topics/persistence). The drawbacks
# to using redis is you have a higher chance of encountering data loss (in
# this case, 'forgetting' that a token was revoked), when events like
# power outages occur.
#
# When does it make sense to use redis for a blacklist? If you are blacklisting
# every token on logout, and not doing nothing besides that (such as keeping
# track of what tokens are blacklisted, providing options to un-revoke
# blacklisted tokens, or view tokens that are currently active for a user),
# then redis is a great choice. In the worst case, a few tokens might slip
# between the cracks in the case of a power outage or other such event, but
# 99.99% of the time tokens will be properly blacklisted.
#
# Redis also has the benefit of supporting an expires time when storing data.
# Utilizing this, you will not need to manually prune down the stored tokens
# to keep it from blowing up over time. This code includes how to do this.
#
# If you intend to use some other features in your blacklist (tracking
# what tokens are currently active, option to revoke or unrevoke specific
# tokens, etc), data integrity is probably more important to you then
# raw performance. In this case a database solution (such as postgres) is
# probably a better fit for your blacklist. Check out the "database_blacklist"
# example for how that might work.
import redis
from datetime import timedelta
from quart import Quart, request, jsonify
from quart_jwt_extended import (
JWTManager,
create_access_token,
create_refresh_token,
get_jti,
jwt_refresh_token_required,
get_jwt_identity,
jwt_required,
get_raw_jwt,
)
app = Quart(__name__)
app.secret_key = "ChangeMe!"
# Setup the quart-jwt-extended extension. See:
ACCESS_EXPIRES = timedelta(minutes=15)
REFRESH_EXPIRES = timedelta(days=30)
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = ACCESS_EXPIRES
app.config["JWT_REFRESH_TOKEN_EXPIRES"] = REFRESH_EXPIRES
app.config["JWT_BLACKLIST_ENABLED"] = True
app.config["JWT_BLACKLIST_TOKEN_CHECKS"] = ["access", "refresh"]
jwt = JWTManager(app)
# Setup our redis connection for storing the blacklisted tokens
revoked_store = redis.StrictRedis(
host="localhost", port=6379, db=0, decode_responses=True
)
# Create our function to check if a token has been blacklisted. In this simple
# case, we will just store the tokens jti (unique identifier) in redis
# whenever we create a new token (with the revoked status being 'false'). This
# function will return the revoked status of a token. If a token doesn't
# exist in this store, we don't know where it came from (as we are adding newly
# created tokens to our store with a revoked status of 'false'). In this case
# we will consider the token to be revoked, for safety purposes.
@jwt.token_in_blacklist_loader
def check_if_token_is_revoked(decrypted_token):
jti = decrypted_token["jti"]
entry = revoked_store.get(jti)
if entry is None:
return True
return entry == "true"
@app.route("/auth/login", methods=["POST"])
async def login():
username = (await request.get_json()).get("username", None)
password = (await request.get_json()).get("password", None)
if username != "test" or password != "test":
return {"msg": "Bad username or password"}, 401
# Create our JWTs
access_token = create_access_token(identity=username)
refresh_token = create_refresh_token(identity=username)
# Store the tokens in redis with a status of not currently revoked. We
# can use the `get_jti()` method to get the unique identifier string for
# each token. We can also set an expires time on these tokens in redis,
# so they will get automatically removed after they expire. We will set
# everything to be automatically removed shortly after the token expires
access_jti = get_jti(encoded_token=access_token)
refresh_jti = get_jti(encoded_token=refresh_token)
revoked_store.set(access_jti, "false", ACCESS_EXPIRES * 1.2)
revoked_store.set(refresh_jti, "false", REFRESH_EXPIRES * 1.2)
ret = {"access_token": access_token, "refresh_token": refresh_token}
return ret, 201
# A blacklisted refresh tokens will not be able to access this endpoint
@app.route("/auth/refresh", methods=["POST"])
@jwt_refresh_token_required
async def refresh():
# Do the same thing that we did in the login endpoint here
current_user = get_jwt_identity()
access_token = create_access_token(identity=current_user)
access_jti = get_jti(encoded_token=access_token)
revoked_store.set(access_jti, "false", ACCESS_EXPIRES * 1.2)
ret = {"access_token": access_token}
return ret, 201
# Endpoint for revoking the current users access token
@app.route("/auth/access_revoke", methods=["DELETE"])
@jwt_required
async def logout():
jti = get_raw_jwt()["jti"]
revoked_store.set(jti, "true", ACCESS_EXPIRES * 1.2)
return {"msg": "Access token revoked"}, 200
# Endpoint for revoking the current users refresh token
@app.route("/auth/refresh_revoke", methods=["DELETE"])
@jwt_refresh_token_required
async def logout2():
jti = get_raw_jwt()["jti"]
revoked_store.set(jti, "true", REFRESH_EXPIRES * 1.2)
return {"msg": "Refresh token revoked"}, 200
# A blacklisted access token will not be able to access this any more
@app.route("/protected", methods=["GET"])
@jwt_required
async def protected():
return {"hello": "world"}
if __name__ == "__main__":
app.run()