-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
362 lines (299 loc) · 12.6 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
from flask import Flask, flash, redirect, render_template, request, url_for, request, url_for, session
from dotenv import load_dotenv
from repositories import user_repo, posts_repo
from werkzeug.security import generate_password_hash, check_password_hash
from flask_bcrypt import Bcrypt
import os
from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature
from flask_mailman import Mail, EmailMessage
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'a_default_secret_key_for_dev')
bcrypt=Bcrypt(app)
app.config['MAIL_SERVER'] = 'smtp.fastmail.com'
app.config['MAIL_PORT'] = 465
app.config['MAIL_USERNAME'] = 'oav2008@fastmail.com'
app.config['MAIL_PASSWORD'] = 'sxgjpvwktjmqeduu'
app.config['MAIL_USE_TLS'] = False
app.config['MAIL_USE_SSL'] = True
mail = Mail(app)
hashed_password = generate_password_hash('your_plain_text_password', method='pbkdf2:sha256')
def generate_reset_token(email):
serializer = URLSafeTimedSerializer(app.secret_key)
return serializer.dumps(email, salt='email-confirm')
@app.get('/')
def fetch_all_posts():
try:
logging.debug("Attempting to fetch all posts from the database.")
posts = user_repo.find_all_posts()
if not posts:
logging.debug('No posts found')
else:
logging.debug(f'Number of posts retrieved: {len(posts)}')
return render_template('new_index.html', posts=posts)
except Exception as e:
logging.error(f'Failed to fetch posts: {e}', exc_info=True)
return "Error fetching posts", 500
@app.get('/signup')
def signup():
all_users = user_repo.get_all_users()
return render_template('new_signup.html', users=all_users)
@app.get('/signin')
def signin():
return render_template('new_signin.html')
@app.get('/create_post')
def show_create_post_form():
return render_template('new_createpost.html')
@app.route('/profile')
def see_profile():
user_id = session.get('user_id')
if user_id:
user = user_repo.get_user_by_id(user_id)
if user:
return render_template('new_profilepage.html', user=user)
else:
flash('User not found.')
return redirect(url_for('show_signin_form'))
else:
flash('No user logged in.')
return redirect(url_for('show_signin_form'))
@app.get('/message')
def send_message():
return render_template('new_message.html')
@app.get('/<int:user_id>')
def see_user(user_id):
user = user_repo.get_user_by_id(user_id)
return render_template('new_user.html', user=user)
@app.post('/signup')
def create_user():
first_name = request.form['firstName'].strip()
last_name = request.form['lastName'].strip()
email = request.form['email'].strip()
password = request.form['password'].strip()
confirm_password = request.form['confirmPassword'].strip()
if not (first_name and last_name and email and password and confirm_password):
flash('All fields are required.')
return redirect(url_for('signup'))
if password != confirm_password:
flash('Passwords do not match.')
return redirect(url_for('signup'))
encrypted_password = bcrypt.generate_password_hash(password).decode('utf-8')
# Check if user already exists
user = user_repo.find_user_by_email(email)
if user:
flash('Email already in use.')
return redirect(url_for('signup'))
# Create new user
user_repo.create_user(first_name, last_name, email, encrypted_password)
flash('Account created successfully, please log in.')
return redirect(url_for('signin'))
@app.get('/signin')
def show_signin_form():
return render_template('signin.html')
@app.post('/signin')
def signin_user():
email = request.form['email'].strip()
password = request.form['password'].strip()
print("Attempting to sign in with:", email, password)
# Find user by email
user = user_repo.find_user_by_email(email)
print("User found:", user)
# Check if user is None before trying to access its properties
if user is None:
print("No user found with that email.")
flash('Invalid email or password.')
return redirect(url_for('signin'))
# Since user is not None, it's safe to access its properties
print("Stored hash:", user.get('password', 'No password found'))
if bcrypt.check_password_hash(user['password'], password):
session['user_id'] = user['user_id']
session['username'] = user['username']
flash('You are successfully logged in.')
return redirect(url_for('fetch_all_posts'))
else:
flash('Invalid email or password.')
return redirect(url_for('signin'))
@app.post('/logout')
def logout():
# Remove user info from the session
session.pop('user_id', None)
session.pop('username', None)
flash('You have successfully logged out.')
return redirect(url_for('signin'))
@app.get('/forgot_password')
def show_forgot_password_form():
return render_template('forgot_password.html')
@app.post('/forgot_password')
def submit_forgot_password_form():
email = request.form['email'].strip()
user = user_repo.find_user_by_email(email)
if not user:
flash('No account associated with that email. Please try another email or <a href="/signup">sign up</a>.', 'error')
return redirect(url_for('show_forgot_password_form'))
if user:
user_repo.update_password_change_flag(email)
token = generate_reset_token(email)
reset_url = url_for('reset_password', token=token, _external=True)
msg = EmailMessage(
subject="Password Reset Link",
body= f"Here is your password reset link. Follow this link to reset your password: {reset_url}",
from_email="oav2008@fastmail.com",
to=[email]
)
try:
msg.send()
flash('Password reset link has been sent to your email.')
except Exception as e:
flash('An error occurred while sending the email. Please try again later.', 'error')
app.logger.error(f"Failed to send email: {e}")
return redirect(url_for('fetch_all_posts'))
@app.get('/reset_password/<token>')
def show_reset_password_form(token):
try:
serializer = URLSafeTimedSerializer(app.secret_key)
email = serializer.loads(token, salt='email-confirm', max_age=3600) # 1 hour expiration
except SignatureExpired:
flash('Your password reset link has expired.', 'error')
return redirect(url_for('forgot_password'))
except BadSignature:
flash('Invalid password reset link.', 'error')
return redirect(url_for('forgot_password'))
return render_template('reset_password.html', token=token)
@app.post('/reset_password/<token>')
def reset_password(token):
try:
serializer = URLSafeTimedSerializer(app.secret_key)
email = serializer.loads(token, salt='email-confirm', max_age=3600) # 1 hour expiration
except SignatureExpired:
flash('Your password reset link has expired.', 'error')
return redirect(url_for('forgot_password'))
except BadSignature:
flash('Invalid password reset link.', 'error')
return redirect(url_for('forgot_password'))
new_password = request.form['new_password']
confirm_password = request.form['confirm_password']
if new_password != confirm_password:
flash('Passwords do not match.', 'error')
return redirect(url_for('reset_password', token=token))
if len(new_password) < 8: # Assuming you want at least 8 characters
flash('Password must be at least 8 characters long.', 'error')
return redirect(url_for('reset_password', token=token))
# Assuming user_repo.update_user_password(email, new_password) exists and properly hashes the password before storing it
if user_repo.update_user_password(email, new_password):
flash('Your password has been updated successfully.', 'success')
return redirect(url_for('signin'))
else:
flash('An error occurred while updating your password. Please try again.', 'error')
return redirect(url_for('reset_password', token=token))
@app.post('/create_post')
def submit_create_post():
title = request.form['title'].strip()
content = request.form['content'].strip()
difficulty_level = request.form['difficulty_level'].strip()
if not (title and content and difficulty_level):
flash('Both title and content are required.')
return redirect(url_for('create_post_form')) # Assuming you have a form at this route
# Assuming user_id is retrieved from session after user logs in
user_id = session.get('user_id')
if not user_id:
flash('You must be logged in to create a post.')
return redirect(url_for('signin'))
# Create new post in the database
post_id = user_repo.create_post(user_id, title, content, difficulty_level)
if post_id:
flash('Post created successfully!')
return redirect(url_for('fetch_all_posts')) # Redirect to the homepage or another appropriate page
else:
flash('An error occurred while creating the post.')
return redirect(url_for('create_post_form'))
@app.post('/delete_post')
def delete_post():
post_id = request.form['post_id']
if post_id:
user_repo.delete_post_from_db(post_id)
flash('Post deleted successfully!')
else:
flash('Failed to delete the post.')
return redirect(url_for('fetch_all_posts'))
@app.route('/search', methods=['GET', 'POST'])
def search():
query = request.args.get('q', '').strip()
if query:
# Search for users whose usernames match the query
user_results = user_repo.search_users(query)
# Search for posts whose titles match the query
post_results_title = user_repo.search_posts_title(query)
# Search for posts whose usernames match the query
post_results_username = user_repo.search_posts_by_username(query)
# Combine post results from title search and username search
post_results_combined = post_results_title + post_results_username
# Filter posts based on username match
post_results_filtered = [post for post in post_results_combined if any(user['user_id'] == post['user_id'] for user in user_results)]
results = {
'users': user_results,
'posts': post_results_filtered
}
else:
results = None
return render_template('new_search.html', results=results)
if __name__ == "__main__":
print("FLASK_ENV:", os.environ.get("FLASK_ENV"))
print("Debug mode:", app.debug)
app.run(port=5003)
@app.post('/delete_user')
def delete_user():
user_id = request.form['user_id']
if user_id:
user_repo.delete_user(user_id)
flash('User deleted successfully!')
else:
flash('Failed to delete the user.')
return redirect(url_for('fetch_all_posts'))
# Redirect to the edit user form
@app.get('/edit_user/<int:user_id>')
def edit_user_form(user_id):
user = user_repo.get_user_by_id(user_id)
if user:
return render_template('edit_user_form.html', user=user)
else:
flash('User not found.')
return redirect(url_for('new_profilepage.html'))
@app.get('/edit_post/<int:post_id>')
def edit_post_form(post_id):
post = posts_repo.get_post_by_id(post_id)
if post:
return render_template('edit_post_form.html', post=post)
else:
flash('Post not found.')
return redirect(url_for('new_index.html'))
@app.post('/update_post')
def update_post():
post_id = request.form.get('post_id')
title = request.form.get('title')
content = request.form.get('content')
# print(post_id, title, content)
if post_id and title and content:
success = posts_repo.update_post(post_id, title, content)
if success:
flash('Post updated successfully!')
else:
flash('Failed to update the post. Please try again.')
else:
flash('Failed to update the post: Missing data')
return redirect(url_for('fetch_all_posts'))
@app.post('/update_user')
def update_user():
user_id = request.form.get('user_id')
username = request.form.get('username')
if user_id and username:
updated = user_repo.update_user(user_id, username)
session['username'] = user_repo.get_user_by_id(user_id).get('username')
if updated:
flash('User updated successfully!')
else:
flash('Failed to update the user.')
else:
flash('Invalid user ID.')
return redirect(url_for("see_profile"))