diff --git a/.github/workflows/pytest_unit.yml b/.github/workflows/pytest_unit.yml index b48479d..6f75618 100644 --- a/.github/workflows/pytest_unit.yml +++ b/.github/workflows/pytest_unit.yml @@ -29,6 +29,4 @@ jobs: pip install -r requirements.txt - name: Run unit tests - run: | - source ./venv/bin/activate - pytest ./tests/unit -v + run: PYTHONPATH=src ./venv/bin/python -m pytest ./tests/unit -v diff --git a/pyproject.toml b/pyproject.toml index 79e80cf..ff7761a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,12 +7,15 @@ requires-python = ">= 3.11" # older versions untested, but we use new features o Homepage = "https://api.sfucsss.org/" [tool.pytest.ini_options] -pythonpath = "./src/" +pythonpath = ["src"] log_cli = true log_cli_level = "INFO" testpaths = [ "tests", -] + ] +norecursedirs = "tests/wip" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" [tool.ruff] line-length = 120 diff --git a/src/alembic/env.py b/src/alembic/env.py index b3338bf..c7ac8ab 100644 --- a/src/alembic/env.py +++ b/src/alembic/env.py @@ -9,7 +9,9 @@ import blog.tables import database import elections.tables +import nominees.tables import officers.tables +import registrations.tables from alembic import context # this is the Alembic Config object, which provides diff --git a/src/alembic/versions/0a2c458d1ddd_site_user_timestamps_nullable.py b/src/alembic/versions/0a2c458d1ddd_site_user_timestamps_nullable.py new file mode 100644 index 0000000..6121feb --- /dev/null +++ b/src/alembic/versions/0a2c458d1ddd_site_user_timestamps_nullable.py @@ -0,0 +1,40 @@ +"""site_user_timestamps_nullable + +Revision ID: 0a2c458d1ddd +Revises: a5c42bcdda5c +Create Date: 2025-09-28 20:52:02.486734 + +""" +from collections.abc import Sequence + +from sqlalchemy.dialects import postgresql + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "0a2c458d1ddd" +down_revision: str | None = "a5c42bcdda5c" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column("site_user", "first_logged_in", + existing_type=postgresql.TIMESTAMP(), + nullable=True) + op.alter_column("site_user", "last_logged_in", + existing_type=postgresql.TIMESTAMP(), + nullable=True) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column("site_user", "last_logged_in", + existing_type=postgresql.TIMESTAMP(), + nullable=False) + op.alter_column("site_user", "first_logged_in", + existing_type=postgresql.TIMESTAMP(), + nullable=False) + # ### end Alembic commands ### diff --git a/src/alembic/versions/0c717bd88d06_elections_timestamp_datetime.py b/src/alembic/versions/0c717bd88d06_elections_timestamp_datetime.py new file mode 100644 index 0000000..e8e1b2f --- /dev/null +++ b/src/alembic/versions/0c717bd88d06_elections_timestamp_datetime.py @@ -0,0 +1,53 @@ +"""elections_timestamp_datetime + +Revision ID: 0c717bd88d06 +Revises: 0a2c458d1ddd +Create Date: 2025-09-28 22:25:28.864945 + +""" +from collections.abc import Sequence +from typing import Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "0c717bd88d06" +down_revision: str | None = "0a2c458d1ddd" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column("election", "datetime_start_nominations", + existing_type=sa.DATE(), + type_=sa.DateTime(), + existing_nullable=False) + op.alter_column("election", "datetime_start_voting", + existing_type=sa.DATE(), + type_=sa.DateTime(), + existing_nullable=False) + op.alter_column("election", "datetime_end_voting", + existing_type=sa.DATE(), + type_=sa.DateTime(), + existing_nullable=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column("election", "datetime_end_voting", + existing_type=sa.DateTime(), + type_=sa.DATE(), + existing_nullable=False) + op.alter_column("election", "datetime_start_voting", + existing_type=sa.DateTime(), + type_=sa.DATE(), + existing_nullable=False) + op.alter_column("election", "datetime_start_nominations", + existing_type=sa.DateTime(), + type_=sa.DATE(), + existing_nullable=False) + # ### end Alembic commands ### diff --git a/src/alembic/versions/876041e5b41c_elections_officers_datetime_to_date.py b/src/alembic/versions/876041e5b41c_elections_officers_datetime_to_date.py new file mode 100644 index 0000000..5bad3af --- /dev/null +++ b/src/alembic/versions/876041e5b41c_elections_officers_datetime_to_date.py @@ -0,0 +1,54 @@ +"""elections-officers-datetime-to-date + +Revision ID: 876041e5b41c +Revises: 243190df5588 +Create Date: 2025-09-28 18:01:03.913302 + +""" +from collections.abc import Sequence +from typing import Union + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "876041e5b41c" +down_revision: str | None = "243190df5588" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column("election", "datetime_start_nominations", + existing_type=postgresql.TIMESTAMP(), + type_=sa.Date(), + existing_nullable=False) + op.alter_column("election", "datetime_start_voting", + existing_type=postgresql.TIMESTAMP(), + type_=sa.Date(), + existing_nullable=False) + op.alter_column("election", "datetime_end_voting", + existing_type=postgresql.TIMESTAMP(), + type_=sa.Date(), + existing_nullable=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column("election", "datetime_end_voting", + existing_type=sa.Date(), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + op.alter_column("election", "datetime_start_voting", + existing_type=sa.Date(), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + op.alter_column("election", "datetime_start_nominations", + existing_type=sa.Date(), + type_=postgresql.TIMESTAMP(), + existing_nullable=False) + # ### end Alembic commands ### diff --git a/src/alembic/versions/a5c42bcdda5c_update_officer_term_constraint_71.py b/src/alembic/versions/a5c42bcdda5c_update_officer_term_constraint_71.py new file mode 100644 index 0000000..b15244e --- /dev/null +++ b/src/alembic/versions/a5c42bcdda5c_update_officer_term_constraint_71.py @@ -0,0 +1,38 @@ +"""update_officer_term_constraint_71 + +Revision ID: a5c42bcdda5c +Revises: 876041e5b41c +Create Date: 2025-09-28 18:03:54.856781 + +""" +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "a5c42bcdda5c" +down_revision: str | None = "876041e5b41c" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column("election", "type", + existing_type=sa.VARCHAR(length=64), + type_=sa.String(length=32), + nullable=False) + op.create_unique_constraint(op.f("uq_officer_term_computing_id"), "officer_term", ["computing_id", "position", "start_date"]) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint(op.f("uq_officer_term_computing_id"), "officer_term", type_="unique") + op.alter_column("election", "type", + existing_type=sa.String(length=32), + type_=sa.VARCHAR(length=64), + nullable=True) + # ### end Alembic commands ### diff --git a/src/auth/tables.py b/src/auth/tables.py index b6ffe07..b2471cb 100644 --- a/src/auth/tables.py +++ b/src/auth/tables.py @@ -37,16 +37,21 @@ class SiteUser(Base): ) # first and last time logged into the CSSS API - first_logged_in: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now()) - last_logged_in: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.now()) + first_logged_in: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + last_logged_in: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) # optional user information for display purposes profile_picture_url: Mapped[str | None] = mapped_column(Text, nullable=True) def serialize(self) -> dict[str, str | int | bool | None]: - return { + + res = { "computing_id": self.computing_id, - "first_logged_in": self.first_logged_in.isoformat(), - "last_logged_in": self.last_logged_in.isoformat(), "profile_picture_url": self.profile_picture_url } + if self.first_logged_in is not None: + res["first_logged_in"] = self.first_logged_in.isoformat() + if self.last_logged_in is not None: + res["last_logged_in"] = self.last_logged_in.isoformat() + + return res diff --git a/src/cron/daily.py b/src/cron/daily.py index 00615d1..37e949b 100644 --- a/src/cron/daily.py +++ b/src/cron/daily.py @@ -6,7 +6,7 @@ import github import google_api import utils -from database import _db_session +from database import get_db_session from officers.crud import all_officers, get_user_by_username _logger = logging.getLogger(__name__) @@ -55,7 +55,7 @@ async def update_github_permissions(db_session): _logger.info("updated github permissions") async def update_permissions(): - db_session = _db_session() + db_session = get_db_session() update_google_permissions(db_session) db_session.commit() diff --git a/src/database.py b/src/database.py index 2e872bd..470c7ca 100644 --- a/src/database.py +++ b/src/database.py @@ -102,7 +102,7 @@ def setup_database(): # TODO: where is sys.stdout piped to? I want all these to go to a specific logs folder sessionmanager = DatabaseSessionManager( SQLALCHEMY_TEST_DATABASE_URL if os.environ.get("LOCAL") else SQLALCHEMY_DATABASE_URL, - { "echo": True }, + { "echo": False }, ) @contextlib.asynccontextmanager @@ -116,10 +116,9 @@ async def lifespan(app: FastAPI): await sessionmanager.close() -async def _db_session(): +async def get_db_session(): async with sessionmanager.session() as session: yield session -# TODO: what does this do again? -DBSession = Annotated[AsyncSession, Depends(_db_session)] +DBSession = Annotated[AsyncSession, Depends(get_db_session)] diff --git a/src/elections/tables.py b/src/elections/tables.py index 1d9709c..270631a 100644 --- a/src/elections/tables.py +++ b/src/elections/tables.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import date, datetime from sqlalchemy import ( DateTime, @@ -24,10 +24,10 @@ class Election(Base): # Slugs are unique identifiers slug: Mapped[str] = mapped_column(String(MAX_ELECTION_SLUG), primary_key=True) name: Mapped[str] = mapped_column(String(MAX_ELECTION_NAME), nullable=False) - type: Mapped[ElectionTypeEnum] = mapped_column(String(64), default=ElectionTypeEnum.GENERAL) - datetime_start_nominations: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - datetime_start_voting: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - datetime_end_voting: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + type: Mapped[ElectionTypeEnum] = mapped_column(String(32), default=ElectionTypeEnum.GENERAL) + datetime_start_nominations: Mapped[datetime] = mapped_column(DateTime, nullable=False) + datetime_start_voting: Mapped[datetime] = mapped_column(DateTime, nullable=False) + datetime_end_voting: Mapped[datetime] = mapped_column(DateTime, nullable=False) # a comma-separated string of positions which must be elements of OfficerPosition # By giving it the type `StringList`, the database entry will automatically be marshalled to the correct form diff --git a/src/elections/urls.py b/src/elections/urls.py index caae581..3dbbf39 100644 --- a/src/elections/urls.py +++ b/src/elections/urls.py @@ -25,7 +25,7 @@ tags=["election"], ) -async def get_user_permissions( +async def get_election_permissions( request: Request, db_session: database.DBSession, ) -> tuple[bool, str | None, str | None]: @@ -88,7 +88,7 @@ def _raise_if_bad_election_data( description="Returns a list of all election & their status", response_model=list[ElectionResponse], responses={ - 404: { "description": "No election found" } + 404: { "description": "No election found", "model": DetailModel } }, operation_id="get_all_elections" ) @@ -96,7 +96,7 @@ async def list_elections( request: Request, db_session: database.DBSession, ): - is_admin, _, _ = await get_user_permissions(request, db_session) + is_admin, _, _ = await get_election_permissions(request, db_session) election_list = await elections.crud.get_all_elections(db_session) if election_list is None or len(election_list) == 0: raise HTTPException( @@ -104,7 +104,7 @@ async def list_elections( detail="no election found" ) - current_time = datetime.datetime.now(tz=datetime.UTC) + current_time = datetime.datetime.now() if is_admin: election_metadata_list = [ election.private_details(current_time) @@ -136,7 +136,7 @@ async def get_election( db_session: database.DBSession, election_name: str ): - current_time = datetime.datetime.now(tz=datetime.UTC) + current_time = datetime.datetime.now() slugified_name = slugify(election_name) election = await elections.crud.get_election(db_session, slugified_name) if election is None: @@ -145,7 +145,7 @@ async def get_election( detail=f"election with slug {slugified_name} does not exist" ) - is_valid_user, _, _ = await get_user_permissions(request, db_session) + is_valid_user, _, _ = await get_election_permissions(request, db_session) if current_time >= election.datetime_start_voting or is_valid_user: election_json = election.private_details(current_time) @@ -218,7 +218,7 @@ async def create_election( available_positions = body.available_positions slugified_name = slugify(body.name) - current_time = datetime.datetime.now(tz=datetime.UTC) + current_time = datetime.datetime.now() start_nominations = datetime.datetime.fromisoformat(body.datetime_start_nominations) start_voting = datetime.datetime.fromisoformat(body.datetime_start_voting) end_voting = datetime.datetime.fromisoformat(body.datetime_end_voting) @@ -233,7 +233,7 @@ async def create_election( available_positions ) - is_valid_user, _, _ = await get_user_permissions(request, db_session) + is_valid_user, _, _ = await get_election_permissions(request, db_session) if not is_valid_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -293,7 +293,7 @@ async def update_election( db_session: database.DBSession, election_name: str, ): - is_valid_user, _, _ = await get_user_permissions(request, db_session) + is_valid_user, _, _ = await get_election_permissions(request, db_session) if not is_valid_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -332,7 +332,7 @@ async def update_election( election = await elections.crud.get_election(db_session, slugified_name) if election is None: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="couldn't find updated election") - return JSONResponse(election.private_details(datetime.datetime.now(tz=datetime.UTC))) + return JSONResponse(election.private_details(datetime.datetime.now())) @router.delete( "/{election_name:str}", @@ -349,7 +349,7 @@ async def delete_election( election_name: str ): slugified_name = slugify(election_name) - is_valid_user, _, _ = await get_user_permissions(request, db_session) + is_valid_user, _, _ = await get_election_permissions(request, db_session) if not is_valid_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, diff --git a/src/load_test_db.py b/src/load_test_db.py index 9d470a3..e59f3a1 100644 --- a/src/load_test_db.py +++ b/src/load_test_db.py @@ -298,6 +298,58 @@ async def load_sysadmin(db_session: AsyncSession): )) await db_session.commit() +WEBMASTER_COMPUTING_ID = "jbriones" +async def load_webmaster(db_session: AsyncSession): + # put your computing id here for testing purposes + print(f"loading new webmaster '{WEBMASTER_COMPUTING_ID}'") + await create_user_session(db_session, f"temp_id_{WEBMASTER_COMPUTING_ID}", WEBMASTER_COMPUTING_ID) + await create_new_officer_info(db_session, OfficerInfo( + legal_name="Jon Andre Briones", + discord_id=None, + discord_name=None, + discord_nickname=None, + + computing_id=WEBMASTER_COMPUTING_ID, + phone_number=None, + github_username=None, + google_drive_email=None, + )) + await create_new_officer_term(db_session, OfficerTerm( + computing_id=WEBMASTER_COMPUTING_ID, + + position=OfficerPositionEnum.FIRST_YEAR_REPRESENTATIVE, + start_date=date.today() - timedelta(days=(365*3)), + end_date=date.today() - timedelta(days=(365*2)), + + nickname="Jon Andre Briones", + favourite_course_0="CMPT 379", + favourite_course_1="CMPT 371", + + favourite_pl_0="TypeScript", + favourite_pl_1="C#", + + biography="o hey fellow kids \n\n\n I can newline", + photo_url=None, + )) + await create_new_officer_term(db_session, OfficerTerm( + computing_id=WEBMASTER_COMPUTING_ID, + + position=OfficerPositionEnum.WEBMASTER, + start_date=date.today() - timedelta(days=365), + end_date=None, + + nickname="G2", + favourite_course_0="CMPT 379", + favourite_course_1="CMPT 295", + + favourite_pl_0="Rust", + favourite_pl_1="C", + + biography="The systems are good o7", + photo_url=None, + )) + await db_session.commit() + async def load_test_elections_data(db_session: AsyncSession): print("loading election data...") await create_election(db_session, Election( @@ -385,6 +437,7 @@ async def async_main(sessionmanager): await load_test_auth_data(db_session) await load_test_officers_data(db_session) await load_sysadmin(db_session) + await load_webmaster(db_session) await load_test_elections_data(db_session) await load_test_election_nominee_application_data(db_session) diff --git a/src/nominees/tables.py b/src/nominees/tables.py index f2203bf..23dff17 100644 --- a/src/nominees/tables.py +++ b/src/nominees/tables.py @@ -14,11 +14,11 @@ class NomineeInfo(Base): __tablename__ = "election_nominee_info" computing_id: Mapped[str] = mapped_column(String(COMPUTING_ID_LEN), primary_key=True) - full_name: Mapped[str] = mapped_column(String(64), nullable=False) - linked_in: Mapped[str] = mapped_column(String(128)) - instagram: Mapped[str] = mapped_column(String(128)) - email: Mapped[str] = mapped_column(String(64)) - discord_username: Mapped[str] = mapped_column(String(DISCORD_NICKNAME_LEN)) + full_name: Mapped[str] = mapped_column(String(64)) + linked_in: Mapped[str] = mapped_column(String(128), nullable=True) + instagram: Mapped[str] = mapped_column(String(128), nullable=True) + email: Mapped[str] = mapped_column(String(64), nullable=True) + discord_username: Mapped[str] = mapped_column(String(DISCORD_NICKNAME_LEN), nullable=True) def to_update_dict(self) -> dict: return { diff --git a/src/nominees/urls.py b/src/nominees/urls.py index 15d8136..f1b6d9d 100644 --- a/src/nominees/urls.py +++ b/src/nominees/urls.py @@ -8,7 +8,7 @@ NomineeInfoUpdateParams, ) from nominees.tables import NomineeInfo -from utils.urls import admin_or_raise +from utils.urls import AdminTypeEnum, admin_or_raise router = APIRouter( prefix="/nominee", @@ -30,7 +30,7 @@ async def get_nominee_info( computing_id: str ): # Putting this one behind the admin wall since it has contact information - await admin_or_raise(request, db_session) + await admin_or_raise(request, db_session, AdminTypeEnum.Election) nominee_info = await nominees.crud.get_nominee_info(db_session, computing_id) if nominee_info is None: raise HTTPException( @@ -56,7 +56,7 @@ async def provide_nominee_info( computing_id: str ): # TODO: There needs to be a lot more validation here. - await admin_or_raise(request, db_session) + await admin_or_raise(request, db_session, AdminTypeEnum.Election) updated_data = {} # Only update fields that were provided diff --git a/src/officers/constants.py b/src/officers/constants.py index 9784829..59aaf9c 100644 --- a/src/officers/constants.py +++ b/src/officers/constants.py @@ -1,5 +1,8 @@ from enum import StrEnum +# OFFICER FIELD CONSTRAINTS +OFFICER_POSITION_MAX = 128 +OFFICER_LEGAL_NAME_MAX = 128 class OfficerPositionEnum(StrEnum): PRESIDENT = "president" diff --git a/src/officers/crud.py b/src/officers/crud.py index a2e1b6f..d9fd332 100644 --- a/src/officers/crud.py +++ b/src/officers/crud.py @@ -1,7 +1,9 @@ -from datetime import datetime +from collections.abc import Sequence +from datetime import date, datetime import sqlalchemy from fastapi import HTTPException +from sqlalchemy.ext.asyncio import AsyncSession import auth.crud import auth.tables @@ -9,84 +11,88 @@ import utils from data import semesters from officers.constants import OfficerPosition +from officers.models import OfficerInfoResponse, OfficerTermCreate from officers.tables import OfficerInfo, OfficerTerm -from officers.types import ( - OfficerData, -) # NOTE: this module should not do any data validation; that should be done in the urls.py or higher layer async def current_officers( db_session: database.DBSession, - include_private: bool -) -> dict[str, list[OfficerData]]: +) -> list[OfficerInfoResponse]: """ Get info about officers that are active. Go through all active & complete officer terms. Returns a mapping between officer position and officer terms """ - query = ( - sqlalchemy - .select(OfficerTerm) - .order_by(OfficerTerm.start_date.desc()) - ) - query = utils.is_active_officer(query) + curr_time = date.today() + query = (sqlalchemy.select(OfficerTerm, OfficerInfo) + .join(OfficerInfo, OfficerTerm.computing_id == OfficerInfo.computing_id) + .where((OfficerTerm.start_date <= curr_time) & (OfficerTerm.end_date >= curr_time)) + .order_by(OfficerTerm.start_date.desc()) + ) - officer_terms = (await db_session.scalars(query)).all() - officer_data = {} - for term in officer_terms: - officer_info_query = ( - sqlalchemy - .select(OfficerInfo) - .where(OfficerInfo.computing_id == term.computing_id) - ) - officer_info = (await db_session.scalars(officer_info_query)).first() - if officer_info is None: - # TODO (#93): make sure there are daily checks that this data actually exists - continue - elif term.position not in officer_data: - officer_data[term.position] = [] + result: Sequence[sqlalchemy.Row[tuple[OfficerTerm, OfficerInfo]]] = (await db_session.execute(query)).all() + officer_list = [] + for term, officer in result: + officer_list.append(OfficerInfoResponse( + legal_name = officer.legal_name, + is_active = True, + position = term.position, + start_date = term.start_date, + end_date = term.end_date, + biography = term.biography, + csss_email = OfficerPosition.to_email(term.position), - officer_data[term.position] += [ - OfficerData.from_data(term, officer_info, include_private, is_active=True) - ] + discord_id = officer.discord_id, + discord_name = officer.discord_name, + discord_nickname = officer.discord_nickname, + computing_id = officer.computing_id, + phone_number = officer.phone_number, + github_username = officer.github_username, + google_drive_email = officer.google_drive_email, + photo_url = term.photo_url + )) - return officer_data + return officer_list async def all_officers( - db_session: database.DBSession, - include_private_data: bool, + db_session: AsyncSession, include_future_terms: bool -) -> list[OfficerData]: +) -> list[OfficerInfoResponse]: """ This could be a lot of data, so be careful """ # NOTE: paginate data if needed - query = ( - sqlalchemy - .select(OfficerTerm) - # Ordered recent first - .order_by(OfficerTerm.start_date.desc()) - ) + query = (sqlalchemy.select(OfficerTerm, OfficerInfo) + .join(OfficerInfo, OfficerTerm.computing_id == OfficerInfo.computing_id) + .order_by(OfficerTerm.start_date.desc()) + ) + if not include_future_terms: query = utils.has_started_term(query) + result: Sequence[sqlalchemy.Row[tuple[OfficerTerm, OfficerInfo]]] = (await db_session.execute(query)).all() + officer_list = [] + for term, officer in result: + officer_list.append(OfficerInfoResponse( + legal_name = officer.legal_name, + is_active = utils.is_active_term(term), + position = term.position, + start_date = term.start_date, + end_date = term.end_date, + biography = term.biography, + csss_email = OfficerPosition.to_email(term.position), - officer_data_list = [] - officer_terms = (await db_session.scalars(query)).all() - for term in officer_terms: - officer_info = await db_session.scalar( - sqlalchemy - .select(OfficerInfo) - .where(OfficerInfo.computing_id == term.computing_id) - ) - officer_data_list += [OfficerData.from_data( - term, - officer_info, - include_private_data, - utils.is_active_term(term) - )] + discord_id = officer.discord_id, + discord_name = officer.discord_name, + discord_nickname = officer.discord_nickname, + computing_id = officer.computing_id, + phone_number = officer.phone_number, + github_username = officer.github_username, + google_drive_email = officer.google_drive_email, + photo_url = term.photo_url + )) - return officer_data_list + return officer_list async def get_officer_info_or_raise(db_session: database.DBSession, computing_id: str) -> OfficerInfo: officer_term = await db_session.scalar( @@ -98,6 +104,19 @@ async def get_officer_info_or_raise(db_session: database.DBSession, computing_id raise HTTPException(status_code=404, detail=f"officer_info for computing_id={computing_id} does not exist yet") return officer_term +async def get_new_officer_info_or_raise(db_session: database.DBSession, computing_id: str) -> OfficerInfo: + """ + This check is for after a create/update + """ + officer_term = await db_session.scalar( + sqlalchemy + .select(OfficerInfo) + .where(OfficerInfo.computing_id == computing_id) + ) + if officer_term is None: + raise HTTPException(status_code=500, detail=f"failed to fetch {computing_id} after update") + return officer_term + async def get_officer_terms( db_session: database.DBSession, computing_id: str, @@ -142,14 +161,17 @@ async def current_officer_positions(db_session: database.DBSession, computing_id officer_term_list = await get_active_officer_terms(db_session, computing_id) return [term.position for term in officer_term_list] -async def get_officer_term_by_id(db_session: database.DBSession, term_id: int) -> OfficerTerm: +async def get_officer_term_by_id_or_raise(db_session: database.DBSession, term_id: int, is_new: bool = False) -> OfficerTerm: officer_term = await db_session.scalar( sqlalchemy .select(OfficerTerm) .where(OfficerTerm.id == term_id) ) if officer_term is None: - raise HTTPException(status_code=400, detail=f"Could not find officer_term with id={term_id}") + if is_new: + raise HTTPException(status_code=500, detail=f"could not find new officer_term with id={term_id}") + else: + raise HTTPException(status_code=404, detail=f"could not find officer_term with id={term_id}") return officer_term async def create_new_officer_info( @@ -161,8 +183,8 @@ async def create_new_officer_info( # if computing_id has not been created as a site_user yet, add them db_session.add(auth.tables.SiteUser( computing_id=new_officer_info.computing_id, - first_logged_in=datetime.now(), - last_logged_in=datetime.now() + first_logged_in=None, + last_logged_in=None )) existing_officer_info = await db_session.scalar( diff --git a/src/officers/models.py b/src/officers/models.py new file mode 100644 index 0000000..67c1b2c --- /dev/null +++ b/src/officers/models.py @@ -0,0 +1,100 @@ +from datetime import date + +from pydantic import BaseModel, ConfigDict, Field + +from officers.constants import OFFICER_LEGAL_NAME_MAX, OfficerPositionEnum + +OFFICER_PRIVATE_INFO = { + "discord_id", + "discord_name", + "discord_nickname", + "computing_id", + "phone_number", + "github_username", + "google_drive_email", + "photo_url" +} + +class OfficerInfoBaseModel(BaseModel): + # TODO (#71): compute this using SFU's API & remove from being uploaded + legal_name: str = Field(..., max_length=OFFICER_LEGAL_NAME_MAX) + position: OfficerPositionEnum + start_date: date + end_date: date | None = None + +class OfficerInfoResponse(OfficerInfoBaseModel): + """ + Response when fetching public officer data + """ + is_active: bool + nickname: str | None = None + biography: str | None = None + csss_email: str | None = None + + # Private data + discord_id: str | None = None + discord_name: str | None = None + discord_nickname: str | None = None + computing_id: str | None = None + phone_number: str | None = None + github_username: str | None = None + google_drive_email: str | None = None + photo_url: str | None = None + +class OfficerSelfUpdate(BaseModel): + """ + Used when an Officer is updating their own information + """ + nickname: str | None = None + discord_id: str | None = None + discord_name: str | None = None + discord_nickname: str | None = None + biography: str | None = None + phone_number: str | None = None + github_username: str | None = None + google_drive_email: str | None = None + +class OfficerUpdate(OfficerSelfUpdate): + """ + Used when an admin is updating an Officer's info + """ + legal_name: str | None = Field(None, max_length=OFFICER_LEGAL_NAME_MAX) + position: OfficerPositionEnum | None = None + start_date: date | None = None + end_date: date | None = None + +class OfficerTermBaseModel(BaseModel): + computing_id: str + position: OfficerPositionEnum + start_date: date + +class OfficerTermCreate(OfficerTermBaseModel): + """ + Params to create a new Officer Term + """ + legal_name: str + +class OfficerTermResponse(OfficerTermCreate): + id: int + end_date: date | None = None + favourite_course_0: str | None = None + favourite_course_1: str | None = None + favourite_pl_0: str | None = None + favourite_pl_1: str | None = None + biography: str | None = None + photo_url: str | None = None + + +class OfficerTermUpdate(BaseModel): + nickname: str | None = None + favourite_course_0: str | None = None + favourite_course_1: str | None = None + favourite_pl_0: str | None = None + favourite_pl_1: str | None = None + biography: str | None = None + + # Admin only + position: OfficerPositionEnum | None = None + start_date: date | None = None + end_date: date | None = None + photo_url: str | None = None # Block this, just in case diff --git a/src/officers/tables.py b/src/officers/tables.py index 23575a0..15567eb 100644 --- a/src/officers/tables.py +++ b/src/officers/tables.py @@ -1,13 +1,14 @@ from __future__ import annotations -from datetime import datetime +from datetime import date, datetime from sqlalchemy import ( - DateTime, + Date, ForeignKey, Integer, String, Text, + UniqueConstraint, ) from sqlalchemy.orm import Mapped, mapped_column @@ -19,7 +20,8 @@ GITHUB_USERNAME_LEN, ) from database import Base -from officers.constants import OfficerPositionEnum +from officers.constants import OFFICER_LEGAL_NAME_MAX, OFFICER_POSITION_MAX, OfficerPositionEnum +from officers.models import OfficerSelfUpdate, OfficerTermUpdate, OfficerUpdate # A row represents an assignment of a person to a position. @@ -27,7 +29,6 @@ class OfficerTerm(Base): __tablename__ = "officer_term" - # TODO (#98): create a unique constraint for (computing_id, position, start_date). id: Mapped[str] = mapped_column(Integer, primary_key=True, autoincrement=True) computing_id: Mapped[str] = mapped_column( @@ -36,10 +37,10 @@ class OfficerTerm(Base): nullable=False, ) - position: Mapped[OfficerPositionEnum] = mapped_column(String(128), nullable=False) - start_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + position: Mapped[OfficerPositionEnum] = mapped_column(String(OFFICER_POSITION_MAX), nullable=False) + start_date: Mapped[date] = mapped_column(Date, nullable=False) # end_date is only not-specified for positions that don't have a length (ie. webmaster) - end_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=True) + end_date: Mapped[date] = mapped_column(Date, nullable=True) nickname: Mapped[str] = mapped_column(String(128), nullable=True) favourite_course_0: Mapped[str] = mapped_column(String(64), nullable=True) @@ -50,6 +51,8 @@ class OfficerTerm(Base): biography: Mapped[str] = mapped_column(Text, nullable=True) photo_url: Mapped[str] = mapped_column(Text, nullable=True) # some urls get big, best to let it be a string + __table_args__ = (UniqueConstraint("computing_id", "position", "start_date"),) # This needs a comma to work + def serializable_dict(self) -> dict: return { "id": self.id, @@ -68,6 +71,15 @@ def serializable_dict(self) -> dict: "photo_url": self.photo_url, } + def update_from_params(self, params: OfficerTermUpdate, admin_update: bool = True): + if admin_update: + update_data = params.model_dump(exclude_unset=True) + else: + update_data = params.model_dump(exclude_unset=True, exclude={"position", "start_date", "end_date", "photo_url"}) + for k, v in update_data.items(): + setattr(self, k, v) + + def is_filled_in(self): return ( # photo & end_date don't have to be uploaded for the term to be "filled" @@ -111,7 +123,7 @@ class OfficerInfo(Base): ) # TODO (#71): we'll need to use SFU's API to get the legal name for users - legal_name: Mapped[str] = mapped_column(String(128), nullable=False) # some people have long names, you never know + legal_name: Mapped[str] = mapped_column(String(OFFICER_LEGAL_NAME_MAX), nullable=False) # some people have long names, you never know phone_number: Mapped[str] = mapped_column(String(24), nullable=True) # TODO (#99): add unique constraints to discord_id (stops users from stealing the username of someone else) @@ -147,6 +159,11 @@ def serializable_dict(self) -> dict: "google_drive_email": self.google_drive_email, } + def update_from_params(self, params: OfficerUpdate | OfficerSelfUpdate): + update_data = params.model_dump(exclude_unset=True) + for k, v in update_data.items(): + setattr(self, k, v) + def is_filled_in(self): return ( self.computing_id is not None diff --git a/src/officers/urls.py b/src/officers/urls.py index 8ce70f7..b74e839 100755 --- a/src/officers/urls.py +++ b/src/officers/urls.py @@ -1,18 +1,24 @@ -import logging - -from fastapi import APIRouter, Body, HTTPException, Request -from fastapi.responses import JSONResponse, PlainTextResponse +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import JSONResponse import auth.crud import database import officers.crud import utils +from officers.constants import OfficerPositionEnum +from officers.models import ( + OFFICER_PRIVATE_INFO, + OfficerInfoResponse, + OfficerSelfUpdate, + OfficerTermCreate, + OfficerTermResponse, + OfficerTermUpdate, + OfficerUpdate, +) from officers.tables import OfficerInfo, OfficerTerm -from officers.types import InitialOfficerInfo, OfficerInfoUpload, OfficerTermUpload from permission.types import OfficerPrivateInfo, WebsiteAdmin -from utils.urls import logged_in_or_raise - -_logger = logging.getLogger(__name__) +from utils.shared_models import DetailModel, SuccessResponse +from utils.urls import admin_or_raise, is_website_admin, logged_in_or_raise router = APIRouter( prefix="/officers", @@ -25,44 +31,51 @@ async def _has_officer_private_info_access( request: Request, db_session: database.DBSession -) -> tuple[None | str, None | str, bool]: +) -> tuple[bool, str | None,]: """determine if the user has access to private officer info""" session_id = request.cookies.get("session_id", None) if session_id is None: - return None, None, False + return False, None computing_id = await auth.crud.get_computing_id(db_session, session_id) if computing_id is None: - return session_id, None, False + return False, None has_private_access = await OfficerPrivateInfo.has_permission(db_session, computing_id) - return session_id, computing_id, has_private_access + return has_private_access, computing_id # ---------------------------------------- # # endpoints @router.get( "/current", - description="Get information about all the officers. More information is given if you're authenticated & have access to private executive data.", + description="Get information about the current officers. With no authorization, only get basic info.", + response_model=dict[OfficerPositionEnum, OfficerInfoResponse], + operation_id="get_current_officers" ) async def current_officers( - # the request headers request: Request, db_session: database.DBSession, ): - _, _, has_private_access = await _has_officer_private_info_access(request, db_session) - current_officers = await officers.crud.current_officers(db_session, has_private_access) - return JSONResponse({ - position: [ - officer_data.serializable_dict() - for officer_data in officer_data_list - ] - for position, officer_data_list in current_officers.items() - }) + has_private_access, _ = await _has_officer_private_info_access(request, db_session) + + curr_officers = await officers.crud.current_officers(db_session) + exclude = OFFICER_PRIVATE_INFO if not has_private_access else {} + + res = {} + for officer in curr_officers: + res[officer.position] = officer.model_dump(exclude=exclude, mode="json") + + return JSONResponse(res) @router.get( "/all", - description="Information for all execs from all exec terms" + description="Information for all execs from all exec terms", + response_model=list[OfficerInfoResponse], + responses={ + 403: { "description": "not authorized to view private info", "model": DetailModel } + }, + operation_id="get_all_officers" ) async def all_officers( request: Request, @@ -71,15 +84,17 @@ async def all_officers( # and may only be accessed by that officer and executives. All other officer terms are public. include_future_terms: bool = False, ): - _, computing_id, has_private_access = await _has_officer_private_info_access(request, db_session) + has_private_access, computing_id = await _has_officer_private_info_access(request, db_session) if include_future_terms: is_website_admin = (computing_id is not None) and (await WebsiteAdmin.has_permission(db_session, computing_id)) if not is_website_admin: raise HTTPException(status_code=401, detail="only website admins can view all executive terms that have not started yet") - all_officers = await officers.crud.all_officers(db_session, has_private_access, include_future_terms) - return JSONResponse([ - officer_data.serializable_dict() + all_officers = await officers.crud.all_officers(db_session, include_future_terms) + exclude = OFFICER_PRIVATE_INFO if not has_private_access else {} + + return JSONResponse(content=[ + officer_data.model_dump(exclude=exclude, mode="json") for officer_data in all_officers ]) @@ -89,6 +104,12 @@ async def all_officers( Get term info for an executive. All term info is public for all past or active terms. Future terms can only be accessed by website admins. """, + response_model=list[OfficerTermResponse], + responses={ + 401: { "description": "not logged in", "model": DetailModel }, + 403: { "description": "not authorized to view private info", "model": DetailModel } + }, + operation_id="get_officer_terms_by_id" ) async def get_officer_terms( request: Request, @@ -112,8 +133,13 @@ async def get_officer_terms( ]) @router.get( - "/info/{computing_id}", + "/info/{computing_id:str}", description="Get officer info for the current user, if they've ever been an exec. Only admins can get info about another user.", + response_model=OfficerInfoResponse, + responses={ + 403: { "description": "not authorized to view author user info", "model": DetailModel } + }, + operation_id="get_officer_info_by_id" ) async def get_officer_info( request: Request, @@ -136,20 +162,19 @@ async def get_officer_info( Only the sysadmin, president, or DoA can submit this request. It will usually be the DoA. Updates the system with a new officer, and enables the user to login to the system to input their information. """, + response_model=SuccessResponse, + responses={ + 403: { "description": "must be a website admin", "model": DetailModel }, + 500: { "model": DetailModel }, + }, + operation_id="create_officer_term" ) -async def new_officer_term( +async def create_officer_term( request: Request, db_session: database.DBSession, - officer_info_list: list[InitialOfficerInfo] = Body(), # noqa: B008 + officer_info_list: list[OfficerTermCreate], ): - """ - If the current computing_id is not already an officer, officer_info will be created for them. - """ - for officer_info in officer_info_list: - officer_info.valid_or_raise() - - _, session_computing_id = await logged_in_or_raise(request, db_session) - await WebsiteAdmin.has_permission_or_raise(db_session, session_computing_id) + await admin_or_raise(request, db_session) for officer_info in officer_info_list: # if user with officer_info.computing_id has never logged into the website before, @@ -157,7 +182,7 @@ async def new_officer_term( await officers.crud.create_new_officer_info(db_session, OfficerInfo( computing_id = officer_info.computing_id, # TODO (#71): use sfu api to get legal name from officer_info.computing_id - legal_name = "default name", + legal_name = officer_info.legal_name, phone_number = None, discord_id = None, @@ -174,7 +199,7 @@ async def new_officer_term( )) await db_session.commit() - return PlainTextResponse("ok") + return JSONResponse({ "success": True }) @router.patch( "/info/{computing_id}", @@ -182,119 +207,96 @@ async def new_officer_term( After election, officer computing ids are input into our system. If you have been elected as a new officer, you may authenticate with SFU CAS, then input your information & the valid token for us. Admins may update this info. - """ + """, + response_model=OfficerInfoResponse, + responses={ + 403: { "description": "must be a website admin", "model": DetailModel }, + 500: { "description": "failed to fetch after update", "model": DetailModel }, + }, + operation_id="update_officer_info" ) -async def update_info( +async def update_officer_info( request: Request, db_session: database.DBSession, computing_id: str, - officer_info_upload: OfficerInfoUpload = Body() # noqa: B008 + officer_info_upload: OfficerUpdate | OfficerSelfUpdate ): - officer_info_upload.valid_or_raise() - _, session_computing_id = await logged_in_or_raise(request, db_session) + is_site_admin, _, session_computing_id = await is_website_admin(request, db_session) - if computing_id != session_computing_id: - await WebsiteAdmin.has_permission_or_raise( - db_session, session_computing_id, - errmsg="must have website admin permissions to update another user" - ) + if computing_id != session_computing_id and not is_site_admin: + raise HTTPException(status_code=403, detail="you may not update other officers") old_officer_info = await officers.crud.get_officer_info_or_raise(db_session, computing_id) - validation_failures, corrected_officer_info = await officer_info_upload.validate(computing_id, old_officer_info) + old_officer_info.update_from_params(officer_info_upload) + await officers.crud.update_officer_info(db_session, old_officer_info) # TODO (#27): log all important changes just to a .log file & persist them for a few years - success = await officers.crud.update_officer_info(db_session, corrected_officer_info) - if not success: - raise HTTPException(status_code=400, detail="officer_info does not exist yet, please create the officer info entry first") - await db_session.commit() - updated_officer_info = await officers.crud.get_officer_info_or_raise(db_session, computing_id) - return JSONResponse({ - "officer_info": updated_officer_info.serializable_dict(), - "validation_failures": validation_failures, - }) + updated_officer_info = await officers.crud.get_new_officer_info_or_raise(db_session, computing_id) + return JSONResponse(updated_officer_info.serializable_dict()) @router.patch( "/term/{term_id}", - description="" + description="Update the information for an Officer's term", + response_model=OfficerTermResponse, + responses={ + 403: { "description": "must be a website admin", "model": DetailModel }, + 500: { "description": "failed to fetch after update", "model": DetailModel }, + }, + operation_id="update_officer_term_by_id" ) -async def update_term( +async def update_officer_term( request: Request, db_session: database.DBSession, term_id: int, - officer_term_upload: OfficerTermUpload = Body(), # noqa: B008 + body: OfficerTermUpdate ): """ A website admin may change the position & term length however they wish. """ - officer_term_upload.valid_or_raise() - _, session_computing_id = await logged_in_or_raise(request, db_session) + is_site_admin, _, session_computing_id = await is_website_admin(request, db_session) - old_officer_term = await officers.crud.get_officer_term_by_id(db_session, term_id) - if old_officer_term.computing_id != session_computing_id: - await WebsiteAdmin.has_permission_or_raise( - db_session, session_computing_id, - errmsg="must have website admin permissions to update another user" - ) - elif utils.is_past_term(old_officer_term): - await WebsiteAdmin.has_permission_or_raise( - db_session, session_computing_id, - errmsg="only website admins can update past terms" - ) + old_officer_term = await officers.crud.get_officer_term_by_id_or_raise(db_session, term_id) + if not is_site_admin: + if old_officer_term.computing_id != session_computing_id: + raise HTTPException(status_code=403, detail="you may not update other officer terms") - if ( - officer_term_upload.computing_id != old_officer_term.computing_id - or officer_term_upload.position != old_officer_term.position - or officer_term_upload.start_date != old_officer_term.start_date - or officer_term_upload.end_date != old_officer_term.end_date - ): - await WebsiteAdmin.has_permission_or_raise( - db_session, session_computing_id, - errmsg="only admins can write new versions of position, start_date, and end_date" - ) + if utils.is_past_term(old_officer_term): + raise HTTPException(status_code=403, detail="you may not update past terms") + + old_officer_term.update_from_params(body, is_site_admin) # TODO (#27): log all important changes to a .log file - success = await officers.crud.update_officer_term( - db_session, - officer_term_upload.to_officer_term(term_id) - ) - if not success: - raise HTTPException(status_code=400, detail="the associated officer_term does not exist yet, please create it first") + await officers.crud.update_officer_term(db_session, old_officer_term) await db_session.commit() - new_officer_term = await officers.crud.get_officer_term_by_id(db_session, term_id) - return JSONResponse({ - "officer_term": new_officer_term.serializable_dict(), - # none for now, but may be added if frontend requests - "validation_failures": [], - }) + new_officer_term = await officers.crud.get_officer_term_by_id_or_raise(db_session, term_id) + return JSONResponse(new_officer_term.serializable_dict()) @router.delete( - "/term/{term_id}", + "/term/{term_id:int}", description="Remove the specified officer term. Only website admins can run this endpoint. BE CAREFUL WITH THIS!", + response_model=SuccessResponse, + responses={ + 401: { "description": "must be logged in", "model": DetailModel }, + 403: { "description": "must be a website admin", "model": DetailModel }, + 500: { "description": "server error", "model": DetailModel }, + }, + operation_id="delete_officer_term_by_id" ) -async def remove_officer( +async def delete_officer_term( request: Request, db_session: database.DBSession, term_id: int, ): - _, session_computing_id = await logged_in_or_raise(request, db_session) - await WebsiteAdmin.has_permission_or_raise( - db_session, session_computing_id, - errmsg="must have website admin permissions to remove a term" - ) - - deleted_officer_term = await officers.crud.get_officer_term_by_id(db_session, term_id) + await admin_or_raise(request, db_session) # TODO (#27): log all important changes to a .log file - # TODO (#100): return whether the deletion succeeded or not await officers.crud.delete_officer_term_by_id(db_session, term_id) await db_session.commit() - return JSONResponse({ - "officer_term": deleted_officer_term.serializable_dict(), - }) + return SuccessResponse(success=True) diff --git a/src/permission/types.py b/src/permission/types.py index 2a1acf6..f0a3765 100644 --- a/src/permission/types.py +++ b/src/permission/types.py @@ -76,5 +76,5 @@ async def has_permission_or_raise( errmsg:str = "must have website admin permissions" ) -> bool: if not await WebsiteAdmin.has_permission(db_session, computing_id): - raise HTTPException(status_code=401, detail=errmsg) + raise HTTPException(status_code=403, detail=errmsg) return True diff --git a/src/registrations/urls.py b/src/registrations/urls.py index 1fe552f..f723c4f 100644 --- a/src/registrations/urls.py +++ b/src/registrations/urls.py @@ -18,7 +18,7 @@ ) from registrations.tables import NomineeApplication from utils.shared_models import DetailModel, SuccessResponse -from utils.urls import admin_or_raise, logged_in_or_raise, slugify +from utils.urls import AdminTypeEnum, admin_or_raise, logged_in_or_raise, slugify router = APIRouter( prefix="/registration", @@ -74,7 +74,7 @@ async def register_in_election( body: NomineeApplicationParams, election_name: str ): - await admin_or_raise(request, db_session) + await admin_or_raise(request, db_session, AdminTypeEnum.Election) if body.position not in OfficerPositionEnum: raise HTTPException( @@ -105,7 +105,7 @@ async def register_in_election( detail=f"{body.position} is not available to register for in this election" ) - if election.status(datetime.datetime.now(tz=datetime.UTC)) != ElectionStatusEnum.NOMINATIONS: + if election.status(datetime.datetime.now()) != ElectionStatusEnum.NOMINATIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="registrations can only be made during the nomination period" @@ -157,7 +157,7 @@ async def update_registration( computing_id: str, position: OfficerPositionEnum ): - await admin_or_raise(request, db_session) + await admin_or_raise(request, db_session, AdminTypeEnum.Election) if body.position not in OfficerPositionEnum: raise HTTPException( @@ -174,7 +174,7 @@ async def update_registration( ) # self updates can only be done during nomination period. Officer updates can be done whenever - if election.status(datetime.datetime.now(tz=datetime.UTC)) != ElectionStatusEnum.NOMINATIONS: + if election.status(datetime.datetime.now()) != ElectionStatusEnum.NOMINATIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="speeches can only be updated during the nomination period" @@ -221,7 +221,7 @@ async def delete_registration( position: OfficerPositionEnum, computing_id: str ): - await admin_or_raise(request, db_session) + await admin_or_raise(request, db_session, AdminTypeEnum.Election) if position not in OfficerPositionEnum: raise HTTPException( @@ -237,7 +237,7 @@ async def delete_registration( detail=f"election with slug {slugified_name} does not exist" ) - if election.status(datetime.datetime.now(tz=datetime.UTC)) != ElectionStatusEnum.NOMINATIONS: + if election.status(datetime.datetime.now()) != ElectionStatusEnum.NOMINATIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="registration can only be revoked during the nomination period" diff --git a/src/utils/__init__.py b/src/utils/__init__.py index c52bd4c..62c95b6 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -34,7 +34,7 @@ def is_active_officer(query: Select) -> Select: ) ) -def has_started_term(query: Select) -> bool: +def has_started_term(query: Select) -> Select[tuple[OfficerTerm]]: return query.where( OfficerTerm.start_date <= date.today() ) diff --git a/src/utils/urls.py b/src/utils/urls.py index 0313300..6d4c391 100644 --- a/src/utils/urls.py +++ b/src/utils/urls.py @@ -1,4 +1,5 @@ import re +from enum import Enum from fastapi import HTTPException, Request, status @@ -8,6 +9,11 @@ from permission.types import ElectionOfficer, WebsiteAdmin +class AdminTypeEnum(Enum): + Full = 1 + Election = 2 + + # TODO: move other utils into this module def slugify(text: str) -> str: """Creates a unique slug based on text passed in. Assumes non-unicode text.""" @@ -49,7 +55,8 @@ async def get_current_user(request: Request, db_session: database.DBSession) -> return session_id, session_computing_id -async def admin_or_raise(request: Request, db_session: database.DBSession) -> tuple[str, str]: +# TODO: Add an election admin version that checks the election attempting to be modified as well +async def admin_or_raise(request: Request, db_session: database.DBSession, admintype: AdminTypeEnum = AdminTypeEnum.Full) -> tuple[str, str]: session_id, computing_id = await get_current_user(request, db_session) if not session_id or not computing_id: raise HTTPException( @@ -58,11 +65,20 @@ async def admin_or_raise(request: Request, db_session: database.DBSession) -> tu ) # where valid means election officer or website admin - if (await ElectionOfficer.has_permission(db_session, computing_id)) or (await WebsiteAdmin.has_permission(db_session, computing_id)): + if (await WebsiteAdmin.has_permission(db_session, computing_id)) or (admintype is AdminTypeEnum.Election and await ElectionOfficer.has_permission(db_session, computing_id)): return session_id, computing_id - else: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="must be an admin" - ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="must be an admin" + ) + +async def is_website_admin(request: Request, db_session: database.DBSession) -> tuple[bool, str | None, str | None]: + session_id, computing_id = await get_current_user(request, db_session) + if session_id is None or computing_id is None: + return False, session_id, computing_id + + if (await WebsiteAdmin.has_permission(db_session, computing_id)): + return True, session_id, computing_id + + return False, session_id, computing_id diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..88c8230 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,56 @@ +# Configuration of Pytest +import logging +from collections.abc import AsyncGenerator +from typing import Any + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient + +from src import load_test_db +from src.auth.crud import create_user_session, remove_user_session +from src.database import SQLALCHEMY_TEST_DATABASE_URL, DatabaseSessionManager +from src.main import app + + +# This might be able to be moved to `package` scope as long as I inject it to every test function +@pytest.fixture(scope="session") +def suppress_sqlalchemy_logs(): + logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING) + yield + logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def database_setup(): + # reset the database again, just in case + print("Resetting DB...") + sessionmanager = DatabaseSessionManager(SQLALCHEMY_TEST_DATABASE_URL, {"echo": False}, check_db=False) + # this resets the contents of the database to be whatever is from `load_test_db.py` + await load_test_db.async_main(sessionmanager) + print("Done setting up!") + yield sessionmanager + await sessionmanager.close() + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def client() -> AsyncGenerator[Any, None]: + # base_url is just a random placeholder url + # ASGITransport is just telling the async client to pass all requests to app + # `async with` syntax used so that the connecton will automatically be closed once done + async with AsyncClient(transport=ASGITransport(app), base_url="http://test") as client: + yield client + +@pytest_asyncio.fixture(scope="function", loop_scope="session") +async def db_session(database_setup): + async with database_setup.session() as session: + yield session + +@pytest_asyncio.fixture(scope="module", loop_scope="session") +async def admin_client(database_setup, client): + session_id = "temp_id_" + load_test_db.SYSADMIN_COMPUTING_ID + client.cookies = { "session_id": session_id } + async with database_setup.session() as session: + await create_user_session(session, session_id, load_test_db.SYSADMIN_COMPUTING_ID) + yield client + await remove_user_session(session, session_id) + + diff --git a/tests/integration/test_elections.py b/tests/integration/test_elections.py index b79220f..bf402a9 100644 --- a/tests/integration/test_elections.py +++ b/tests/integration/test_elections.py @@ -61,7 +61,6 @@ async def test_read_elections(database_setup): election_false = await get_election(db_session, "this-not-a-election") assert election_false is None - # Test getting specific election election = await get_election(db_session, "test-election-1") assert election is not None @@ -198,9 +197,9 @@ async def test_endpoints_admin(client, database_setup): response = await client.post("/election", json={ "name": "testElection4", "type": "general_election", - "datetime_start_nominations": (datetime.datetime.now(tz=datetime.UTC) - timedelta(days=1)).isoformat(), - "datetime_start_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=7)).isoformat(), - "datetime_end_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=14)).isoformat(), + "datetime_start_nominations": (datetime.datetime.now() - timedelta(days=1)).isoformat(), + "datetime_start_voting": (datetime.datetime.now() + timedelta(days=7)).isoformat(), + "datetime_end_voting": (datetime.datetime.now() + timedelta(days=14)).isoformat(), "available_positions": ["president", "treasurer"], "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" }) @@ -209,9 +208,9 @@ async def test_endpoints_admin(client, database_setup): response = await client.post("/election", json={ "name": "byElection4", "type": "by_election", - "datetime_start_nominations": (datetime.datetime.now(tz=datetime.UTC) - timedelta(days=1)).isoformat(), - "datetime_start_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=7)).isoformat(), - "datetime_end_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=14)).isoformat(), + "datetime_start_nominations": (datetime.datetime.now() - timedelta(days=1)).isoformat(), + "datetime_start_voting": (datetime.datetime.now() + timedelta(days=7)).isoformat(), + "datetime_end_voting": (datetime.datetime.now() + timedelta(days=14)).isoformat(), "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" }) assert response.status_code == 200 @@ -276,9 +275,9 @@ async def test_endpoints_admin(client, database_setup): # update the above election response = await client.patch("/election/testElection4", json={ "election_type": "general_election", - "datetime_start_nominations": (datetime.datetime.now(tz=datetime.UTC) - timedelta(days=1)).isoformat(), - "datetime_start_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=7)).isoformat(), - "datetime_end_voting": (datetime.datetime.now(tz=datetime.UTC) + timedelta(days=14)).isoformat(), + "datetime_start_nominations": (datetime.datetime.now() - timedelta(days=1)).isoformat(), + "datetime_start_voting": (datetime.datetime.now() + timedelta(days=7)).isoformat(), + "datetime_end_voting": (datetime.datetime.now() + timedelta(days=14)).isoformat(), "available_positions": ["president", "vice-president", "treasurer"], # update this "survey_link": "https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5" }) diff --git a/tests/integration/test_officers.py b/tests/integration/test_officers.py index dd8ba0c..d2c43b7 100644 --- a/tests/integration/test_officers.py +++ b/tests/integration/test_officers.py @@ -1,118 +1,116 @@ -import asyncio # NOTE: don't comment this out; it's required import json from datetime import date, timedelta import pytest -from httpx import ASGITransport, AsyncClient +from httpx import AsyncClient -import load_test_db -from auth.crud import create_user_session -from database import SQLALCHEMY_TEST_DATABASE_URL, DatabaseSessionManager -from main import app -from officers.constants import OfficerPosition -from officers.crud import all_officers, current_officers, get_active_officer_terms +from src import load_test_db +from src.officers.constants import OfficerPositionEnum +from src.officers.crud import all_officers, current_officers, get_active_officer_terms # TODO: setup a database on the CI machine & run this as a unit test then (since # this isn't really an integration test) -@pytest.fixture(scope="session") -def anyio_backend(): - return "asyncio" - -@pytest.fixture(scope="session") -async def client(): - # base_url is just a random placeholder url - # ASGITransport is just telling the async client to pass all requests to app - async with AsyncClient(transport=ASGITransport(app), base_url="http://test") as client: - yield client - -# run this again for every function -@pytest.fixture(scope="function") -async def database_setup(): - # reset the database again, just in case - print("Resetting DB...") - sessionmanager = DatabaseSessionManager(SQLALCHEMY_TEST_DATABASE_URL, {"echo": False}, check_db=False) - await DatabaseSessionManager.test_connection(SQLALCHEMY_TEST_DATABASE_URL) - # this resets the contents of the database to be whatever is from `load_test_db.py` - await load_test_db.async_main(sessionmanager) - print("Done setting up!") - - return sessionmanager - -# TODO: switch to mark.anyio -@pytest.mark.asyncio -async def test__read_execs(database_setup): - sessionmanager = await database_setup - async with sessionmanager.session() as db_session: - # test that reads from the database succeeded as expected - assert (await get_active_officer_terms(db_session, "blarg")) == [] - assert (await get_active_officer_terms(db_session, "abc22")) != [] - - abc11_officer_terms = await get_active_officer_terms(db_session, "abc11") - assert len(abc11_officer_terms) == 1 - assert abc11_officer_terms[0].computing_id == "abc11" - assert abc11_officer_terms[0].position == OfficerPosition.EXECUTIVE_AT_LARGE - assert abc11_officer_terms[0].start_date is not None - assert abc11_officer_terms[0].nickname == "the holy A" - assert abc11_officer_terms[0].favourite_course_0 == "CMPT 361" - assert abc11_officer_terms[0].biography == "Hi! I'm person A and I want school to be over ; _ ;" - - current_exec_team = await current_officers(db_session, include_private=False) - assert current_exec_team is not None - assert len(current_exec_team.keys()) == 4 - assert next(iter(current_exec_team.keys())) == OfficerPosition.EXECUTIVE_AT_LARGE - assert next(iter(current_exec_team.values()))[0].favourite_course_0 == "CMPT 361" - assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.to_email(OfficerPosition.EXECUTIVE_AT_LARGE) - assert next(iter(current_exec_team.values()))[0].private_data is None - - current_exec_team = await current_officers(db_session, include_private=True) - assert current_exec_team is not None - assert len(current_exec_team) == 4 - assert next(iter(current_exec_team.keys())) == OfficerPosition.EXECUTIVE_AT_LARGE - assert next(iter(current_exec_team.values()))[0].favourite_course_0 == "CMPT 361" - assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.to_email(OfficerPosition.EXECUTIVE_AT_LARGE) - assert next(iter(current_exec_team.values()))[0].private_data is not None - assert next(iter(current_exec_team.values()))[0].private_data.computing_id == "abc11" - - all_terms = await all_officers(db_session, include_private_data=True, include_future_terms=False) - assert len(all_terms) == 6 - - all_terms = await all_officers(db_session, include_private_data=True, include_future_terms=True) - assert len(all_terms) == 7 +pytestmark = pytest.mark.asyncio(loop_scope="session") + +async def test__read_execs(db_session): + # test that reads from the database succeeded as expected + assert (await get_active_officer_terms(db_session, "blarg")) == [] + assert (await get_active_officer_terms(db_session, "abc22")) != [] + + abc11_officer_terms = await get_active_officer_terms(db_session, "abc11") + assert len(abc11_officer_terms) == 1 + assert abc11_officer_terms[0].computing_id == "abc11" + assert abc11_officer_terms[0].position == OfficerPositionEnum.EXECUTIVE_AT_LARGE + assert abc11_officer_terms[0].start_date is not None + assert abc11_officer_terms[0].nickname == "the holy A" + assert abc11_officer_terms[0].favourite_course_0 == "CMPT 361" + assert abc11_officer_terms[0].biography == "Hi! I'm person A and I want school to be over ; _ ;" + + current_exec_team = await current_officers(db_session) + assert current_exec_team is not None + assert len(current_exec_team) == 3 + # assert next(iter(current_exec_team)) == OfficerPositionEnum.EXECUTIVE_AT_LARGE + # assert next(iter(current_exec_team))["favourite_course_0"] == "CMPT 361" + # assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.to_email(OfficerPositionEnum.EXECUTIVE_AT_LARGE) + # assert next(iter(current_exec_team.values()))[0].private_data is None + + current_exec_team = await current_officers(db_session) + assert current_exec_team is not None + assert len(current_exec_team) == 3 + # assert next(iter(current_exec_team.keys())) == OfficerPositionEnum.EXECUTIVE_AT_LARGE + # assert next(iter(current_exec_team.values()))[0].favourite_course_0 == "CMPT 361" + # assert next(iter(current_exec_team.values()))[0].csss_email == OfficerPosition.to_email(OfficerPositionEnum.EXECUTIVE_AT_LARGE) + # assert next(iter(current_exec_team.values()))[0].private_data is not None + # assert next(iter(current_exec_team.values()))[0].private_data.computing_id == "abc11" + + all_terms = await all_officers(db_session, include_future_terms=False) + assert len(all_terms) == 8 + #async def test__update_execs(database_setup): # # TODO: the second time an update_officer_info call occurs, the user should be updated with info # pass -@pytest.mark.anyio -async def test__endpoints(client, database_setup): - # `database_setup` resets & loads the test database - +async def test__get_officers(client): + # private data shouldn't be leaked response = await client.get("/officers/current") assert response.status_code == 200 assert response.json() != {} - assert len(response.json().values()) == 4 - assert not response.json()["executive at large"][0]["private_data"] + assert len(response.json().values()) == 3 + assert "computing_id" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + assert "discord_id" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + assert "discord_name" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + assert "discord_nickname" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + assert "phone_number" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + assert "github_username" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + assert "google_drive_email" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + assert "photo_url" not in response.json()[OfficerPositionEnum.EXECUTIVE_AT_LARGE] + + assert "computing_id" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + assert "discord_id" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + assert "discord_name" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + assert "discord_nickname" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + assert "phone_number" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + assert "github_username" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + assert "google_drive_email" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + assert "photo_url" not in response.json()[OfficerPositionEnum.DIRECTOR_OF_ARCHIVES] + + assert "computing_id" not in response.json()[OfficerPositionEnum.PRESIDENT] + assert "discord_id" not in response.json()[OfficerPositionEnum.PRESIDENT] + assert "discord_name" not in response.json()[OfficerPositionEnum.PRESIDENT] + assert "discord_nickname" not in response.json()[OfficerPositionEnum.PRESIDENT] + assert "phone_number" not in response.json()[OfficerPositionEnum.PRESIDENT] + assert "github_username" not in response.json()[OfficerPositionEnum.PRESIDENT] + assert "google_drive_email" not in response.json()[OfficerPositionEnum.PRESIDENT] + assert "photo_url" not in response.json()[OfficerPositionEnum.PRESIDENT] response = await client.get("/officers/all?include_future_terms=false") assert response.status_code == 200 assert response.json() != [] - assert len(response.json()) == 6 - assert response.json()[0]["private_data"] is None + assert len(response.json()) == 8 + assert "computing_id" not in response.json()[0] + assert "discord_id" not in response.json()[0] + assert "discord_name" not in response.json()[0] + assert "discord_nickname" not in response.json()[0] + assert "phone_number" not in response.json()[0] + assert "github_username" not in response.json()[0] + assert "google_drive_email" not in response.json()[0] + assert "photo_url" not in response.json()[0] response = await client.get("/officers/all?include_future_terms=true") assert response.status_code == 401 +async def test__get_officer_terms(client: AsyncClient): response = await client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=false") assert response.status_code == 200 - assert response.json() != [] assert len(response.json()) == 2 assert response.json()[0]["nickname"] == "G2" assert response.json()[1]["nickname"] == "G1" response = await client.get("/officers/terms/balargho?include_future_terms=false") assert response.status_code == 200 - assert response.json() == [] + assert len(response.json()) == 0 response = await client.get("/officers/terms/abc11?include_future_terms=true") assert response.status_code == 401 @@ -122,32 +120,39 @@ async def test__endpoints(client, database_setup): response = await client.get(f"/officers/info/{load_test_db.SYSADMIN_COMPUTING_ID}") assert response.status_code == 401 - response = await client.post("officers/term", content=json.dumps([{ +async def test__post_officer_terms(client: AsyncClient): + # Only admins can create new terms + response = await client.post("officers/term", json=[{ "computing_id": "ehbc12", - "position": OfficerPosition.DIRECTOR_OF_MULTIMEDIA, - "start_date": "2025-12-29" - }])) + "position": OfficerPositionEnum.DIRECTOR_OF_MULTIMEDIA, + "start_date": "2025-12-29", + "legal_name": "Eh Bc" + }]) assert response.status_code == 401 - response = await client.post("officers/term", content=json.dumps([{ + # Position must be one of the enum positions + response = await client.post("officers/term", json=[{ "computing_id": "ehbc12", "position": "balargho", - "start_date": "2025-12-29" - }])) - assert response.status_code == 400 - - response = await client.patch("officers/info/abc11", content=json.dumps({ + "start_date": "2025-12-29", + "legal_name": "Eh Bc" + }]) + assert response.status_code == 422 + +async def test__patch_officer_term(client: AsyncClient): + # Only admins can update new terms + response = await client.patch("officers/info/abc11", json={ "legal_name": "fancy name", "phone_number": None, "discord_name": None, "github_username": None, "google_drive_email": None, - })) - assert response.status_code == 401 + }) + assert response.status_code == 403 response = await client.patch("officers/term/1", content=json.dumps({ "computing_id": "abc11", - "position": OfficerPosition.VICE_PRESIDENT, + "position": OfficerPositionEnum.VICE_PRESIDENT, "start_date": (date.today() - timedelta(days=365)).isoformat(), "end_date": (date.today() - timedelta(days=1)).isoformat(), @@ -159,71 +164,67 @@ async def test__endpoints(client, database_setup): "favourite_pl_1": "5", "biography": "hello" })) - assert response.status_code == 401 + assert response.status_code == 403 response = await client.delete("officers/term/1") assert response.status_code == 401 -@pytest.mark.anyio -async def test__endpoints_admin(client, database_setup): - # login as website admin - session_id = "temp_id_" + load_test_db.SYSADMIN_COMPUTING_ID - async with database_setup.session() as db_session: - await create_user_session(db_session, session_id, load_test_db.SYSADMIN_COMPUTING_ID) - - client.cookies = { "session_id": session_id } - +async def test__get_current_officers_admin(admin_client): # test that more info is given if logged in & with access to it - response = await client.get("/officers/current") + response = await admin_client.get("/officers/current") assert response.status_code == 200 - assert response.json() != {} - assert len(response.json().values()) == 4 - assert response.json()["executive at large"][0]["private_data"] + curr_officers = response.json() + assert len(curr_officers) == 3 + assert curr_officers["executive at large"]["computing_id"] is not None - response = await client.get("/officers/all?include_future_terms=true") +async def test__get_all_officers_admin(admin_client): + response = await admin_client.get("/officers/all?include_future_terms=true") assert response.status_code == 200 - assert response.json() != [] - print(len(response.json())) - assert len(response.json()) == 7 - assert response.json()[1]["private_data"]["phone_number"] == "1234567890" + assert len(response.json()) == 9 + assert response.json()[1]["phone_number"] == "1234567890" - response = await client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=false") +async def test__get_officer_term_admin(admin_client): + response = await admin_client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=false") assert response.status_code == 200 assert response.json() != [] assert len(response.json()) == 2 - response = await client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=true") + response = await admin_client.get(f"/officers/terms/{load_test_db.SYSADMIN_COMPUTING_ID}?include_future_terms=true") assert response.status_code == 200 assert response.json() != [] assert len(response.json()) == 3 - response = await client.get("/officers/info/abc11") + response = await admin_client.get("/officers/terms/ehbc12?include_future_terms=true") + assert response.status_code == 200 + assert response.json() == [] + +async def test__get_officer_info_admin(admin_client): + response = await admin_client.get("/officers/info/abc11") assert response.status_code == 200 assert response.json() != {} assert response.json()["legal_name"] == "Person A" - response = await client.get(f"/officers/info/{load_test_db.SYSADMIN_COMPUTING_ID}") + response = await admin_client.get(f"/officers/info/{load_test_db.SYSADMIN_COMPUTING_ID}") assert response.status_code == 200 assert response.json() != {} - response = await client.get("/officers/info/balargho") + response = await admin_client.get("/officers/info/balargho") assert response.status_code == 404 - response = await client.get("/officers/terms/ehbc12?include_future_terms=true") - assert response.status_code == 200 - assert response.json() == [] - - response = await client.post("officers/term", content=json.dumps([{ +async def test__post_officer_term_admin(admin_client): + response = await admin_client.post("officers/term", json=[{ "computing_id": "ehbc12", - "position": OfficerPosition.DIRECTOR_OF_MULTIMEDIA, - "start_date": "2025-12-29" - }])) + "position": OfficerPositionEnum.DIRECTOR_OF_MULTIMEDIA, + "start_date": "2025-12-29", + "legal_name": "Eh Bc" + }]) assert response.status_code == 200 - response = await client.get("/officers/terms/ehbc12?include_future_terms=true") + response = await admin_client.get("/officers/terms/ehbc12?include_future_terms=true") assert response.status_code == 200 assert response.json() != [] assert len(response.json()) == 1 - response = await client.patch("officers/info/abc11", content=json.dumps({ +async def test__patch_officer_info_admin(admin_client): + response = await admin_client.patch("officers/info/abc11", content=json.dumps({ "legal_name": "Person A2", "phone_number": "12345asdab67890", "discord_name": "person_a_yeah", @@ -231,10 +232,14 @@ async def test__endpoints_admin(client, database_setup): "google_drive_email": "person_a@gmail.com", })) assert response.status_code == 200 - assert response.json()["officer_info"] != {} - assert len(response.json()["validation_failures"]) == 3 - - response = await client.patch("officers/info/aaabbbc", content=json.dumps({ + resJson = response.json() + assert resJson["legal_name"] == "Person A2" + assert resJson["phone_number"] == "12345asdab67890" + assert resJson["discord_name"] == "person_a_yeah" + assert resJson["github_username"] == "person_a" + assert resJson["google_drive_email"] == "person_a@gmail.com" + + response = await admin_client.patch("officers/info/aaabbbc", content=json.dumps({ "legal_name": "Person AABBCC", "phone_number": "1234567890", "discord_name": None, @@ -243,9 +248,10 @@ async def test__endpoints_admin(client, database_setup): })) assert response.status_code == 404 - response = await client.patch("officers/term/1", content=json.dumps({ - "computing_id": "abc11", - "position": OfficerPosition.TREASURER, +async def test__patch_officer_term_admin(admin_client): + target_id = 1 + response = await admin_client.patch(f"officers/term/{target_id}", json={ + "position": OfficerPositionEnum.TREASURER, "start_date": (date.today() - timedelta(days=365)).isoformat(), "end_date": (date.today() - timedelta(days=1)).isoformat(), "nickname": "1", @@ -254,29 +260,50 @@ async def test__endpoints_admin(client, database_setup): "favourite_pl_0": "4", "favourite_pl_1": "5", "biography": "hello o77" - })) + }) assert response.status_code == 200 - response = await client.get("/officers/terms/abc11?include_future_terms=true") + response = await admin_client.get("/officers/terms/abc11?include_future_terms=true") assert response.status_code == 200 - assert response.json() != [] - assert response.json()[1]["position"] == OfficerPosition.TREASURER - assert response.json()[0]["favourite_course_0"] != "2" - assert response.json()[1]["biography"] == "hello o77" - - async with database_setup.session() as db_session: - all_terms = await all_officers(db_session, include_private_data=True, include_future_terms=True) - assert len(all_terms) == 8 - - response = await client.delete("officers/term/1") + modifiedTerm = next((item for item in response.json() if item["id"] == target_id), None) + print(modifiedTerm) + assert modifiedTerm is not None + assert modifiedTerm["position"] == OfficerPositionEnum.TREASURER + assert modifiedTerm["start_date"] == (date.today() - timedelta(days=365)).isoformat() + assert modifiedTerm["end_date"] == (date.today() - timedelta(days=1)).isoformat() + assert modifiedTerm["nickname"] == "1" + assert modifiedTerm["favourite_course_0"] == "2" + assert modifiedTerm["favourite_course_1"] == "3" + assert modifiedTerm["favourite_pl_0"] == "4" + assert modifiedTerm["favourite_pl_1"] == "5" + assert modifiedTerm["biography"] == "hello o77" + + # other one shouldn't be modified assert response.status_code == 200 - response = await client.delete("officers/term/2") + modifiedTerm = next((item for item in response.json() if item["id"] == target_id + 1), None) + print(modifiedTerm) + assert modifiedTerm is not None + assert modifiedTerm["position"] == OfficerPositionEnum.EXECUTIVE_AT_LARGE + assert modifiedTerm["start_date"] != (date.today() - timedelta(days=365)).isoformat() + assert modifiedTerm["end_date"] != (date.today() - timedelta(days=1)).isoformat() + assert modifiedTerm["nickname"] != "1" + assert modifiedTerm["favourite_course_0"] != "2" + assert modifiedTerm["favourite_course_1"] != "3" + assert modifiedTerm["favourite_pl_0"] != "4" + assert modifiedTerm["favourite_pl_1"] != "5" + assert modifiedTerm["biography"] != "hello o77" + + response = await admin_client.get("officers/all?include_future_terms=True") + assert len(response.json()) == 10 + + response = await admin_client.delete("officers/term/1") assert response.status_code == 200 - response = await client.delete("officers/term/3") + response = await admin_client.delete("officers/term/2") assert response.status_code == 200 - response = await client.delete("officers/term/4") + response = await admin_client.delete("officers/term/3") + assert response.status_code == 200 + response = await admin_client.delete("officers/term/4") assert response.status_code == 200 - async with database_setup.session() as db_session: - all_terms = await all_officers(db_session, include_private_data=True, include_future_terms=True) - assert len(all_terms) == (8 - 4) + response = await admin_client.get("officers/all?include_future_terms=True") + assert len(response.json()) == 6 diff --git a/tests/integration/test_discord.py b/tests/wip/test_discord.py similarity index 100% rename from tests/integration/test_discord.py rename to tests/wip/test_discord.py diff --git a/tests/integration/test_github.py b/tests/wip/test_github.py similarity index 100% rename from tests/integration/test_github.py rename to tests/wip/test_github.py diff --git a/tests/integration/test_google.py b/tests/wip/test_google.py similarity index 100% rename from tests/integration/test_google.py rename to tests/wip/test_google.py