From f462b97185a19001505bfba6969cf533c47cb24a Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 18 Aug 2022 18:04:35 +0300 Subject: [PATCH 1/4] Temp state. --- spire/journal/actions.py | 31 ++++++++++++----- spire/journal/api.py | 72 ++++++++++++++++------------------------ spire/journal/search.py | 41 +++++++++++++++++++++-- spire/public/api.py | 26 ++++++++------- 4 files changed, 104 insertions(+), 66 deletions(-) diff --git a/spire/journal/actions.py b/spire/journal/actions.py index 1ef119f..641b9a8 100644 --- a/spire/journal/actions.py +++ b/spire/journal/actions.py @@ -598,6 +598,21 @@ async def get_journal_entries( return query.all() +async def _get_journal_entry( + db_session: Session, journal_entry_id: UUID +) -> JournalEntry: + """ + Returns a journal entry by its id. Raises a JournalEntryNotFound error if no such entry is + found in the database. + """ + journal_entry = ( + db_session.query(JournalEntry) + .filter(JournalEntry.id == journal_entry_id) + .one_or_none() + ) + return journal_entry + + async def delete_journal_entry( db_session: Session, journal_spec: JournalSpec, @@ -981,14 +996,14 @@ def store_search_results( result_bytes = json.dumps(result).encode("utf-8") result_key = f"{prefix}/{result_id}.json" - s3 = boto3.client("s3") - s3.put_object( - Body=result_bytes, - Bucket=bucket, - Key=result_key, - ContentType="application/json", - Metadata={"search_type": "journal"}, - ) + # s3 = boto3.client("s3") + # s3.put_object( + # Body=result_bytes, + # Bucket=bucket, + # Key=result_key, + # ContentType="application/json", + # Metadata={"search_type": "journal"}, + # ) async def get_scopes(db_session: Session, api: str) -> List[SpireOAuthScopes]: diff --git a/spire/journal/api.py b/spire/journal/api.py index ef10e6d..09b3636 100644 --- a/spire/journal/api.py +++ b/spire/journal/api.py @@ -1074,19 +1074,19 @@ async def get_entries( journal_url = "/".join(url.split("/")[:-1]) individual_responses = [] for journal_entry in entries: - tag_objects = await actions.get_journal_entry_tags( - db_session, - journal_spec, - journal_entry.id, - user_group_id_list=request.state.user_group_id_list, - ) - tags = [tag.tag for tag in tag_objects] + # tag_objects = await actions.get_journal_entry_tags( + # db_session, + # journal_spec, + # journal_entry.id, + # user_group_id_list=request.state.user_group_id_list, + # ) + # tags = [tag.tag for tag in tag_objects] entry_response = JournalEntryResponse( id=journal_entry.id, journal_url=journal_url, title=journal_entry.title, content=journal_entry.content, - tags=tags, + tags=journal_entry.tags, created_at=journal_entry.created_at, updated_at=journal_entry.updated_at, context_url=journal_entry.context_url, @@ -1280,21 +1280,12 @@ async def update_entry_content( es_index = journal.search_index try: - journal_entry_container = await actions.get_journal_entries( - db_session, - journal_spec, - entry_id, - request.state.user_group_id_list, + journal_entry = await actions._get_journal_entry( + db_session=db_session, journal_entry_id=entry_id ) - if len(journal_entry_container) == 0: + if journal_entry is None: raise actions.EntryNotFound() - assert len(journal_entry_container) == 1 - journal_entry = journal_entry_container[0] - except actions.JournalNotFound: - logger.error( - f"Journal not found with ID={journal_id} for user={request.state.user_id}" - ) - raise HTTPException(status_code=404) + except actions.EntryNotFound: logger.error( f"Entry not found with ID={entry_id} in journal with ID={journal_id}" @@ -1304,6 +1295,8 @@ async def update_entry_content( logger.error(f"Error listing journal entries: {str(e)}") raise HTTPException(status_code=500) + + journal_entry.title = api_request.title journal_entry.content = api_request.content db_session.add(journal_entry) @@ -1810,14 +1803,11 @@ async def create_tags( if es_index is not None: try: - entry_container = await actions.get_journal_entries( - db_session, - journal_spec, - entry_id, - user_group_id_list=request.state.user_group_id_list, + journal_entry = await actions._get_journal_entry( + db_session=db_session, journal_entry_id=entry_id ) - assert len(entry_container) == 1 - entry = entry_container[0] + assert journal_entry != None + entry = journal_entry all_tags = await actions.get_journal_entry_tags( db_session, journal_spec, @@ -1963,14 +1953,11 @@ async def update_tags( if es_index is not None: try: - entry_container = await actions.get_journal_entries( - db_session, - journal_spec, - entry_id, - request.state.user_group_id_list, + journal_entry = await actions._get_journal_entry( + db_session=db_session, journal_entry_id=entry_id ) - assert len(entry_container) == 1 - entry = entry_container[0] + assert journal_entry != None + entry = journal_entry all_tags_str = [tag.tag for tag in tags] search.new_entry( es_client, @@ -2062,14 +2049,11 @@ async def delete_tag( if es_index is not None: try: - entry_container = await actions.get_journal_entries( - db_session, - journal_spec, - entry_id, - user_group_id_list=request.state.user_group_id_list, + journal_entry = await actions._get_journal_entry( + db_session=db_session, journal_entry_id=entry_id ) - assert len(entry_container) == 1 - entry = entry_container[0] + assert journal_entry != None + entry = journal_entry all_tags = await actions.get_journal_entry_tags( db_session, journal_spec, @@ -2165,7 +2149,7 @@ async def search_journal( max_score: Optional[float] = 1.0 for entry in rows: - tags: List[str] = [tag.tag for tag in entry.tags] + # tags: List[str] = [tag.tag for tag in entry.tags] entry_url = f"{journal_url}/entries/{str(entry.id)}" content_url = f"{entry_url}/content" result = JournalSearchResult( @@ -2173,7 +2157,7 @@ async def search_journal( content_url=content_url, title=entry.title, content=entry.content, - tags=tags, + tags=entry.tags, created_at=str(entry.created_at), updated_at=str(entry.updated_at), score=1.0, diff --git a/spire/journal/search.py b/spire/journal/search.py index 1e1460e..5420584 100644 --- a/spire/journal/search.py +++ b/spire/journal/search.py @@ -14,9 +14,10 @@ import elasticsearch from elasticsearch.client import IndicesClient from elasticsearch.helpers import bulk -from sqlalchemy import and_, or_, not_ +from sqlalchemy import and_, or_, not_, func from sqlalchemy.sql.elements import BooleanClauseList -from sqlalchemy.orm import Session, Query +from sqlalchemy.orm import Session, Query + from . import actions from .data import JournalSpec, JournalEntryResponse @@ -26,6 +27,8 @@ from ..utils.settings import DEFAULT_JOURNALS_ES_INDEX, BULK_CHUNKSIZE logger = logging.getLogger(__name__) +# add sqlalchemy logger +logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) class IndexAlreadyExists(Exception): @@ -734,6 +737,40 @@ def search_database( query = query.order_by(JournalEntry.created_at.desc()) num_entries = query.count() query = query.limit(size).offset(start) + + journal_entries_temp = query.cte(name="journal_entries_temp") + + entries_ids_with_tags = ( + db_session.query(journal_entries_temp.c.id, JournalEntryTag.tag).join( + JournalEntryTag, + JournalEntryTag.journal_entry_id == journal_entries_temp.c.id, + ) + ).cte(name="entries_ids_with_tags") + + aggregated_tags = ( + db_session.query( + entries_ids_with_tags.c.id, + func.array_agg(entries_ids_with_tags.c.tag).label("tags"), + ) + .group_by(entries_ids_with_tags.c.id) + .cte(name="aggregated_tags") + ) + + query = db_session.query( + journal_entries_temp.c.id.label("id"), + aggregated_tags.c.tags.label("tags"), + journal_entries_temp.c.title.label("title"), + journal_entries_temp.c.content.label("content"), + journal_entries_temp.c.context_id.label("context_id"), + journal_entries_temp.c.context_url.label("context_url"), + journal_entries_temp.c.context_type.label("context_type"), + journal_entries_temp.c.version_id.label("version_id"), + journal_entries_temp.c.created_at.label("created_at"), + journal_entries_temp.c.updated_at.label("updated_at"), + ).join(aggregated_tags, journal_entries_temp.c.id == aggregated_tags.c.id) + + + rows = query.all() return num_entries, rows diff --git a/spire/public/api.py b/spire/public/api.py index eacb49b..595b8a3 100644 --- a/spire/public/api.py +++ b/spire/public/api.py @@ -4,7 +4,7 @@ FastAPI doesn't like Generic Routes https://github.com/tiangolo/fastapi/issues/913#issuecomment """ import logging -from typing import List, Optional +from typing import List, Optional, Any from uuid import UUID from bugout.data import ( @@ -195,7 +195,7 @@ async def get_public_journal_entry_handler( @app_public.get( - "/{journal_id}/search", tags=["public journals"], response_model=BugoutSearchResults + "/{journal_id}/search", tags=["public journals"], response_model=Any ) async def search_public_journal_handler( journal_id: UUID = Path(...), @@ -205,7 +205,7 @@ async def search_public_journal_handler( offset: int = Query(0), content: bool = Query(True), db_session: Session = Depends(db.yield_connection_from_env), -) -> BugoutSearchResults: +) -> Any: """ Executes a search query against the given public journal. """ @@ -215,14 +215,16 @@ async def search_public_journal_handler( except actions.PublicJournalNotFound: raise HTTPException(status_code=404, detail="Public journal not found") - result = bugout_api.search( - token=public_user.restricted_token_id, - journal_id=public_journal.journal_id, - query=q, - filters=filters, - limit=limit, - offset=offset, - content=content, - ) + # result = bugout_api.search( + # token=public_user.restricted_token_id, + # journal_id=public_journal.journal_id, + # query=q, + # filters=filters, + # limit=limit, + # offset=offset, + # content=content, + # ) + + result = [] return result From 53c81c9ae34911733b399d2602b7accd9a2bf2fd Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 18 Aug 2022 19:41:40 +0300 Subject: [PATCH 2/4] Remove comments. --- spire/journal/actions.py | 16 ++++++++-------- spire/journal/api.py | 17 ++++++++--------- spire/journal/search.py | 2 -- spire/public/api.py | 26 ++++++++++++-------------- 4 files changed, 28 insertions(+), 33 deletions(-) diff --git a/spire/journal/actions.py b/spire/journal/actions.py index 18c7749..9682977 100644 --- a/spire/journal/actions.py +++ b/spire/journal/actions.py @@ -1010,14 +1010,14 @@ def store_search_results( result_bytes = json.dumps(result).encode("utf-8") result_key = f"{prefix}/{result_id}.json" - # s3 = boto3.client("s3") - # s3.put_object( - # Body=result_bytes, - # Bucket=bucket, - # Key=result_key, - # ContentType="application/json", - # Metadata={"search_type": "journal"}, - # ) + s3 = boto3.client("s3") + s3.put_object( + Body=result_bytes, + Bucket=bucket, + Key=result_key, + ContentType="application/json", + Metadata={"search_type": "journal"}, + ) async def get_scopes(db_session: Session, api: str) -> List[SpireOAuthScopes]: diff --git a/spire/journal/api.py b/spire/journal/api.py index c2e1fff..156b102 100644 --- a/spire/journal/api.py +++ b/spire/journal/api.py @@ -1074,19 +1074,19 @@ async def get_entries( journal_url = "/".join(url.split("/")[:-1]) individual_responses = [] for journal_entry in entries: - # tag_objects = await actions.get_journal_entry_tags( - # db_session, - # journal_spec, - # journal_entry.id, - # user_group_id_list=request.state.user_group_id_list, - # ) - # tags = [tag.tag for tag in tag_objects] + tag_objects = await actions.get_journal_entry_tags( + db_session, + journal_spec, + journal_entry.id, + user_group_id_list=request.state.user_group_id_list, + ) + tags = [tag.tag for tag in tag_objects] entry_response = JournalEntryResponse( id=journal_entry.id, journal_url=journal_url, title=journal_entry.title, content=journal_entry.content, - tags=journal_entry.tags, + tags=tags, created_at=journal_entry.created_at, updated_at=journal_entry.updated_at, context_url=journal_entry.context_url, @@ -2149,7 +2149,6 @@ async def search_journal( max_score: Optional[float] = 1.0 for entry in rows: - # tags: List[str] = [tag.tag for tag in entry.tags] entry_url = f"{journal_url}/entries/{str(entry.id)}" content_url = f"{entry_url}/content" result = JournalSearchResult( diff --git a/spire/journal/search.py b/spire/journal/search.py index 5420584..e008259 100644 --- a/spire/journal/search.py +++ b/spire/journal/search.py @@ -27,8 +27,6 @@ from ..utils.settings import DEFAULT_JOURNALS_ES_INDEX, BULK_CHUNKSIZE logger = logging.getLogger(__name__) -# add sqlalchemy logger -logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) class IndexAlreadyExists(Exception): diff --git a/spire/public/api.py b/spire/public/api.py index 595b8a3..eacb49b 100644 --- a/spire/public/api.py +++ b/spire/public/api.py @@ -4,7 +4,7 @@ FastAPI doesn't like Generic Routes https://github.com/tiangolo/fastapi/issues/913#issuecomment """ import logging -from typing import List, Optional, Any +from typing import List, Optional from uuid import UUID from bugout.data import ( @@ -195,7 +195,7 @@ async def get_public_journal_entry_handler( @app_public.get( - "/{journal_id}/search", tags=["public journals"], response_model=Any + "/{journal_id}/search", tags=["public journals"], response_model=BugoutSearchResults ) async def search_public_journal_handler( journal_id: UUID = Path(...), @@ -205,7 +205,7 @@ async def search_public_journal_handler( offset: int = Query(0), content: bool = Query(True), db_session: Session = Depends(db.yield_connection_from_env), -) -> Any: +) -> BugoutSearchResults: """ Executes a search query against the given public journal. """ @@ -215,16 +215,14 @@ async def search_public_journal_handler( except actions.PublicJournalNotFound: raise HTTPException(status_code=404, detail="Public journal not found") - # result = bugout_api.search( - # token=public_user.restricted_token_id, - # journal_id=public_journal.journal_id, - # query=q, - # filters=filters, - # limit=limit, - # offset=offset, - # content=content, - # ) - - result = [] + result = bugout_api.search( + token=public_user.restricted_token_id, + journal_id=public_journal.journal_id, + query=q, + filters=filters, + limit=limit, + offset=offset, + content=content, + ) return result From 6879c49caeb23e515dd89d0bb0526ad6dddc07a1 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 18 Aug 2022 19:42:39 +0300 Subject: [PATCH 3/4] Black formating. --- spire/journal/api.py | 2 -- spire/journal/search.py | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/spire/journal/api.py b/spire/journal/api.py index 156b102..83a4d09 100644 --- a/spire/journal/api.py +++ b/spire/journal/api.py @@ -1295,8 +1295,6 @@ async def update_entry_content( logger.error(f"Error listing journal entries: {str(e)}") raise HTTPException(status_code=500) - - journal_entry.title = api_request.title journal_entry.content = api_request.content db_session.add(journal_entry) diff --git a/spire/journal/search.py b/spire/journal/search.py index e008259..6c3a8c6 100644 --- a/spire/journal/search.py +++ b/spire/journal/search.py @@ -16,7 +16,7 @@ from elasticsearch.helpers import bulk from sqlalchemy import and_, or_, not_, func from sqlalchemy.sql.elements import BooleanClauseList -from sqlalchemy.orm import Session, Query +from sqlalchemy.orm import Session, Query from . import actions @@ -767,8 +767,6 @@ def search_database( journal_entries_temp.c.updated_at.label("updated_at"), ).join(aggregated_tags, journal_entries_temp.c.id == aggregated_tags.c.id) - - rows = query.all() return num_entries, rows From 2b8e0f8ddeea7dd0af58b2c4cef7c6cf8ce710ee Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 18 Aug 2022 20:11:04 +0300 Subject: [PATCH 4/4] Add fix --- spire/journal/actions.py | 4 ++-- spire/journal/api.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spire/journal/actions.py b/spire/journal/actions.py index 9682977..a79768f 100644 --- a/spire/journal/actions.py +++ b/spire/journal/actions.py @@ -612,9 +612,9 @@ async def get_journal_entries( return query.all() -async def _get_journal_entry( +async def get_journal_entry( db_session: Session, journal_entry_id: UUID -) -> JournalEntry: +) -> Optional[JournalEntry]: """ Returns a journal entry by its id. Raises a JournalEntryNotFound error if no such entry is found in the database. diff --git a/spire/journal/api.py b/spire/journal/api.py index 83a4d09..84370ef 100644 --- a/spire/journal/api.py +++ b/spire/journal/api.py @@ -1280,7 +1280,7 @@ async def update_entry_content( es_index = journal.search_index try: - journal_entry = await actions._get_journal_entry( + journal_entry = await actions.get_journal_entry( db_session=db_session, journal_entry_id=entry_id ) if journal_entry is None: @@ -1801,7 +1801,7 @@ async def create_tags( if es_index is not None: try: - journal_entry = await actions._get_journal_entry( + journal_entry = await actions.get_journal_entry( db_session=db_session, journal_entry_id=entry_id ) assert journal_entry != None @@ -1951,7 +1951,7 @@ async def update_tags( if es_index is not None: try: - journal_entry = await actions._get_journal_entry( + journal_entry = await actions.get_journal_entry( db_session=db_session, journal_entry_id=entry_id ) assert journal_entry != None @@ -2047,7 +2047,7 @@ async def delete_tag( if es_index is not None: try: - journal_entry = await actions._get_journal_entry( + journal_entry = await actions.get_journal_entry( db_session=db_session, journal_entry_id=entry_id ) assert journal_entry != None