Skip to content

Commit

Permalink
[AUD-704] Add full support for aggregate challenges (#1657)
Browse files Browse the repository at this point in the history
* WIP

Working aggregation

* Aggregate challenges working + tests

* PR polish

* Address PR comments
  • Loading branch information
piazzatron committed Jul 16, 2021
1 parent 809548a commit b6a74b0
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 95 deletions.
6 changes: 4 additions & 2 deletions discovery-provider/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
{
// With this setup, we get linting via pylint, typechecking via mypy,
// and formatting via black.
// and formatting via black. Comments are allowed b/c
// vscode understands this as a special 'JSON with Comments' file.
"python.linting.pylintEnabled": true, // This enables basic linting
"python.linting.mypyEnabled": true, // This gives us typing
"python.linting.lintOnSave": true, // lint on save
"python.linting.maxNumberOfProblems": 1000, // set high max errors
"python.linting.pylintCategorySeverity.convention": "Warning",
"python.linting.pylintCategorySeverity.refactor": "Warning",
"python.pythonPath": "/usr/bin/python3",
"python.formatting.provider": "black", // This gives us autoforatting
"python.formatting.provider": "black", // This gives us autoformatting
"editor.formatOnSave": true,
"python.analysis.typeCheckingMode": "off",
"python.languageServer": "Pylance",
"editor.tabSize": 4
}
204 changes: 156 additions & 48 deletions discovery-provider/src/challenges/challenge.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,75 @@
import logging
from typing import Counter, Dict, Tuple, TypedDict, List, Optional, cast
from sqlalchemy.orm.session import Session
from sqlalchemy import func
from src.models.models import ChallengeType
from abc import ABC, abstractmethod
from src.models import Challenge, UserChallenge

logger = logging.getLogger(__name__)

# DB Accessors
def fetch_user_challenges(session, challenge_id, user_ids):
def fetch_user_challenges(
session: Session, challenge_id: str, specifiers: List[str]
) -> List[UserChallenge]:
return (
session.query(UserChallenge).filter(
UserChallenge.challenge_id == challenge_id,
UserChallenge.user_id.in_(user_ids),
UserChallenge.specifier.in_(specifiers),
)
).all()


class EventMetadata(TypedDict):
block_number: int
user_id: int
extra: Dict


class FullEventMetadata(TypedDict):
block_number: int
user_id: int
specifier: str
extra: Dict


class ChallengeUpdater(ABC):
"""`ChallengeUpdater` is an abstract class which provides challenge specific logic
to an instance of a `ChallengeManager`. The only required override is update_user_challenges
"""

@abstractmethod
def update_user_challenges(
self, session, event, user_challenges_metadata, step_count
self,
session: Session,
event: str,
user_challenges: List[UserChallenge],
step_count,
):
"""This is the main required method to fill out when implementing a new challenge.
"""This is usually the main required method to fill out when implementing a new challenge.
Given an event type, a list of existing user challenges, and the base challenge type,
update the given user_challenges.
In the case of aggregate challenges, where UserChallenges are created in an
already completed state, this method can be left as is.
"""

def on_after_challenge_creation(self, session, user_ids):
def on_after_challenge_creation(self, session, metadatas: List[FullEventMetadata]):
"""Optional method to do some work after the `ChallengeManager` creates new challenges.
If a challenge is backed by it's own table, for instance, create those rows here.
"""

def generate_specifier(self, user_id):
def generate_specifier(self, user_id: int, extra: Dict) -> str:
"""Optional method to provide a custom specifier for a challenge, given a user_id"""
return user_id
return str(user_id)

def should_create_new_challenge(
self, event: str, user_id: int, extra: Dict
) -> bool:
"""Optional method called for aggregate challenges to allow for overriding default
behavior of creating a new UserChallenge whenever 1) we see a relevant event and
2) the parent challenge is not yet complete.
"""
return True


class ChallengeManager:
Expand All @@ -44,99 +78,173 @@ class ChallengeManager:
of `ChallengeUpdater` implementing the business logic of that challenge.
"""

def __init__(self, challenge_id, updater):
challenge_id: str
_did_init: bool
_updater: ChallengeUpdater
_starting_block: Optional[int]
_step_count: Optional[int]
_challenge_type: ChallengeType

