forked from zap-me/premio_stage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stash_endpoint.py
150 lines (140 loc) · 5.59 KB
/
stash_endpoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# pylint: disable=unbalanced-tuple-unpacking
import logging
import time
import datetime
from flask import Blueprint, render_template, request, jsonify, flash, redirect
from app_core import db
from models import UserStash, UserStashRequest
import utils
from web_utils import bad_request, get_json_params
import web_utils
logger = logging.getLogger(__name__)
stash_bp = Blueprint('stash_bp', __name__, template_folder='templates')
@stash_bp.route('/save', methods=['POST'])
def stash_save():
content = request.get_json(force=True)
if content is None:
return bad_request(web_utils.INVALID_JSON)
params, err_response = get_json_params(content, ["key", "email", "iv", "cyphertext", "question"])
if err_response:
return err_response
key, email, iv, cyphertext, question = params
stash = UserStash.from_email_hash(db.session, key, utils.sha256(email))
req = UserStashRequest(key, email, iv, cyphertext, question, UserStashRequest.ACTION_SAVE)
if not stash:
db.session.add(req)
db.session.commit()
utils.email_stash_save_request(logger, email, req, req.MINUTES_EXPIRY)
logger.info('stash save request %s created', req.token)
else:
logger.warning('stash for email %s exists, save request %s not created', email, req.token)
utils.email_stash_save_exists(logger, email, req)
return jsonify(dict(token=req.token))
@stash_bp.route('/save_check/<token>')
def stash_save_check(token=None):
req = UserStashRequest.from_token(db.session, token)
if not req:
return jsonify(dict(confirmed=False))
return jsonify(dict(confirmed=req.created_stash is not None))
@stash_bp.route('/save_confirm/<token>/<secret>', methods=['GET', 'POST'])
def stash_save_confirm(token=None, secret=None):
req = UserStashRequest.from_token(db.session, token)
if not req:
logger.warning('stash request %s not found', token)
time.sleep(5)
flash('STASH request not found.', 'danger')
return redirect('/')
now = datetime.datetime.now()
if now > req.expiry:
time.sleep(5)
flash('STASH request expired.', 'danger')
return redirect('/')
if req.secret != secret:
flash('STASH code invaid.', 'danger')
return redirect('/')
if req.action != req.ACTION_SAVE:
flash('STASH action invaid.', 'danger')
return redirect('/')
if request.method == 'POST':
confirm = request.form.get('confirm') == 'true'
if not confirm:
db.session.delete(req)
db.session.commit()
flash('STASH cancelled.', 'success')
return redirect('/')
stash = UserStash(req)
req.created_stash = stash
db.session.add(req)
db.session.add(stash)
db.session.commit()
flash('STASH confirmed.', 'success')
return redirect('/')
return render_template('stash/stash_save_confirm.html', req=req)
@stash_bp.route('/load', methods=['POST'])
def stash_load():
content = request.get_json(force=True)
if content is None:
return bad_request(web_utils.INVALID_JSON)
params, err_response = get_json_params(content, ["key", "email"])
if err_response:
return err_response
key, email = params
stash = UserStash.from_email_hash(db.session, key, utils.sha256(email))
req = UserStashRequest(key, email, None, None, None, UserStashRequest.ACTION_LOAD)
if stash:
utils.email_stash_load_request(logger, email, req, req.MINUTES_EXPIRY)
db.session.add(req)
db.session.commit()
logger.info('stash load request %s created', req.token)
else:
logger.warning('stash for email %s does not exist, load request %s not created', email, req.token)
return jsonify(dict(token=req.token))
@stash_bp.route('/load_check/<token>')
def stash_load_check(token=None):
confirmed = False
key = None
iv = None
cyphertext = None
question = None
req = UserStashRequest.from_token(db.session, token)
if req:
confirmed = req.loaded_stash is not None
if confirmed:
key = req.loaded_stash.key
iv = req.loaded_stash.iv
cyphertext = req.loaded_stash.cyphertext
question = req.loaded_stash.question
return jsonify(dict(confirmed=confirmed, key=key, iv=iv, cyphertext=cyphertext, question=question))
@stash_bp.route('/load_confirm/<token>/<secret>', methods=['GET', 'POST'])
def stash_load_confirm(token=None, secret=None):
req = UserStashRequest.from_token(db.session, token)
if not req:
logger.warning('stash request %s not found', token)
time.sleep(5)
flash('STASH request not found.', 'danger')
return redirect('/')
now = datetime.datetime.now()
if now > req.expiry:
time.sleep(5)
flash('STASH request expired.', 'danger')
return redirect('/')
if req.secret != secret:
flash('STASH code invaid.', 'danger')
return redirect('/')
if req.action != req.ACTION_LOAD:
flash('STASH action invaid.', 'danger')
return redirect('/')
if request.method == 'POST':
confirm = request.form.get('confirm') == 'true'
if not confirm:
db.session.delete(req)
db.session.commit()
flash('STASH cancelled.', 'success')
return redirect('/')
stash = UserStash.from_email_hash(db.session, req.key, req.email_hash)
req.loaded_stash = stash
db.session.add(req)
db.session.commit()
flash('STASH confirmed.', 'success')
return redirect('/')
return render_template('stash/stash_load_confirm.html', req=req)