Skip to content

Commit

Permalink
Refactored userhelper
Browse files Browse the repository at this point in the history
  • Loading branch information
loleg committed Mar 27, 2024
1 parent ecc9702 commit 0f72d03
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 72 deletions.
86 changes: 86 additions & 0 deletions dribdat/public/userhelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
"""Helper functions for user lists."""

from dribdat.user.models import User, Activity
from urllib.parse import quote, quote_plus

from sqlalchemy import or_, func

import re


# Removes markdown and HTML tags
RE_NO_TAGS = re.compile(r'\!\[[^\]]*\]\([^\)]+\)|\[|\]|<[^>]+>')


def get_users_by_search(search_by):
"""Collects all users."""
users = User.query.filter_by(active=True)
if search_by and len(search_by) > 2:
q = search_by.replace('@', '').lower()
q = "%%%s%%" % q
if '@' in search_by:
users = users.filter(or_(
User.email.ilike(q),
User.username.ilike(q)
))
else:
users = users.filter(or_(
User.my_story.ilike(q),
User.my_goals.ilike(q),
))
# TODO: pagination!
if users.count() > 200:
# Only the first 200 participants are shown
users = users.limit(200)
# Provide certificate if available
if users:
return sorted(users.all(), key=lambda x: x.username)
return []


def filter_users_by_search(users, search_by=None, role_call=None):
"""Collects the participating users."""
if not users: return [], ''
# Quick (actually, rather slow..) search filter:
if role_call and len(role_call) > 2:
usearch = []
for u in users:
for r in u.roles:
if role_call in str(r):
usearch.append(u)
search_by = role_call
break
elif search_by and len(search_by) > 2:
usearch = []
qq = search_by.replace('@', '').lower()
for u in users:
if qq in u.username.lower() or qq in u.email.lower():
usearch.append(u)
elif (u.my_story and qq in u.my_story.lower()) or \
(u.my_goals and qq in u.my_goals.lower()):
usearch.append(u)
else:
usearch = users
search_by = ''
return usearch, search_by


def get_dribs_paginated(page=1, per_page=10, host_url=''):
"""Collects the latest activities ("dribs")."""
latest_dribs = Activity.query.filter(or_(
Activity.action == "post",
Activity.name == "boost")).order_by(Activity.id.desc())
dribs = latest_dribs.paginate(page=page, per_page=per_page)
dribs.items = [
d for d in dribs.items
if not d.project.is_hidden and d.content]
# Generate social links
for d in dribs.items:
d.share = {
'text': quote(" ".join([
RE_NO_TAGS.sub('', d.content or d.project.name),
d.project.event.hashtags or '#dribdat']).strip()),
'url': quote_plus(host_url + d.project.url)
}
return dribs
80 changes: 12 additions & 68 deletions dribdat/public/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@
"""Public section, including homepage and signup."""
from dribdat.utils import load_event_presets
from flask import (Blueprint, request, render_template, flash, url_for,
redirect, current_app, jsonify)
redirect, current_app, jsonify)
from flask_login import login_required, current_user
from dribdat.user.models import User, Event, Project, Activity
from dribdat.public.forms import NewEventForm
from dribdat.public.userhelper import (get_users_by_search,
filter_users_by_search, get_dribs_paginated)
from dribdat.database import db
from dribdat.extensions import cache
from dribdat.aggregation import GetEventUsers
from dribdat.user import getProjectStages, isUserActive
from urllib.parse import quote, quote_plus, urlparse
from urllib.parse import urlparse
from datetime import datetime
from sqlalchemy import and_, or_, func
import re
from sqlalchemy import and_

blueprint = Blueprint('public', __name__, static_folder="../static")

# Loads confiuration for events
EVENT_PRESET = load_event_presets()

# Removes markdown and HTML tags
RE_NO_TAGS = re.compile(r'\!\[[^\]]*\]\([^\)]+\)|\[|\]|<[^>]+>')


def current_event():
"""Just get a current event."""
Expand Down Expand Up @@ -219,31 +217,11 @@ def event_participants(event_id):
"""Show list of participants of an event."""
event = Event.query.filter_by(id=event_id).first_or_404()
users = GetEventUsers(event)
cert_path = None
search_by = request.args.get('q') or ''
role_call = request.args.get('r') or ''
# Quick (actually, rather slow..) search filter
if role_call and len(role_call) > 2:
usearch = []
for u in users:
for r in u.roles:
if role_call in str(r):
usearch.append(u)
search_by = role_call
break
elif search_by and len(search_by) > 2:
usearch = []
qq = search_by.replace('@', '').lower()
for u in users:
if qq in u.username.lower() or qq in u.email.lower():
usearch.append(u)
elif (u.my_story and qq in u.my_story.lower()) or \
(u.my_goals and qq in u.my_goals.lower()):
usearch.append(u)
else:
usearch = users
search_by = ''
usearch, search_by = filter_users_by_search(users, search_by, role_call)
# Provide certificate if available
cert_path = None
if current_user and not current_user.is_anonymous:
cert_path = current_user.get_cert_path(event)
usercount = len(usearch) if usearch else 0
Expand All @@ -258,34 +236,14 @@ def event_participants(event_id):
@blueprint.route("/participants")
def all_participants():
"""Show list of participants of an event."""
users = User.query.filter_by(active=True)
search_by = request.args.get('q')
if search_by and len(search_by) > 2:
q = search_by.replace('@', '').lower()
q = "%%%s%%" % q
if '@' in search_by:
users = users.filter(or_(
User.email.ilike(q),
User.username.ilike(q)
))
else:
users = users.filter(or_(
User.my_story.ilike(q),
User.my_goals.ilike(q),
))
else:
users = users.limit(50).all()
search_by = ''
# Provide certificate if available
if users:
users = sorted(users, key=lambda x: x.username)
usercount = len(users)
else:
usercount = 0
users = get_users_by_search(search_by)
if len(users) == 200:
flash('Only the first 200 participants are shown.', 'info')
return render_template("public/eventusers.html",
q=search_by,
participants=users,
usercount=usercount,
usercount=len(users),
active="participants")


