diff --git a/src/elections/crud.py b/src/elections/crud.py index 10695a4..3d99ce9 100644 --- a/src/elections/crud.py +++ b/src/elections/crud.py @@ -1,11 +1,12 @@ +from collections.abc import Sequence + import sqlalchemy from sqlalchemy.ext.asyncio import AsyncSession -from elections.tables import Election, NomineeApplication, NomineeInfo +from elections.tables import Election -async def get_all_elections(db_session: AsyncSession) -> list[Election]: - # TODO: can this return None? +async def get_all_elections(db_session: AsyncSession) -> Sequence[Election]: election_list = (await db_session.scalars( sqlalchemy .select(Election) @@ -46,100 +47,3 @@ async def delete_election(db_session: AsyncSession, slug: str) -> None: .delete(Election) .where(Election.slug == slug) ) - -# ------------------------------------------------------- # - -# TODO: switch to only using one of application or registration -async def get_all_registrations_of_user( - db_session: AsyncSession, - computing_id: str, - election_slug: str -) -> list[NomineeApplication] | None: - registrations = (await db_session.scalars( - sqlalchemy - .select(NomineeApplication) - .where( - (NomineeApplication.computing_id == computing_id) - & (NomineeApplication.nominee_election == election_slug) - ) - )).all() - return registrations - -async def get_all_registrations_in_election( - db_session: AsyncSession, - election_slug: str, -) -> list[NomineeApplication] | None: - registrations = (await db_session.scalars( - sqlalchemy - .select(NomineeApplication) - .where( - NomineeApplication.nominee_election == election_slug - ) - )).all() - return registrations - -async def add_registration( - db_session: AsyncSession, - initial_application: NomineeApplication -): - db_session.add(initial_application) - -async def update_registration( - db_session: AsyncSession, - initial_application: NomineeApplication -): - await db_session.execute( - sqlalchemy - .update(NomineeApplication) - .where( - (NomineeApplication.computing_id == initial_application.computing_id) - & (NomineeApplication.nominee_election == initial_application.nominee_election) - & (NomineeApplication.position == initial_application.position) - ) - .values(initial_application.to_update_dict()) - ) - -async def delete_registration( - db_session: AsyncSession, - computing_id: str, - election_slug: str, - position: str -): - await db_session.execute( - sqlalchemy - .delete(NomineeApplication) - .where( - (NomineeApplication.computing_id == computing_id) - & (NomineeApplication.nominee_election == election_slug) - & (NomineeApplication.position == position) - ) - ) - -# ------------------------------------------------------- # - -async def get_nominee_info( - db_session: AsyncSession, - computing_id: str, -) -> NomineeInfo | None: - return await db_session.scalar( - sqlalchemy - .select(NomineeInfo) - .where(NomineeInfo.computing_id == computing_id) - ) - -async def create_nominee_info( - db_session: AsyncSession, - info: NomineeInfo, -): - db_session.add(info) - -async def update_nominee_info( - db_session: AsyncSession, - info: NomineeInfo, -): - await db_session.execute( - sqlalchemy - .update(NomineeInfo) - .where(NomineeInfo.computing_id == info.computing_id) - .values(info.to_update_dict()) - ) diff --git a/src/elections/models.py b/src/elections/models.py index e83db14..429924b 100644 --- a/src/elections/models.py +++ b/src/elections/models.py @@ -1,6 +1,9 @@ from enum import StrEnum -from pydantic import BaseModel +from pydantic import BaseModel, Field + +from officers.constants import OfficerPositionEnum +from registrations.models import RegistrationModel class ElectionTypeEnum(StrEnum): @@ -8,26 +11,39 @@ class ElectionTypeEnum(StrEnum): BY_ELECTION = "by_election" COUNCIL_REP = "council_rep_election" -class ElectionModel(BaseModel): +class ElectionStatusEnum(StrEnum): + BEFORE_NOMINATIONS = "before_nominations" + NOMINATIONS = "nominations" + VOTING = "voting" + AFTER_VOTING = "after_voting" + +class ElectionResponse(BaseModel): slug: str name: str type: ElectionTypeEnum datetime_start_nominations: str datetime_start_voting: str datetime_end_voting: str - available_positions: str + available_positions: list[OfficerPositionEnum] + status: ElectionStatusEnum + + survey_link: str | None = Field(None, description="Only available to admins") + candidates: list[RegistrationModel] | None = Field(None, description="Only available to admins") + +class ElectionParams(BaseModel): + name: str + type: ElectionTypeEnum + datetime_start_nominations: str + datetime_start_voting: str + datetime_end_voting: str + available_positions: list[OfficerPositionEnum] | None = None + survey_link: str | None = None + +class ElectionUpdateParams(BaseModel): + type: ElectionTypeEnum | None = None + datetime_start_nominations: str | None = None + datetime_start_voting: str | None = None + datetime_end_voting: str | None = None + available_positions: list[OfficerPositionEnum] | None = None survey_link: str | None = None -class NomineeInfoModel(BaseModel): - computing_id: str - full_name: str - linked_in: str - instagram: str - email: str - discord_username: str - -class NomineeApplicationModel(BaseModel): - computing_id: str - nominee_election: str - position: str - speech: str diff --git a/src/elections/tables.py b/src/elections/tables.py index 1c248d8..1d9709c 100644 --- a/src/elections/tables.py +++ b/src/elections/tables.py @@ -1,34 +1,19 @@ from datetime import datetime from sqlalchemy import ( - Column, DateTime, - ForeignKey, - PrimaryKeyConstraint, String, - Text, ) +from sqlalchemy.orm import Mapped, mapped_column -from constants import ( - COMPUTING_ID_LEN, - DISCORD_ID_LEN, - DISCORD_NAME_LEN, - DISCORD_NICKNAME_LEN, -) from database import Base -from officers.constants import COUNCIL_REP_ELECTION_POSITIONS, GENERAL_ELECTION_POSITIONS - -# If you wish to add more elections & defaults, please see `create_election` -election_types = ["general_election", "by_election", "council_rep_election"] - -DEFAULT_POSITIONS_GENERAL_ELECTION = ",".join(GENERAL_ELECTION_POSITIONS) -DEFAULT_POSITIONS_BY_ELECTION = ",".join(GENERAL_ELECTION_POSITIONS) -DEFAULT_POSITIONS_COUNCIL_REP_ELECTION = ",".join(COUNCIL_REP_ELECTION_POSITIONS) - -STATUS_BEFORE_NOMINATIONS = "before_nominations" -STATUS_NOMINATIONS = "nominations" -STATUS_VOTING = "voting" -STATUS_AFTER_VOTING = "after_voting" +from elections.models import ( + ElectionStatusEnum, + ElectionTypeEnum, + ElectionUpdateParams, +) +from officers.constants import OfficerPositionEnum +from utils.types import StringList MAX_ELECTION_NAME = 64 MAX_ELECTION_SLUG = 64 @@ -37,16 +22,19 @@ class Election(Base): __tablename__ = "election" # Slugs are unique identifiers - slug = Column(String(MAX_ELECTION_SLUG), primary_key=True) - name = Column(String(MAX_ELECTION_NAME), nullable=False) - type = Column(String(64), default="general_election") - datetime_start_nominations = Column(DateTime, nullable=False) - datetime_start_voting = Column(DateTime, nullable=False) - datetime_end_voting = Column(DateTime, nullable=False) - - # a csv list of positions which must be elements of OfficerPosition - available_positions = Column(Text, nullable=False) - survey_link = Column(String(300)) + slug: Mapped[str] = mapped_column(String(MAX_ELECTION_SLUG), primary_key=True) + name: Mapped[str] = mapped_column(String(MAX_ELECTION_NAME), nullable=False) + type: Mapped[ElectionTypeEnum] = mapped_column(String(64), default=ElectionTypeEnum.GENERAL) + datetime_start_nominations: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + datetime_start_voting: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + datetime_end_voting: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + # a comma-separated string of positions which must be elements of OfficerPosition + # By giving it the type `StringList`, the database entry will automatically be marshalled to the correct form + # DB -> Python: str -> list[str] + # Python -> DB: list[str] -> str + available_positions: Mapped[list[OfficerPositionEnum]] = mapped_column(StringList(), nullable=False,) + survey_link: Mapped[str | None] = mapped_column(String(300)) def private_details(self, at_time: datetime) -> dict: # is serializable @@ -107,81 +95,27 @@ def to_update_dict(self) -> dict: "survey_link": self.survey_link, } + def update_from_params(self, params: ElectionUpdateParams): + update_data = params.model_dump( + exclude_unset=True, + exclude={"datetime_start_nominations", "datetime_start_voting", "datetime_end_voting"} + ) + for k, v in update_data.items(): + setattr(self, k, v) + if params.datetime_start_nominations: + self.datetime_start_nominations = datetime.fromisoformat(params.datetime_start_nominations) + if params.datetime_start_voting: + self.datetime_start_voting = datetime.fromisoformat(params.datetime_start_voting) + if params.datetime_end_voting: + self.datetime_end_voting = datetime.fromisoformat(params.datetime_end_voting) + def status(self, at_time: datetime) -> str: if at_time <= self.datetime_start_nominations: - return STATUS_BEFORE_NOMINATIONS + return ElectionStatusEnum.BEFORE_NOMINATIONS elif at_time <= self.datetime_start_voting: - return STATUS_NOMINATIONS + return ElectionStatusEnum.NOMINATIONS elif at_time <= self.datetime_end_voting: - return STATUS_VOTING + return ElectionStatusEnum.VOTING else: - return STATUS_AFTER_VOTING - -class NomineeInfo(Base): - __tablename__ = "election_nominee_info" - - computing_id = Column(String(COMPUTING_ID_LEN), primary_key=True) - full_name = Column(String(64), nullable=False) - linked_in = Column(String(128)) - instagram = Column(String(128)) - email = Column(String(64)) - discord_username = Column(String(DISCORD_NICKNAME_LEN)) - - def to_update_dict(self) -> dict: - return { - "computing_id": self.computing_id, - "full_name": self.full_name, - - "linked_in": self.linked_in, - "instagram": self.instagram, - "email": self.email, - "discord_username": self.discord_username, - } - - def as_serializable(self) -> dict: - # NOTE: this function is currently the same as to_update_dict since the contents - # have a different invariant they're upholding, which may cause them to change if a - # new property is introduced. For example, dates must be converted into strings - # to be serialized, but must not for update dictionaries. - return { - "computing_id": self.computing_id, - "full_name": self.full_name, - - "linked_in": self.linked_in, - "instagram": self.instagram, - "email": self.email, - "discord_username": self.discord_username, - } - -class NomineeApplication(Base): - __tablename__ = "election_nominee_application" - - # TODO: add index for nominee_election? - computing_id = Column(ForeignKey("election_nominee_info.computing_id"), primary_key=True) - nominee_election = Column(ForeignKey("election.slug"), primary_key=True) - position = Column(String(64), primary_key=True) - - speech = Column(Text) - - __table_args__ = ( - PrimaryKeyConstraint(computing_id, nominee_election, position), - ) - - def serializable_dict(self) -> dict: - return { - "computing_id": self.computing_id, - "nominee_election": self.nominee_election, - "position": self.position, - - "speech": self.speech, - } - - def to_update_dict(self) -> dict: - return { - "computing_id": self.computing_id, - "nominee_election": self.nominee_election, - "position": self.position, - - "speech": self.speech, - } + return ElectionStatusEnum.AFTER_VOTING diff --git a/src/elections/urls.py b/src/elections/urls.py index 351430c..caae581 100644 --- a/src/elections/urls.py +++ b/src/elections/urls.py @@ -1,65 +1,110 @@ -import re -from datetime import datetime +import datetime from fastapi import APIRouter, HTTPException, Request, status from fastapi.responses import JSONResponse import database -import elections import elections.crud import elections.tables -from elections.models import ElectionModel, NomineeApplicationModel, NomineeInfoModel -from elections.tables import Election, NomineeApplication, NomineeInfo, election_types -from officers.constants import OfficerPosition -from officers.crud import get_active_officer_terms +import nominees.crud +import registrations.crud +from elections.models import ( + ElectionParams, + ElectionResponse, + ElectionTypeEnum, + ElectionUpdateParams, +) +from elections.tables import Election +from officers.constants import COUNCIL_REP_ELECTION_POSITIONS, GENERAL_ELECTION_POSITIONS, OfficerPositionEnum from permission.types import ElectionOfficer, WebsiteAdmin -from utils.shared_models import SuccessFailModel -from utils.urls import is_logged_in +from utils.shared_models import DetailModel, SuccessResponse +from utils.urls import get_current_user, slugify router = APIRouter( - prefix="/elections", - tags=["elections"], + prefix="/election", + tags=["election"], ) -def _slugify(text: str) -> str: - """Creates a unique slug based on text passed in. Assumes non-unicode text.""" - return re.sub(r"[\W_]+", "-", text.strip().replace("/", "").replace("&", "")) - -async def _validate_user( +async def get_user_permissions( request: Request, db_session: database.DBSession, ) -> tuple[bool, str | None, str | None]: - logged_in, session_id, computing_id = await is_logged_in(request, db_session) - if not logged_in or not computing_id: + session_id, computing_id = await get_current_user(request, db_session) + if not session_id or not computing_id: return False, None, None - # where valid means elections officer or website admin + # where valid means election officer or website admin has_permission = await ElectionOfficer.has_permission(db_session, computing_id) if not has_permission: has_permission = await WebsiteAdmin.has_permission(db_session, computing_id) return has_permission, session_id, computing_id -# elections ------------------------------------------------------------- # +def _default_election_positions(election_type: ElectionTypeEnum) -> list[OfficerPositionEnum]: + if election_type == ElectionTypeEnum.GENERAL: + available_positions = GENERAL_ELECTION_POSITIONS + elif election_type == ElectionTypeEnum.BY_ELECTION: + available_positions = GENERAL_ELECTION_POSITIONS + elif election_type == ElectionTypeEnum.COUNCIL_REP: + available_positions = COUNCIL_REP_ELECTION_POSITIONS + return available_positions + + +def _raise_if_bad_election_data( + slug: str, + election_type: str, + datetime_start_nominations: datetime.datetime, + datetime_start_voting: datetime.datetime, + datetime_end_voting: datetime.datetime, + available_positions: list[OfficerPositionEnum] +): + if election_type not in ElectionTypeEnum: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"unknown election type {election_type}", + ) + + if datetime_start_nominations > datetime_start_voting or datetime_start_voting > datetime_end_voting: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="dates must be in order from earliest to latest", + ) + + for position in available_positions: + if position not in OfficerPositionEnum: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"unknown position found in position list {position}", + ) + + if len(slug) > elections.tables.MAX_ELECTION_SLUG: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"election slug '{slug}' is too long", + ) @router.get( - "/list", - description="Returns a list of all elections & their status", - response_model=list[ElectionModel] + "", + description="Returns a list of all election & their status", + response_model=list[ElectionResponse], + responses={ + 404: { "description": "No election found" } + }, + operation_id="get_all_elections" ) async def list_elections( request: Request, db_session: database.DBSession, ): - is_admin, _, _ = await _validate_user(request, db_session) + is_admin, _, _ = await get_user_permissions(request, db_session) election_list = await elections.crud.get_all_elections(db_session) if election_list is None or len(election_list) == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail="no elections found" + detail="no election found" ) - current_time = datetime.now() + current_time = datetime.datetime.now(tz=datetime.UTC) if is_admin: election_metadata_list = [ election.private_details(current_time) @@ -78,29 +123,33 @@ async def list_elections( description=""" Retrieves the election data for an election by name. Returns private details when the time is allowed. - If user is an admin or elections officer, returns computing ids for each candidate as well. + If user is an admin or election officer, returns computing ids for each candidate as well. """, - response_model=ElectionModel + response_model=ElectionResponse, + responses={ + 404: { "description": "Election of that name doesn't exist", "model": DetailModel } + }, + operation_id="get_election_by_name" ) async def get_election( request: Request, db_session: database.DBSession, - election_name: str, + election_name: str ): - current_time = datetime.now() - slugified_name = _slugify(election_name) + current_time = datetime.datetime.now(tz=datetime.UTC) + slugified_name = slugify(election_name) election = await elections.crud.get_election(db_session, slugified_name) if election is None: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status_code=status.HTTP_404_NOT_FOUND, detail=f"election with slug {slugified_name} does not exist" ) - is_valid_user, _, _ = await _validate_user(request, db_session) + is_valid_user, _, _ = await get_user_permissions(request, db_session) if current_time >= election.datetime_start_voting or is_valid_user: election_json = election.private_details(current_time) - all_nominations = await elections.crud.get_all_registrations_in_election(db_session, slugified_name) + all_nominations = await registrations.crud.get_all_registrations_in_election(db_session, slugified_name) if not all_nominations: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -108,14 +157,14 @@ async def get_election( ) election_json["candidates"] = [] - available_positions_list = election.available_positions.split(",") + available_positions_list = election.available_positions for nomination in all_nominations: if nomination.position not in available_positions_list: # ignore any positions that are **no longer** active continue # NOTE: if a nominee does not input their legal name, they are not considered a nominee - nominee_info = await elections.crud.get_nominee_info(db_session, nomination.computing_id) + nominee_info = await nominees.crud.get_nominee_info(db_session, nomination.computing_id) if nominee_info is None: continue @@ -143,94 +192,52 @@ async def get_election( return JSONResponse(election_json) -def _raise_if_bad_election_data( - name: str, - election_type: str, - datetime_start_nominations: datetime, - datetime_start_voting: datetime, - datetime_end_voting: datetime, - available_positions: str | None, -): - if election_type not in election_types: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"unknown election type {election_type}", - ) - elif not ( - (datetime_start_nominations <= datetime_start_voting) - and (datetime_start_voting <= datetime_end_voting) - ): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="dates must be in order from earliest to latest", - ) - elif available_positions is not None: - for position in available_positions.split(","): - if position not in OfficerPosition.position_list(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"unknown position found in position list {position}", - ) - elif len(_slugify(name)) > elections.tables.MAX_ELECTION_SLUG: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"election slug {_slugify(name)} is too long", - ) - @router.post( - "/{election_name:str}", + "", description="Creates an election and places it in the database. Returns election json on success", - response_model=ElectionModel + response_model=ElectionResponse, + responses={ + 400: { "description": "Invalid request.", "model": DetailModel }, + 500: { "model": DetailModel }, + }, + operation_id="create_election" ) async def create_election( request: Request, + body: ElectionParams, db_session: database.DBSession, - election_name: str, - election_type: str, - datetime_start_nominations: datetime, - datetime_start_voting: datetime, - datetime_end_voting: datetime, - # allows None, which assigns it to the default - available_positions: str | None = None, - survey_link: str | None = None, ): - # ensure that election name is not "list" as it will collide with endpoint - if election_name == "list": - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="cannot use that election name", - ) - - if available_positions is None: - if election_type == "general_election": - available_positions = elections.tables.DEFAULT_POSITIONS_GENERAL_ELECTION - elif election_type == "by_election": - available_positions = elections.tables.DEFAULT_POSITIONS_BY_ELECTION - elif election_type == "council_rep_election": - available_positions = elections.tables.DEFAULT_POSITIONS_COUNCIL_REP_ELECTION - else: + if body.available_positions is None: + if body.type not in ElectionTypeEnum: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail=f"invalid election type {election_type} for available positions" + detail=f"invalid election type {body.type} for available positions" ) - slugified_name = _slugify(election_name) - current_time = datetime.now() + available_positions = _default_election_positions(body.type) + else: + available_positions = body.available_positions + + slugified_name = slugify(body.name) + current_time = datetime.datetime.now(tz=datetime.UTC) + start_nominations = datetime.datetime.fromisoformat(body.datetime_start_nominations) + start_voting = datetime.datetime.fromisoformat(body.datetime_start_voting) + end_voting = datetime.datetime.fromisoformat(body.datetime_end_voting) + + # TODO: We might be able to just use a validation function from Pydantic or SQLAlchemy to check this _raise_if_bad_election_data( - election_name, - election_type, - datetime_start_nominations, - datetime_start_voting, - datetime_end_voting, - available_positions, + slugified_name, + body.type, + start_nominations, + start_voting, + end_voting, + available_positions ) - is_valid_user, _, _ = await _validate_user(request, db_session) + is_valid_user, _, _ = await get_user_permissions(request, db_session) if not is_valid_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="must have election officer or admin permission", - # TODO: is this header actually required? - headers={"WWW-Authenticate": "Basic"}, + detail="must have election officer or admin permission" ) elif await elections.crud.get_election(db_session, slugified_name) is not None: # don't overwrite a previous election @@ -243,18 +250,23 @@ async def create_election( db_session, Election( slug = slugified_name, - name = election_name, - type = election_type, - datetime_start_nominations = datetime_start_nominations, - datetime_start_voting = datetime_start_voting, - datetime_end_voting = datetime_end_voting, + name = body.name, + type = body.type, + datetime_start_nominations = start_nominations, + datetime_start_voting = start_voting, + datetime_end_voting = end_voting, available_positions = available_positions, - survey_link = survey_link + survey_link = body.survey_link ) ) await db_session.commit() election = await elections.crud.get_election(db_session, slugified_name) + if election is None: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="couldn't fetch newly created election" + ) return JSONResponse(election.private_details(current_time)) @router.patch( @@ -267,81 +279,81 @@ async def create_election( Returns election json on success. """, - response_model=ElectionModel + response_model=ElectionResponse, + responses={ + 400: { "model": DetailModel }, + 401: { "description": "Bad request", "model": DetailModel }, + 500: { "description": "Failed to find updated election", "model": DetailModel } + }, + operation_id="update_election" ) async def update_election( request: Request, + body: ElectionUpdateParams, db_session: database.DBSession, election_name: str, - election_type: str, - datetime_start_nominations: datetime, - datetime_start_voting: datetime, - datetime_end_voting: datetime, - available_positions: str, - survey_link: str | None = None, ): - slugified_name = _slugify(election_name) - current_time = datetime.now() - _raise_if_bad_election_data( - election_name, - election_type, - datetime_start_nominations, - datetime_start_voting, - datetime_end_voting, - available_positions, - ) - - is_valid_user, _, _ = await _validate_user(request, db_session) + is_valid_user, _, _ = await get_user_permissions(request, db_session) if not is_valid_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="must have election officer or admin permission", - headers={"WWW-Authenticate": "Basic"}, + detail="must have election officer or admin permission" ) - elif await elections.crud.get_election(db_session, slugified_name) is None: + + slugified_name = slugify(election_name) + election = await elections.crud.get_election(db_session, slugified_name) + if not election: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status_code=status.HTTP_404_NOT_FOUND, detail=f"election with slug {slugified_name} does not exist", ) + election.update_from_params(body) + + # TODO: We might be able to just use a validation function from Pydantic or SQLAlchemy to check this + _raise_if_bad_election_data( + slugified_name, + election.type, + election.datetime_start_nominations, + election.datetime_start_voting, + election.datetime_end_voting, + election.available_positions, + ) + # NOTE: If you update available positions, people will still *technically* be able to update their # registrations, however they will not be returned in the results. await elections.crud.update_election( db_session, - Election( - slug = slugified_name, - name = election_name, - type = election_type, - datetime_start_nominations = datetime_start_nominations, - datetime_start_voting = datetime_start_voting, - datetime_end_voting = datetime_end_voting, - available_positions = available_positions, - survey_link = survey_link - ) + election ) + await db_session.commit() election = await elections.crud.get_election(db_session, slugified_name) - return JSONResponse(election.private_details(current_time)) + if election is None: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="couldn't find updated election") + return JSONResponse(election.private_details(datetime.datetime.now(tz=datetime.UTC))) @router.delete( "/{election_name:str}", description="Deletes an election from the database. Returns whether the election exists after deletion.", - response_model=SuccessFailModel + response_model=SuccessResponse, + responses={ + 401: { "description": "Need to be logged in as an admin.", "model": DetailModel } + }, + operation_id="delete_election" ) async def delete_election( request: Request, db_session: database.DBSession, election_name: str ): - slugified_name = _slugify(election_name) - is_valid_user, _, _ = await _validate_user(request, db_session) + slugified_name = slugify(election_name) + is_valid_user, _, _ = await get_user_permissions(request, db_session) if not is_valid_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="must have election officer permission", - # TODO: is this header actually required? - headers={"WWW-Authenticate": "Basic"}, + detail="must have election officer permission" ) await elections.crud.delete_election(db_session, slugified_name) @@ -349,305 +361,3 @@ async def delete_election( old_election = await elections.crud.get_election(db_session, slugified_name) return JSONResponse({"success": old_election is None}) - -# registration ------------------------------------------------------------- # - -@router.get( - "/registration/{election_name:str}", - description="get your election registration(s)", - response_model=list[NomineeApplicationModel] -) -async def get_election_registrations( - request: Request, - db_session: database.DBSession, - election_name: str -): - slugified_name = _slugify(election_name) - logged_in, _, computing_id = await is_logged_in(request, db_session) - if not logged_in: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="must be logged in to get election registrations" - ) - - if await elections.crud.get_election(db_session, slugified_name) is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"election with slug {slugified_name} does not exist" - ) - - registration_list = await elections.crud.get_all_registrations_of_user(db_session, computing_id, slugified_name) - if registration_list is None: - return JSONResponse([]) - return JSONResponse([ - item.serializable_dict() for item in registration_list - ]) - -@router.post( - "/registration/{election_name:str}", - description="register for a specific position in this election, but doesn't set a speech", -) -async def register_in_election( - request: Request, - db_session: database.DBSession, - election_name: str, - position: str -): - logged_in, _, computing_id = await is_logged_in(request, db_session) - if not logged_in: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="must be logged in to register in election" - ) - if position not in OfficerPosition.position_list(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"invalid position {position}" - ) - - if await elections.crud.get_nominee_info(db_session, computing_id) is None: - # ensure that the user has a nominee info entry before allowing registration to occur. - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="must have submitted nominee info before registering" - ) - - current_time = datetime.now() - slugified_name = _slugify(election_name) - election = await elections.crud.get_election(db_session, slugified_name) - if election is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"election with slug {slugified_name} does not exist" - ) - elif position not in election.available_positions.split(","): - # NOTE: We only restrict creating a registration for a position that doesn't exist, - # not updating or deleting one - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"{position} is not available to register for in this election" - ) - elif election.status(current_time) != elections.tables.STATUS_NOMINATIONS: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="registrations can only be made during the nomination period" - ) - elif await elections.crud.get_all_registrations_of_user(db_session, computing_id, slugified_name): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="you are already registered in this election" - ) - - # TODO: associate specific elections officers with specific elections, then don't - # allow any elections officer running an election to register for it - - await elections.crud.add_registration(db_session, NomineeApplication( - computing_id=computing_id, - nominee_election=slugified_name, - position=position, - speech=None - )) - await db_session.commit() - -@router.patch( - "/registration/{election_name:str}/{ccid_of_registrant}", - description="update the application of a specific registrant" -) -async def update_registration( - request: Request, - db_session: database.DBSession, - election_name: str, - ccid_of_registrant: str, - position: str, - speech: str | None, -): - # check if logged in - logged_in, _, computing_id = await is_logged_in(request, db_session) - if not logged_in: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="must be logged in to update election registration" - ) - # Leave this for now, can remove self_updates if no longer needed. - is_self_update = (computing_id == ccid_of_registrant) - is_officer = await get_active_officer_terms(db_session, computing_id) - # check if the computing_id is of a valid officer or the right applicant - if not is_officer and not is_self_update: # returns [] if user is currently not an officer - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="only valid **current** officers or the applicant can update registrations" - ) - - if position not in OfficerPosition.position_list(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"invalid position {position}" - ) - - current_time = datetime.now() - slugified_name = _slugify(election_name) - election = await elections.crud.get_election(db_session, slugified_name) - if election is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"election with slug {slugified_name} does not exist" - ) - - # self updates can only be done during nomination period. Officer updates can be done whenever - elif election.status(current_time) != elections.tables.STATUS_NOMINATIONS and is_self_update: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="speeches can only be updated during the nomination period" - ) - - elif not await elections.crud.get_all_registrations_of_user(db_session, ccid_of_registrant, slugified_name): - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="applicant not yet registered in this election" - ) - - await elections.crud.update_registration(db_session, NomineeApplication( - computing_id=ccid_of_registrant, - nominee_election=slugified_name, - position=position, - speech=speech - )) - await db_session.commit() - -@router.delete( - "/registration/{election_name:str}/{position:str}", - description="revoke your registration for a specific position in this election" -) -async def delete_registration( - request: Request, - db_session: database.DBSession, - election_name: str, - position: str, -): - logged_in, _, computing_id = await is_logged_in(request, db_session) - if not logged_in: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="must be logged in to delete election registration" - ) - elif position not in OfficerPosition.position_list(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"invalid position {position}" - ) - - current_time = datetime.now() - slugified_name = _slugify(election_name) - election = await elections.crud.get_election(db_session, slugified_name) - if election is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"election with slug {slugified_name} does not exist" - ) - elif election.status(current_time) != elections.tables.STATUS_NOMINATIONS: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="registration can only be revoked during the nomination period" - ) - elif not await elections.crud.get_all_registrations_of_user(db_session, computing_id, slugified_name): - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="you are not yet registered in this election" - ) - - await elections.crud.delete_registration(db_session, computing_id, slugified_name, position) - await db_session.commit() - -# nominee info ------------------------------------------------------------- # - -@router.get( - "/nominee/info", - description="Nominee info is always publically tied to elections, so be careful!", - response_model=NomineeInfoModel -) -async def get_nominee_info( - request: Request, - db_session: database.DBSession, -): - logged_in, _, computing_id = await is_logged_in(request, db_session) - if not logged_in: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="must be logged in to get your nominee info" - ) - - nominee_info = await elections.crud.get_nominee_info(db_session, computing_id) - if nominee_info is None: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="You don't have any nominee info yet" - ) - - return JSONResponse(nominee_info.as_serializable()) - -@router.put( - "/nominee/info", - description="Will create or update nominee info. Returns an updated copy of their nominee info.", - response_model=NomineeInfoModel -) -async def provide_nominee_info( - request: Request, - db_session: database.DBSession, - full_name: str | None = None, - linked_in: str | None = None, - instagram: str | None = None, - email: str | None = None, - discord_username: str | None = None, -): - logged_in, _, computing_id = await is_logged_in(request, db_session) - if not logged_in: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="must be logged in to update nominee info" - ) - - updated_data = {} - # Only update fields that were provided - if full_name is not None: - updated_data["full_name"] = full_name - if linked_in is not None: - updated_data["linked_in"] = linked_in - if instagram is not None: - updated_data["instagram"] = instagram - if email is not None: - updated_data["email"] = email - if discord_username is not None: - updated_data["discord_username"] = discord_username - - existing_info = await elections.crud.get_nominee_info(db_session, computing_id) - # if not already existing, create it - if not existing_info: - # check if full name is passed - if "full_name" not in updated_data: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="full name is required when creating a nominee info" - ) - # unpack dictionary and expand into NomineeInfo class - new_nominee_info = NomineeInfo(computing_id=computing_id, **updated_data) - # create a new nominee - await elections.crud.create_nominee_info(db_session, new_nominee_info) - # else just update the partial data - else: - merged_data = { - "computing_id": computing_id, - "full_name": existing_info.full_name, - "linked_in": existing_info.linked_in, - "instagram": existing_info.instagram, - "email": existing_info.email, - "discord_username": existing_info.discord_username, - } - # update the dictionary with new data - merged_data.update(updated_data) - updated_nominee_info = NomineeInfo(**merged_data) - await elections.crud.update_nominee_info(db_session, updated_nominee_info) - - await db_session.commit() - - nominee_info = await elections.crud.get_nominee_info(db_session, computing_id) - return JSONResponse(nominee_info.as_serializable()) diff --git a/src/load_test_db.py b/src/load_test_db.py index 690fbd8..9d470a3 100644 --- a/src/load_test_db.py +++ b/src/load_test_db.py @@ -12,9 +12,11 @@ # tables, or the current python context will not be able to find them & they won't be loaded from auth.crud import create_user_session, update_site_user from database import SQLALCHEMY_TEST_DATABASE_URL, Base, DatabaseSessionManager -from elections.crud import create_election, create_nominee_info, update_election -from elections.tables import Election, NomineeInfo -from officers.constants import OfficerPosition +from elections.crud import create_election, update_election +from elections.tables import Election +from nominees.crud import create_nominee_info +from nominees.tables import NomineeInfo +from officers.constants import OfficerPositionEnum from officers.crud import ( create_new_officer_info, create_new_officer_term, @@ -22,6 +24,8 @@ update_officer_term, ) from officers.tables import OfficerInfo, OfficerTerm +from registrations.crud import add_registration +from registrations.tables import NomineeApplication async def reset_db(engine): @@ -125,7 +129,7 @@ async def load_test_officers_data(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id="abc11", - position=OfficerPosition.VICE_PRESIDENT, + position=OfficerPositionEnum.VICE_PRESIDENT, start_date=date.today() - timedelta(days=365), end_date=date.today() - timedelta(days=1), @@ -142,7 +146,7 @@ async def load_test_officers_data(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id="abc11", - position=OfficerPosition.EXECUTIVE_AT_LARGE, + position=OfficerPositionEnum.EXECUTIVE_AT_LARGE, start_date=date.today(), end_date=None, @@ -159,7 +163,7 @@ async def load_test_officers_data(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id="abc33", - position=OfficerPosition.PRESIDENT, + position=OfficerPositionEnum.PRESIDENT, start_date=date.today(), end_date=date.today() + timedelta(days=365), @@ -177,7 +181,7 @@ async def load_test_officers_data(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id="abc22", - position=OfficerPosition.DIRECTOR_OF_ARCHIVES, + position=OfficerPositionEnum.DIRECTOR_OF_ARCHIVES, start_date=date.today(), end_date=date.today() + timedelta(days=365), @@ -208,7 +212,7 @@ async def load_test_officers_data(db_session: AsyncSession): await update_officer_term(db_session, OfficerTerm( computing_id="abc33", - position=OfficerPosition.PRESIDENT, + position=OfficerPositionEnum.PRESIDENT, start_date=date.today(), end_date=date.today() + timedelta(days=365), @@ -243,7 +247,7 @@ async def load_sysadmin(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id=SYSADMIN_COMPUTING_ID, - position=OfficerPosition.FIRST_YEAR_REPRESENTATIVE, + position=OfficerPositionEnum.FIRST_YEAR_REPRESENTATIVE, start_date=date.today() - timedelta(days=(365*3)), end_date=date.today() - timedelta(days=(365*2)), @@ -260,7 +264,7 @@ async def load_sysadmin(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id=SYSADMIN_COMPUTING_ID, - position=OfficerPosition.SYSTEM_ADMINISTRATOR, + position=OfficerPositionEnum.SYSTEM_ADMINISTRATOR, start_date=date.today() - timedelta(days=365), end_date=None, @@ -278,7 +282,7 @@ async def load_sysadmin(db_session: AsyncSession): await create_new_officer_term(db_session, OfficerTerm( computing_id=SYSADMIN_COMPUTING_ID, - position=OfficerPosition.DIRECTOR_OF_ARCHIVES, + position=OfficerPositionEnum.DIRECTOR_OF_ARCHIVES, start_date=date.today() + timedelta(days=365*1), end_date=date.today() + timedelta(days=365*2), @@ -295,7 +299,7 @@ async def load_sysadmin(db_session: AsyncSession): await db_session.commit() async def load_test_elections_data(db_session: AsyncSession): - print("loading elections data...") + print("loading election data...") await create_election(db_session, Election( slug="test-election-1", name="test election 1", @@ -303,7 +307,7 @@ async def load_test_elections_data(db_session: AsyncSession): datetime_start_nominations=datetime.now() - timedelta(days=400), datetime_start_voting=datetime.now() - timedelta(days=395, hours=4), datetime_end_voting=datetime.now() - timedelta(days=390, hours=8), - available_positions="president,vice-president", + available_positions=["president", "vice-president"], survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" )) await update_election(db_session, Election( @@ -313,7 +317,7 @@ async def load_test_elections_data(db_session: AsyncSession): datetime_start_nominations=datetime.now() - timedelta(days=400), datetime_start_voting=datetime.now() - timedelta(days=395, hours=4), datetime_end_voting=datetime.now() - timedelta(days=390, hours=8), - available_positions="president,vice-president,treasurer", + available_positions=["president", "vice-president", "treasurer"], survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" )) await create_election(db_session, Election( @@ -323,7 +327,7 @@ async def load_test_elections_data(db_session: AsyncSession): datetime_start_nominations=datetime.now() - timedelta(days=1), datetime_start_voting=datetime.now() + timedelta(days=7), datetime_end_voting=datetime.now() + timedelta(days=14), - available_positions="president,vice-president,treasurer", + available_positions=["president", "vice-president", "treasurer"], survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5 (oh yeah)" )) await create_nominee_info(db_session, NomineeInfo( @@ -349,7 +353,7 @@ async def load_test_elections_data(db_session: AsyncSession): datetime_start_nominations=datetime.now() - timedelta(days=5), datetime_start_voting=datetime.now() - timedelta(days=1, hours=4), datetime_end_voting=datetime.now() + timedelta(days=5, hours=8), - available_positions="president,vice-president,treasurer", + available_positions=["president", "vice-president" ,"treasurer"], survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" )) await create_election(db_session, Election( @@ -359,11 +363,20 @@ async def load_test_elections_data(db_session: AsyncSession): datetime_start_nominations=datetime.now() + timedelta(days=5), datetime_start_voting=datetime.now() + timedelta(days=10, hours=4), datetime_end_voting=datetime.now() + timedelta(days=15, hours=8), - available_positions="president,vice-president,treasurer", + available_positions=["president" ,"vice-president", "treasurer"], survey_link=None )) await db_session.commit() +async def load_test_election_nominee_application_data(db_session: AsyncSession): + await add_registration(db_session, NomineeApplication( + computing_id=SYSADMIN_COMPUTING_ID, + nominee_election="test-election-2", + position="vice-president", + speech=None + )) + await db_session.commit() + # ----------------------------------------------------------------- # async def async_main(sessionmanager): @@ -373,6 +386,7 @@ async def async_main(sessionmanager): await load_test_officers_data(db_session) await load_sysadmin(db_session) await load_test_elections_data(db_session) + await load_test_election_nominee_application_data(db_session) if __name__ == "__main__": response = input(f"This will reset the {SQLALCHEMY_TEST_DATABASE_URL} database, are you okay with this? (y/N): ") diff --git a/src/main.py b/src/main.py index 0a5433b..d326ee6 100755 --- a/src/main.py +++ b/src/main.py @@ -9,8 +9,10 @@ import auth.urls import database import elections.urls +import nominees.urls import officers.urls import permission.urls +import registrations.urls from constants import IS_PROD logging.basicConfig(level=logging.DEBUG) @@ -55,6 +57,8 @@ app.include_router(auth.urls.router) app.include_router(elections.urls.router) +app.include_router(registrations.urls.router) +app.include_router(nominees.urls.router) app.include_router(officers.urls.router) app.include_router(permission.urls.router) diff --git a/src/nominees/crud.py b/src/nominees/crud.py new file mode 100644 index 0000000..372ed91 --- /dev/null +++ b/src/nominees/crud.py @@ -0,0 +1,32 @@ +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncSession + +from nominees.tables import NomineeInfo + + +async def get_nominee_info( + db_session: AsyncSession, + computing_id: str, +) -> NomineeInfo | None: + return await db_session.scalar( + sqlalchemy + .select(NomineeInfo) + .where(NomineeInfo.computing_id == computing_id) + ) + +async def create_nominee_info( + db_session: AsyncSession, + info: NomineeInfo, +): + db_session.add(info) + +async def update_nominee_info( + db_session: AsyncSession, + info: NomineeInfo, +): + await db_session.execute( + sqlalchemy + .update(NomineeInfo) + .where(NomineeInfo.computing_id == info.computing_id) + .values(info.to_update_dict()) + ) diff --git a/src/nominees/models.py b/src/nominees/models.py new file mode 100644 index 0000000..095c108 --- /dev/null +++ b/src/nominees/models.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel + + +class NomineeInfoModel(BaseModel): + computing_id: str + full_name: str + linked_in: str + instagram: str + email: str + discord_username: str + +class NomineeInfoUpdateParams(BaseModel): + full_name: str | None = None + linked_in: str | None = None + instagram: str | None = None + email: str | None = None + discord_username: str | None = None + diff --git a/src/nominees/tables.py b/src/nominees/tables.py new file mode 100644 index 0000000..f2203bf --- /dev/null +++ b/src/nominees/tables.py @@ -0,0 +1,47 @@ +from sqlalchemy import ( + String, +) +from sqlalchemy.orm import Mapped, mapped_column + +from constants import ( + COMPUTING_ID_LEN, + DISCORD_NICKNAME_LEN, +) +from database import Base + + +class NomineeInfo(Base): + __tablename__ = "election_nominee_info" + + computing_id: Mapped[str] = mapped_column(String(COMPUTING_ID_LEN), primary_key=True) + full_name: Mapped[str] = mapped_column(String(64), nullable=False) + linked_in: Mapped[str] = mapped_column(String(128)) + instagram: Mapped[str] = mapped_column(String(128)) + email: Mapped[str] = mapped_column(String(64)) + discord_username: Mapped[str] = mapped_column(String(DISCORD_NICKNAME_LEN)) + + def to_update_dict(self) -> dict: + return { + "full_name": self.full_name, + + "linked_in": self.linked_in, + "instagram": self.instagram, + "email": self.email, + "discord_username": self.discord_username, + } + + def serialize(self) -> dict: + # NOTE: this function is currently the same as to_update_dict since the contents + # have a different invariant they're upholding, which may cause them to change if a + # new property is introduced. For example, dates must be converted into strings + # to be serialized, but must not for update dictionaries. + return { + "computing_id": self.computing_id, + "full_name": self.full_name, + + "linked_in": self.linked_in, + "instagram": self.instagram, + "email": self.email, + "discord_username": self.discord_username, + } + diff --git a/src/nominees/urls.py b/src/nominees/urls.py new file mode 100644 index 0000000..15d8136 --- /dev/null +++ b/src/nominees/urls.py @@ -0,0 +1,104 @@ +from fastapi import APIRouter, HTTPException, Request, status +from fastapi.responses import JSONResponse + +import database +import nominees.crud +from nominees.models import ( + NomineeInfoModel, + NomineeInfoUpdateParams, +) +from nominees.tables import NomineeInfo +from utils.urls import admin_or_raise + +router = APIRouter( + prefix="/nominee", + tags=["nominee"], +) + +@router.get( + "/{computing_id:str}", + description="Nominee info is always publically tied to election, so be careful!", + response_model=NomineeInfoModel, + responses={ + 404: { "description": "nominee doesn't exist" } + }, + operation_id="get_nominee" +) +async def get_nominee_info( + request: Request, + db_session: database.DBSession, + computing_id: str +): + # Putting this one behind the admin wall since it has contact information + await admin_or_raise(request, db_session) + nominee_info = await nominees.crud.get_nominee_info(db_session, computing_id) + if nominee_info is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="nominee doesn't exist" + ) + + return JSONResponse(nominee_info.serialize()) + +@router.patch( + "/{computing_id:str}", + description="Will create or update nominee info. Returns an updated copy of their nominee info.", + response_model=NomineeInfoModel, + responses={ + 500: { "description": "Failed to retrieve updated nominee." } + }, + operation_id="update_nominee" +) +async def provide_nominee_info( + request: Request, + db_session: database.DBSession, + body: NomineeInfoUpdateParams, + computing_id: str +): + # TODO: There needs to be a lot more validation here. + await admin_or_raise(request, db_session) + + updated_data = {} + # Only update fields that were provided + if body.full_name is not None: + updated_data["full_name"] = body.full_name + if body.linked_in is not None: + updated_data["linked_in"] = body.linked_in + if body.instagram is not None: + updated_data["instagram"] = body.instagram + if body.email is not None: + updated_data["email"] = body.email + if body.discord_username is not None: + updated_data["discord_username"] = body.discord_username + + existing_info = await nominees.crud.get_nominee_info(db_session, computing_id) + # if not already existing, create it + if not existing_info: + # unpack dictionary and expand into NomineeInfo class + new_nominee_info = NomineeInfo(computing_id=computing_id, **updated_data) + # create a new nominee + await nominees.crud.create_nominee_info(db_session, new_nominee_info) + # else just update the partial data + else: + merged_data = { + "computing_id": computing_id, + "full_name": existing_info.full_name, + "linked_in": existing_info.linked_in, + "instagram": existing_info.instagram, + "email": existing_info.email, + "discord_username": existing_info.discord_username, + } + # update the dictionary with new data + merged_data.update(updated_data) + updated_nominee_info = NomineeInfo(**merged_data) + await nominees.crud.update_nominee_info(db_session, updated_nominee_info) + + await db_session.commit() + + nominee_info = await nominees.crud.get_nominee_info(db_session, computing_id) + if not nominee_info: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="failed to get updated nominee" + ) + return JSONResponse(nominee_info.serialize()) diff --git a/src/officers/constants.py b/src/officers/constants.py index a60a8cc..9784829 100644 --- a/src/officers/constants.py +++ b/src/officers/constants.py @@ -1,4 +1,7 @@ -class OfficerPosition: +from enum import StrEnum + + +class OfficerPositionEnum(StrEnum): PRESIDENT = "president" VICE_PRESIDENT = "vice-president" TREASURER = "treasurer" @@ -14,7 +17,7 @@ class OfficerPosition: EXECUTIVE_AT_LARGE = "executive at large" FIRST_YEAR_REPRESENTATIVE = "first year representative" - ELECTIONS_OFFICER = "elections officer" + ELECTIONS_OFFICER = "election officer" SFSS_COUNCIL_REPRESENTATIVE = "sfss council representative" FROSH_WEEK_CHAIR = "frosh week chair" @@ -22,12 +25,13 @@ class OfficerPosition: WEBMASTER = "webmaster" SOCIAL_MEDIA_MANAGER = "social media manager" +class OfficerPosition: @staticmethod - def position_list() -> list[str]: + def position_list() -> list[OfficerPositionEnum]: return _OFFICER_POSITION_LIST @staticmethod - def length_in_semesters(position: str) -> int | None: + def length_in_semesters(position: OfficerPositionEnum) -> int | None: # TODO (#101): ask the committee to maintain a json file with all the important details from the constitution """How many semester position is active for, according to the CSSS Constitution""" if position not in _LENGTH_MAP: @@ -37,7 +41,7 @@ def length_in_semesters(position: str) -> int | None: return _LENGTH_MAP[position] @staticmethod - def to_email(position: str) -> str | None: + def to_email(position: OfficerPositionEnum) -> str | None: return _EMAIL_MAP.get(position, None) @staticmethod @@ -47,13 +51,13 @@ def num_active(position: str) -> int | None: """ # None means there can be any number active if ( - position == OfficerPosition.EXECUTIVE_AT_LARGE - or position == OfficerPosition.FIRST_YEAR_REPRESENTATIVE + position == OfficerPositionEnum.EXECUTIVE_AT_LARGE + or position == OfficerPositionEnum.FIRST_YEAR_REPRESENTATIVE ): return 2 elif ( - position == OfficerPosition.FROSH_WEEK_CHAIR - or position == OfficerPosition.SOCIAL_MEDIA_MANAGER + position == OfficerPositionEnum.FROSH_WEEK_CHAIR + or position == OfficerPositionEnum.SOCIAL_MEDIA_MANAGER ): return None else: @@ -65,132 +69,132 @@ def is_signer(position: str) -> bool: If the officer is a signing authority of the CSSS """ return ( - position == OfficerPosition.PRESIDENT - or position == OfficerPosition.VICE_PRESIDENT - or position == OfficerPosition.TREASURER - or position == OfficerPosition.DIRECTOR_OF_RESOURCES - or position == OfficerPosition.DIRECTOR_OF_EVENTS + position == OfficerPositionEnum.PRESIDENT + or position == OfficerPositionEnum.VICE_PRESIDENT + or position == OfficerPositionEnum.TREASURER + or position == OfficerPositionEnum.DIRECTOR_OF_RESOURCES + or position == OfficerPositionEnum.DIRECTOR_OF_EVENTS ) @staticmethod def expected_positions() -> list[str]: # TODO (#93): use this function in the daily cronjobs return [ - OfficerPosition.PRESIDENT, - OfficerPosition.VICE_PRESIDENT, - OfficerPosition.TREASURER, - - OfficerPosition.DIRECTOR_OF_RESOURCES, - OfficerPosition.DIRECTOR_OF_EVENTS, - OfficerPosition.DIRECTOR_OF_EDUCATIONAL_EVENTS, - OfficerPosition.ASSISTANT_DIRECTOR_OF_EVENTS, - OfficerPosition.DIRECTOR_OF_COMMUNICATIONS, - #OfficerPosition.DIRECTOR_OF_OUTREACH, # TODO (#101): when https://github.com/CSSS/documents/pull/9/files merged - OfficerPosition.DIRECTOR_OF_MULTIMEDIA, - OfficerPosition.DIRECTOR_OF_ARCHIVES, - OfficerPosition.EXECUTIVE_AT_LARGE, + OfficerPositionEnum.PRESIDENT, + OfficerPositionEnum.VICE_PRESIDENT, + OfficerPositionEnum.TREASURER, + + OfficerPositionEnum.DIRECTOR_OF_RESOURCES, + OfficerPositionEnum.DIRECTOR_OF_EVENTS, + OfficerPositionEnum.DIRECTOR_OF_EDUCATIONAL_EVENTS, + OfficerPositionEnum.ASSISTANT_DIRECTOR_OF_EVENTS, + OfficerPositionEnum.DIRECTOR_OF_COMMUNICATIONS, + #OfficerPositionEnum.DIRECTOR_OF_OUTREACH, # TODO (#101): when https://github.com/CSSS/documents/pull/9/files merged + OfficerPositionEnum.DIRECTOR_OF_MULTIMEDIA, + OfficerPositionEnum.DIRECTOR_OF_ARCHIVES, + OfficerPositionEnum.EXECUTIVE_AT_LARGE, # TODO (#101): expect these only during fall & spring semesters. - #OfficerPosition.FIRST_YEAR_REPRESENTATIVE, + #OfficerPositionEnum.FIRST_YEAR_REPRESENTATIVE, #ElectionsOfficer, - OfficerPosition.SFSS_COUNCIL_REPRESENTATIVE, - OfficerPosition.FROSH_WEEK_CHAIR, + OfficerPositionEnum.SFSS_COUNCIL_REPRESENTATIVE, + OfficerPositionEnum.FROSH_WEEK_CHAIR, - OfficerPosition.SYSTEM_ADMINISTRATOR, - OfficerPosition.WEBMASTER, + OfficerPositionEnum.SYSTEM_ADMINISTRATOR, + OfficerPositionEnum.WEBMASTER, ] _EMAIL_MAP = { - OfficerPosition.PRESIDENT: "csss-president-current@sfu.ca", - OfficerPosition.VICE_PRESIDENT: "csss-vp-current@sfu.ca", - OfficerPosition.TREASURER: "csss-treasurer-current@sfu.ca", - - OfficerPosition.DIRECTOR_OF_RESOURCES: "csss-dor-current@sfu.ca", - OfficerPosition.DIRECTOR_OF_EVENTS: "csss-doe-current@sfu.ca", - OfficerPosition.DIRECTOR_OF_EDUCATIONAL_EVENTS: "csss-doee-current@sfu.ca", - OfficerPosition.ASSISTANT_DIRECTOR_OF_EVENTS: "csss-adoe-current@sfu.ca", - OfficerPosition.DIRECTOR_OF_COMMUNICATIONS: "csss-doc-current@sfu.ca", - #OfficerPosition.DIRECTOR_OF_OUTREACH, - OfficerPosition.DIRECTOR_OF_MULTIMEDIA: "csss-domm-current@sfu.ca", - OfficerPosition.DIRECTOR_OF_ARCHIVES: "csss-doa-current@sfu.ca", - OfficerPosition.EXECUTIVE_AT_LARGE: "csss-eal-current@sfu.ca", - OfficerPosition.FIRST_YEAR_REPRESENTATIVE: "csss-fyr-current@sfu.ca", - - OfficerPosition.ELECTIONS_OFFICER: "csss-elections@sfu.ca", - OfficerPosition.SFSS_COUNCIL_REPRESENTATIVE: "csss-councilrep@sfu.ca", - OfficerPosition.FROSH_WEEK_CHAIR: "csss-froshchair@sfu.ca", - - OfficerPosition.SYSTEM_ADMINISTRATOR: "csss-sysadmin@sfu.ca", - OfficerPosition.WEBMASTER: "csss-webmaster@sfu.ca", - OfficerPosition.SOCIAL_MEDIA_MANAGER: "N/A", + OfficerPositionEnum.PRESIDENT: "csss-president-current@sfu.ca", + OfficerPositionEnum.VICE_PRESIDENT: "csss-vp-current@sfu.ca", + OfficerPositionEnum.TREASURER: "csss-treasurer-current@sfu.ca", + + OfficerPositionEnum.DIRECTOR_OF_RESOURCES: "csss-dor-current@sfu.ca", + OfficerPositionEnum.DIRECTOR_OF_EVENTS: "csss-doe-current@sfu.ca", + OfficerPositionEnum.DIRECTOR_OF_EDUCATIONAL_EVENTS: "csss-doee-current@sfu.ca", + OfficerPositionEnum.ASSISTANT_DIRECTOR_OF_EVENTS: "csss-adoe-current@sfu.ca", + OfficerPositionEnum.DIRECTOR_OF_COMMUNICATIONS: "csss-doc-current@sfu.ca", + #OfficerPositionEnum.DIRECTOR_OF_OUTREACH, + OfficerPositionEnum.DIRECTOR_OF_MULTIMEDIA: "csss-domm-current@sfu.ca", + OfficerPositionEnum.DIRECTOR_OF_ARCHIVES: "csss-doa-current@sfu.ca", + OfficerPositionEnum.EXECUTIVE_AT_LARGE: "csss-eal-current@sfu.ca", + OfficerPositionEnum.FIRST_YEAR_REPRESENTATIVE: "csss-fyr-current@sfu.ca", + + OfficerPositionEnum.ELECTIONS_OFFICER: "csss-election@sfu.ca", + OfficerPositionEnum.SFSS_COUNCIL_REPRESENTATIVE: "csss-councilrep@sfu.ca", + OfficerPositionEnum.FROSH_WEEK_CHAIR: "csss-froshchair@sfu.ca", + + OfficerPositionEnum.SYSTEM_ADMINISTRATOR: "csss-sysadmin@sfu.ca", + OfficerPositionEnum.WEBMASTER: "csss-webmaster@sfu.ca", + OfficerPositionEnum.SOCIAL_MEDIA_MANAGER: "N/A", } # None, means that the length of the position does not have a set length in semesters _LENGTH_MAP = { - OfficerPosition.PRESIDENT: 3, - OfficerPosition.VICE_PRESIDENT: 3, - OfficerPosition.TREASURER: 3, - - OfficerPosition.DIRECTOR_OF_RESOURCES: 3, - OfficerPosition.DIRECTOR_OF_EVENTS: 3, - OfficerPosition.DIRECTOR_OF_EDUCATIONAL_EVENTS: 3, - OfficerPosition.ASSISTANT_DIRECTOR_OF_EVENTS: 3, - OfficerPosition.DIRECTOR_OF_COMMUNICATIONS: 3, - #OfficerPosition.DIRECTOR_OF_OUTREACH: 3, - OfficerPosition.DIRECTOR_OF_MULTIMEDIA: 3, - OfficerPosition.DIRECTOR_OF_ARCHIVES: 3, - OfficerPosition.EXECUTIVE_AT_LARGE: 1, - OfficerPosition.FIRST_YEAR_REPRESENTATIVE: 2, - - OfficerPosition.ELECTIONS_OFFICER: None, - OfficerPosition.SFSS_COUNCIL_REPRESENTATIVE: 3, - OfficerPosition.FROSH_WEEK_CHAIR: None, - - OfficerPosition.SYSTEM_ADMINISTRATOR: None, - OfficerPosition.WEBMASTER: None, - OfficerPosition.SOCIAL_MEDIA_MANAGER: None, + OfficerPositionEnum.PRESIDENT: 3, + OfficerPositionEnum.VICE_PRESIDENT: 3, + OfficerPositionEnum.TREASURER: 3, + + OfficerPositionEnum.DIRECTOR_OF_RESOURCES: 3, + OfficerPositionEnum.DIRECTOR_OF_EVENTS: 3, + OfficerPositionEnum.DIRECTOR_OF_EDUCATIONAL_EVENTS: 3, + OfficerPositionEnum.ASSISTANT_DIRECTOR_OF_EVENTS: 3, + OfficerPositionEnum.DIRECTOR_OF_COMMUNICATIONS: 3, + #OfficerPositionEnum.DIRECTOR_OF_OUTREACH: 3, + OfficerPositionEnum.DIRECTOR_OF_MULTIMEDIA: 3, + OfficerPositionEnum.DIRECTOR_OF_ARCHIVES: 3, + OfficerPositionEnum.EXECUTIVE_AT_LARGE: 1, + OfficerPositionEnum.FIRST_YEAR_REPRESENTATIVE: 2, + + OfficerPositionEnum.ELECTIONS_OFFICER: None, + OfficerPositionEnum.SFSS_COUNCIL_REPRESENTATIVE: 3, + OfficerPositionEnum.FROSH_WEEK_CHAIR: None, + + OfficerPositionEnum.SYSTEM_ADMINISTRATOR: None, + OfficerPositionEnum.WEBMASTER: None, + OfficerPositionEnum.SOCIAL_MEDIA_MANAGER: None, } _OFFICER_POSITION_LIST = [ - OfficerPosition.PRESIDENT, - OfficerPosition.VICE_PRESIDENT, - OfficerPosition.TREASURER, - - OfficerPosition.DIRECTOR_OF_RESOURCES, - OfficerPosition.DIRECTOR_OF_EVENTS, - OfficerPosition.DIRECTOR_OF_EDUCATIONAL_EVENTS, - OfficerPosition.ASSISTANT_DIRECTOR_OF_EVENTS, - OfficerPosition.DIRECTOR_OF_COMMUNICATIONS, - #OfficerPosition.DIRECTOR_OF_OUTREACH, - OfficerPosition.DIRECTOR_OF_MULTIMEDIA, - OfficerPosition.DIRECTOR_OF_ARCHIVES, - OfficerPosition.EXECUTIVE_AT_LARGE, - OfficerPosition.FIRST_YEAR_REPRESENTATIVE, - - OfficerPosition.ELECTIONS_OFFICER, - OfficerPosition.SFSS_COUNCIL_REPRESENTATIVE, - OfficerPosition.FROSH_WEEK_CHAIR, - - OfficerPosition.SYSTEM_ADMINISTRATOR, - OfficerPosition.WEBMASTER, - OfficerPosition.SOCIAL_MEDIA_MANAGER, + OfficerPositionEnum.PRESIDENT, + OfficerPositionEnum.VICE_PRESIDENT, + OfficerPositionEnum.TREASURER, + + OfficerPositionEnum.DIRECTOR_OF_RESOURCES, + OfficerPositionEnum.DIRECTOR_OF_EVENTS, + OfficerPositionEnum.DIRECTOR_OF_EDUCATIONAL_EVENTS, + OfficerPositionEnum.ASSISTANT_DIRECTOR_OF_EVENTS, + OfficerPositionEnum.DIRECTOR_OF_COMMUNICATIONS, + #OfficerPositionEnum.DIRECTOR_OF_OUTREACH, + OfficerPositionEnum.DIRECTOR_OF_MULTIMEDIA, + OfficerPositionEnum.DIRECTOR_OF_ARCHIVES, + OfficerPositionEnum.EXECUTIVE_AT_LARGE, + OfficerPositionEnum.FIRST_YEAR_REPRESENTATIVE, + + OfficerPositionEnum.ELECTIONS_OFFICER, + OfficerPositionEnum.SFSS_COUNCIL_REPRESENTATIVE, + OfficerPositionEnum.FROSH_WEEK_CHAIR, + + OfficerPositionEnum.SYSTEM_ADMINISTRATOR, + OfficerPositionEnum.WEBMASTER, + OfficerPositionEnum.SOCIAL_MEDIA_MANAGER, ] GENERAL_ELECTION_POSITIONS = [ - OfficerPosition.PRESIDENT, - OfficerPosition.VICE_PRESIDENT, - OfficerPosition.TREASURER, - - OfficerPosition.DIRECTOR_OF_RESOURCES, - OfficerPosition.DIRECTOR_OF_EVENTS, - OfficerPosition.DIRECTOR_OF_EDUCATIONAL_EVENTS, - OfficerPosition.ASSISTANT_DIRECTOR_OF_EVENTS, - OfficerPosition.DIRECTOR_OF_COMMUNICATIONS, - #OfficerPosition.DIRECTOR_OF_OUTREACH, - OfficerPosition.DIRECTOR_OF_MULTIMEDIA, - OfficerPosition.DIRECTOR_OF_ARCHIVES, + OfficerPositionEnum.PRESIDENT, + OfficerPositionEnum.VICE_PRESIDENT, + OfficerPositionEnum.TREASURER, + + OfficerPositionEnum.DIRECTOR_OF_RESOURCES, + OfficerPositionEnum.DIRECTOR_OF_EVENTS, + OfficerPositionEnum.DIRECTOR_OF_EDUCATIONAL_EVENTS, + OfficerPositionEnum.ASSISTANT_DIRECTOR_OF_EVENTS, + OfficerPositionEnum.DIRECTOR_OF_COMMUNICATIONS, + #OfficerPositionEnum.DIRECTOR_OF_OUTREACH, + OfficerPositionEnum.DIRECTOR_OF_MULTIMEDIA, + OfficerPositionEnum.DIRECTOR_OF_ARCHIVES, ] COUNCIL_REP_ELECTION_POSITIONS = [ - OfficerPosition.SFSS_COUNCIL_REPRESENTATIVE, + OfficerPositionEnum.SFSS_COUNCIL_REPRESENTATIVE, ] diff --git a/src/officers/crud.py b/src/officers/crud.py index 5ebc91e..a2e1b6f 100644 --- a/src/officers/crud.py +++ b/src/officers/crud.py @@ -1,4 +1,3 @@ -import logging from datetime import datetime import sqlalchemy @@ -15,8 +14,6 @@ OfficerData, ) -_logger = logging.getLogger(__name__) - # NOTE: this module should not do any data validation; that should be done in the urls.py or higher layer async def current_officers( diff --git a/src/officers/tables.py b/src/officers/tables.py index 32eb447..23575a0 100644 --- a/src/officers/tables.py +++ b/src/officers/tables.py @@ -1,15 +1,16 @@ from __future__ import annotations +from datetime import datetime + from sqlalchemy import ( - Column, - Date, + DateTime, ForeignKey, Integer, String, Text, ) +from sqlalchemy.orm import Mapped, mapped_column -# from sqlalchemy.orm import relationship from constants import ( COMPUTING_ID_LEN, DISCORD_ID_LEN, @@ -18,6 +19,7 @@ GITHUB_USERNAME_LEN, ) from database import Base +from officers.constants import OfficerPositionEnum # A row represents an assignment of a person to a position. @@ -26,27 +28,27 @@ class OfficerTerm(Base): __tablename__ = "officer_term" # TODO (#98): create a unique constraint for (computing_id, position, start_date). - id = Column(Integer, primary_key=True, autoincrement=True) + id: Mapped[str] = mapped_column(Integer, primary_key=True, autoincrement=True) - computing_id = Column( + computing_id: Mapped[str] = mapped_column( String(COMPUTING_ID_LEN), ForeignKey("site_user.computing_id"), nullable=False, ) - position = Column(String(128), nullable=False) - start_date = Column(Date, nullable=False) + position: Mapped[OfficerPositionEnum] = mapped_column(String(128), nullable=False) + start_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) # end_date is only not-specified for positions that don't have a length (ie. webmaster) - end_date = Column(Date, nullable=True) + end_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=True) - nickname = Column(String(128), nullable=True) - favourite_course_0 = Column(String(64), nullable=True) - favourite_course_1 = Column(String(64), nullable=True) + nickname: Mapped[str] = mapped_column(String(128), nullable=True) + favourite_course_0: Mapped[str] = mapped_column(String(64), nullable=True) + favourite_course_1: Mapped[str] = mapped_column(String(64), nullable=True) # programming language - favourite_pl_0 = Column(String(64), nullable=True) - favourite_pl_1 = Column(String(64), nullable=True) - biography = Column(Text, nullable=True) - photo_url = Column(Text, nullable=True) # some urls get big, best to let it be a string + favourite_pl_0: Mapped[str] = mapped_column(String(64), nullable=True) + favourite_pl_1: Mapped[str] = mapped_column(String(64), nullable=True) + biography: Mapped[str] = mapped_column(Text, nullable=True) + photo_url: Mapped[str] = mapped_column(Text, nullable=True) # some urls get big, best to let it be a string def serializable_dict(self) -> dict: return { @@ -102,32 +104,32 @@ def to_update_dict(self) -> dict: class OfficerInfo(Base): __tablename__ = "officer_info" - computing_id = Column( + computing_id: Mapped[str] = mapped_column( String(COMPUTING_ID_LEN), ForeignKey("site_user.computing_id"), primary_key=True, ) # TODO (#71): we'll need to use SFU's API to get the legal name for users - legal_name = Column(String(128), nullable=False) # some people have long names, you never know - phone_number = Column(String(24), nullable=True) + legal_name: Mapped[str] = mapped_column(String(128), nullable=False) # some people have long names, you never know + phone_number: Mapped[str] = mapped_column(String(24), nullable=True) # TODO (#99): add unique constraints to discord_id (stops users from stealing the username of someone else) - discord_id = Column(String(DISCORD_ID_LEN), nullable=True) - discord_name = Column(String(DISCORD_NAME_LEN), nullable=True) + discord_id: Mapped[str] = mapped_column(String(DISCORD_ID_LEN), nullable=True) + discord_name: Mapped[str] = mapped_column(String(DISCORD_NAME_LEN), nullable=True) # this is their nickname in the csss server - discord_nickname = Column(String(DISCORD_NICKNAME_LEN), nullable=True) + discord_nickname: Mapped[str] = mapped_column(String(DISCORD_NICKNAME_LEN), nullable=True) # Technically 320 is the most common max-size for emails, but we'll use 256 instead, # since it's reasonably large (input validate this too) # TODO (#99): add unique constraint to this (stops users from stealing the username of someone else) - google_drive_email = Column(String(256), nullable=True) + google_drive_email: Mapped[str] = mapped_column(String(256), nullable=True) # TODO (#99): add unique constraint to this (stops users from stealing the username of someone else) - github_username = Column(String(GITHUB_USERNAME_LEN), nullable=True) + github_username: Mapped[str] = mapped_column(String(GITHUB_USERNAME_LEN), nullable=True) # TODO (#22): add support for giving executives bitwarden access automagically - # has_signed_into_bitwarden = Column(Boolean) + # has_signed_into_bitwarden: Mapped[str] = mapped_column(Boolean) def serializable_dict(self) -> dict: return { diff --git a/src/officers/urls.py b/src/officers/urls.py index 8cb82f3..8ce70f7 100755 --- a/src/officers/urls.py +++ b/src/officers/urls.py @@ -179,7 +179,7 @@ async def new_officer_term( @router.patch( "/info/{computing_id}", description=""" - After elections, officer computing ids are input into our system. + After election, officer computing ids are input into our system. If you have been elected as a new officer, you may authenticate with SFU CAS, then input your information & the valid token for us. Admins may update this info. """ diff --git a/src/permission/types.py b/src/permission/types.py index 3b9db50..2a1acf6 100644 --- a/src/permission/types.py +++ b/src/permission/types.py @@ -8,7 +8,7 @@ import officers.crud import utils from data.semesters import step_semesters -from officers.constants import OfficerPosition +from officers.constants import OfficerPositionEnum class OfficerPrivateInfo: @@ -34,11 +34,11 @@ class ElectionOfficer: @staticmethod async def has_permission(db_session: database.DBSession, computing_id: str) -> bool: """ - An current elections officer has access to all elections, prior elections officers have no access. + An current election officer has access to all election, prior election officers have no access. """ officer_terms = await officers.crud.current_officers(db_session, True) current_election_officer = officer_terms.get( - officers.constants.OfficerPosition.ELECTIONS_OFFICER + officers.constants.OfficerPositionEnum.ELECTIONS_OFFICER ) if current_election_officer is not None: for election_officer in current_election_officer[1]: @@ -51,12 +51,12 @@ async def has_permission(db_session: database.DBSession, computing_id: str) -> b return False class WebsiteAdmin: - WEBSITE_ADMIN_POSITIONS: ClassVar[list[str]] = [ - OfficerPosition.PRESIDENT, - OfficerPosition.VICE_PRESIDENT, - OfficerPosition.DIRECTOR_OF_ARCHIVES, - OfficerPosition.SYSTEM_ADMINISTRATOR, - OfficerPosition.WEBMASTER, + WEBSITE_ADMIN_POSITIONS: ClassVar[list[OfficerPositionEnum]] = [ + OfficerPositionEnum.PRESIDENT, + OfficerPositionEnum.VICE_PRESIDENT, + OfficerPositionEnum.DIRECTOR_OF_ARCHIVES, + OfficerPositionEnum.SYSTEM_ADMINISTRATOR, + OfficerPositionEnum.WEBMASTER, ] @staticmethod @@ -77,3 +77,4 @@ async def has_permission_or_raise( ) -> bool: if not await WebsiteAdmin.has_permission(db_session, computing_id): raise HTTPException(status_code=401, detail=errmsg) + return True diff --git a/src/registrations/crud.py b/src/registrations/crud.py new file mode 100644 index 0000000..6d135b5 --- /dev/null +++ b/src/registrations/crud.py @@ -0,0 +1,91 @@ +from collections.abc import Sequence + +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncSession + +from officers.constants import OfficerPositionEnum +from registrations.tables import NomineeApplication + + +async def get_all_registrations_of_user( + db_session: AsyncSession, + computing_id: str, + election_slug: str +) -> Sequence[NomineeApplication] | None: + registrations = (await db_session.scalars( + sqlalchemy + .select(NomineeApplication) + .where( + (NomineeApplication.computing_id == computing_id) + & (NomineeApplication.nominee_election == election_slug) + ) + )).all() + return registrations + +async def get_one_registration_in_election( + db_session: AsyncSession, + computing_id: str, + election_slug: str, + position: OfficerPositionEnum, +) -> NomineeApplication | None: + registration = (await db_session.scalar( + sqlalchemy + .select(NomineeApplication) + .where( + NomineeApplication.computing_id == computing_id, + NomineeApplication.nominee_election == election_slug, + NomineeApplication.position == position + ) + )) + return registration + +async def get_all_registrations_in_election( + db_session: AsyncSession, + election_slug: str, +) -> Sequence[NomineeApplication] | None: + registrations = (await db_session.scalars( + sqlalchemy + .select(NomineeApplication) + .where( + NomineeApplication.nominee_election == election_slug + ) + )).all() + return registrations + +async def add_registration( + db_session: AsyncSession, + initial_application: NomineeApplication +): + db_session.add(initial_application) + +async def update_registration( + db_session: AsyncSession, + initial_application: NomineeApplication +): + await db_session.execute( + sqlalchemy + .update(NomineeApplication) + .where( + (NomineeApplication.computing_id == initial_application.computing_id) + & (NomineeApplication.nominee_election == initial_application.nominee_election) + & (NomineeApplication.position == initial_application.position) + ) + .values(initial_application.to_update_dict()) + ) + +async def delete_registration( + db_session: AsyncSession, + computing_id: str, + election_slug: str, + position: OfficerPositionEnum +): + await db_session.execute( + sqlalchemy + .delete(NomineeApplication) + .where( + (NomineeApplication.computing_id == computing_id) + & (NomineeApplication.nominee_election == election_slug) + & (NomineeApplication.position == position) + ) + ) + diff --git a/src/registrations/models.py b/src/registrations/models.py new file mode 100644 index 0000000..7a26a68 --- /dev/null +++ b/src/registrations/models.py @@ -0,0 +1,28 @@ +from pydantic import BaseModel + +from officers.constants import OfficerPositionEnum + + +class RegistrationModel(BaseModel): + position: OfficerPositionEnum + full_name: str + linked_in: str + instagram: str + email: str + discord_username: str + speech: str + +class NomineeApplicationParams(BaseModel): + computing_id: str + position: OfficerPositionEnum + +class NomineeApplicationUpdateParams(BaseModel): + position: OfficerPositionEnum | None = None + speech: str | None = None + +class NomineeApplicationModel(BaseModel): + computing_id: str + nominee_election: str + position: OfficerPositionEnum + speech: str | None = None + diff --git a/src/registrations/tables.py b/src/registrations/tables.py new file mode 100644 index 0000000..27b98aa --- /dev/null +++ b/src/registrations/tables.py @@ -0,0 +1,45 @@ +from sqlalchemy import ForeignKey, PrimaryKeyConstraint, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from database import Base +from officers.constants import OfficerPositionEnum +from registrations.models import NomineeApplicationUpdateParams + + +class NomineeApplication(Base): + __tablename__ = "election_nominee_application" + + computing_id: Mapped[str] = mapped_column(ForeignKey("election_nominee_info.computing_id"), primary_key=True) + nominee_election: Mapped[str] = mapped_column(ForeignKey("election.slug"), primary_key=True) + position: Mapped[OfficerPositionEnum] = mapped_column(String(64), primary_key=True) + + speech: Mapped[str | None] = mapped_column(Text) + + __table_args__ = ( + PrimaryKeyConstraint(computing_id, nominee_election, position), + ) + + def serialize(self) -> dict: + return { + "computing_id": self.computing_id, + "nominee_election": self.nominee_election, + "position": self.position, + + "speech": self.speech, + } + + def to_update_dict(self) -> dict: + return { + "computing_id": self.computing_id, + "nominee_election": self.nominee_election, + "position": self.position, + + "speech": self.speech, + } + + def update_from_params(self, params: NomineeApplicationUpdateParams): + update_data = params.model_dump(exclude_unset=True) + for k, v in update_data.items(): + setattr(self, k, v) + + diff --git a/src/registrations/urls.py b/src/registrations/urls.py new file mode 100644 index 0000000..1fe552f --- /dev/null +++ b/src/registrations/urls.py @@ -0,0 +1,256 @@ +import datetime + +from fastapi import APIRouter, HTTPException, Request, status +from fastapi.responses import JSONResponse + +import database +import elections.crud +import nominees.crud +import registrations.crud +from elections.models import ( + ElectionStatusEnum, +) +from officers.constants import OfficerPositionEnum +from registrations.models import ( + NomineeApplicationModel, + NomineeApplicationParams, + NomineeApplicationUpdateParams, +) +from registrations.tables import NomineeApplication +from utils.shared_models import DetailModel, SuccessResponse +from utils.urls import admin_or_raise, logged_in_or_raise, slugify + +router = APIRouter( + prefix="/registration", + tags=["registration"], +) + +@router.get( + "/{election_name:str}", + description="get all the registrations of a single election", + response_model=list[NomineeApplicationModel], + responses={ + 401: { "description": "Not logged in", "model": DetailModel }, + 404: { "description": "Election with slug does not exist", "model": DetailModel } + }, + operation_id="get_election_registrations" +) +async def get_election_registrations( + request: Request, + db_session: database.DBSession, + election_name: str +): + await logged_in_or_raise(request, db_session) + + slugified_name = slugify(election_name) + if await elections.crud.get_election(db_session, slugified_name) is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"election with slug {slugified_name} does not exist" + ) + + registration_list = await registrations.crud.get_all_registrations_in_election(db_session, slugified_name) + if registration_list is None: + return JSONResponse([]) + return JSONResponse([ + item.serialize() for item in registration_list + ]) + +@router.post( + "/{election_name:str}", + description="Register for a specific position in this election, but doesn't set a speech. Returns the created entry.", + response_model=NomineeApplicationModel, + responses={ + 400: { "description": "Bad request", "model": DetailModel }, + 401: { "description": "Not logged in", "model": DetailModel }, + 403: { "description": "Not an admin", "model": DetailModel }, + 404: { "description": "No election found", "model": DetailModel }, + }, + operation_id="register" +) +async def register_in_election( + request: Request, + db_session: database.DBSession, + body: NomineeApplicationParams, + election_name: str +): + await admin_or_raise(request, db_session) + + if body.position not in OfficerPositionEnum: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"invalid position {body.position}" + ) + + if await nominees.crud.get_nominee_info(db_session, body.computing_id) is None: + # ensure that the user has a nominee info entry before allowing registration to occur. + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="must have submitted nominee info before registering" + ) + + slugified_name = slugify(election_name) + election = await elections.crud.get_election(db_session, slugified_name) + if election is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"election with slug {slugified_name} does not exist" + ) + + if body.position not in election.available_positions: + # NOTE: We only restrict creating a registration for a position that doesn't exist, + # not updating or deleting one + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"{body.position} is not available to register for in this election" + ) + + if election.status(datetime.datetime.now(tz=datetime.UTC)) != ElectionStatusEnum.NOMINATIONS: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="registrations can only be made during the nomination period" + ) + + if await registrations.crud.get_all_registrations_of_user(db_session, body.computing_id, slugified_name): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="person is already registered in this election" + ) + + # TODO: associate specific election officers with specific election, then don't + # allow any election officer running an election to register for it + await registrations.crud.add_registration(db_session, NomineeApplication( + computing_id=body.computing_id, + nominee_election=slugified_name, + position=body.position, + speech=None + )) + await db_session.commit() + + registrant = await registrations.crud.get_one_registration_in_election( + db_session, body.computing_id, slugified_name, body.position + ) + if not registrant: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="failed to find new registrant" + ) + return registrant + +@router.patch( + "/{election_name:str}/{position:str}/{computing_id:str}", + description="update the application of a specific registrant and return the changed entry", + response_model=NomineeApplicationModel, + responses={ + 400: { "description": "Bad request", "model": DetailModel }, + 401: { "description": "Not logged in", "model": DetailModel }, + 403: { "description": "Not an admin", "model": DetailModel }, + 404: { "description": "No election found", "model": DetailModel }, + }, + operation_id="update_registration" +) +async def update_registration( + request: Request, + db_session: database.DBSession, + body: NomineeApplicationUpdateParams, + election_name: str, + computing_id: str, + position: OfficerPositionEnum +): + await admin_or_raise(request, db_session) + + if body.position not in OfficerPositionEnum: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"invalid position {body.position}" + ) + + slugified_name = slugify(election_name) + election = await elections.crud.get_election(db_session, slugified_name) + if election is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"election with slug {slugified_name} does not exist" + ) + + # self updates can only be done during nomination period. Officer updates can be done whenever + if election.status(datetime.datetime.now(tz=datetime.UTC)) != ElectionStatusEnum.NOMINATIONS: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="speeches can only be updated during the nomination period" + ) + + registration = await registrations.crud.get_one_registration_in_election(db_session, computing_id, slugified_name, position) + if not registration: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="no registration record found" + ) + + registration.update_from_params(body) + + await registrations.crud.update_registration(db_session, registration) + await db_session.commit() + + registrant = await registrations.crud.get_one_registration_in_election( + db_session, registration.computing_id, slugified_name, registration.position + ) + if not registrant: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="failed to find changed registrant" + ) + return registrant + +@router.delete( + "/{election_name:str}/{position:str}/{computing_id:str}", + description="delete the registration of a person", + response_model=SuccessResponse, + responses={ + 400: { "description": "Bad request", "model": DetailModel }, + 401: { "description": "Not logged in", "model": DetailModel }, + 403: { "description": "Not an admin", "model": DetailModel }, + 404: { "description": "No election or registrant found", "model": DetailModel }, + }, + operation_id="delete_registration" +) +async def delete_registration( + request: Request, + db_session: database.DBSession, + election_name: str, + position: OfficerPositionEnum, + computing_id: str +): + await admin_or_raise(request, db_session) + + if position not in OfficerPositionEnum: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"invalid position {position}" + ) + + slugified_name = slugify(election_name) + election = await elections.crud.get_election(db_session, slugified_name) + if election is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"election with slug {slugified_name} does not exist" + ) + + if election.status(datetime.datetime.now(tz=datetime.UTC)) != ElectionStatusEnum.NOMINATIONS: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="registration can only be revoked during the nomination period" + ) + + if not await registrations.crud.get_all_registrations_of_user(db_session, computing_id, slugified_name): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"{computing_id} was not registered in election {slugified_name} for {position}" + ) + + await registrations.crud.delete_registration(db_session, computing_id, slugified_name, position) + await db_session.commit() + old_election = await registrations.crud.get_one_registration_in_election(db_session, computing_id, slugified_name, position) + return JSONResponse({"success": old_election is None}) + diff --git a/src/utils/shared_models.py b/src/utils/shared_models.py index 9f5766d..9d2032b 100644 --- a/src/utils/shared_models.py +++ b/src/utils/shared_models.py @@ -1,7 +1,7 @@ from pydantic import BaseModel -class SuccessFailModel(BaseModel): +class SuccessResponse(BaseModel): success: bool class DetailModel(BaseModel): diff --git a/src/utils/types.py b/src/utils/types.py new file mode 100644 index 0000000..71f99df --- /dev/null +++ b/src/utils/types.py @@ -0,0 +1,23 @@ + +from sqlalchemy import Dialect +from sqlalchemy.types import Text, TypeDecorator + + +class StringList(TypeDecorator): + impl = Text + cache_ok = True + + def process_bind_param(self, value, dialect: Dialect) -> str: + if value is None: + return "" + if not isinstance(value, list): + raise ValueError("Must be a list") + + return ",".join(value) + + def process_result_value(self, value, dialect: Dialect) -> list[str]: + if value is None or value == "": + return [] + return value.split(",") + + diff --git a/src/utils/urls.py b/src/utils/urls.py index 53f66dd..0313300 100644 --- a/src/utils/urls.py +++ b/src/utils/urls.py @@ -1,10 +1,17 @@ -from fastapi import HTTPException, Request +import re + +from fastapi import HTTPException, Request, status import auth import auth.crud import database +from permission.types import ElectionOfficer, WebsiteAdmin + # TODO: move other utils into this module +def slugify(text: str) -> str: + """Creates a unique slug based on text passed in. Assumes non-unicode text.""" + return re.sub(r"[\W_]+", "-", text.strip().replace("/", "").replace("&", "")) async def logged_in_or_raise( request: Request, @@ -21,17 +28,41 @@ async def logged_in_or_raise( return session_id, session_computing_id -async def is_logged_in( - request: Request, - db_session: database.DBSession -) -> tuple[bool, str | None, str | None]: - """gets the user's computing_id, or raises an exception if the current request is not logged in""" +async def get_current_user(request: Request, db_session: database.DBSession) -> tuple[str, str] | tuple[None, None]: + """ + Gets information about the currently logged in user. + + Args: + request: The request being checked + db_session: The current database session + + Returns: + A tuple of either (None, None) if there is no logged in user or a tuple (session ID, computing ID) + """ session_id = request.cookies.get("session_id", None) if session_id is None: - return False, None, None + return None, None session_computing_id = await auth.crud.get_computing_id(db_session, session_id) if session_computing_id is None: - return False, None, None + return None, None + + return session_id, session_computing_id + +async def admin_or_raise(request: Request, db_session: database.DBSession) -> tuple[str, str]: + session_id, computing_id = await get_current_user(request, db_session) + if not session_id or not computing_id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="must be logged in" + ) + + # where valid means election officer or website admin + if (await ElectionOfficer.has_permission(db_session, computing_id)) or (await WebsiteAdmin.has_permission(db_session, computing_id)): + return session_id, computing_id + else: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="must be an admin" + ) - return True, session_id, session_computing_id diff --git a/tests/integration/test_elections.py b/tests/integration/test_elections.py index 466c11b..b79220f 100644 --- a/tests/integration/test_elections.py +++ b/tests/integration/test_elections.py @@ -1,31 +1,23 @@ import asyncio -import json -from datetime import date, datetime, timedelta +import datetime +from datetime import timedelta import pytest from httpx import ASGITransport, AsyncClient -import load_test_db -from auth.crud import create_user_session, get_computing_id, update_site_user -from database import SQLALCHEMY_TEST_DATABASE_URL, DatabaseSessionManager -from main import app +from src import load_test_db +from src.auth.crud import create_user_session +from src.database import SQLALCHEMY_TEST_DATABASE_URL, DatabaseSessionManager from src.elections.crud import ( - add_registration, - create_election, - create_nominee_info, - delete_election, - delete_registration, - # election crud get_all_elections, - get_all_registrations_in_election, - # election registration crud - get_all_registrations_of_user, get_election, - # info crud +) +from src.main import app +from src.nominees.crud import ( get_nominee_info, - update_election, - update_nominee_info, - update_registration, +) +from src.registrations.crud import ( + get_all_registrations_in_election, ) @@ -95,15 +87,13 @@ async def test_read_elections(database_setup): # API endpoint testing (without AUTH)-------------------------------------- @pytest.mark.anyio async def test_endpoints(client, database_setup): - - - response = await client.get("/elections/list") + response = await client.get("/election") assert response.status_code == 200 assert response.json() != {} - # Returns private details when the time is allowed. If user is an admin or elections officer, returns computing ids for each candidate as well. + # Returns private details when the time is allowed. If user is an admin or election officer, returns computing ids for each candidate as well. election_name = "test election 2" - response = await client.get(f"/elections/{election_name}") + response = await client.get(f"/election/{election_name}") assert response.status_code == 200 assert response.json() != {} # if candidates filled, enure unauthorized values remain hidden @@ -111,51 +101,53 @@ async def test_endpoints(client, database_setup): for cand in response.json()["candidates"]: assert "computing_id" not in cand + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # Only authorized users can access registrations get - response = await client.get(f"/elections/registration/{election_name}") + response = await client.get(f"/registration/{election_name}") assert response.status_code == 401 - response = await client.get("/elections/nominee/info") + response = await client.get("/nominee/pkn4") assert response.status_code == 401 - - - response = await client.post(f"/elections/{election_name}", params={ - "election_type": "general_election", + response = await client.post("/election", json={ + "name": election_name, + "type": "general_election", "datetime_start_nominations": "2025-08-18T09:00:00Z", "datetime_start_voting": "2025-09-03T09:00:00Z", "datetime_end_voting": "2025-09-18T23:59:59Z", - "available_positions": "president", + "available_positions": ["president"], "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" - }) assert response.status_code == 401 # unauthorized access to create an election - response = await client.post(f"/elections/registration/{election_name}", params={ + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed + response = await client.post("/registration/{test-election-1}", json={ + "computing_id": "1234567", "position": "president", - }) assert response.status_code == 401 # unauthorized access to register candidates - response = await client.patch(f"/elections/{election_name}", params={ - "name": "test election 4", - "election_type": "general_election", + response = await client.patch(f"/election/{election_name}", json={ + "type": "general_election", "datetime_start_nominations": "2025-08-18T09:00:00Z", "datetime_start_voting": "2025-09-03T09:00:00Z", "datetime_end_voting": "2025-09-18T23:59:59Z", - "available_positions": "president,treasurer", + "available_positions": ["president", "treasurer"], "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" }) assert response.status_code == 401 - response = await client.patch(f"/elections/registration/{election_name}/pkn4", params={ + # TODO: Move these tests to a registrations test function + response = await client.patch(f"/registration/{election_name}/vice-president/{load_test_db.SYSADMIN_COMPUTING_ID}", json={ "position": "president", "speech": "I would like to run for president because I'm the best in Valorant at SFU." }) assert response.status_code == 401 - response = await client.put("/elections/nominee/info", params={ + response = await client.patch("/nominee/jdo12", json={ "full_name": "John Doe VI", "linked_in": "linkedin.com/john-doe-vi", "instagram": "john_vi", @@ -164,10 +156,11 @@ async def test_endpoints(client, database_setup): }) assert response.status_code == 401 - response = await client.delete(f"/elections/{election_name}") + response = await client.delete(f"/election/{election_name}") assert response.status_code == 401 - response = await client.delete(f"/elections/registration/{election_name}/president") + # TODO: Move these tests to a registrations test function + response = await client.delete(f"/registration/{election_name}/vice-president/{load_test_db.SYSADMIN_COMPUTING_ID}") assert response.status_code == 401 @@ -182,13 +175,13 @@ async def test_endpoints_admin(client, database_setup): client.cookies = { "session_id": session_id } # test that more info is given if logged in & with access to it - response = await client.get("/elections/list") + response = await client.get("/election") assert response.status_code == 200 assert response.json() != {} - # Returns private details when the time is allowed. If user is an admin or elections officer, returns computing ids for each candidate as well. + # Returns private details when the time is allowed. If user is an admin or election officer, returns computing ids for each candidate as well. election_name = "test election 2" - response = await client.get(f"/elections/{election_name}") + response = await client.get(f"/election/{election_name}") assert response.status_code == 200 assert response.json() != {} # if candidates filled, enure unauthorized values remain hidden @@ -196,59 +189,58 @@ async def test_endpoints_admin(client, database_setup): for cand in response.json()["candidates"]: assert "computing_id" in cand + # TODO: Move these tests to a registrations test function # ensure that registrations can be viewed - response = await client.get(f"/elections/registration/{election_name}") + response = await client.get(f"/election/{election_name}") assert response.status_code == 200 # ensure that authorized users can create an election - response = await client.post("/elections/testElection4", params={ - "election_type": "general_election", - "datetime_start_nominations": (datetime.now() - timedelta(days=1)).isoformat(), - "datetime_start_voting": (datetime.now() + timedelta(days=7)).isoformat(), - "datetime_end_voting": (datetime.now() + timedelta(days=14)).isoformat(), - "available_positions": "president,treasurer", + response = await client.post("/election", json={ + "name": "testElection4", + "type": "general_election", + "datetime_start_nominations": (datetime.datetime.now(tz=datetime.UTC) - timedelta(days=1)).isoformat(), + "datetime_start_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=7)).isoformat(), + "datetime_end_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=14)).isoformat(), + "available_positions": ["president", "treasurer"], "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" }) assert response.status_code == 200 - # ensure that user can create elections without knowing each position type - response = await client.post("/elections/byElection4", params={ - "election_type": "by_election", - "datetime_start_nominations": (datetime.now() - timedelta(days=1)).isoformat(), - "datetime_start_voting": (datetime.now() + timedelta(days=7)).isoformat(), - "datetime_end_voting": (datetime.now() + timedelta(days=14)).isoformat(), + # ensure that user can create election without knowing each position type + response = await client.post("/election", json={ + "name": "byElection4", + "type": "by_election", + "datetime_start_nominations": (datetime.datetime.now(tz=datetime.UTC) - timedelta(days=1)).isoformat(), + "datetime_start_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=7)).isoformat(), + "datetime_end_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=14)).isoformat(), "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" }) assert response.status_code == 200 - # try creating an invalid election name - response = await client.post("/elections/list", params={ - "election_type": "by_election", - "datetime_start_nominations": (datetime.now() - timedelta(days=1)).isoformat(), - "datetime_start_voting": (datetime.now() + timedelta(days=7)).isoformat(), - "datetime_end_voting": (datetime.now() + timedelta(days=14)).isoformat(), - "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" - }) - assert response.status_code == 400 - - - - + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # try to register for a past election -> should say nomination period expired - response = await client.post("/elections/registration/test election 1", params={ + testElection1 = "test election 1" + response = await client.post(f"/registration/{testElection1}", json={ + "computing_id": load_test_db.SYSADMIN_COMPUTING_ID, "position": "president", }) assert response.status_code == 400 assert "nomination period" in response.json()["detail"] - # try to register for an invalid position - response = await client.post(f"/elections/registration/{election_name}", params={ + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed + # try to register for an invalid position will just throw a 422 + response = await client.post(f"/registration/{election_name}", json={ + "computing_id": load_test_db.SYSADMIN_COMPUTING_ID, "position": "CEO", }) - assert response.status_code == 400 - assert "invalid position" in response.json()["detail"] + assert response.status_code == 422 + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # try to register in an unknown election - response = await client.post("/elections/registration/unknownElection12345", params={ + response = await client.post("/registration/unknownElection12345", json={ + "computing_id": load_test_db.SYSADMIN_COMPUTING_ID, "position": "president", }) assert response.status_code == 404 @@ -256,67 +248,79 @@ async def test_endpoints_admin(client, database_setup): + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # register for an election correctly - response = await client.post(f"/elections/registration/{election_name}", params={ + response = await client.post(f"/registration/{election_name}", json={ + "computing_id": "jdo12", "position": "president", }) assert response.status_code == 200 + + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # ensure that the above registration exists and is valid - response = await client.get(f"/elections/registration/{election_name}") + response = await client.get(f"/registration/{election_name}") assert response.status_code == 200 + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # duplicate registration - response = await client.post(f"/elections/registration/{election_name}", params={ + response = await client.post(f"/registration/{election_name}", json={ + "computing_id": "jdo12", "position": "president", }) assert response.status_code == 400 assert "registered" in response.json()["detail"] - - # update the above election - response = await client.patch("/elections/testElection4", params={ + response = await client.patch("/election/testElection4", json={ "election_type": "general_election", - "datetime_start_nominations": (datetime.now() - timedelta(days=1)).isoformat(), - "datetime_start_voting": (datetime.now() + timedelta(days=7)).isoformat(), - "datetime_end_voting": (datetime.now() + timedelta(days=14)).isoformat(), - "available_positions": "president,vice-president,treasurer", # update this + "datetime_start_nominations": (datetime.datetime.now(tz=datetime.UTC) - timedelta(days=1)).isoformat(), + "datetime_start_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=7)).isoformat(), + "datetime_end_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=14)).isoformat(), + "available_positions": ["president", "vice-president", "treasurer"], # update this "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" }) assert response.status_code == 200 + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # update the registration - response = await client.patch(f"/elections/registration/{election_name}/pkn4", params={ - "position": "president", + await client.patch(f"/registration/{election_name}/vice-president/pkn4", json={ "speech": "Vote for me as treasurer" }) assert response.status_code == 200 + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # try updating a non-registered election - response = await client.patch("/elections/registration/testElection4/pkn4", params={ + response = await client.patch("/registration/testElection4/pkn4", json={ "position": "president", "speech": "Vote for me as president, I am good at valorant." }) assert response.status_code == 404 # delete an election - response = await client.delete("/elections/testElection4") + response = await client.delete("/election/testElection4") assert response.status_code == 200 + # TODO: Move these tests to a registrations test function + # ensure that registrations can be viewed # delete a registration - response = await client.delete(f"/elections/registration/{election_name}/president") + response = await client.delete(f"/registration/{election_name}/president/jdo12") assert response.status_code == 200 # get nominee info - response = await client.get("/elections/nominee/info") + response = await client.get(f"/nominee/{load_test_db.SYSADMIN_COMPUTING_ID}") assert response.status_code == 200 # update nominee info - response = await client.put("/elections/nominee/info", params={ + response = await client.patch(f"/nominee/{load_test_db.SYSADMIN_COMPUTING_ID}", json={ "full_name": "Puneet N", "linked_in": "linkedin.com/not-my-linkedin", }) assert response.status_code == 200 - response = await client.get("/elections/nominee/info") + response = await client.get(f"/nominee/{load_test_db.SYSADMIN_COMPUTING_ID}") assert response.status_code == 200