def __init__(self, challenge_id: str, updater: ChallengeUpdater):
self.challenge_id = challenge_id
self._did_init = False
self._updater = updater
self._starting_block = None
self._step_count = None
self._challenge_type = None # type: ignore

def process(self, session, event_type, event_metadatas):
def process(self, session, event_type: str, event_metadatas: List[EventMetadata]):
"""Processes a number of events for a particular event type, updating
UserChallengeEvents as needed.
event_metadata is [{ block_id: number, user_id: number }]
"""
if not self._did_init: # lazy init
self._init_challenge(session)

# filter out events that took place before the starting block, returning
# early if need be
if self._starting_block:
if self._starting_block is not None:
event_metadatas = list(
filter(
lambda x: x["block_number"] >= self._starting_block, event_metadatas
lambda x: x["block_number"] >= cast(int, self._starting_block),
event_metadatas,
)
)

if not event_metadatas:
return

user_ids = list(map(lambda x: x["user_id"], event_metadatas))
user_id_metadatas = {x["user_id"]: x for x in event_metadatas}
# Add specifiers
events_with_specifiers: List[FullEventMetadata] = [
{
"user_id": event["user_id"],
"block_number": event["block_number"],
"extra": event["extra"],
"specifier": self._updater.generate_specifier(
event["user_id"], event["extra"]
),
}
for event in event_metadatas
]

# Drop any duplicate specifiers
events_with_specifiers_map = {
event["specifier"]: event for event in events_with_specifiers
}
events_with_specifiers = list(events_with_specifiers_map.values())

specifiers: List[str] = [e["specifier"] for e in events_with_specifiers]

# Gets all user challenges,
existing_user_challenges = fetch_user_challenges(
session, self.challenge_id, user_ids
session, self.challenge_id, specifiers
)