Expand Down Expand Up @@ -421,21 +379,7 @@ def dribs():
"""Show the latest logged posts."""
page = int(request.args.get('page') or 1)
per_page = int(request.args.get('limit') or 10)
latest_dribs = Activity.query.filter(or_(
Activity.action == "post",
Activity.name == "boost")).order_by(Activity.id.desc())
dribs = latest_dribs.paginate(page=page, per_page=per_page)
dribs.items = [
d for d in dribs.items
if not d.project.is_hidden and d.content]
# Generate social links
for d in dribs.items:
d.share = {
'text': quote(" ".join([
RE_NO_TAGS.sub('', d.content or d.project.name),
d.project.event.hashtags or '#dribdat']).strip()),
'url': quote_plus(request.host_url + d.project.url)
}
dribs = get_dribs_paginated(page, per_page, request.host_url)
return render_template("public/dribs.html",
current_event=current_event(),
endpoint='public.dribs', active='dribs',
Expand Down
6 changes: 4 additions & 2 deletions dribdat/user/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ def set_from_data(self, data):
self.username = data['username']
self.webpage_url = data['webpage_url']
if 'email' not in data:
data['email'] = "%s@%d.localdomain" % (self.username, data['id'])
self.email = data['email']
if not self.email or not '@' in self.email:
self.email = "%s@localhost.localdomain" % self.username
else:
self.email = data['email']
self.updated_at = dt.datetime.utcnow()

def socialize(self):
Expand Down
45 changes: 45 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
import pytz
from base64 import b64decode

from dribdat.user.models import Role, User, Event
from dribdat.user.constants import stageProjectToNext
Expand All @@ -28,6 +29,16 @@ def test_get_by_id(self):
retrieved = User.get_by_id(user.id)
assert retrieved == user

newdata = { 'username': 'bar', 'webpage_url': '#' }
user.set_from_data(newdata)
assert user.username == 'bar'
assert user.webpage_url == newdata['webpage_url']
assert 'localdomain' not in user.email

user = User()
user.set_from_data(newdata)
assert 'localdomain' in user.email

def test_created_at_defaults_to_datetime(self):
"""Test creation date."""
user = User(username='foo', email='foo@bar.com')
Expand Down Expand Up @@ -68,6 +79,26 @@ def test_roles(self):
user.save()
assert role in user.roles

def test_social(self):
"""Check social network profile."""
user = UserFactory()
user.socialize()
assert user.cardtype == ''
assert 'gravatar' in user.carddata
user.webpage_url = 'https://github.com/dribdat'
user.socialize()
assert user.cardtype == 'github'
assert 'dribdat' in user.carddata
user.webpage_url = 'https://gitlab.com/dribdat'
user.email = b64decode(b'b2xAdXRvdS5jaA==').decode("utf-8")
user.socialize()
assert user.cardtype == 'gitlab'
assert 'identicon' in user.carddata
user.webpage_url = 'https://linkedin.com/in/loleg'
user.socialize()
assert user.cardtype == 'linkedin-square'
assert 'avatar' in user.carddata


@pytest.mark.usefixtures('db')
class TestEvent:
Expand Down Expand Up @@ -137,6 +168,20 @@ def test_event_projects(self, db):
assert p1 in event.current_projects()
assert p3 not in event.current_projects()

def test_event_certify(self, db):
event = EventFactory()
user = UserFactory()
user.save()
assert user.may_certify() == (False, 'projects')
project = ProjectFactory()
project.event = event
project.save()
ProjectActivity(project, 'star', user)
assert user.may_certify() == (False, 'event')
event.certificate_path = 'https://testcert.cc/{username}'
event.save()
assert user.may_certify()[0]


@pytest.mark.usefixtures('db')
class TestProject:
Expand Down
27 changes: 25 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
See: http://webtest.readthedocs.org/
"""
from .factories import ProjectFactory, EventFactory, UserFactory
from dribdat.public import views
from dribdat.aggregation import ProjectActivity
from dribdat.public import views, userhelper
from dribdat.aggregation import ProjectActivity, GetEventUsers
from datetime import datetime

class TestViews:
Expand Down Expand Up @@ -68,6 +68,29 @@ def test_feeds_rss(self, event, testapp):
view_rss = testapp.get('/feeds/dribs.xml')
assert 'content:encoded' in view_rss

querypage = userhelper.get_dribs_paginated()
assert len(querypage.items) == 1

# Test personal view
user_rss = testapp.get('/feeds/user/' + user.username)
assert 'Hello, world' in user_rss

def test_user_helpers(self, event, testapp):
"""Functions used in user search."""

user = UserFactory()
user.username = 'Bob'
user.save()
assert user in userhelper.get_users_by_search('@bob')

event = EventFactory()
event.save()
project = ProjectFactory()
project.event = event
project.save()
ProjectActivity(project, 'star', user)
assert project in user.joined_projects()
users = GetEventUsers(event)
assert user in users
assert user in userhelper.filter_users_by_search(users, '@bob')[0]

0 comments on commit 0f72d03

Please sign in to comment.