# Create users that need challenges still
existing_user_ids = {
challenge.user_id for challenge in existing_user_challenges
existing_specifiers = {
challenge.specifier for challenge in existing_user_challenges
}
needs_challenge_ids = list(
{id for id in user_ids if not id in existing_user_ids}
)

# Create new challenges

new_challenge_metadata = [
metadata
for metadata in events_with_specifiers
if metadata["specifier"] not in existing_specifiers
]
to_create_metadata: List[FullEventMetadata] = []
if self._challenge_type == ChallengeType.aggregate:
# For aggregate challenges, only create them
# if we haven't maxed out completion yet, and
# we haven't overriden this via should_create_new_challenge

# Get *all* UserChallenges per user
user_ids = list({e["user_id"] for e in event_metadatas})
all_user_challenges: List[Tuple[int, int]] = (
session.query(
UserChallenge.user_id, func.count(UserChallenge.specifier)
)
.filter(
UserChallenge.challenge_id == self.challenge_id,
UserChallenge.user_id.in_(user_ids),
)
.group_by(UserChallenge.user_id)
).all()
challenges_per_user = dict(all_user_challenges)
for new_metadata in new_challenge_metadata:
completion_count = challenges_per_user.get(new_metadata["user_id"], 0)
if self._step_count and completion_count >= self._step_count:
continue
if not self._updater.should_create_new_challenge(
event_type, new_metadata["user_id"], new_metadata["extra"]
):
continue
to_create_metadata.append(new_metadata)
else:
to_create_metadata = new_challenge_metadata

new_user_challenges = [
self._create_new_challenge(metadata["user_id"], metadata["specifier"])
for metadata in to_create_metadata
]
# Do any other custom work needed after creating a challenge event
self._updater.on_after_challenge_creation(session, to_create_metadata)

# Update all the challenges

in_progress_challenges = [
challenge
for challenge in existing_user_challenges
if not challenge.is_complete
]
new_user_challenges = self._create_new_challenges(needs_challenge_ids)

# Do any other custom work needed after creating a challenge event
self._updater.on_after_challenge_creation(session, needs_challenge_ids)

# Update all the challenges
to_update = in_progress_challenges + new_user_challenges
user_challenges_metadata = [
(user_challenge, user_id_metadatas[user_challenge.user_id])
for user_challenge in to_update
]
self._updater.update_user_challenges(
session, event_type, user_challenges_metadata, self._step_count
session, event_type, to_update, self._step_count
)

# Add block # to newly completed challenges
for challenge in to_update:
if challenge.is_complete:
block_number = events_with_specifiers_map[challenge.specifier][
"block_number"
]
challenge.completed_blocknumber = block_number

logger.debug(f"Updated challenges from event [{event_type}]: [{to_update}]")
# Only add the new ones
session.add_all(new_user_challenges)

def get_challenge_state(self, session, user_ids):
user_challenges = fetch_user_challenges(session, self.challenge_id, user_ids)
return {
user_challenge.user_id: user_challenge for user_challenge in user_challenges
def get_challenge_state(
self, session: Session, specifiers: List[str]
) -> List[UserChallenge]:
user_challenges = fetch_user_challenges(session, self.challenge_id, specifiers)
# Re-sort them
specifier_map = {
user_challenge.specifier: user_challenge
for user_challenge in user_challenges
}
return [specifier_map[s] for s in specifiers]

# Helpers

def _init_challenge(self, session):
challenge = (
challenge: Challenge = (
session.query(Challenge).filter(Challenge.id == self.challenge_id).first()
)
if not challenge:
raise Exception("No matching challenge!")
self._starting_block = challenge.starting_block
self._step_count = challenge.step_count
self._challenge_type = challenge.type
self._did_init = True

def _create_new_challenges(self, user_ids):
return [
UserChallenge(
challenge_id=self.challenge_id,
user_id=user_id,
specifier=self._updater.generate_specifier(user_id),
is_complete=False,
current_step_count=0,
)
for user_id in user_ids
]
def _create_new_challenge(self, user_id: int, specifier: str):
return UserChallenge(
challenge_id=self.challenge_id,
user_id=user_id,
specifier=specifier,
is_complete=(
self._challenge_type == ChallengeType.aggregate
), # Aggregates are made in completed state
current_step_count=0,
)
30 changes: 23 additions & 7 deletions discovery-provider/src/challenges/challenge_event_bus.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
from flask import current_app
from typing import Dict
from sqlalchemy.orm.session import Session
from src.utils.redis_connection import get_redis
from src.challenges.profile_challenge import profile_challenge_manager
from src.challenges.challenge_event import ChallengeEvent
Expand All @@ -26,16 +27,23 @@ def register_listener(self, event, listener):
"""Registers a listener (`ChallengeEventManager`) to listen for a particular event type."""
self._listeners[event].append(listener)

def dispatch(self, session, event, block_number, user_id):
def dispatch(
self,
session: Session,
event: str,
block_number: int,
user_id: int,
extra: Dict = {},
):
"""Dispatches an event + block_number + user_id to Redis queue"""
try:
event_json = self._event_to_json(event, block_number, user_id)
event_json = self._event_to_json(event, block_number, user_id, extra)
logger.info(f"ChallengeEventBus: dispatch {event_json}")
self._redis.rpush(REDIS_QUEUE_PREFIX, event_json)
except Exception as e:
logger.warning(f"ChallengeEventBus: error enqueuing to Redis: {e}")

def process_events(self, session, max_events=1000):
def process_events(self, session: Session, max_events=1000):
"""Dequeues `max_events` from Redis queue and processes them, forwarding to listening ChallengeManagers.
Returns the number of events it's processed.
"""
Expand All @@ -48,14 +56,17 @@ def process_events(self, session, max_events=1000):
events_dicts = list(map(self._json_to_event, events_json))

# Consolidate event types for processing
# map of {"event_type": [{ user_id: number, block_number: number }]}}
# map of {"event_type": [{ user_id: number, block_number: number, extra: {} }]}}
event_user_dict = defaultdict(lambda: [])
for event_dict in events_dicts:
event_type = event_dict["event"]
event_user_dict[event_type].append(
{
"user_id": event_dict["user_id"],
"block_number": event_dict["block_number"],
"extra": event_dict.get( # use .get to be safe since prior versions didn't have `extra`
"extra", {}
),
}
)

Expand All @@ -71,8 +82,13 @@ def process_events(self, session, max_events=1000):

# Helpers

def _event_to_json(self, event, block_number, user_id):
event_dict = {"event": event, "user_id": user_id, "block_number": block_number}
def _event_to_json(self, event: str, block_number: int, user_id: int, extra: Dict):
event_dict = {
"event": event,
"user_id": user_id,
"block_number": block_number,
"extra": extra,
}
return json.dumps(event_dict)

def _json_to_event(self, event_json):
Expand Down

0 comments on commit b6a74b0

Please sign in to comment.