diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e4a09e40d..8b505e30d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -36,7 +36,7 @@ repos: language_version: python3 - repo: https://github.com/PyCQA/bandit - rev: 1.7.5 + rev: 1.8.0 hooks: - id: bandit args: ["-c", "pyproject.toml"] diff --git a/docs/reference/experimental/async/table.md b/docs/reference/experimental/async/table.md index bca487e31..e9deb74be 100644 --- a/docs/reference/experimental/async/table.md +++ b/docs/reference/experimental/async/table.md @@ -53,6 +53,29 @@ at your own risk. ::: synapseclient.models.ColumnType [](){ #json-sub-column-reference-async } ::: synapseclient.models.JsonSubColumn +[](){ #sumfilesize-reference-async } +::: synapseclient.models.SumFileSizes +[](){ #query-reference-async } +::: synapseclient.models.Query +[](){ #query-bundle-request-reference-async } +::: synapseclient.models.QueryBundleRequest +[](){ #query-job-reference-async } +::: synapseclient.models.QueryJob +[](){ #query-next-page-token-reference-async } +::: synapseclient.models.QueryNextPageToken +[](){ #query-result-reference-async } +::: synapseclient.models.QueryResult +[](){ #query-result-bundle-reference-async } +::: synapseclient.models.QueryResultBundle +[](){ #query-result-output-reference-async } +::: synapseclient.models.QueryResultOutput +[](){ #row-reference-async } +::: synapseclient.models.Row +[](){ #rowset-reference-async } +::: synapseclient.models.RowSet +[](){ #select-column-reference-async } +::: synapseclient.models.SelectColumn + [](){ #column-change-reference-async } diff --git a/docs/reference/experimental/sync/table.md b/docs/reference/experimental/sync/table.md index 13d6aafe7..8f2f99974 100644 --- a/docs/reference/experimental/sync/table.md +++ b/docs/reference/experimental/sync/table.md @@ -64,6 +64,30 @@ at your own risk. ::: synapseclient.models.ColumnType [](){ #json-sub-column-reference-sync } ::: synapseclient.models.JsonSubColumn +[](){ #sumfilesize-reference-sync } +::: synapseclient.models.SumFileSizes +[](){ #query-reference-sync } +::: synapseclient.models.Query +[](){ #query-bundle-request-reference-sync } +::: synapseclient.models.QueryBundleRequest +[](){ #query-job-reference-sync } +::: synapseclient.models.QueryJob +[](){ #query-next-page-token-reference-sync } +::: synapseclient.models.QueryNextPageToken +[](){ #query-result-reference-sync } +::: synapseclient.models.QueryResult +[](){ #query-result-bundle-reference-sync } +::: synapseclient.models.QueryResultBundle +[](){ #query-result-output-reference-sync } +::: synapseclient.models.QueryResultOutput +[](){ #row-reference-sync } +::: synapseclient.models.Row +[](){ #rowset-reference-sync } +::: synapseclient.models.RowSet +[](){ #select-column-reference-sync } +::: synapseclient.models.SelectColumn + + [](){ #column-change-reference-sync } diff --git a/synapseclient/client.py b/synapseclient/client.py index 78ae3c7f1..d78f5f017 100644 --- a/synapseclient/client.py +++ b/synapseclient/client.py @@ -7386,9 +7386,17 @@ def getTableColumns(self, table): for result in self.restGET(uri)["results"]: yield Column(**result) - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "Use the `query` functions from `synapseclient.models.Table` instead. " + "Check the docstring for the replacement function example.", + ) def tableQuery(self, query: str, resultsAs: str = "csv", **kwargs): """ + **Deprecated with replacement.** This method will be removed in 5.0.0. + Use the `query` or `query_async` functions from [synapseclient.models.Table][] instead. + Query a Synapse Table. You can receive query results either as a generator over rows or as a CSV file. For smallish tables, either @@ -7421,6 +7429,69 @@ def tableQuery(self, query: str, resultsAs: str = "csv", **kwargs): # Sets the max timeout to 5 minutes. syn.table_query_timeout = 300 + Example: Using this function (DEPRECATED) + Getting query results as a DataFrame: + + results = syn.tableQuery("SELECT * FROM syn12345") + df = results.asDataFrame() + + Getting query results as a CSV file: + + results = syn.tableQuery("SELECT * FROM syn12345", resultsAs="csv", downloadLocation="./my_data/") + + Example: Migration to new method +   + + ```python + import asyncio + from synapseclient import Synapse + from synapseclient.models import query, query_async + + # Login to Synapse + syn = Synapse() + syn.login() + + # Synchronous query (recommended for most use cases) + results = query(query="SELECT * FROM syn12345") + print(results) + + # Query with specific options + results = query( + query="SELECT * FROM syn12345", + include_row_id_and_row_version=True, + convert_to_datetime=True + ) + print(results) + + # Download query results to a CSV file + file_path = query( + query="SELECT * FROM syn12345", + download_location="./my_data/", + separator=",", + quote_character='"', + header=True + ) + print(f"Results downloaded to: {file_path}") + + # Asynchronous query (for advanced use cases) + async def async_query_example(): + results = await query_async(query="SELECT * FROM syn12345") + print(results) + + # Download query results to a CSV file asynchronously + file_path = await query_async( + query="SELECT * FROM syn12345", + download_location="./my_data/", + separator=",", + quote_character='"', + header=True + ) + print(f"Results downloaded to: {file_path}") + + # Run the async example + asyncio.run(async_query_example()) + ``` + """ if resultsAs.lower() == "rowset": return TableQueryResult(self, query, **kwargs) @@ -7435,7 +7506,11 @@ def tableQuery(self, query: str, resultsAs: str = "csv", **kwargs): "Unknown return type requested from tableQuery: " + str(resultsAs) ) - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "Use the `query_part_mask` methods on the `Table`, `EntityView`, `SubmissionView`, `MaterializedView`, or `Dataset` classes instead.", + ) def _queryTable( self, query: str, @@ -7445,6 +7520,9 @@ def _queryTable( partMask=None, ) -> TableQueryResult: """ + **Deprecated with replacement.** This method will be removed in 5.0.0. + Use the `query_part_mask` or `query_part_mask_async` methods on the `Table`, `EntityView`, `SubmissionView`, `MaterializedView`, or `Dataset` classes instead. + Query a table and return the first page of results as a [QueryResultBundle](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/QueryResultBundle.html). If the result contains a *nextPageToken*, following pages a retrieved by calling [_queryTableNext][]. @@ -7462,6 +7540,46 @@ def _queryTable( Returns: The first page of results as a QueryResultBundle + + Example: Using this function (DEPRECATED) + Query a table with part mask: + + result = syn._queryTable( + query="SELECT * FROM syn123", + partMask=0x1 + ) + + Example: Migration to new method +   + + ```python + import asyncio + from synapseclient import Synapse + from synapseclient.models import Table + + # Login to Synapse + syn = Synapse() + syn.login() + + table = Table(id="syn123") + result = table.query_part_mask( + query="SELECT * FROM syn123", + part_mask=0x1 # QUERY_RESULTS + ) + + # Or using the async version + async def async_query_example(): + table = Table(id="syn123") + result = await table.query_part_mask_async( + query="SELECT * FROM syn123", + part_mask=0x1 # QUERY_RESULTS + ) + print(result) + + # Run the async example + asyncio.run(async_query_example()) + ``` + """ # See: query_bundle_request = { @@ -7487,9 +7605,16 @@ def _queryTable( return self._waitForAsync(uri=uri, request=query_bundle_request) - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "Use the `query_part_mask` method on the `Table`, `EntityView`, `SubmissionView`, `MaterializedView`, or `Dataset` classes instead.", + ) def _queryTableNext(self, nextPageToken: str, tableId: str) -> TableQueryResult: """ + **Deprecated with replacement.** This method will be removed in 5.0.0. + Use the `query_part_mask` or `query_part_mask_async` methods on the `Table`, `EntityView`, `SubmissionView`, `MaterializedView`, or `Dataset` classes instead. + Retrieve following pages if the result contains a *nextPageToken* Arguments: @@ -7498,11 +7623,59 @@ def _queryTableNext(self, nextPageToken: str, tableId: str) -> TableQueryResult: Returns: The following page of results as a QueryResultBundle + + Example: Using this function (DEPRECATED) + Get the next page of results: + + result = syn._queryTableNext( + nextPageToken="some_token", + tableId="syn123" + ) + + Example: Migration to new method +   + + ```python + import asyncio + from synapseclient import Synapse + from synapseclient.models import Table + + # Login to Synapse + syn = Synapse() + syn.login() + + table = Table(id="syn123") + + # For pagination, use LIMIT and OFFSET directly in the SQL query instead of nextPageToken. + # LIMIT controls the number of rows returned per page, OFFSET skips the specified number of rows. + # For example: LIMIT 100 OFFSET 100 returns rows 101-200, LIMIT 100 OFFSET 200 returns rows 201-300, etc. + result = table.query_part_mask( + query="SELECT * FROM syn123 LIMIT 100 OFFSET 100", + part_mask=0x1 # QUERY_RESULTS + ) + + # Or using the async version + async def async_query_example(): + table = Table(id="syn123") + result = await table.query_part_mask_async( + query="SELECT * FROM syn123 LIMIT 100 OFFSET 100", + part_mask=0x1 # QUERY_RESULTS + ) + print(result) + + # Run the async example + asyncio.run(async_query_example()) + ``` """ uri = "/entity/{id}/table/query/nextPage/async".format(id=tableId) return self._waitForAsync(uri=uri, request=nextPageToken) - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "Use the `_chunk_and_upload_csv` method on the `from synapseclient.models import Table` class instead. " + "Check the docstring for the replacement function example.", + ) def _uploadCsv( self, filepath: str, @@ -7516,6 +7689,9 @@ def _uploadCsv( linesToSkip: int = 0, ) -> dict: """ + **Deprecated with replacement.** This method will be removed in 5.0.0. + Use the `_chunk_and_upload_csv` method on [synapseclient.models.Table][] instead for efficient CSV processing. + Send an [UploadToTableRequest](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/UploadToTableRequest.html) to Synapse. Arguments: @@ -7533,6 +7709,54 @@ def _uploadCsv( Returns: [UploadToTableResult](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/UploadToTableResult.html) + + Example: Using this function (DEPRECATED) + Uploading a CSV file to an existing table: + + response = syn._uploadCsv( + filepath='/path/to/table.csv', + schema='syn123' + ) + + Example: Migration to new method +   + + ```python + import asyncio + from synapseclient import Synapse + from synapseclient.models import Table, CsvTableDescriptor + + TABLE_ID ="syn123" # Replace with your table ID + PATH = "/path/to/table.csv" + + # Login to Synapse + # syn = Synapse() + # syn.login() + + # Create CSV table descriptor with your settings + csv_descriptor = CsvTableDescriptor( + is_first_line_header=True, + separator=",", # or your custom separator + quote_character='"', # or your custom quote character + escape_character="\\", # or your custom escape character + ) + + # Define an async function to upload CSV with chunk method + async def upload_csv_with_chunk_method(table_id, path): + table = await Table(id=table_id).get_async(include_columns=True) + await table._chunk_and_upload_csv( + path_to_csv=path, + insert_size_bytes=900 * 1024 * 1024, # 900MB chunks + csv_table_descriptor=csv_descriptor, + schema_change_request=None, # Add schema changes if needed + client=syn, + job_timeout=600, + additional_changes=None # Add additional changes if needed + ) + + # Run the async function + asyncio.run(upload_csv_with_chunk_method(table_id=TABLE_ID, path=PATH)) + ``` """ fileHandleId = wrap_async_to_sync( @@ -7561,7 +7785,11 @@ def _uploadCsv( return response - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "To be removed in 5.0.0. This is a private function and has no direct replacement.", + ) def _check_table_transaction_response(self, response): for result in response["results"]: result_type = result["concreteType"] @@ -7600,7 +7828,12 @@ def _check_table_transaction_response(self, response): % (result_type, result) ) - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "Use the `query` or `query_async` functions from `synapseclient.models.Table` with `download_location` parameter instead. " + "Check the docstring for the replacement function example.", + ) def _queryTableCsv( self, query: str, @@ -7613,6 +7846,9 @@ def _queryTableCsv( downloadLocation: str = None, ) -> Tuple: """ + **Deprecated with replacement.** This method will be removed in 5.0.0. + Use the `query` or `query_async` functions from [synapseclient.models.Table][] with `download_location` parameter instead. + Query a Synapse Table and download a CSV file containing the results. Sends a [DownloadFromTableRequest](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/DownloadFromTableRequest.html) to Synapse. @@ -7630,6 +7866,55 @@ def _queryTableCsv( Returns: A tuple containing a [DownloadFromTableResult](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/DownloadFromTableResult.html) + Example: Using this function (DEPRECATED) + Query a table and download CSV: + + result, csv_path = syn._queryTableCsv( + query="SELECT * FROM syn123", + downloadLocation="/path/to/download", + quoteCharacter='"', + separator="," + ) + + Example: Migration to new method +   + + ```python + import asyncio + from synapseclient import Synapse + from synapseclient.models import query, query_async + + # Login to Synapse + syn = Synapse() + syn.login() + + # Synchronous query with CSV download + csv_path = query( + query="SELECT * FROM syn123", + download_location="/path/to/download", + quote_character='"', + separator=",", + header=True, + include_row_id_and_row_version=True + ) + print(f"CSV downloaded to: {csv_path}") + + # Asynchronous query with CSV download + async def async_csv_download(): + csv_path = await query_async( + query="SELECT * FROM syn123", + download_location="/path/to/download", + quote_character='"', + separator=",", + header=True, + include_row_id_and_row_version=True + ) + print(f"CSV downloaded to: {csv_path}") + + # Run the async example + asyncio.run(async_csv_download()) + ``` + The DownloadFromTableResult object contains these fields: * headers: ARRAY, The list of ColumnModel IDs that describes the rows of this set. * resultsFileHandleId: STRING, The resulting file handle ID can be used to download the CSV file created by @@ -8035,7 +8320,10 @@ def downloadTableColumns(self, table, columns, downloadLocation=None, **kwargs): return file_handle_to_path_map - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. This is a private function and has no direct replacement.", + ) def _build_table_download_file_handle_list(self, table, columns, downloadLocation): # ------------------------------------------------------------ # build list of file handles to download @@ -8077,7 +8365,10 @@ def _build_table_download_file_handle_list(self, table, columns, downloadLocatio warnings.warn("Weird file handle: %s" % file_handle_id) return file_handle_associations, file_handle_to_path_map - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. This is a private function and has no direct replacement.", + ) def _get_default_view_columns(self, view_type, view_type_mask=None): """Get default view columns""" uri = f"/column/tableview/defaults?viewEntityType={view_type}" @@ -8085,7 +8376,10 @@ def _get_default_view_columns(self, view_type, view_type_mask=None): uri += f"&viewTypeMask={view_type_mask}" return [Column(**col) for col in self.restGET(uri)["list"]] - # TODO: Deprecate method in https://sagebionetworks.jira.com/browse/SYNPY-1632 + @deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. This is a private function and has no direct replacement.", + ) def _get_annotation_view_columns( self, scope_ids: list, view_type: str, view_type_mask: str = None ) -> list: diff --git a/synapseclient/core/constants/concrete_types.py b/synapseclient/core/constants/concrete_types.py index dce70f541..38863289c 100644 --- a/synapseclient/core/constants/concrete_types.py +++ b/synapseclient/core/constants/concrete_types.py @@ -91,3 +91,17 @@ "org.sagebionetworks.repo.model.schema.GetValidationSchemaRequest" ) CREATE_SCHEMA_REQUEST = "org.sagebionetworks.repo.model.schema.CreateSchemaRequest" + +# Query Table as a CSV +# https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/DownloadFromTableResult.html +QUERY_TABLE_CSV_REQUEST = ( + "org.sagebionetworks.repo.model.table.DownloadFromTableRequest" +) + +# Query Table Bundle Request +# https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/QueryBundleRequest.html +QUERY_BUNDLE_REQUEST = "org.sagebionetworks.repo.model.table.QueryBundleRequest" + +QUERY_RESULT = "org.sagebionetworks.repo.model.table.QueryResult" + +QUERY_TABLE_CSV_RESULT = "org.sagebionetworks.repo.model.table.DownloadFromTableResult" diff --git a/synapseclient/models/__init__.py b/synapseclient/models/__init__.py index c9c957e96..5c0bce346 100644 --- a/synapseclient/models/__init__.py +++ b/synapseclient/models/__init__.py @@ -18,6 +18,7 @@ from synapseclient.models.submissionview import SubmissionView from synapseclient.models.table import Table from synapseclient.models.table_components import ( + ActionRequiredCount, AppendableRowSetRequest, Column, ColumnChange, @@ -28,8 +29,18 @@ JsonSubColumn, PartialRow, PartialRowSet, + Query, + QueryBundleRequest, + QueryJob, + QueryNextPageToken, + QueryResult, QueryResultBundle, + QueryResultOutput, + Row, + RowSet, SchemaStorageStrategy, + SelectColumn, + SumFileSizes, TableSchemaChangeRequest, TableUpdateTransaction, UploadToTableRequest, @@ -66,9 +77,9 @@ "Table", "Column", "ColumnType", + "SumFileSizes", "FacetType", "JsonSubColumn", - "QueryResultBundle", "query_async", "query", "query_part_mask_async", @@ -83,6 +94,17 @@ "CsvTableDescriptor", "MaterializedView", "VirtualTable", + "ActionRequiredCount", + "QueryBundleRequest", + "QueryNextPageToken", + "QueryResult", + "QueryResultBundle", + "QueryResultOutput", + "QueryJob", + "Query", + "Row", + "RowSet", + "SelectColumn", # Dataset models "Dataset", "EntityRef", diff --git a/synapseclient/models/mixins/asynchronous_job.py b/synapseclient/models/mixins/asynchronous_job.py index 4eaf9a339..ddd24a7fa 100644 --- a/synapseclient/models/mixins/asynchronous_job.py +++ b/synapseclient/models/mixins/asynchronous_job.py @@ -14,6 +14,8 @@ AGENT_CHAT_REQUEST, CREATE_SCHEMA_REQUEST, GET_VALIDATION_SCHEMA_REQUEST, + QUERY_BUNDLE_REQUEST, + QUERY_TABLE_CSV_REQUEST, TABLE_UPDATE_TRANSACTION_REQUEST, ) from synapseclient.core.exceptions import ( @@ -27,6 +29,8 @@ TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async", GET_VALIDATION_SCHEMA_REQUEST: "/schema/type/validation/async", CREATE_SCHEMA_REQUEST: "/schema/type/create/async", + QUERY_TABLE_CSV_REQUEST: "/entity/{entityId}/table/download/csv/async", + QUERY_BUNDLE_REQUEST: "/entity/{entityId}/table/query/async", } diff --git a/synapseclient/models/mixins/table_components.py b/synapseclient/models/mixins/table_components.py index 259c269d1..9e9fa3040 100644 --- a/synapseclient/models/mixins/table_components.py +++ b/synapseclient/models/mixins/table_components.py @@ -30,13 +30,22 @@ put_entity_id_bundle2, ) from synapseclient.core.async_utils import async_to_sync, otel_trace_method +from synapseclient.core.download.download_functions import ( + download_by_file_handle, + ensure_download_location_is_directory, +) from synapseclient.core.exceptions import SynapseTimeoutError from synapseclient.core.upload.multipart_upload_async import ( multipart_upload_dataframe_async, multipart_upload_file_async, multipart_upload_partial_file_async, ) -from synapseclient.core.utils import MB, log_dataclass_diff, merge_dataclass_entities +from synapseclient.core.utils import ( + MB, + extract_synapse_id_from_query, + log_dataclass_diff, + merge_dataclass_entities, +) from synapseclient.models import Activity from synapseclient.models.services.search import get_id from synapseclient.models.services.storable_entity_components import ( @@ -52,10 +61,15 @@ CsvTableDescriptor, PartialRow, PartialRowSet, + Query, + QueryBundleRequest, + QueryJob, + QueryNextPageToken, QueryResultBundle, + QueryResultOutput, + Row, SchemaStorageStrategy, SnapshotRequest, - SumFileSizes, TableSchemaChangeRequest, TableUpdateTransaction, UploadToTableRequest, @@ -97,6 +111,15 @@ DATA_FRAME_TYPE = TypeVar("pd.DataFrame") SERIES_TYPE = TypeVar("pd.Series") +LIST_COLUMN_TYPES = { + "STRING_LIST", + "INTEGER_LIST", + "BOOLEAN_LIST", + "DATE_LIST", + "ENTITYID_LIST", + "USERID_LIST", +} + def test_import_pandas() -> None: """This function is called within other functions and methods to ensure that pandas is installed.""" @@ -117,6 +140,410 @@ def test_import_pandas() -> None: raise +def row_labels_from_id_and_version(rows): + return ["_".join(map(str, row)) for row in rows] + + +def row_labels_from_rows(rows: List[Row]) -> List[Row]: + return row_labels_from_id_and_version( + [ + ( + (row.row_id, row.version_number, row.etag) + if row.etag + else (row.row_id, row.version_number) + ) + for row in rows + ] + ) + + +async def _query_table_csv( + query: str, + synapse: Synapse, + header: bool = True, + include_row_id_and_row_version: bool = True, + # for csvTableDescriptor + quote_character: str = '"', + escape_character: str = "\\", + line_end: str = os.linesep, + separator: str = ",", + # END for csvTableDescriptor + file_name: str = None, + additional_filters: Dict[str, Any] = None, + selected_facets: Dict[str, Any] = None, + include_entity_etag: bool = False, + select_file_column: int = None, + select_file_version_column: int = None, + offset: int = None, + sort: List[Dict[str, Any]] = None, + download_location: str = None, +) -> Tuple[QueryJob, str]: + """ + Query a Synapse Table and download a CSV file containing the results. + + Sends a [DownloadFromTableRequest](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/DownloadFromTableRequest.html) to Synapse. + + Arguments: + query: The SQL query string to execute against the table. + synapse: An authenticated Synapse client instance used for making the API call. + header: Should the first line contain the column names as a header in the + resulting file? Set to True to include the headers, False otherwise. + The default value is True. + include_row_id_and_row_version: Should the first two columns contain the row ID + and row version? The default value is True. + quote_character: The character used for quoting fields in the CSV. + The default is a double quote ("). + escape_character: The character used for escaping special characters in the CSV. + The default character '\\' will be used if this is not provided by the caller. + line_end: The string used to separate lines in the CSV. + The default is the system's line separator. + separator: The character used to separate fields in the CSV. + The default is a comma (,). + file_name: The optional name for the downloaded table file. + additional_filters: Appends additional filters to the SQL query. These are + applied before facets. Filters within the list have an AND relationship. + If a WHERE clause already exists on the SQL query or facets are selected, + it will also be ANDed with the query generated by these additional filters. + selected_facets: The selected facet filters to apply to the query. + include_entity_etag: Optional, default False. When True, query results against + views will include the Etag of each entity in the results. Note: The etag + is necessary to update Entities in the view. + select_file_column: The id of the column used to select file entities + (e.g. to fetch the action required for download). The column needs to be + an ENTITYID type column and be part of the schema of the underlying table/view. + select_file_version_column: The id of the column used as the version for + selecting file entities when required (e.g. to add a materialized view + query to the download cart with version enabled). The column needs to be + an INTEGER type column and be part of the schema of the underlying table/view. + offset: The optional offset into the results for pagination. + sort: Optional list of sort items to specify the ordering of results. + download_location: The download location + + Returns: + A tuple containing the download result (QueryJob object) and the path to the downloaded CSV file. + The download result is a dictionary containing information about the download. + """ + + csv_descriptor = CsvTableDescriptor( + separator=separator, + escape_character=escape_character, + quote_character=quote_character, + line_end=line_end, + is_first_line_header=header, + ) + + entity_id = extract_synapse_id_from_query(query) + query_job_request = QueryJob( + entity_id=entity_id, + sql=query, + write_header=header, + csv_table_descriptor=csv_descriptor, + include_row_id_and_row_version=include_row_id_and_row_version, + file_name=file_name, + additional_filters=additional_filters, + selected_facets=selected_facets, + include_entity_etag=include_entity_etag, + select_file_column=select_file_column, + select_file_version_column=select_file_version_column, + offset=offset, + sort=sort, + ) + + download_from_table_result = await query_job_request.send_job_and_wait_async( + synapse_client=synapse + ) + + file_handle_id = download_from_table_result.results_file_handle_id + cached_file_path = synapse.cache.get( + file_handle_id=file_handle_id, path=download_location + ) + if cached_file_path is not None: + return download_from_table_result, cached_file_path + + if download_location: + download_dir = ensure_download_location_is_directory( + download_location=download_location + ) + else: + download_dir = synapse.cache.get_cache_dir(file_handle_id=file_handle_id) + + os.makedirs(download_dir, exist_ok=True) + filename = f"SYNAPSE_TABLE_QUERY_{file_handle_id}.csv" + path = await download_by_file_handle( + file_handle_id=file_handle_id, + synapse_id=extract_synapse_id_from_query(query), + entity_type="TableEntity", + destination=os.path.join(download_dir, filename), + synapse_client=synapse, + ) + return download_from_table_result, path + + +def _query_table_next_page( + next_page_token: "QueryNextPageToken", table_id: str, synapse: Synapse +) -> "QueryResultBundle": + """ + Retrieve following pages if the result contains a *nextPageToken* + + Arguments: + next_page_token: Forward this token to get the next page of results. + table_id: The Synapse ID of the table + synapse: An authenticated Synapse client instance used for making the API call. + + Returns: + The following page of results as a QueryResultBundle + + """ + uri = "/entity/{id}/table/query/nextPage/async".format(id=table_id) + result = synapse._waitForAsync(uri=uri, request=next_page_token.token) + return QueryResultBundle.fill_from_dict(data=result) + + +async def _query_table_row_set( + query: str, + synapse: Synapse, + limit: int = None, + offset: int = None, + part_mask=None, +) -> "QueryResultBundle": + """ + Executes a SQL query against a Synapse table and returns the resulting row set. + + Args: + query (str): The SQL query string to execute. + synapse (Synapse): An authenticated Synapse client instance. + limit (int, optional): Maximum number of rows to return. Defaults to None. + offset (int, optional): Number of rows to skip before starting to return rows. Defaults to None. + part_mask (optional): Bit mask to specify which parts of the query result bundle to return. See Synapse REST docs for details. + + Returns: a QueryResultBundle object + """ + entity_id = extract_synapse_id_from_query(query) + query_cls = Query( + sql=query, + include_entity_etag=True, + limit=limit, + offset=offset, + ) + query_request = query_cls.to_synapse_request() + query_bundle_request = QueryBundleRequest( + entity_id=entity_id, query=query_request, part_mask=part_mask + ) + + completed_request = await query_bundle_request.send_job_and_wait_async( + synapse_client=synapse + ) + + return QueryResultBundle( + query_result=completed_request.query_result, + query_count=completed_request.query_count, + select_columns=completed_request.select_columns, + max_rows_per_page=completed_request.max_rows_per_page, + column_models=completed_request.column_models, + facets=completed_request.facets, + sum_file_sizes=completed_request.sum_file_sizes, + last_updated_on=completed_request.last_updated_on, + combined_sql=completed_request.combined_sql, + actions_required=completed_request.actions_required, + ) + + +async def _table_query( + query: str, synapse: Optional[Synapse] = None, results_as: str = "csv", **kwargs +) -> Union["QueryResultBundle", Tuple["QueryJob", str]]: + """ + Query a Synapse Table. + + Optional keyword arguments differ for the two return types of `rowset` or `csv`: + + - For `rowset`, you can specify: + - `limit`: Maximum number of rows to return. + - `offset`: Number of rows to skip before starting to return rows. + - `part_mask`: Bit mask to specify which parts of the query result bundle to return. + + - For `csv`, you can specify: + - `quote_character`: Character used for quoting fields. Default is double quote ("). + - `escape_character`: Character used for escaping special characters. Default is backslash (\). + - `line_end`: Character(s) used to terminate lines. Default is system line separator. + - `separator`: Character used to separate fields. Default is comma (,). + - `header`: Whether to include a header row. Default is True. + - `include_row_id_and_row_version`: Whether to include row ID and version in the output. Default is True. + - `file_name`: Optional name for the downloaded table file. + - `additional_filters`: Additional filters to append to the SQL query. + - `selected_facets`: Selected facet filters to apply to the query. + - `include_entity_etag`: Whether to include entity etag in view results. Default is False. + - `select_file_column`: Column ID for selecting file entities. + - `select_file_version_column`: Column ID for file version selection. + - `offset`: Optional offset into the results for pagination. + - `sort`: Optional list of sort items for result ordering. + - `download_location`: Location to download the CSV file. + + + Returns: + If `results_as` is "rowset", returns a QueryResultBundle object. + If `results_as` is "csv", returns a tuple of (QueryJob, csv_path). + """ + client = Synapse.get_client(synapse_client=synapse) + + if results_as.lower() == "rowset": + return await _query_table_row_set(query=query, synapse=client, **kwargs) + + elif results_as.lower() == "csv": + result, csv_path = await _query_table_csv( + query=query, + synapse=client, + quote_character=kwargs.get("quote_character", DEFAULT_QUOTE_CHARACTER), + escape_character=kwargs.get("escape_character", DEFAULT_ESCAPSE_CHAR), + line_end=kwargs.get("line_end", str(os.linesep)), + separator=kwargs.get("separator", DEFAULT_SEPARATOR), + header=kwargs.get("header", True), + include_row_id_and_row_version=kwargs.get( + "include_row_id_and_row_version", True + ), + file_name=kwargs.get("file_name", None), + additional_filters=kwargs.get("additional_filters", None), + selected_facets=kwargs.get("selected_facets", None), + include_entity_etag=kwargs.get("include_entity_etag", False), + select_file_column=kwargs.get("select_file_column", None), + select_file_version_column=kwargs.get("select_file_version_column", None), + offset=kwargs.get("offset", None), + sort=kwargs.get("sort", None), + download_location=kwargs.get("download_location", None), + ) + + return result, csv_path + + +def _rowset_to_pandas_df( + query_result_bundle: QueryResultBundle, + synapse: Synapse, + row_id_and_version_in_index: bool = True, + **kwargs, +) -> "DATA_FRAME_TYPE": + """ + Converts a Synapse table query rowset result to a pandas DataFrame. + + Arguments: + query_result_bundle: The query result bundle containing rows and headers from a Synapse + table query. This is typically the response from a table query operation + that includes query results, headers, and pagination information. + see here: https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/QueryResultBundle.html + synapse: An authenticated Synapse client instance used for making additional + API calls when pagination is required to fetch subsequent pages of results. + row_id_and_version_in_index: If True, uses ROW_ID, ROW_VERSION (and ROW_ETAG + if present) as the DataFrame index. + **kwargs: Additional keyword arguments (currently unused but maintained for + API compatibility). + + Returns: + A pandas DataFrame containing all the query results. + """ + test_import_pandas() + import collections + + import pandas as pd + + def construct_rownames(query_result_bundle, offset=0): + try: + return ( + row_labels_from_rows( + query_result_bundle.query_result.query_results.rows + ) + if row_id_and_version_in_index + else None + ) + except KeyError: + # if we don't have row id and version, just number the rows + # python3 cast range to list for safety + return list(range(offset, offset + len(rowset["rows"]))) + + # first page of rows + offset = 0 + rownames = construct_rownames(query_result_bundle, offset) + query_result = query_result_bundle.query_result + rowset = query_result.query_results + + if not rowset: + raise ValueError("The provided query_result_bundle has no 'rowset' data.") + + rows = rowset.rows or [] + headers = rowset.headers + + offset += len(rows) + series = collections.OrderedDict() + + if not row_id_and_version_in_index: + # Since we use an OrderedDict this must happen before we construct the other columns + # add row id, verison, and etag as rows + append_etag = False # only useful when (not row_id_and_version_in_index), hooray for lazy variables! + series["ROW_ID"] = pd.Series(name="ROW_ID", data=[row.row_id for row in rows]) + series["ROW_VERSION"] = pd.Series( + name="ROW_VERSION", + data=[row.version_number for row in rows], + ) + + row_etag = [row.etag for row in rows] + if any(row_etag): + append_etag = True + series["ROW_ETAG"] = pd.Series(name="ROW_ETAG", data=row_etag) + + for i, header in enumerate(headers): + column_name = header.name + series[column_name] = pd.Series( + name=column_name, + data=[row.values[i] for row in rows], + index=rownames, + ) + + next_page_token = query_result.next_page_token + + while next_page_token: + # see QueryResult: https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/QueryResult.html + # see RowSet: https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/RowSet.html + result = _query_table_next_page( + next_page_token=next_page_token, table_id=rowset.table_id, synapse=synapse + ) + rowset = result.query_result.query_results + next_page_token = result.query_result.next_page_token + + rownames = construct_rownames(rowset, offset) + offset += len(rowset.rows) + + if not row_id_and_version_in_index: + # TODO: Look into why this isn't being assigned + series["ROW_ID"].append( + pd.Series(name="ROW_ID", data=[row.id for row in rowset.rows]) + ) + series["ROW_VERSION"].append( + pd.Series( + name="ROW_VERSION", + data=[row.version_number for row in rowset.rows], + ) + ) + if append_etag: + series["ROW_ETAG"] = pd.Series( + name="ROW_ETAG", + data=[row.etag for row in rowset.rows], + ) + + for i, header in enumerate(rowset.headers): + column_name = header.name + series[column_name] = pd.concat( + [ + series[column_name], + pd.Series( + name=column_name, + data=[row.values[i] for row in rowset.rows], + index=rownames, + ), + ], + # can't verify integrity when indices are just numbers instead of 'rowid_rowversion' + verify_integrity=row_id_and_version_in_index, + ) + + return pd.DataFrame(data=series) + + @dataclass class TableBase: """Base class for any `Table`-like entities in Synapse. @@ -2052,7 +2479,7 @@ def query( *, synapse_client: Optional[Synapse] = None, **kwargs, - ) -> Union[DATA_FRAME_TYPE, str]: + ) -> Union["DATA_FRAME_TYPE", str]: """Query for data on a table stored in Synapse. The results will always be returned as a Pandas DataFrame unless you specify a `download_location` in which case the results will be downloaded to that location. There are a number of @@ -2131,7 +2558,8 @@ def query_part_mask( part_mask: int, *, synapse_client: Optional[Synapse] = None, - ) -> QueryResultBundle: + **kwargs, + ) -> "QueryResultOutput": """Query for data on a table stored in Synapse. This is a more advanced use case of the `query` function that allows you to determine what addiitional metadata about the table or query should also be returned. If you do not need this @@ -2148,7 +2576,6 @@ def query_part_mask( part_mask: The bitwise OR of the part mask values you want to return in the results. The following list of part masks are implemented to be returned in the results: - - Query Results (queryResults) = 0x1 - Query Count (queryCount) = 0x2 - The sum of the file sizes (sumFileSizesBytes) = 0x40 @@ -2187,8 +2614,7 @@ def query_part_mask( print(result) ``` """ - # Replaced at runtime - return QueryResultBundle(result=None) + return QueryResultOutput() @async_to_sync @@ -2209,7 +2635,7 @@ async def query_async( *, synapse_client: Optional[Synapse] = None, **kwargs, - ) -> DATA_FRAME_TYPE: + ) -> Union["DATA_FRAME_TYPE", str]: """Query for data on a table stored in Synapse. The results will always be returned as a Pandas DataFrame unless you specify a `download_location` in which case the results will be downloaded to that location. There are a number of @@ -2283,7 +2709,6 @@ async def main(): asyncio.run(main()) ``` """ - loop = asyncio.get_event_loop() client = Synapse.get_client(synapse_client=synapse_client) @@ -2295,26 +2720,43 @@ async def main(): # pandas.read_csv. During implmentation a determination on how large of a CSV # that can be loaded from Memory will be needed. When that limit is reached we # should continue to force the download of those results to disk. - - # TODO: Replace method in https://sagebionetworks.jira.com/browse/SYNPY-1632 - results = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).tableQuery( - query=query, - includeRowIdAndRowVersion=include_row_id_and_row_version, - quoteCharacter=quote_character, - escapeCharacter=escape_character, - lineEnd=line_end, - separator=separator, - header=header, - downloadLocation=download_location, - ), + result, csv_path = await _table_query( + query=query, + include_row_id_and_row_version=include_row_id_and_row_version, + quote_char=quote_character, + escape_char=escape_character, + line_end=line_end, + separator=separator, + header=header, + download_location=download_location, ) + if download_location: - return results.filepath - return results.asDataFrame( - rowIdAndVersionInIndex=False, - convert_to_datetime=convert_to_datetime, + return csv_path + + date_columns = [] + list_columns = [] + dtype = {} + + if result.headers is not None: + for column in result.headers: + if column.column_type == "STRING": + # we want to identify string columns so that pandas doesn't try to + # automatically parse strings in a string column to other data types + dtype[column.name] = str + elif column.column_type in LIST_COLUMN_TYPES: + list_columns.append(column.name) + elif column.column_type == "DATE" and convert_to_datetime: + date_columns.append(column.name) + + return csv_to_pandas_df( + filepath=csv_path, + separator=separator or DEFAULT_SEPARATOR, + quote_char=quote_character or DEFAULT_QUOTE_CHARACTER, + escape_char=escape_character or DEFAULT_ESCAPSE_CHAR, + row_id_and_version_in_index=False, + date_columns=date_columns if date_columns else None, + list_columns=list_columns if list_columns else None, **kwargs, ) @@ -2324,7 +2766,8 @@ async def query_part_mask_async( part_mask: int, *, synapse_client: Optional[Synapse] = None, - ) -> QueryResultBundle: + **kwargs, + ) -> "QueryResultOutput": """Query for data on a table stored in Synapse. This is a more advanced use case of the `query` function that allows you to determine what addiitional metadata about the table or query should also be returned. If you do not need this @@ -2352,7 +2795,7 @@ async def query_part_mask_async( instance from the Synapse class constructor. Returns: - The results of the query as a Pandas DataFrame. + The results of the query as a QueryResultOutput object. Example: Querying for data with a part mask This example shows how to use the bitwise `OR` of Python to combine the @@ -2389,36 +2832,32 @@ async def main(): client = Synapse.get_client(synapse_client=synapse_client) client.logger.info(f"Running query: {query}") - - # TODO: Replace method in https://sagebionetworks.jira.com/browse/SYNPY-1632 - results = await loop.run_in_executor( - None, - lambda: Synapse.get_client(synapse_client=synapse_client).tableQuery( - query=query, - resultsAs="rowset", - partMask=part_mask, - ), + limit = kwargs.get("limit", None) + offset = kwargs.get("offset", None) + + results = await _table_query( + query=query, + results_as="rowset", + part_mask=part_mask, + limit=limit, + offset=offset, ) - # TODO: Replace method in https://sagebionetworks.jira.com/browse/SYNPY-1632 as_df = await loop.run_in_executor( None, - lambda: results.asDataFrame(rowIdAndVersionInIndex=False), + lambda: _rowset_to_pandas_df( + query_result_bundle=results, + synapse=client, + row_id_and_version_in_index=False, + ), ) - return QueryResultBundle( + return QueryResultOutput.fill_from_dict( result=as_df, - count=results.count, - sum_file_sizes=( - SumFileSizes( - sum_file_size_bytes=results.sumFileSizes.get( - "sumFileSizesBytes", None - ), - greater_than=results.sumFileSizes.get("greaterThan", None), - ) - if results.sumFileSizes - else None - ), - last_updated_on=results.lastUpdatedOn, + data={ + "count": results.query_count, + "last_updated_on": results.last_updated_on, + "sum_file_sizes": results.sum_file_sizes, + }, ) diff --git a/synapseclient/models/table_components.py b/synapseclient/models/table_components.py index 7a187bfd0..8effc36de 100644 --- a/synapseclient/models/table_components.py +++ b/synapseclient/models/table_components.py @@ -1,3 +1,4 @@ +import json import os from dataclasses import dataclass, field, replace from enum import Enum @@ -17,7 +18,13 @@ from synapseclient import Column as Synapse_Column from synapseclient.core.async_utils import async_to_sync, skip_async_to_sync from synapseclient.core.constants import concrete_types -from synapseclient.core.utils import delete_none_keys +from synapseclient.core.constants.concrete_types import ( + QUERY_BUNDLE_REQUEST, + QUERY_RESULT, + QUERY_TABLE_CSV_REQUEST, + QUERY_TABLE_CSV_RESULT, +) +from synapseclient.core.utils import delete_none_keys, from_unix_epoch_time from synapseclient.models.mixins.asynchronous_job import AsynchronousCommunicator from synapseclient.models.protocols.table_protocol import ColumnSynchronousProtocol @@ -29,23 +36,23 @@ @dataclass class SumFileSizes: - sum_file_size_bytes: int - """The sum of the file size in bytes.""" + """ + A model for the sum of file sizes in a query result bundle. - greater_than: bool - """When true, the actual sum of the files sizes is greater than the value provided - with 'sum_file_size_bytes'. When false, the actual sum of the files sizes is equals - the value provided with 'sum_file_size_bytes'""" + This result is modeled from: + """ + + sum_file_size_bytes: int = None + """The sum of the file size in bytes.""" + greater_than: bool = None + """When true, the actual sum of the files sizes is greater than the value provided with 'sumFileSizesBytes'. When false, the actual sum of the files sizes is equals the value provided with 'sumFileSizesBytes'""" @dataclass -class QueryResultBundle: +class QueryResultOutput: """ The result of querying Synapse with an included `part_mask`. This class contains a subnet of the available items that may be returned by specifying a `part_mask`. - - - This result is modeled from: """ result: "DATA_FRAME_TYPE" @@ -65,6 +72,36 @@ class QueryResultBundle: updated. Use mask = 0x80 to include in the bundle. This is returned in the ISO8601 format like `2000-01-01T00:00:00.000Z`.""" + @classmethod + def fill_from_dict( + cls, result: "DATA_FRAME_TYPE", data: Dict[str, Any] + ) -> "QueryResultOutput": + """ + Create a QueryResultOutput from a result DataFrame and dictionary response. + + Arguments: + result: The pandas DataFrame result from the query. + data: The dictionary response from the REST API containing metadata. + + Returns: + A QueryResultOutput instance. + """ + sum_file_sizes = ( + SumFileSizes( + sum_file_size_bytes=data["sum_file_sizes"].sum_file_size_bytes, + greater_than=data["sum_file_sizes"].greater_than, + ) + if data.get("sum_file_sizes") + else None + ) + + return cls( + result=result, + count=data.get("count", None), + sum_file_sizes=sum_file_sizes, + last_updated_on=data.get("last_updated_on", None), + ) + @dataclass class CsvTableDescriptor: @@ -82,7 +119,7 @@ class CsvTableDescriptor: line_end: str = os.linesep """The line feed terminator to be used for the resulting file. The default value of '\n' will be used if this is not provided by the caller.""" - is_file_line_header: bool = True + is_first_line_header: bool = True """Is the first line a header? The default value of 'true' will be used if this is not provided by the caller.""" def to_synapse_request(self): @@ -92,7 +129,7 @@ def to_synapse_request(self): "quoteCharacter": self.quote_character, "escapeCharacter": self.escape_character, "lineEnd": self.line_end, - "isFirstLineHeader": self.is_file_line_header, + "isFirstLineHeader": self.is_first_line_header, } delete_none_keys(request) return request @@ -482,6 +519,527 @@ def __repr__(self) -> str: return self.value +@dataclass +class Row: + """ + Represents a single row of a TableEntity. + + This result is modeled from: + """ + + row_id: Optional[int] = None + """The immutable ID issued to a new row.""" + + version_number: Optional[int] = None + """The version number of this row. Each row version is immutable, so when a row + is updated a new version is created.""" + + etag: Optional[str] = None + """For queries against EntityViews with query.includeEntityEtag=true, this field + will contain the etag of the entity. Will be null for all other cases.""" + + values: Optional[List[str]] = None + """The values for each column of this row. To delete a row, set this to an empty list: []""" + + def to_boolean(value): + """ + Convert a string to boolean, case insensitively, + where true values are: true, t, and 1 and false values are: false, f, 0. + Raise a ValueError for all other values. + """ + if value is None: + raise ValueError("Can't convert None to boolean.") + + if isinstance(value, bool): + return value + + if isinstance(value, str): + lower_value = value.lower() + if lower_value in ["true", "t", "1"]: + return True + if lower_value in ["false", "f", "0"]: + return False + + raise ValueError(f"Can't convert {value} to boolean.") + + @staticmethod + def cast_values(values, headers): + """ + Convert a row of table query results from strings to the correct column type. + + See: + """ + if len(values) != len(headers): + raise ValueError( + f"The number of columns in the csv file does not match the given headers. {len(values)} fields, {len(headers)} headers" + ) + + result = [] + for header, field in zip(headers, values): # noqa: F402 + columnType = header.get("columnType", "STRING") + + # convert field to column type + if field is None or field == "": + result.append(None) + elif columnType in { + "STRING", + "ENTITYID", + "FILEHANDLEID", + "LARGETEXT", + "USERID", + "LINK", + }: + result.append(field) + elif columnType == "DOUBLE": + result.append(float(field)) + elif columnType == "INTEGER": + result.append(int(field)) + elif columnType == "BOOLEAN": + result.append(Row.to_boolean(field)) + elif columnType == "DATE": + result.append(from_unix_epoch_time(field)) + elif columnType in { + "STRING_LIST", + "INTEGER_LIST", + "BOOLEAN_LIST", + "ENTITYID_LIST", + "USERID_LIST", + }: + result.append(json.loads(field)) + elif columnType == "DATE_LIST": + result.append(json.loads(field, parse_int=from_unix_epoch_time)) + else: + # default to string for unknown column type + result.append(field) + + return result + + @classmethod + def fill_from_dict(cls, data: Dict[str, Any]) -> "Row": + """Create a Row from a dictionary response.""" + return cls( + row_id=data.get("rowId"), + version_number=data.get("versionNumber"), + etag=data.get("etag"), + values=data.get("values"), + ) + + +@dataclass +class ActionRequiredCount: + """ + Represents a single action that the user will need to take in order to download one or more files. + + This result is modeled from: + """ + + action: Optional[Dict[str, Any]] = None + """An action that the user must take in order to download a file.""" + + count: Optional[int] = None + """The number of files that require this action.""" + + @classmethod + def fill_from_dict(cls, data: Dict[str, Any]) -> "ActionRequiredCount": + """Create an ActionRequiredCount from a dictionary response.""" + return cls( + action=data.get("action", None), + count=data.get("count", None), + ) + + +@dataclass +class SelectColumn: + """ + A column model contains the metadata of a single column of a TableEntity. + + This result is modeled from: + """ + + name: Optional[str] = None + """The required display name of the column""" + + column_type: Optional[ColumnType] = None + """The column type determines the type of data that can be stored in a column. + Switching between types (using a transaction with TableUpdateTransactionRequest + in the "changes" list) is generally allowed except for switching to "_LIST" + suffixed types. In such cases, a new column must be created and data must be + copied over manually""" + + id: Optional[str] = None + """The optional ID of the select column, if this is a direct column selected""" + + @classmethod + def fill_from_dict(cls, data: Dict[str, Any]) -> "SelectColumn": + """Create a SelectColumn from a dictionary response.""" + column_type = None + column_type_value = data.get("columnType") + if column_type_value: + try: + column_type = ColumnType(column_type_value) + except ValueError: + column_type = None + return cls( + name=data.get("name"), + column_type=column_type, + id=data.get("id"), + ) + + +@dataclass +class QueryNextPageToken: + """ + Token for retrieving the next page of query results. + + This result is modeled from: + """ + + concrete_type: Optional[str] = None + """The concrete type of this object""" + + entity_id: Optional[str] = None + """The ID of the entity (table/view) being queried""" + + token: Optional[str] = None + """The token for the next page.""" + + @classmethod + def fill_from_dict(cls, data: Dict[str, Any]) -> "QueryNextPageToken": + """Create a QueryNextPageToken from a dictionary response.""" + return cls( + concrete_type=data.get("concreteType"), + entity_id=data.get("entityId"), + token=data.get("token"), + ) + + +@dataclass +class RowSet: + """ + Represents a set of row of a TableEntity. + + This result is modeled from: + """ + + concrete_type: Optional[str] = None + """The concrete type of this object""" + + table_id: Optional[str] = None + """The ID of the TableEntity than owns these rows""" + + etag: Optional[str] = None + """Any RowSet returned from Synapse will contain the current etag of the change set. + To update any rows from a RowSet the etag must be provided with the POST.""" + + headers: Optional[List[SelectColumn]] = None + """The list of SelectColumns that describes the rows of this set.""" + + rows: Optional[List[Row]] = field(default_factory=list) + """The Rows of this set. The index of each row value aligns with the index of each header.""" + + @classmethod + def cast_row( + cls, row: Dict[str, Any], headers: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """ + Cast the values in a single row to their appropriate column types. + + This method takes a row dictionary containing string values from a table query + response and converts them to the correct Python types based on the column + headers. For example, converts string "123" to integer 123 for INTEGER columns, + or string "true" to boolean True for BOOLEAN columns. + + Arguments: + row: A dictionary representing a single table row with keys that need to be cast to proper types. + headers: A list of header dictionaries, each containing column metadata + including 'columnType' which determines how to cast the corresponding + value in the row. + + Returns: + The same row dictionary with the 'values' field updated to contain + properly typed values instead of strings. + """ + row["values"] = Row.cast_values(row["values"], headers) + return row + + @classmethod + def cast_row_set(cls, rows: List[Row], headers: List[Dict[str, Any]]) -> List[Row]: + """ + Cast the values in multiple rows to their appropriate column types. + + This method takes a list of row dictionaries containing string values from a table query + response and converts them to the correct Python types based on the column headers. + It applies the same type casting logic as `cast_row` to each row in the collection. + + Arguments: + rows: A list of row dictionaries, each representing a single table row with + field contains a list of string values that need to be cast to proper types. + headers: A list of header dictionaries, each containing column metadata + including 'columnType' which determines how to cast the corresponding + values in each row. + + Returns: + A list of row dictionaries with the 'values' field in each row updated to + contain properly typed values instead of strings. + """ + rows = [cls.cast_row(row, headers) for row in rows] + return rows + + @classmethod + def fill_from_dict(cls, data: Dict[str, Any]) -> "RowSet": + """Create a RowSet from a dictionary response.""" + headers_data = data.get("headers") + rows_data = data.get("rows") + + # Handle headers - convert to SelectColumn objects + headers = None + if headers_data and isinstance(headers_data, list): + headers = [SelectColumn.fill_from_dict(header) for header in headers_data] + + # Handle rows - cast values and convert to Row objects + rows = None + if rows_data and isinstance(rows_data, list): + # Cast row values based on header types if headers are available + if headers_data and isinstance(headers_data, list): + rows_data = cls.cast_row_set(rows_data, headers_data) + # Convert to Row objects + rows = [Row.fill_from_dict(row) for row in rows_data] + + return cls( + concrete_type=data.get("concreteType"), + table_id=data.get("tableId"), + etag=data.get("etag"), + headers=headers, + rows=rows, + ) + + +@dataclass +class QueryResult: + """ + A page of query result. + + This result is modeled from: + """ + + query_results: RowSet + """Represents a set of row of a TableEntity (RowSet)""" + + concrete_type: str = QUERY_RESULT + """The concrete type of this object""" + + next_page_token: Optional[QueryNextPageToken] = None + """Token for retrieving the next page of results, if available""" + + @classmethod + def fill_from_dict(cls, data: Dict[str, Any]) -> "QueryResult": + """Create a QueryResult from a dictionary response.""" + next_page_token = None + query_results = data.get("queryResults", None) + + if data.get("nextPageToken", None): + next_page_token = QueryNextPageToken.fill_from_dict(data["nextPageToken"]) + + if data.get("queryResults", None): + query_results = RowSet.fill_from_dict(data["queryResults"]) + + return cls( + concrete_type=data.get("concreteType"), + query_results=query_results, + next_page_token=next_page_token, + ) + + +@dataclass +class QueryJob(AsynchronousCommunicator): + """ + A query job that can be submitted to Synapse and return a DownloadFromTableResult. + + This class combines query request parameters with the ability to receive + query results through the AsynchronousCommunicator pattern. + + Request modeled from: + + Response modeled from: + """ + + # Request parameters + entity_id: str + """The ID of the entity (table/view) being queried""" + + concrete_type: str = QUERY_TABLE_CSV_REQUEST + "The concrete type of the request (usually DownloadFromTableRequest)" + + write_header: Optional[bool] = True + """Should the first line contain the columns names as a header in the resulting file? Set to 'true' to include the headers else, 'false'. The default value is 'true'.""" + + include_row_id_and_row_version: Optional[bool] = True + """Should the first two columns contain the row ID and row version? The default value is 'true'.""" + + csv_table_descriptor: Optional[CsvTableDescriptor] = None + """The description of a csv for upload or download.""" + + file_name: Optional[str] = None + """The optional name for the downloaded table.""" + + sql: Optional[str] = None + """The SQL query to execute""" + + additional_filters: Optional[List[Dict[str, Any]]] = None + """Appends additional filters to the SQL query. These are applied before facets. Filters within the list have an AND relationship. If a WHERE clause already exists on the SQL query or facets are selected, it will also be ANDed with the query generated by these additional filters.""" + """TODO: create QueryFilter dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651""" + + selected_facets: Optional[List[Dict[str, Any]]] = None + """The selected facet filters.""" + """TODO: create FacetColumnRequest dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651""" + + include_entity_etag: Optional[bool] = False + """"Optional, default false. When true, a query results against views will include the Etag of each entity in the results. Note: The etag is necessary to update Entities in the view.""" + + select_file_column: Optional[int] = None + """The id of the column used to select file entities (e.g. to fetch the action required for download). The column needs to be an ENTITYID type column and be part of the schema of the underlying table/view.""" + + select_file_version_column: Optional[int] = None + """The id of the column used as the version for selecting file entities when required (e.g. to add a materialized view query to the download cart with version enabled). The column needs to be an INTEGER type column and be part of the schema of the underlying table/view.""" + + offset: Optional[int] = None + """The optional offset into the results""" + + limit: Optional[int] = None + """The optional limit to the results""" + + sort: Optional[List[Dict[str, Any]]] = None + """The sort order for the query results (ARRAY)""" + """TODO: Add SortItem dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651""" + + # Response attributes (filled after job completion) + job_id: Optional[str] = None + """The job ID returned from the async job""" + + results_file_handle_id: Optional[str] = None + """The file handle ID of the results CSV file""" + + table_id: Optional[str] = None + """The ID of the table that was queried""" + + etag: Optional[str] = None + """The etag of the table""" + + headers: Optional[List[SelectColumn]] = None + """The column headers from the query result""" + + response_concrete_type: Optional[str] = QUERY_TABLE_CSV_RESULT + """The concrete type of the response (usually DownloadFromTableResult)""" + + def to_synapse_request(self) -> Dict[str, Any]: + """Convert to DownloadFromTableRequest format for async job submission.""" + + csv_table_descriptor = None + if self.csv_table_descriptor: + csv_table_descriptor = self.csv_table_descriptor.to_synapse_request() + + synapse_request = { + "concreteType": QUERY_TABLE_CSV_REQUEST, + "entityId": self.entity_id, + "csvTableDescriptor": csv_table_descriptor, + "sql": self.sql, + "writeHeader": self.write_header, + "includeRowIdAndRowVersion": self.include_row_id_and_row_version, + "includeEntityEtag": self.include_entity_etag, + "fileName": self.file_name, + "additionalFilters": self.additional_filters, + "selectedFacet": self.selected_facets, + "selectFileColumns": self.select_file_column, + "selectFileVersionColumns": self.select_file_version_column, + "offset": self.offset, + "sort": self.sort, + } + delete_none_keys(synapse_request) + return synapse_request + + def fill_from_dict(self, synapse_response: Dict[str, Any]) -> "Self": + """Fill the job results from Synapse response.""" + # Fill response attributes from DownloadFromTableResult + headers = None + headers_data = synapse_response.get("headers") + if headers_data and isinstance(headers_data, list): + headers = [SelectColumn.fill_from_dict(header) for header in headers_data] + + self.job_id = synapse_response.get("jobId") + self.response_concrete_type = synapse_response.get("concreteType") + self.results_file_handle_id = synapse_response.get("resultsFileHandleId") + self.table_id = synapse_response.get("tableId") + self.etag = synapse_response.get("etag") + self.headers = headers + + return self + + +@dataclass +class Query: + """ + Represents a SQL query with optional parameters. + + This result is modeled from: + """ + + sql: str + """The SQL query string""" + + additional_filters: Optional[List[Dict[str, Any]]] = None + """Appends additional filters to the SQL query. These are applied before facets. + Filters within the list have an AND relationship. If a WHERE clause already exists + on the SQL query or facets are selected, it will also be ANDed with the query + generated by these additional filters.""" + """TODO: create QueryFilter dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651""" + + selected_facets: Optional[List[Dict[str, Any]]] = None + """The selected facet filters""" + """TODO: create FacetColumnRequest dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651""" + + include_entity_etag: Optional[bool] = False + """Optional, default false. When true, a query results against views will include + the Etag of each entity in the results. Note: The etag is necessary to update + Entities in the view.""" + + select_file_column: Optional[int] = None + """The id of the column used to select file entities (e.g. to fetch the action + required for download). The column needs to be an ENTITYID type column and be + part of the schema of the underlying table/view.""" + + select_file_version_column: Optional[int] = None + """The id of the column used as the version for selecting file entities when required + (e.g. to add a materialized view query to the download cart with version enabled). + The column needs to be an INTEGER type column and be part of the schema of the + underlying table/view.""" + + offset: Optional[int] = None + """The optional offset into the results""" + + limit: Optional[int] = None + """The optional limit to the results""" + + sort: Optional[List[Dict[str, Any]]] = None + """The sort order for the query results (ARRAY)""" + """TODO: Add SortItem dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651 """ + + def to_synapse_request(self) -> Dict[str, Any]: + """Converts the Query object into a dictionary that can be passed into the REST API.""" + result = { + "sql": self.sql, + "additionalFilters": self.additional_filters, + "selectedFacets": self.selected_facets, + "includeEntityEtag": self.include_entity_etag, + "selectFileColumn": self.select_file_column, + "selectFileVersionColumn": self.select_file_version_column, + "offset": self.offset, + "limit": self.limit, + "sort": self.sort, + } + delete_none_keys(result) + return result + + @dataclass class JsonSubColumn: """For column of type JSON that represents the combination of multiple @@ -783,6 +1341,215 @@ def to_synapse_request(self) -> Dict[str, Any]: return result +@dataclass +class QueryResultBundle: + """ + A bundle of information about a query result. + + This result is modeled from: + """ + + concrete_type: str = QUERY_TABLE_CSV_REQUEST + """The concrete type of this object""" + + query_result: QueryResult = None + """A page of query result""" + + query_count: Optional[int] = None + """The total number of rows that match the query. Use mask = 0x2 to include in the + bundle.""" + + select_columns: Optional[List[SelectColumn]] = None + """The list of SelectColumns from the select clause. Use mask = 0x4 to include in + the bundle.""" + + max_rows_per_page: Optional[int] = None + """The maximum number of rows that can be retrieved in a single call. This is a + function of the columns that are selected in the query. Use mask = 0x8 to include + in the bundle.""" + + column_models: Optional[List[Column]] = None + """The list of ColumnModels for the table. Use mask = 0x10 to include in the bundle.""" + + facets: Optional[List[Dict[str, Any]]] = None + """TODO: create facets dataclass""" + """The list of facets for the search results. Use mask = 0x20 to include in the bundle.""" + + sum_file_sizes: Optional[SumFileSizes] = None + """The sum of the file size for all files in the given view query. Use mask = 0x40 + to include in the bundle.""" + + last_updated_on: Optional[str] = None + """The date-time when this table/view was last updated. Note: Since views are + eventually consistent a view might still be out-of-date even if it was recently + updated. Use mask = 0x80 to include in the bundle. This is returned in the + ISO8601 format like `2000-01-01T00:00:00.000Z`.""" + + combined_sql: Optional[str] = None + """The SQL that is combination of a the input SQL, FacetRequests, AdditionalFilters, + Sorting, and Pagination. Use mask = 0x100 to include in the bundle.""" + + actions_required: Optional[List[ActionRequiredCount]] = None + """The first 50 actions required to download the files that are part of the query. + Use mask = 0x200 to include them in the bundle.""" + + @classmethod + def fill_from_dict(cls, data: Dict[str, Any]) -> "QueryResultBundle": + """Create a QueryResultBundle from a dictionary response.""" + # Handle sum_file_sizes + sum_file_sizes = None + sum_file_sizes_data = data.get("sumFileSizes") + if sum_file_sizes_data: + sum_file_sizes = SumFileSizes( + sum_file_size_bytes=sum_file_sizes_data.get("sumFileSizesBytes"), + greater_than=sum_file_sizes_data.get("greaterThan"), + ) + + # Handle query_result + query_result = None + query_result_data = data.get("queryResult") + if query_result_data: + query_result = QueryResult.fill_from_dict(query_result_data) + + # Handle select_columns + select_columns = None + select_columns_data = data.get("selectColumns") + if select_columns_data and isinstance(select_columns_data, list): + select_columns = [ + SelectColumn.fill_from_dict(col) for col in select_columns_data + ] + + # Handle actions_required + actions_required = None + actions_required_data = data.get("actionsRequired") + if actions_required_data and isinstance(actions_required_data, list): + actions_required = [ + ActionRequiredCount.fill_from_dict(action) + for action in actions_required_data + ] + + # Handle column_models + column_models = None + column_models_data = data.get("columnModels") + if column_models_data and isinstance(column_models_data, list): + column_models = [Column().fill_from_dict(col) for col in column_models_data] + + return cls( + concrete_type=data.get("concreteType"), + query_result=query_result, + query_count=data.get("queryCount"), + select_columns=select_columns, + max_rows_per_page=data.get("maxRowsPerPage"), + column_models=column_models, + facets=data.get("facets"), + sum_file_sizes=sum_file_sizes, + last_updated_on=data.get("lastUpdatedOn"), + combined_sql=data.get("combinedSql"), + actions_required=actions_required, + ) + + +@dataclass +class QueryBundleRequest(AsynchronousCommunicator): + """ + A query bundle request that can be submitted to Synapse to retrieve query results with metadata. + + This class combines query request parameters with the ability to receive + a QueryResultBundle through the AsynchronousCommunicator pattern. + + The partMask determines which parts of the result bundle are included: + - Query Results (queryResults) = 0x1 + - Query Count (queryCount) = 0x2 + - Select Columns (selectColumns) = 0x4 + - Max Rows Per Page (maxRowsPerPage) = 0x8 + - The Table Columns (columnModels) = 0x10 + - Facet statistics for each faceted column (facetStatistics) = 0x20 + - The sum of the file sizes (sumFileSizesBytes) = 0x40 + - The last updated on date (lastUpdatedOn) = 0x80 + - The combined SQL query including additional filters (combinedSql) = 0x100 + - The list of actions required for any file in the query (actionsRequired) = 0x200 + + This result is modeled from: + """ + + # Request parameters + entity_id: str + """The ID of the entity (table/view) being queried""" + + query: Query + """The SQL query with parameters""" + + concrete_type: str = QUERY_BUNDLE_REQUEST + """The concrete type of this request""" + + part_mask: Optional[int] = None + """Optional integer mask to request specific parts. Default includes all parts if not specified.""" + + # Response attributes (filled after job completion from QueryResultBundle) + query_result: Optional[QueryResult] = None + """A page of query result""" + + query_count: Optional[int] = None + """The total number of rows that match the query""" + + select_columns: Optional[List[SelectColumn]] = None + """The list of SelectColumns from the select clause""" + + max_rows_per_page: Optional[int] = None + """The maximum number of rows that can be retrieved in a single call""" + + column_models: Optional[List[Dict[str, Any]]] = None + """The list of ColumnModels for the table""" + + facets: Optional[List[Dict[str, Any]]] = None + """The list of facets for the search results""" + + sum_file_sizes: Optional[SumFileSizes] = None + """The sum of the file size for all files in the given view query""" + + last_updated_on: Optional[str] = None + """The date-time when this table/view was last updated""" + + combined_sql: Optional[str] = None + """The SQL that is combination of a the input SQL, FacetRequests, AdditionalFilters, Sorting, and Pagination""" + + actions_required: Optional[List[ActionRequiredCount]] = None + """The first 50 actions required to download the files that are part of the query""" + + def to_synapse_request(self) -> Dict[str, Any]: + """Convert to QueryBundleRequest format for async job submission.""" + result = { + "concreteType": self.concrete_type, + "entityId": self.entity_id, + "query": self.query, + } + + if self.part_mask is not None: + result["partMask"] = self.part_mask + + delete_none_keys(result) + return result + + def fill_from_dict(self, synapse_response: Dict[str, Any]) -> "Self": + """Fill the request results from Synapse response (QueryResultBundle).""" + # Use QueryResultBundle's fill_from_dict logic to populate response fields + bundle = QueryResultBundle.fill_from_dict(synapse_response) + + # Copy all the result fields from the bundle + self.query_result = bundle.query_result + self.query_count = bundle.query_count + self.select_columns = bundle.select_columns + self.max_rows_per_page = bundle.max_rows_per_page + self.column_models = bundle.column_models + self.facets = bundle.facets + self.sum_file_sizes = bundle.sum_file_sizes + self.last_updated_on = bundle.last_updated_on + self.combined_sql = bundle.combined_sql + self.actions_required = bundle.actions_required + + return self + + class SchemaStorageStrategy(str, Enum): """Enum used to determine how to store the schema of a table in Synapse.""" diff --git a/synapseclient/table.py b/synapseclient/table.py index a68040813..9956de0a6 100644 --- a/synapseclient/table.py +++ b/synapseclient/table.py @@ -263,10 +263,20 @@ def column_ids(columns): return [col.id for col in columns if "id" in col] +@deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "Moved to synapseclient.models.mixins.table_components. ", +) def row_labels_from_id_and_version(rows): return ["_".join(map(str, row)) for row in rows] +@deprecated( + version="4.9.0", + reason="To be removed in 5.0.0. " + "Moved to synapseclient.models.mixins.table_components. ", +) def row_labels_from_rows(rows): return row_labels_from_id_and_version( [ diff --git a/tests/integration/synapseclient/models/async/test_entityview_async.py b/tests/integration/synapseclient/models/async/test_entityview_async.py index 19c1775cf..80eee1050 100644 --- a/tests/integration/synapseclient/models/async/test_entityview_async.py +++ b/tests/integration/synapseclient/models/async/test_entityview_async.py @@ -238,9 +238,42 @@ async def test_update_rows_and_annotations( entityview = await entityview.store_async(synapse_client=self.syn) self.schedule_for_cleanup(entityview.id) - # SPY on the CSV conversion function to verify different input paths - spy_csv_file_conversion = mocker.spy(table_module, "csv_to_pandas_df") + # Custom wrapper to capture call stack + original_csv_to_pandas_df = table_module.csv_to_pandas_df + call_info = [] + + def csv_wrapper(*args, **kwargs): + import traceback + + stack = traceback.extract_stack() + + # Find the calling function (skip the wrapper itself) + calling_function = None + for frame in reversed(stack[:-1]): # Skip current frame + if "_upsert_rows_async" in frame.name: + calling_function = "_upsert_rows_async" + break + elif "query_async" in frame.name: + calling_function = "query_async" + break + else: + pass + + call_info.append( + { + "caller": calling_function, + "args": args, + "kwargs": kwargs, + "filepath": kwargs.get("filepath", args[0] if args else None), + } + ) + return original_csv_to_pandas_df(*args, **kwargs) + + # Patch the csv_to_pandas_df function to use the wrapper + mock_csv_to_pandas_df = mocker.patch.object( + table_module, "csv_to_pandas_df", side_effect=csv_wrapper + ) # Create test data for all files test_data = { "id": [file.id for file in files], @@ -259,7 +292,7 @@ async def test_update_rows_and_annotations( for method in update_methods: # Reset the spy for each method - spy_csv_file_conversion.reset_mock() + call_info.clear() # WHEN I update rows using different input types if method == "csv": @@ -278,7 +311,10 @@ async def test_update_rows_and_annotations( ) # THEN the CSV conversion function should be called - spy_csv_file_conversion.assert_called_once() + _upsert_rows_async_calls = [ + call for call in call_info if call["caller"] == "_upsert_rows_async" + ] + assert len(_upsert_rows_async_calls) == 1 elif method == "dataframe": # Use DataFrame @@ -290,7 +326,10 @@ async def test_update_rows_and_annotations( ) # THEN the CSV conversion function should NOT be called - spy_csv_file_conversion.assert_not_called() + _upsert_rows_async_calls = [ + call for call in call_info if call["caller"] == "_upsert_rows_async" + ] + assert len(_upsert_rows_async_calls) == 0 else: # dict # Use dictionary @@ -302,7 +341,10 @@ async def test_update_rows_and_annotations( ) # THEN the CSV conversion function should NOT be called - spy_csv_file_conversion.assert_not_called() + _upsert_rows_async_calls = [ + call for call in call_info if call["caller"] == "_upsert_rows_async" + ] + assert len(_upsert_rows_async_calls) == 0 # THEN the columns should exist in the entity view assert "column_string" in entityview.columns diff --git a/tests/integration/synapseclient/models/async/test_table_async.py b/tests/integration/synapseclient/models/async/test_table_async.py index 52c851827..06b0f2709 100644 --- a/tests/integration/synapseclient/models/async/test_table_async.py +++ b/tests/integration/synapseclient/models/async/test_table_async.py @@ -815,7 +815,7 @@ async def test_store_rows_as_csv_being_split_and_uploaded( assert len(results) == 200 # AND The spy should have been called in multiple batches - assert spy_send_job.call_count == 4 + assert spy_send_job.call_count == 5 async def test_store_rows_as_df_being_split_and_uploaded( self, project_model: Project, mocker: MockerFixture @@ -879,7 +879,7 @@ async def test_store_rows_as_df_being_split_and_uploaded( # AND The spy should have been called in multiple batches # Note: DataFrames have a minimum of 100 rows per batch - assert spy_send_job.call_count == 2 + assert spy_send_job.call_count == 3 @skip("Skip in normal testing because the large size makes it slow") async def test_store_rows_as_large_df_being_split_and_uploaded( @@ -1068,7 +1068,7 @@ async def test_upsert_operations_with_various_data_sources( # We should have 9 total rows now (6 from before + 3 new) assert len(results) == 9 # The spy should have been called for update and insert operations - assert spy_send_job.call_count == 2 + assert spy_send_job.call_count == 4 # Test 4: Dry run operation # WHEN I perform a dry run upsert @@ -1079,8 +1079,7 @@ async def test_upsert_operations_with_various_data_sources( } ) - # Reset the spy to count just this operation - spy_send_job.reset_mock() + spy_table_update = mocker.spy(table_module, "_push_row_updates_to_synapse") await table.upsert_rows_async( values=dry_run_data, @@ -1098,7 +1097,7 @@ async def test_upsert_operations_with_various_data_sources( # The values from the previous update should still be in place assert 99 not in results["column_key_2"].values # The spy should not have been called - assert spy_send_job.call_count == 0 + assert spy_table_update.call_count == 0 async def test_upsert_with_multi_value_key(self, project_model: Project) -> None: """Test upserting rows using multiple columns as the primary key.""" @@ -1274,7 +1273,7 @@ async def test_upsert_with_large_data_and_batching( assert len(results) == 6 # AND multiple batch jobs should have been created due to batching settings - assert spy_send_job.call_count == 5 # More batches due to small size settings + assert spy_send_job.call_count == 7 # More batches due to small size settings async def test_upsert_all_data_types( self, mocker: MockerFixture, project_model: Project diff --git a/tests/integration/synapseclient/models/synchronous/test_entityview.py b/tests/integration/synapseclient/models/synchronous/test_entityview.py index 6b4a9890f..7af33226c 100644 --- a/tests/integration/synapseclient/models/synchronous/test_entityview.py +++ b/tests/integration/synapseclient/models/synchronous/test_entityview.py @@ -236,8 +236,42 @@ async def test_update_rows_and_annotations( entityview = entityview.store(synapse_client=self.syn) self.schedule_for_cleanup(entityview.id) - # SPY on the CSV conversion function to verify different input paths - spy_csv_file_conversion = mocker.spy(table_module, "csv_to_pandas_df") + # Custom wrapper to capture call stack + original_csv_to_pandas_df = table_module.csv_to_pandas_df + call_info = [] + + def csv_wrapper(*args, **kwargs): + import traceback + + stack = traceback.extract_stack() + + # Find the calling function (skip the wrapper itself) + calling_function = None + for frame in reversed(stack[:-1]): # Skip current frame + if "_upsert_rows_async" in frame.name: + calling_function = "_upsert_rows_async" + break + elif "query_async" in frame.name: + calling_function = "query_async" + break + else: + pass + + call_info.append( + { + "caller": calling_function, + "args": args, + "kwargs": kwargs, + "filepath": kwargs.get("filepath", args[0] if args else None), + } + ) + + return original_csv_to_pandas_df(*args, **kwargs) + + # Patch the csv_to_pandas_df function to use the wrapper + mock_csv_to_pandas_df = mocker.patch.object( + table_module, "csv_to_pandas_df", side_effect=csv_wrapper + ) # Create test data for all files test_data = { @@ -257,7 +291,7 @@ async def test_update_rows_and_annotations( for method in update_methods: # Reset the spy for each method - spy_csv_file_conversion.reset_mock() + call_info.clear() # WHEN I update rows using different input types if method == "csv": @@ -276,7 +310,10 @@ async def test_update_rows_and_annotations( ) # THEN the CSV conversion function should be called - spy_csv_file_conversion.assert_called_once() + _upsert_rows_async_calls = [ + call for call in call_info if call["caller"] == "_upsert_rows_async" + ] + assert len(_upsert_rows_async_calls) == 1 elif method == "dataframe": # Use DataFrame @@ -288,7 +325,10 @@ async def test_update_rows_and_annotations( ) # THEN the CSV conversion function should NOT be called - spy_csv_file_conversion.assert_not_called() + _upsert_rows_async_calls = [ + call for call in call_info if call["caller"] == "_upsert_rows_async" + ] + assert len(_upsert_rows_async_calls) == 0 else: # dict # Use dictionary @@ -300,7 +340,10 @@ async def test_update_rows_and_annotations( ) # THEN the CSV conversion function should NOT be called - spy_csv_file_conversion.assert_not_called() + _upsert_rows_async_calls = [ + call for call in call_info if call["caller"] == "_upsert_rows_async" + ] + assert len(_upsert_rows_async_calls) == 0 # THEN the columns should exist in the entity view assert "column_string" in entityview.columns diff --git a/tests/integration/synapseclient/models/synchronous/test_table.py b/tests/integration/synapseclient/models/synchronous/test_table.py index 85f1e21d8..1ba4b88b5 100644 --- a/tests/integration/synapseclient/models/synchronous/test_table.py +++ b/tests/integration/synapseclient/models/synchronous/test_table.py @@ -793,7 +793,7 @@ async def test_store_rows_as_csv_being_split_and_uploaded( assert len(results) == 200 # AND The spy should have been called in multiple batches - assert spy_send_job.call_count == 4 + assert spy_send_job.call_count == 5 async def test_store_rows_as_df_being_split_and_uploaded( self, project_model: Project, mocker: MockerFixture @@ -857,7 +857,7 @@ async def test_store_rows_as_df_being_split_and_uploaded( # AND The spy should have been called in multiple batches # Note: DataFrames have a minimum of 100 rows per batch - assert spy_send_job.call_count == 2 + assert spy_send_job.call_count == 3 @skip("Skip in normal testing because the large size makes it slow") async def test_store_rows_as_large_df_being_split_and_uploaded( @@ -1040,7 +1040,7 @@ async def test_upsert_operations_with_various_data_sources( # We should have 9 total rows now (6 from before + 3 new) assert len(results) == 9 # The spy should have been called for update and insert operations - assert spy_send_job.call_count == 2 + assert spy_send_job.call_count == 4 # Test 4: Dry run operation # WHEN I perform a dry run upsert @@ -1051,8 +1051,7 @@ async def test_upsert_operations_with_various_data_sources( } ) - # Reset the spy to count just this operation - spy_send_job.reset_mock() + spy_table_update = mocker.spy(table_module, "_push_row_updates_to_synapse") table.upsert_rows( values=dry_run_data, @@ -1068,7 +1067,7 @@ async def test_upsert_operations_with_various_data_sources( # The values from the previous update should still be in place assert 99 not in results["column_key_2"].values # The spy should not have been called - assert spy_send_job.call_count == 0 + assert spy_table_update.call_count == 0 async def test_upsert_with_multi_value_key(self, project_model: Project) -> None: """Test upserting rows using multiple columns as the primary key.""" @@ -1238,7 +1237,7 @@ async def test_upsert_with_large_data_and_batching( assert len(results) == 6 # AND multiple batch jobs should have been created due to batching settings - assert spy_send_job.call_count == 5 # More batches due to small size settings + assert spy_send_job.call_count == 7 # More batches due to small size settings async def test_upsert_all_data_types( self, mocker: MockerFixture, project_model: Project diff --git a/tests/unit/synapseclient/mixins/unit_test_table_components.py b/tests/unit/synapseclient/mixins/unit_test_table_components.py index ab27fbb16..0f480836c 100644 --- a/tests/unit/synapseclient/mixins/unit_test_table_components.py +++ b/tests/unit/synapseclient/mixins/unit_test_table_components.py @@ -10,6 +10,11 @@ from synapseclient import Synapse from synapseclient.api import ViewEntityType, ViewTypeMask +from synapseclient.core.constants.concrete_types import ( + QUERY_BUNDLE_REQUEST, + QUERY_RESULT, + QUERY_TABLE_CSV_REQUEST, +) from synapseclient.core.utils import MB from synapseclient.models import Activity, Column, ColumnType from synapseclient.models.mixins.table_components import ( @@ -26,8 +31,26 @@ ViewSnapshotMixin, ViewStoreMixin, ViewUpdateMixin, + _query_table_csv, + _query_table_next_page, + _query_table_row_set, +) +from synapseclient.models.table_components import ( + ActionRequiredCount, + ColumnType, + CsvTableDescriptor, + Query, + QueryBundleRequest, + QueryJob, + QueryNextPageToken, + QueryResult, + QueryResultBundle, + QueryResultOutput, + Row, + RowSet, + SelectColumn, + SumFileSizes, ) -from synapseclient.table import TableQueryResult POST_COLUMNS_PATCH = "synapseclient.models.mixins.table_components.post_columns" GET_ID_PATCH = "synapseclient.models.mixins.table_components.get_id" @@ -927,6 +950,264 @@ async def test_upsert_rows_async(self): ) +class TestQuery: + """Test suite for the Query.to_synapse_request method.""" + + def test_to_synapse_request_with_minimal_data(self): + """Test to_synapse_request with only required SQL parameter.""" + # GIVEN a Query with minimal parameters + query = Query(sql="SELECT * FROM syn123456") + + # WHEN calling to_synapse_request + result = query.to_synapse_request() + + # THEN verify only sql and includeEntityEtag are included (None values are deleted) + expected = {"sql": "SELECT * FROM syn123456", "includeEntityEtag": False} + assert result == expected + + def test_to_synapse_request_with_all_parameters(self): + """Test to_synapse_request with all parameters specified.""" + # GIVEN a Query with all parameters + additional_filters = [ + { + "concreteType": "org.example.Filter1", + "column": "col1", + "operator": "EQUALS", + "values": ["value1"], + }, + { + "concreteType": "org.example.Filter2", + "column": "col2", + "operator": "GREATER_THAN", + "values": [10], + }, + ] + selected_facets = [ + { + "concreteType": "org.example.FacetColumnRangeRequest", + "columnName": "age", + "min": "18", + "max": "65", + }, + { + "concreteType": "org.example.FacetColumnValuesRequest", + "columnName": "category", + "facetValues": ["A", "B"], + }, + ] + sort_items = [ + {"column": "name", "direction": "ASC"}, + {"column": "date_created", "direction": "DESC"}, + ] + + query = Query( + sql="SELECT col1, col2, col3 FROM syn123456", + additional_filters=additional_filters, + selected_facets=selected_facets, + include_entity_etag=True, + select_file_column=123, + select_file_version_column=456, + offset=50, + limit=100, + sort=sort_items, + ) + + # WHEN calling to_synapse_request + result = query.to_synapse_request() + + # THEN verify all parameters are included + expected = { + "sql": "SELECT col1, col2, col3 FROM syn123456", + "additionalFilters": additional_filters, + "selectedFacets": selected_facets, + "includeEntityEtag": True, + "selectFileColumn": 123, + "selectFileVersionColumn": 456, + "offset": 50, + "limit": 100, + "sort": sort_items, + } + assert result == expected + + def test_to_synapse_request_with_partial_parameters(self): + """Test to_synapse_request with some parameters specified.""" + # GIVEN a Query with partial parameters + query = Query( + sql="SELECT COUNT(*) FROM syn123456", + include_entity_etag=False, + offset=0, + limit=50, + ) + + # WHEN calling to_synapse_request + result = query.to_synapse_request() + + # THEN verify only specified parameters are included + expected = { + "sql": "SELECT COUNT(*) FROM syn123456", + "includeEntityEtag": False, + "offset": 0, + "limit": 50, + } + assert result == expected + + +class TestQueryBundleRequest: + """Test suite for the QueryBundleRequest.to_synapse_request and fill_from_dict methods.""" + + @pytest.fixture + def sample_query(self): + """Sample Query object for testing.""" + return Query( + sql="SELECT * FROM syn123456", include_entity_etag=True, offset=0, limit=100 + ) + + @pytest.fixture + def sample_query_result_bundle_data(self): + """Sample QueryResultBundle response data for testing.""" + return { + "concreteType": "org.sagebionetworks.repo.model.table.QueryResultBundle", + "queryResult": { + "concreteType": "org.sagebionetworks.repo.model.table.QueryResult", + "queryResults": { + "concreteType": "org.sagebionetworks.repo.model.table.RowSet", + "tableId": "syn123456", + "etag": "rowset-etag", + "headers": [{"name": "col1", "columnType": "STRING", "id": "123"}], + "rows": [ + {"rowId": 1, "versionNumber": 1, "values": ["test_value"]} + ], + }, + }, + "queryCount": 250, + "selectColumns": [ + {"name": "col1", "columnType": "STRING", "id": "123"}, + {"name": "col2", "columnType": "INTEGER", "id": "124"}, + ], + "maxRowsPerPage": 100, + "columnModels": [ + {"name": "col1", "columnType": "STRING", "id": "123"}, + {"name": "col2", "columnType": "INTEGER", "id": "124"}, + ], + "facets": [ + { + "concreteType": "org.sagebionetworks.repo.model.table.FacetColumnResultValues", + "columnName": "status", + "facetType": "enumeration", + "facetValues": [ + {"value": "active", "count": 100, "isSelected": False} + ], + } + ], + "sumFileSizes": {"sumFileSizesBytes": 2048576, "greaterThan": True}, + "lastUpdatedOn": "2025-08-27T12:30:45.678Z", + "combinedSql": "SELECT * FROM syn123456 WHERE status = 'active' LIMIT 100 OFFSET 0", + "actionsRequired": [ + { + "action": { + "concreteType": "org.sagebionetworks.repo.model.download.MeetAccessRequirement", + "accessRequirementId": 12345, + }, + "count": 5, + } + ], + } + + def test_to_synapse_request_with_minimal_parameters(self, sample_query): + """Test to_synapse_request with minimal parameters.""" + # GIVEN a QueryBundleRequest with minimal parameters + request = QueryBundleRequest(entity_id="syn123456", query=sample_query) + + # WHEN calling to_synapse_request + result = request.to_synapse_request() + + # THEN verify the correct request structure + expected = { + "concreteType": QUERY_BUNDLE_REQUEST, + "entityId": "syn123456", + "query": sample_query, + } + assert result == expected + + def test_to_synapse_request_with_part_mask(self, sample_query): + """Test to_synapse_request with part_mask specified.""" + # GIVEN a QueryBundleRequest with part_mask + part_mask = 0x1 | 0x2 | 0x4 + request = QueryBundleRequest( + entity_id="syn789012", query=sample_query, part_mask=part_mask + ) + + # WHEN calling to_synapse_request + result = request.to_synapse_request() + + # THEN verify part_mask is included + expected = { + "concreteType": QUERY_BUNDLE_REQUEST, + "entityId": "syn789012", + "query": sample_query, + "partMask": part_mask, + } + assert result == expected + + def test_fill_from_dict_with_complete_bundle( + self, sample_query, sample_query_result_bundle_data + ): + """Test fill_from_dict with complete QueryResultBundle response.""" + # GIVEN a QueryBundleRequest and complete response data + request = QueryBundleRequest( + entity_id="syn123456", query=sample_query, part_mask=0x3FF + ) + + # WHEN calling fill_from_dict + result = request.fill_from_dict(sample_query_result_bundle_data) + + # THEN verify all response attributes are set + assert result is request # Should return self + + # Verify nested QueryResult + assert isinstance(request.query_result, QueryResult) + assert ( + request.query_result.concrete_type + == "org.sagebionetworks.repo.model.table.QueryResult" + ) + assert isinstance(request.query_result.query_results, RowSet) + assert request.query_result.query_results.table_id == "syn123456" + + # Verify scalar fields + assert request.query_count == 250 + assert request.max_rows_per_page == 100 + assert request.last_updated_on == "2025-08-27T12:30:45.678Z" + assert ( + request.combined_sql + == "SELECT * FROM syn123456 WHERE status = 'active' LIMIT 100 OFFSET 0" + ) + + # Verify SelectColumns + assert len(request.select_columns) == 2 + assert isinstance(request.select_columns[0], SelectColumn) + assert request.select_columns[0].name == "col1" + assert request.select_columns[0].column_type == ColumnType.STRING + + # Verify ColumnModels + assert len(request.column_models) == 2 + assert isinstance(request.column_models[0], Column) + assert request.column_models[0].name == "col1" + + # Verify Facets (stored as raw data) + assert len(request.facets) == 1 + assert request.facets[0]["columnName"] == "status" + + # Verify SumFileSizes + assert isinstance(request.sum_file_sizes, SumFileSizes) + assert request.sum_file_sizes.sum_file_size_bytes == 2048576 + assert request.sum_file_sizes.greater_than == True + + # Verify ActionsRequired + assert len(request.actions_required) == 1 + assert isinstance(request.actions_required[0], ActionRequiredCount) + assert request.actions_required[0].count == 5 + + class TestViewUpdateMixin: @pytest.fixture(autouse=True, scope="function") def init_syn(self, syn: Synapse) -> None: @@ -967,6 +1248,136 @@ async def test_update_rows_async(self): ) +class TestQueryResultBundle: + """Test suite for the QueryResultBundle.fill_from_dict method.""" + + @pytest.fixture + def sample_query_result_data(self): + """Sample QueryResult data for testing.""" + return { + "concreteType": "org.sagebionetworks.repo.model.table.QueryResult", + "queryResults": { + "concreteType": "org.sagebionetworks.repo.model.table.RowSet", + "tableId": "syn123456", + "etag": "rowset-etag", + "headers": [ + {"name": "col1", "columnType": "STRING", "id": "123"}, + {"name": "col2", "columnType": "INTEGER", "id": "124"}, + ], + "rows": [ + {"rowId": 1, "versionNumber": 1, "values": ["test1", "100"]}, + {"rowId": 2, "versionNumber": 1, "values": ["test2", "200"]}, + ], + }, + "nextPageToken": { + "concreteType": "org.sagebionetworks.repo.model.table.QueryNextPageToken", + "entityId": "syn123456", + "token": "next-page-token-abc", + }, + } + + @pytest.fixture + def sample_select_columns_data(self): + """Sample SelectColumn data for testing.""" + return [ + {"name": "col1", "columnType": "STRING", "id": "123"}, + {"name": "col2", "columnType": "INTEGER", "id": "124"}, + {"name": "col3", "columnType": "BOOLEAN", "id": "125"}, + ] + + @pytest.fixture + def sample_sum_file_sizes_data(self): + """Sample SumFileSizes data for testing.""" + return {"sumFileSizesBytes": 1048576, "greaterThan": False} + + def test_fill_from_dict_with_complete_data( + self, + sample_query_result_data, + sample_select_columns_data, + sample_sum_file_sizes_data, + ): + """Test fill_from_dict with complete QueryResultBundle data.""" + # GIVEN complete QueryResultBundle data + data = { + "concreteType": "org.sagebionetworks.repo.model.table.QueryResultBundle", + "queryResult": sample_query_result_data, + "queryCount": 150, + "selectColumns": sample_select_columns_data, + "maxRowsPerPage": 100, + "columnModels": [ + {"name": "col1", "columnType": "STRING", "id": "123"}, + {"name": "col2", "columnType": "INTEGER", "id": "124"}, + ], + "facets": [ + { + "concreteType": "org.sagebionetworks.repo.model.table.FacetColumnResultValues", + "columnName": "status", + "facetType": "enumeration", + "facetValues": [ + {"value": "active", "count": 50, "isSelected": False}, + {"value": "inactive", "count": 25, "isSelected": True}, + ], + } + ], + "sumFileSizes": sample_sum_file_sizes_data, + "lastUpdatedOn": "2025-08-20T15:30:45.123Z", + "combinedSql": "SELECT col1, col2 FROM syn123456 WHERE status = 'active'", + } + + # WHEN calling fill_from_dict + result = QueryResultBundle.fill_from_dict(data) + + # THEN verify all attributes are set correctly + assert ( + result.concrete_type + == "org.sagebionetworks.repo.model.table.QueryResultBundle" + ) + + # Verify nested QueryResult + assert isinstance(result.query_result, QueryResult) + assert ( + result.query_result.concrete_type + == "org.sagebionetworks.repo.model.table.QueryResult" + ) + assert isinstance(result.query_result.query_results, RowSet) + assert result.query_result.query_results.table_id == "syn123456" + + # Verify scalar fields + assert result.query_count == 150 + assert result.max_rows_per_page == 100 + assert result.last_updated_on == "2025-08-20T15:30:45.123Z" + assert ( + result.combined_sql + == "SELECT col1, col2 FROM syn123456 WHERE status = 'active'" + ) + + # Verify SelectColumns + assert len(result.select_columns) == 3 + assert isinstance(result.select_columns[0], SelectColumn) + assert result.select_columns[0].name == "col1" + assert result.select_columns[0].column_type == ColumnType.STRING + assert result.select_columns[1].name == "col2" + assert result.select_columns[1].column_type == ColumnType.INTEGER + assert result.select_columns[2].name == "col3" + assert result.select_columns[2].column_type == ColumnType.BOOLEAN + + # Verify ColumnModels + assert len(result.column_models) == 2 + assert result.column_models[0].name == "col1" + assert result.column_models[1].column_type == "INTEGER" + + # Verify Facets (stored as raw data) + assert len(result.facets) == 1 + assert result.facets[0]["columnName"] == "status" + assert result.facets[0]["facetType"] == "enumeration" + assert len(result.facets[0]["facetValues"]) == 2 + + # Verify SumFileSizes + assert isinstance(result.sum_file_sizes, SumFileSizes) + assert result.sum_file_sizes.sum_file_size_bytes == 1048576 + assert result.sum_file_sizes.greater_than == False + + class TestQueryMixin: fake_query = "SELECT * FROM syn123" @@ -984,16 +1395,39 @@ async def test_query_async(self): # GIVEN a TestClass instance test_instance = self.ClassForTest() - # Create a mock TableQueryResult without calling __init__ - mock_query_result = MagicMock(spec=TableQueryResult) - mock_query_result.asDataFrame.return_value = pd.DataFrame( - {"col1": ["A", "B"], "col2": [1, 2]} + mock_query_job = QueryJob( + entity_id="syn1234", + sql="SELECT * FROM syn1234", + # Response attributes populated after job completion + job_id="1234", + results_file_handle_id="5678", + table_id="syn1234", + etag="test_etag", + headers=[ + SelectColumn(name="col1", column_type=ColumnType.STRING, id="111"), + SelectColumn(name="col2", column_type=ColumnType.INTEGER, id="222"), + ], + response_concrete_type="org.sagebionetworks.repo.model.table.DownloadFromTableResult", + ) + + # CREATE a mock table query result + mock_df = pd.DataFrame( + {"test_col": ["random string1"], "test_col2": ["random string2"]} ) + mock_query_result = mock_query_job, "dummy.csv" # WHEN I call query_async - with patch.object( - self.syn, "tableQuery", return_value=mock_query_result - ) as mock_table_query: + with ( + patch( + "synapseclient.models.mixins.table_components._table_query", + return_value=mock_query_result, + ) as mock_table_query, + patch( + "synapseclient.models.mixins.table_components.csv_to_pandas_df", + return_value=mock_df, + ) as mock_csv_to_pandas_df, + patch.object(os, "linesep", str(os.linesep)), + ): result = await test_instance.query_async( query=self.fake_query, synapse_client=self.syn ) @@ -1001,39 +1435,159 @@ async def test_query_async(self): # THEN mock_table_query should be called with correct args mock_table_query.assert_called_once_with( query=self.fake_query, - includeRowIdAndRowVersion=True, - quoteCharacter='"', - escapeCharacter="\\", - lineEnd=str(os.linesep), + include_row_id_and_row_version=True, + quote_char='"', + escape_char="\\", + line_end=str(os.linesep), separator=",", header=True, - downloadLocation=None, + download_location=None, + ) + + # AND csv_to_pandas_df should be called with correct args + mock_csv_to_pandas_df.assert_called_once_with( + filepath="dummy.csv", + separator=",", + quote_char='"', + escape_char="\\", + row_id_and_version_in_index=False, + date_columns=None, + list_columns=None, ) - # AND mock_as_data_frame should be called with correct args - mock_query_result.asDataFrame.assert_called_once_with( - rowIdAndVersionInIndex=False, - convert_to_datetime=False, + # AND the result should match expected DataFrame + assert result.equals(mock_df) + + async def test_query_async_with_date_and_list_columns(self): + # GIVEN a TestClass instance + test_instance = self.ClassForTest() + + # CREATE a mock table query result with headers containing date and list columns + mock_df = pd.DataFrame( + { + "date_col": ["2024-01-01", "2024-01-02"], + "list_col": [["item1", "item2"], ["item3", "item4"]], + "string_col": ["A", "B"], + } + ) + + csv_table_descriptor = CsvTableDescriptor( + quote_character='"', + escape_character="\\", + line_end=os.linesep, + separator=",", + is_first_line_header=True, + ) + + # Mock query result with headers that include date and list column types + mock_query_job_response = QueryJob( + entity_id="syn123", + sql="SELECT * FROM syn123", + csv_table_descriptor=csv_table_descriptor, + include_row_id_and_row_version=True, + job_id="test-job-12345", + response_concrete_type="org.sagebionetworks.repo.model.table.DownloadFromTableResult", + results_file_handle_id="file-handle-67890", + table_id="syn123", + etag="test-etag-abc123", + headers=[ + SelectColumn(name="date_col", column_type=ColumnType.DATE), + SelectColumn(name="list_col", column_type=ColumnType.STRING_LIST), + SelectColumn(name="string_col", column_type=ColumnType.STRING), + ], + ) + + mock_query_result_with_headers = ( + mock_query_job_response, + "dummy.csv", + ) + + # WHEN I call query_async with convert_to_datetime=True + with ( + patch( + "synapseclient.models.mixins.table_components._table_query", + return_value=mock_query_result_with_headers, + ) as mock_table_query, + patch( + "synapseclient.models.mixins.table_components.csv_to_pandas_df", + return_value=mock_df, + ) as mock_csv_to_pandas_df, + patch.object(os, "linesep", str(os.linesep)), + ): + result = await test_instance.query_async( + query=self.fake_query, convert_to_datetime=True, synapse_client=self.syn + ) + + # THEN mock_table_query should be called with correct args + mock_table_query.assert_called_once_with( + query=self.fake_query, + include_row_id_and_row_version=True, + quote_char='"', + escape_char="\\", + line_end=str(os.linesep), + separator=",", + header=True, + download_location=None, + ) + + # AND csv_to_pandas_df should be called with date_columns and list_columns populated + mock_csv_to_pandas_df.assert_called_once_with( + filepath="dummy.csv", + separator=",", + quote_char='"', + escape_char="\\", + row_id_and_version_in_index=False, + date_columns=["date_col"], # Should contain the DATE column + list_columns=["list_col"], # Should contain the STRING_LIST column ) # AND the result should match expected DataFrame - assert result.equals(pd.DataFrame({"col1": ["A", "B"], "col2": [1, 2]})) + assert result.equals(mock_df) async def test_query_part_mask_async(self): # GIVEN a TestClass instance test_instance = self.ClassForTest() - # Create mock query result with all possible part mask returns - mock_query_result = MagicMock(spec=TableQueryResult) - mock_query_result.asDataFrame.return_value = pd.DataFrame( - {"col1": ["A", "B"], "col2": [1, 2]} + # Create mock QueryResultBundle + mock_query_result_bundle = QueryResultBundle( + concrete_type="org.sagebionetworks.repo.model.table.QueryResultBundle", + query_result=QueryResult( + concrete_type="org.sagebionetworks.repo.model.table.QueryResult", + query_results=RowSet( + concrete_type="org.sagebionetworks.repo.model.table.RowSet", + table_id="syn123", + etag="test etag", + headers=[ + SelectColumn( + name="test_col", column_type=ColumnType.STRING, id="242777" + ), + SelectColumn( + name="test_col2", column_type=ColumnType.STRING, id="242778" + ), + ], + rows=[ + Row( + row_id=1, + version_number=1, + values=["random string1", "random string2"], + ), + Row( + row_id=2, + version_number=1, + values=["random string3", "random string4"], + ), + ], + ), + next_page_token=None, + ), + query_count=2, + last_updated_on="2025-08-17T09:50:35.248Z", + ) + + # Create expected DataFrame result + expected_df = pd.DataFrame( + {"test_col": ["random string1"], "test_col2": ["random string2"]} ) - mock_query_result.count = 2 - mock_query_result.sumFileSizes = { - "sumFileSizesBytes": 1000, - "greaterThan": False, - } - mock_query_result.lastUpdatedOn = "2024-02-21" # Set up part mask combining all options QUERY_RESULTS = 0x1 @@ -1043,9 +1597,16 @@ async def test_query_part_mask_async(self): part_mask = QUERY_RESULTS | QUERY_COUNT | SUM_FILE_SIZES | LAST_UPDATED_ON # WHEN I call query_part_mask_async - with patch.object( - self.syn, "tableQuery", return_value=mock_query_result - ) as mock_table_query: + with ( + patch( + "synapseclient.models.mixins.table_components._table_query", + return_value=mock_query_result_bundle, + ) as mock_table_query, + patch( + "synapseclient.models.mixins.table_components._rowset_to_pandas_df", + return_value=expected_df, + ) as mock_rowset_to_pandas_df, + ): result = await test_instance.query_part_mask_async( query=self.fake_query, part_mask=part_mask, synapse_client=self.syn ) @@ -1053,44 +1614,89 @@ async def test_query_part_mask_async(self): # THEN mock_table_query should be called with correct args mock_table_query.assert_called_once_with( query=self.fake_query, - resultsAs="rowset", - partMask=part_mask, + results_as="rowset", + part_mask=part_mask, + limit=None, + offset=None, ) - - # AND mock_as_data_frame should be called - mock_query_result.asDataFrame.assert_called_once_with( - rowIdAndVersionInIndex=False + # AND mock_rowset_to_pandas_df should be called with correct args + mock_rowset_to_pandas_df.assert_called_once_with( + query_result_bundle=mock_query_result_bundle, + synapse=self.syn, + row_id_and_version_in_index=False, ) - - # AND the result should contain all requested parts - assert result.result.equals( - pd.DataFrame({"col1": ["A", "B"], "col2": [1, 2]}) + # THEN mock_table_query should be called with correct args + mock_table_query.assert_called_once_with( + query=self.fake_query, + results_as="rowset", + part_mask=part_mask, + limit=None, + offset=None, ) - assert result.count == 2 - assert result.sum_file_sizes.sum_file_size_bytes == 1000 - assert result.sum_file_sizes.greater_than is False - assert result.last_updated_on == "2024-02-21" + # AND the result should be a QueryResultOutput with expected values + assert isinstance(result, QueryResultOutput) + assert result.result.equals(expected_df) + assert result.count == mock_query_result_bundle.query_count + assert result.last_updated_on == mock_query_result_bundle.last_updated_on + assert result.sum_file_sizes is None # Not set in mock, should be None async def test_query_part_mask_async_minimal(self): # GIVEN a TestClass instance test_instance = self.ClassForTest() - # Create mock with just query results - mock_query_result = MagicMock(spec=TableQueryResult) - mock_query_result.asDataFrame.return_value = pd.DataFrame( - {"col1": ["A", "B"], "col2": [1, 2]} + mock_query_result = QueryResult( + concrete_type="org.sagebionetworks.repo.model.table.QueryResult", + query_results=RowSet( + concrete_type="org.sagebionetworks.repo.model.table.RowSet", + table_id="syn456", + etag="etag", + headers=[ + SelectColumn( + name="test_col", column_type=ColumnType.STRING, id="242777" + ), + SelectColumn( + name="test_col2", column_type=ColumnType.STRING, id="242778" + ), + ], + rows=[ + Row( + row_id=1, + version_number=1, + values=["random string1", "random string2"], + ), + Row( + row_id=2, + version_number=1, + values=["random string3", "random string4"], + ), + ], + ), + next_page_token=None, + ) + mock_query_result_bundle = QueryResultBundle( + concrete_type="org.sagebionetworks.repo.model.table.QueryResult", + query_result=mock_query_result, + ) + + # Create expected DataFrame result + expected_df = pd.DataFrame( + {"test_col": ["random string1"], "test_col2": ["random string2"]} ) - mock_query_result.count = None - mock_query_result.sumFileSizes = None - mock_query_result.lastUpdatedOn = None # Use just QUERY_RESULTS mask part_mask = 0x1 # QUERY_RESULTS only # WHEN I call query_part_mask_async - with patch.object( - self.syn, "tableQuery", return_value=mock_query_result - ) as mock_table_query: + with ( + patch( + "synapseclient.models.mixins.table_components._table_query", + return_value=mock_query_result_bundle, + ) as mock_table_query, + patch( + "synapseclient.models.mixins.table_components._rowset_to_pandas_df", + return_value=expected_df, + ) as mock_rowset_to_pandas_df, + ): result = await test_instance.query_part_mask_async( query=self.fake_query, part_mask=part_mask, synapse_client=self.syn ) @@ -1098,22 +1704,24 @@ async def test_query_part_mask_async_minimal(self): # THEN mock_table_query should be called with correct args mock_table_query.assert_called_once_with( query=self.fake_query, - resultsAs="rowset", - partMask=part_mask, + results_as="rowset", + part_mask=part_mask, + limit=None, + offset=None, ) - # AND mock_as_data_frame should be called - mock_query_result.asDataFrame.assert_called_once_with( - rowIdAndVersionInIndex=False + mock_rowset_to_pandas_df.assert_called_once_with( + query_result_bundle=mock_query_result_bundle, + synapse=self.syn, + row_id_and_version_in_index=False, ) - # AND the result should contain only the query results - assert result.result.equals( - pd.DataFrame({"col1": ["A", "B"], "col2": [1, 2]}) - ) + # AND the result should be a QueryResultOutput with expected values + assert isinstance(result, QueryResultOutput) + assert result.result.equals(expected_df) assert result.count is None - assert result.sum_file_sizes is None assert result.last_updated_on is None + assert result.sum_file_sizes is None class TestViewSnapshotMixin: @@ -1225,3 +1833,938 @@ async def test_delete_rows_async(self): assert result.equals( pd.DataFrame({"ROW_ID": ["A", "B"], "ROW_VERSION": [1, 2]}) ) + + +class TestQueryTableCsv: + """Test suite for the _query_table_csv function.""" + + @pytest.fixture + def mock_synapse(self): + """Create a mock Synapse client.""" + synapse = MagicMock(spec=Synapse) + synapse._waitForAsync = MagicMock() + synapse.cache = MagicMock() + synapse.cache.get = MagicMock() + synapse.cache.get_cache_dir = MagicMock() + return synapse + + @pytest.fixture + def sample_query(self): + """Sample SQL query for testing.""" + return "SELECT * FROM syn1234" + + @pytest.fixture + def mock_query_job_response(self, sample_query): + """Sample query job response.""" + # Create a mock query job response after calling send_job_and_wait_async + return QueryJob( + entity_id="syn1234", + sql=sample_query, + # Response attributes populated after job completion + job_id="1234", + results_file_handle_id="5678", + table_id="syn1234", + etag="test_etag", + headers=[ + SelectColumn(name="col1", column_type=ColumnType.STRING, id="111"), + SelectColumn(name="col2", column_type=ColumnType.INTEGER, id="222"), + ], + response_concrete_type="org.sagebionetworks.repo.model.table.DownloadFromTableResult", + ) + + @pytest.fixture + def sample_file_path(self): + """Sample file path for downloaded CSV.""" + return "/path/to/downloaded/file.csv" + + @pytest.mark.asyncio + async def test_query_table_csv_request_generation(self, sample_query): + """Test that QueryJob generates the correct synapse request.""" + # GIVEN custom parameters for CSV formatting + custom_params = { + "quote_character": "'", + "escape_character": "/", + "line_end": "\n", + "separator": ";", + "is_first_line_header": False, + } + csv_table_descriptor = CsvTableDescriptor(**custom_params) + + # WHEN creating a QueryJob with these parameters + query_job = QueryJob( + entity_id="syn1234", + sql=sample_query, + include_row_id_and_row_version=False, + write_header=False, + csv_table_descriptor=csv_table_descriptor, + ) + + # THEN verify the to_synapse_request() method generates the correct request + synapse_request = query_job.to_synapse_request() + + assert ( + synapse_request["concreteType"] + == "org.sagebionetworks.repo.model.table.DownloadFromTableRequest" + ) + assert synapse_request["entityId"] == "syn1234" + assert synapse_request["sql"] == sample_query + assert synapse_request["writeHeader"] == False + assert synapse_request["includeRowIdAndRowVersion"] == False + assert synapse_request["csvTableDescriptor"]["isFirstLineHeader"] == False + assert synapse_request["csvTableDescriptor"]["quoteCharacter"] == "'" + assert synapse_request["csvTableDescriptor"]["escapeCharacter"] == "/" + assert synapse_request["csvTableDescriptor"]["lineEnd"] == "\n" + assert synapse_request["csvTableDescriptor"]["separator"] == ";" + + @pytest.mark.asyncio + async def test_query_table_csv_basic_functionality( + self, mock_synapse, sample_query, sample_file_path, mock_query_job_response + ): + """Test basic functionality of _query_table_csv.""" + # GIVEN + mock_synapse.cache.get.return_value = None + mock_synapse.cache.get_cache_dir.return_value = "/cache/dir" + + with ( + patch( + "synapseclient.models.mixins.table_components.extract_synapse_id_from_query" + ) as mock_extract_id, + patch( + "synapseclient.models.mixins.table_components.ensure_download_location_is_directory" + ) as mock_ensure_dir, + patch( + "synapseclient.models.mixins.table_components.download_by_file_handle" + ) as mock_download, + patch("os.makedirs") as mock_makedirs, + patch( + "synapseclient.models.table_components.QueryJob.send_job_and_wait_async" + ) as mock_send_job_and_wait_async, + ): + mock_extract_id.return_value = "syn1234" + mock_download.return_value = sample_file_path + + mock_send_job_and_wait_async.return_value = mock_query_job_response + + # WHEN calling the function + completed_query_job, file_path = await _query_table_csv( + query=sample_query, synapse=mock_synapse + ) + + # THEN ensure download file is correct + assert file_path == sample_file_path + assert completed_query_job.entity_id == "syn1234" + + # Verify API call was made correctly + mock_send_job_and_wait_async.assert_called_once() + + # Verify the completed job has the expected response data + assert completed_query_job.results_file_handle_id == "5678" + assert completed_query_job.job_id == "1234" + assert completed_query_job.table_id == "syn1234" + assert len(completed_query_job.headers) == 2 + + @pytest.mark.asyncio + async def test_query_table_csv_with_download_location( + self, mock_synapse, sample_query, sample_file_path, mock_query_job_response + ): + """Test _query_table_csv with specified download location.""" + # GIVEN a custom download location + download_location = "/custom/download/path" + mock_synapse.cache.get.return_value = None + + with ( + patch( + "synapseclient.models.mixins.table_components.extract_synapse_id_from_query" + ) as mock_extract_id, + patch( + "synapseclient.models.mixins.table_components.ensure_download_location_is_directory" + ) as mock_ensure_dir, + patch( + "synapseclient.models.mixins.table_components.download_by_file_handle" + ) as mock_download, + patch("os.makedirs") as mock_makedirs, + patch( + "synapseclient.models.table_components.QueryJob.send_job_and_wait_async" + ) as mock_send_job_and_wait_async, + ): + mock_extract_id.return_value = "syn1234" + mock_ensure_dir.return_value = download_location + mock_download.return_value = sample_file_path + mock_send_job_and_wait_async.return_value = mock_query_job_response + + # WHEN calling the function with a download location + result = await _query_table_csv( + query=sample_query, + synapse=mock_synapse, + download_location=download_location, + ) + + # THEN verify ensure_download_location_is_directory is called with the correct location + mock_ensure_dir.assert_called_once_with(download_location=download_location) + mock_makedirs.assert_called_once_with(download_location, exist_ok=True) + assert result == (mock_query_job_response, sample_file_path) + + +class TestQueryResultOutput: + """Test suite for the QueryResultOutput.fill_from_dict method.""" + + @pytest.fixture + def sample_dataframe(self): + """Sample pandas DataFrame for testing.""" + import pandas as pd + + return pd.DataFrame( + {"col1": ["A", "B", "C"], "col2": [1, 2, 3], "col3": ["X", "Y", "Z"]} + ) + + def test_fill_from_dict_with_full_data(self, sample_dataframe): + """Test fill_from_dict with complete data including sum_file_sizes.""" + # GIVEN a complete data dictionary + data = { + "count": 100, + "last_updated_on": "2025-08-20T10:00:00.000Z", + "sum_file_sizes": SumFileSizes( + sum_file_size_bytes=1024000, greater_than=False + ), + } + # WHEN calling fill_from_dict + result = QueryResultOutput.fill_from_dict(result=sample_dataframe, data=data) + + # THEN verify all attributes are set correctly + assert result.result.equals(sample_dataframe) + assert result.count == 100 + assert result.last_updated_on == "2025-08-20T10:00:00.000Z" + assert result.sum_file_sizes.sum_file_size_bytes == 1024000 + assert result.sum_file_sizes.greater_than == False + + +class TestRow: + """Test suite for the Row class.""" + + @pytest.fixture + def sample_row_data(self): + """Sample row data for testing.""" + return { + "rowId": 12345, + "versionNumber": 1, + "etag": "test-etag-123", + "values": ["A", "1", "true", "160000000"], + } + + @pytest.fixture + def sample_headers(self): + """Sample headers for testing cast_values method.""" + return [ + {"columnType": "STRING", "name": "string_col"}, + {"columnType": "INTEGER", "name": "int_col"}, + {"columnType": "BOOLEAN", "name": "bool_col"}, + {"columnType": "DATE", "name": "date_col"}, + ] + + def test_fill_from_dict_complete_data(self, sample_row_data): + """Test fill_from_dict with complete row data.""" + # WHEN creating Row from dictionary + row = Row.fill_from_dict(sample_row_data) + + # THEN verify all fields are populated correctly + assert row.row_id == 12345 + assert row.version_number == 1 + assert row.etag == "test-etag-123" + assert row.values == ["A", "1", "true", "160000000"] + + @pytest.mark.parametrize( + "value,expected", + [ + (True, True), + (False, False), + ("true", True), + ("True", True), + ("TRUE", True), + ("t", True), + ("T", True), + ("1", True), + ("false", False), + ("False", False), + ("FALSE", False), + ("f", False), + ("F", False), + ("0", False), + ], + ) + def test_to_boolean_valid_values(self, value, expected): + """Test to_boolean method with valid boolean values.""" + # WHEN calling to_boolean with valid values + result = Row.to_boolean(value) + + # THEN verify correct boolean conversion + assert result == expected + assert isinstance(result, bool) + + @pytest.mark.parametrize( + "invalid_value", + [ + "invalid", + "yes", + "no", + "2", + "", + None, + ], + ) + def test_to_boolean_invalid_values(self, invalid_value): + """Test to_boolean method with invalid values.""" + # WHEN calling to_boolean with invalid values + # THEN verify ValueError is raised + with pytest.raises( + ValueError, match=f"Can't convert {invalid_value} to boolean" + ): + Row.to_boolean(invalid_value) + + def test_cast_values_string_column(self): + """Test cast_values with STRING column type.""" + # GIVEN string values and headers + values = ["hello", "world", "test"] + headers = [ + {"columnType": "STRING"}, + {"columnType": "STRING"}, + {"columnType": "STRING"}, + ] + + # WHEN casting values + result = Row.cast_values(values, headers) + + # THEN verify strings are preserved + assert result == ["hello", "world", "test"] + + def test_cast_values_integer_column(self): + """Test cast_values with INTEGER column type.""" + # GIVEN integer values and headers + values = ["123", "456", "789"] + headers = [ + {"columnType": "INTEGER"}, + {"columnType": "INTEGER"}, + {"columnType": "INTEGER"}, + ] + + # WHEN casting values + result = Row.cast_values(values, headers) + + # THEN verify integers are converted + assert result == [123, 456, 789] + assert all(isinstance(val, int) for val in result) + + +class TestActionRequiredCount: + """Test suite for the ActionRequiredCount.fill_from_dict method.""" + + def test_fill_from_dict_with_complete_data(self): + """Test fill_from_dict with complete action data.""" + # GIVEN complete action data + data = { + "action": { + "concreteType": "org.sagebionetworks.repo.model.download.MeetAccessRequirement", + "accessRequirementId": 12345, + }, + "count": 42, + } + + # WHEN calling fill_from_dict + result = ActionRequiredCount.fill_from_dict(data) + + # THEN verify all attributes are set correctly + assert result.action == data["action"] + assert result.count == 42 + + +class TestSelectColumn: + """Test suite for the SelectColumn.fill_from_dict method.""" + + def test_fill_from_dict_with_complete_data(self): + """Test fill_from_dict with complete column data.""" + # GIVEN complete column data + data = {"name": "test_column", "columnType": "STRING", "id": "123456"} + + # WHEN calling fill_from_dict + result = SelectColumn.fill_from_dict(data) + + # THEN verify all attributes are set correctly + assert result.name == "test_column" + assert result.column_type == ColumnType.STRING + assert result.id == "123456" + + def test_fill_from_dict_with_valid_column_types(self): + """Test fill_from_dict with all valid column types.""" + valid_column_types = [ + "STRING", + "DOUBLE", + "INTEGER", + "BOOLEAN", + "DATE", + "FILEHANDLEID", + "ENTITYID", + "LINK", + "MEDIUMTEXT", + "LARGETEXT", + "USERID", + "STRING_LIST", + "INTEGER_LIST", + "USERID_LIST", + "JSON", + ] + + for column_type_str in valid_column_types: + # GIVEN data with valid column type + data = { + "name": f"test_{column_type_str.lower()}", + "columnType": column_type_str, + "id": "123", + } + + # WHEN calling fill_from_dict + result = SelectColumn.fill_from_dict(data) + + # THEN verify column type is converted correctly + assert result.column_type == ColumnType(column_type_str) + assert result.name == f"test_{column_type_str.lower()}" + assert result.id == "123" + + +class TestQueryResult: + """Test suite for the QueryResult.fill_from_dict method.""" + + @pytest.fixture + def sample_rowset_data(self): + """Sample RowSet data for testing.""" + return { + "concreteType": "org.sagebionetworks.repo.model.table.RowSet", + "tableId": "syn123456", + "etag": "rowset-etag", + "headers": [{"name": "col1", "columnType": "STRING", "id": "123"}], + "rows": [{"rowId": 1, "versionNumber": 1, "values": ["test_value"]}], + } + + @pytest.fixture + def sample_next_page_token_data(self): + """Sample QueryNextPageToken data for testing.""" + return { + "concreteType": "org.sagebionetworks.repo.model.table.QueryNextPageToken", + "entityId": "syn123456", + "token": "next-page-token-xyz", + } + + def test_fill_from_dict_with_complete_data( + self, sample_rowset_data, sample_next_page_token_data + ): + """Test fill_from_dict with complete QueryResult data.""" + # GIVEN complete QueryResult data + data = { + "concreteType": "org.sagebionetworks.repo.model.table.QueryResult", + "queryResults": sample_rowset_data, + "nextPageToken": sample_next_page_token_data, + } + + # WHEN calling fill_from_dict + result = QueryResult.fill_from_dict(data) + + # THEN verify all attributes are set correctly + assert ( + result.concrete_type == "org.sagebionetworks.repo.model.table.QueryResult" + ) + + # Verify nested RowSet + assert isinstance(result.query_results, RowSet) + assert ( + result.query_results.concrete_type + == "org.sagebionetworks.repo.model.table.RowSet" + ) + assert result.query_results.table_id == "syn123456" + assert result.query_results.etag == "rowset-etag" + + # Verify nested QueryNextPageToken + assert isinstance(result.next_page_token, QueryNextPageToken) + assert ( + result.next_page_token.concrete_type + == "org.sagebionetworks.repo.model.table.QueryNextPageToken" + ) + assert result.next_page_token.entity_id == "syn123456" + assert result.next_page_token.token == "next-page-token-xyz" + + +class TestRowSet: + """Test suite for the RowSet.fill_from_dict method.""" + + @pytest.fixture + def sample_row_data(self): + """Sample row data for testing.""" + return [ + { + "rowId": 1, + "versionNumber": 1, + "etag": "etag-1", + "values": ["A", "1", "true"], + }, + { + "rowId": 2, + "versionNumber": 2, + "etag": "etag-2", + "values": ["B", "2", "false"], + }, + ] + + @pytest.fixture + def sample_header_data(self): + """Sample header data for testing.""" + return [ + {"name": "col1", "columnType": "STRING", "id": "123"}, + {"name": "col2", "columnType": "INTEGER", "id": "124"}, + {"name": "col3", "columnType": "BOOLEAN", "id": "125"}, + ] + + def test_fill_from_dict_with_complete_data( + self, sample_row_data, sample_header_data + ): + """Test fill_from_dict with complete RowSet data.""" + # GIVEN complete RowSet data + data = { + "concreteType": "org.sagebionetworks.repo.model.table.RowSet", + "tableId": "syn123456", + "etag": "table-etag-123", + "headers": sample_header_data, + "rows": sample_row_data, + } + + # WHEN calling fill_from_dict + result = RowSet.fill_from_dict(data) + + # THEN verify all attributes are set correctly + assert result.concrete_type == "org.sagebionetworks.repo.model.table.RowSet" + assert result.table_id == "syn123456" + assert result.etag == "table-etag-123" + + # Verify headers + assert len(result.headers) == 3 + assert result.headers[0].name == "col1" + assert result.headers[0].column_type == ColumnType.STRING + assert result.headers[0].id == "123" + assert result.headers[1].name == "col2" + assert result.headers[1].column_type == ColumnType.INTEGER + assert result.headers[1].id == "124" + + # Verify rows + assert len(result.rows) == 2 + assert result.rows[0].row_id == 1 + assert result.rows[0].version_number == 1 + assert result.rows[0].etag == "etag-1" + assert result.rows[0].values == ["A", 1, True] + assert result.rows[1].row_id == 2 + assert result.rows[1].version_number == 2 + assert result.rows[1].etag == "etag-2" + assert result.rows[1].values == ["B", 2, False] + + +class TestQueryNextPageToken: + """Test suite for the QueryNextPageToken.fill_from_dict method.""" + + def test_fill_from_dict_with_complete_data(self): + """Test fill_from_dict with complete token data.""" + # GIVEN complete token data + data = { + "concreteType": "org.sagebionetworks.repo.model.table.QueryNextPageToken", + "entityId": "syn123456", + "token": "next-page-token-12345", + } + + # WHEN calling fill_from_dict + result = QueryNextPageToken.fill_from_dict(data) + + # THEN verify all attributes are set correctly + assert ( + result.concrete_type + == "org.sagebionetworks.repo.model.table.QueryNextPageToken" + ) + assert result.entity_id == "syn123456" + assert result.token == "next-page-token-12345" + + +class TestQueryJob: + """Test suite for the QueryJob.to_synapse_request and fill_from_dict methods.""" + + @pytest.fixture + def sample_csv_descriptor(self): + """Sample CsvTableDescriptor for testing.""" + return CsvTableDescriptor( + quote_character="'", + escape_character="/", + line_end="\n", + separator=";", + ) + + def test_to_synapse_request_with_defaults(self): + """Test to_synapse_request with default parameters.""" + # GIVEN a QueryJob with minimal parameters (using defaults) + job = QueryJob(entity_id="syn123456", sql="SELECT * FROM syn123456") + + # WHEN calling to_synapse_request + result = job.to_synapse_request() + + # THEN verify default values are set correctly + expected = { + "concreteType": QUERY_TABLE_CSV_REQUEST, + "entityId": "syn123456", + "sql": "SELECT * FROM syn123456", + "writeHeader": True, # Default value + "includeRowIdAndRowVersion": True, # Default value + "includeEntityEtag": False, # Default value + } + assert result == expected + + def test_to_synapse_request_with_none_values(self): + """Test that None values are properly excluded from request.""" + # GIVEN a QueryJob with some None values + job = QueryJob( + entity_id="syn123456", + sql="SELECT * FROM syn123456", + csv_table_descriptor=None, # Should be excluded + include_entity_etag=None, # Should be excluded + ) + + # WHEN calling to_synapse_request + result = job.to_synapse_request() + + # THEN verify None values are not included + assert "csvTableDescriptor" not in result + assert "includeEntityEtag" not in result + + def test_to_synapse_request_csv_descriptor_integration(self, sample_csv_descriptor): + """Test that CsvTableDescriptor is properly integrated in request.""" + # GIVEN a QueryJob with CsvTableDescriptor + job = QueryJob( + entity_id="syn123456", + sql="SELECT * FROM syn123456", + csv_table_descriptor=sample_csv_descriptor, + ) + + # WHEN calling to_synapse_request + result = job.to_synapse_request() + + # THEN verify CsvTableDescriptor is included correctly + assert "csvTableDescriptor" in result + csv_desc = result["csvTableDescriptor"] + assert csv_desc["quoteCharacter"] == "'" + assert csv_desc["escapeCharacter"] == "/" + assert csv_desc["lineEnd"] == "\n" + assert csv_desc["separator"] == ";" + + def test_fill_from_dict_with_complete_response(self): + """Test fill_from_dict with complete DownloadFromTableResult response.""" + # GIVEN a QueryJob and complete response data + job = QueryJob(entity_id="syn123456", sql="SELECT * FROM syn123456") + response_data = { + "jobId": "async-job-12345", + "concreteType": "org.sagebionetworks.repo.model.table.DownloadFromTableResult", + "resultsFileHandleId": "file-handle-67890", + "tableId": "syn123456", + "etag": "table-etag-abc123", + "headers": [ + {"name": "col1", "columnType": "STRING", "id": "111"}, + {"name": "col2", "columnType": "INTEGER", "id": "222"}, + ], + } + + # WHEN calling fill_from_dict + result = job.fill_from_dict(response_data) + + # THEN verify all response attributes are set + assert result is job # Should return self + assert job.job_id == "async-job-12345" + assert ( + job.response_concrete_type + == "org.sagebionetworks.repo.model.table.DownloadFromTableResult" + ) + assert job.results_file_handle_id == "file-handle-67890" + assert job.table_id == "syn123456" + assert job.etag == "table-etag-abc123" + + # Verify the nested SelectColumns + assert isinstance(result.headers, list) + assert len(result.headers) == 2 + assert isinstance(result.headers[0], SelectColumn) + assert isinstance(result.headers[1], SelectColumn) + assert result.headers[0].name == "col1" + assert result.headers[0].column_type == "STRING" + assert result.headers[0].id == "111" + assert result.headers[1].name == "col2" + assert result.headers[1].column_type == "INTEGER" + assert result.headers[1].id == "222" + + +class TestQueryTableRowSet: + """Test suite for the _query_table_row_set function.""" + + @pytest.fixture + def mock_synapse_client(self): + """Mock Synapse client for testing.""" + mock_client = MagicMock() + return mock_client + + @pytest.fixture + def sample_query_result_bundle(self): + """Sample QueryResultBundle response.""" + return QueryResultBundle( + query_result=QueryResult( + concrete_type=QUERY_RESULT, + query_results=RowSet( + table_id="syn123456", + etag="test-etag", + headers=[ + SelectColumn( + name="test_col", column_type=ColumnType.STRING, id="242777" + ), + SelectColumn( + name="test_col2", column_type=ColumnType.STRING, id="242778" + ), + ], + rows=[ + Row( + row_id=1, + version_number=1, + etag=None, + values=["random string1", "random string2"], + ) + ], + ), + next_page_token=None, + ), + query_count=1, + select_columns=[ + SelectColumn(name="col1", column_type=ColumnType.STRING, id="111"), + SelectColumn(name="col2", column_type=ColumnType.INTEGER, id="222"), + ], + max_rows_per_page=1000, + column_models=[ + Column( + id="242777", + name="test_col", + column_type=ColumnType.STRING, + facet_type=None, + default_value=None, + maximum_size=50, + maximum_list_length=None, + enum_values=None, + json_sub_columns=None, + ), + Column( + id="242778", + name="test_col2", + column_type=ColumnType.STRING, + facet_type=None, + default_value=None, + maximum_size=50, + maximum_list_length=None, + enum_values=None, + json_sub_columns=None, + ), + ], + facets=[], + sum_file_sizes=SumFileSizes(sum_file_size_bytes=1024, greater_than=False), + last_updated_on="2025-08-26T21:38:31.677Z", + combined_sql="SELECT col1, col2 FROM syn123456", + actions_required=None, + ) + + @pytest.mark.asyncio + async def test_query_table_row_set_basic( + self, mock_synapse_client, sample_query_result_bundle + ): + """Test basic query_table_row_set functionality.""" + # GIVEN a query and mock response + query = "SELECT col1, col2 FROM syn123456" + + with ( + patch( + "synapseclient.models.mixins.table_components.extract_synapse_id_from_query", + return_value="syn123456", + ) as mock_extract_id, + patch.object( + QueryBundleRequest, + "send_job_and_wait_async", + return_value=sample_query_result_bundle, + ) as mock_send_job, + ): + # WHEN calling _query_table_row_set + result = await _query_table_row_set( + query=query, + synapse=mock_synapse_client, + ) + + # THEN verify the result + assert isinstance(result, QueryResultBundle) + assert result.query_count == 1 + assert result.query_result == sample_query_result_bundle.query_result + assert result.select_columns == sample_query_result_bundle.select_columns + assert result.sum_file_sizes == sample_query_result_bundle.sum_file_sizes + assert result.last_updated_on == sample_query_result_bundle.last_updated_on + assert result.combined_sql == sample_query_result_bundle.combined_sql + assert result.column_models == sample_query_result_bundle.column_models + assert result.facets == sample_query_result_bundle.facets + assert ( + result.actions_required == sample_query_result_bundle.actions_required + ) + + # Verify extract_synapse_id_from_query was called correctly + mock_extract_id.assert_called_once_with(query) + + # Verify send_job_and_wait_async was called correctly + mock_send_job.assert_called_once_with(synapse_client=mock_synapse_client) + + @pytest.mark.asyncio + async def test_query_table_row_set_with_parameters( + self, mock_synapse_client, sample_query_result_bundle + ): + """Test _query_table_row_set with all optional parameters.""" + # GIVEN a query with all parameters + query = "SELECT col1, col2 FROM syn123456" + limit = 100 + offset = 50 + part_mask = 0x1 | 0x2 | 0x4 # Query results + count + select columns + + with ( + patch( + "synapseclient.models.mixins.table_components.extract_synapse_id_from_query", + return_value="syn123456", + ) as mock_extract_id, + patch( + "synapseclient.models.mixins.table_components.Query" + ) as mock_query_class, + patch.object( + QueryBundleRequest, + "send_job_and_wait_async", + return_value=sample_query_result_bundle, + ) as mock_send_job, + ): + # Create mock instances + mock_query_instance = MagicMock() + mock_query_class.return_value = mock_query_instance + + # WHEN calling _query_table_row_set with parameters + result = await _query_table_row_set( + query=query, + synapse=mock_synapse_client, + limit=limit, + offset=offset, + part_mask=part_mask, + ) + # THEN verify the Query was created with correct parameters + mock_query_class.assert_called_once_with( + sql=query, + include_entity_etag=True, + limit=limit, + offset=offset, + ) + + # THEN verify the result structure + assert isinstance(result, QueryResultBundle) + assert result.query_count == 1 + assert result.query_result == sample_query_result_bundle.query_result + + # Verify the QueryBundleRequest was created with correct parameters + mock_send_job.assert_called_once_with(synapse_client=mock_synapse_client) + + +class TestQueryTableNextPage: + """Test suite for the _query_table_next_page function.""" + + @pytest.fixture(autouse=True, scope="function") + def init_syn(self, syn: Synapse) -> None: + self.syn = syn + + @pytest.fixture + def sample_table_id(self): + """Sample table ID for testing.""" + return "syn123456" + + @pytest.fixture + def sample_next_page_token(self): + """Sample QueryNextPageToken for testing.""" + token = MagicMock(spec=QueryNextPageToken) + token.token = "sample_token_string" + return token + + @pytest.fixture + def sample_synapse_response(self): + """Sample response from Synapse API.""" + return { + "concreteType": "org.sagebionetworks.repo.model.table.QueryResultBundle", + "queryResult": { + "concreteType": "org.sagebionetworks.repo.model.table.QueryResult", + "queryResults": { + "concreteType": "org.sagebionetworks.repo.model.table.RowSet", + "tableId": "syn123456", + "etag": "test-etag", + "headers": [ + {"name": "col1", "columnType": "STRING", "id": "12345"}, + {"name": "col2", "columnType": "INTEGER", "id": "12346"}, + ], + "rows": [ + {"rowId": 1, "versionNumber": 1, "values": ["test1", "100"]}, + {"rowId": 2, "versionNumber": 1, "values": ["test2", "200"]}, + ], + }, + "nextPageToken": None, + }, + "queryCount": 100, + "lastUpdatedOn": "2025-08-20T10:00:00.000Z", + "selectColumns": [ + {"name": "column1", "columnType": "STRING", "id": "12345"} + ], + } + + async def test_query_table_next_page_basic_functionality( + self, sample_table_id, sample_next_page_token, sample_synapse_response + ): + """Test basic functionality of _query_table_next_page. Next page token is None""" + with patch( + "synapseclient.client.Synapse._waitForAsync", + return_value=sample_synapse_response, + ) as mock_wait_for_async: + # WHEN calling _query_table_next_page function + result = _query_table_next_page( + next_page_token=sample_next_page_token, + table_id=sample_table_id, + synapse=self.syn, + ) + # Verify API call was made correctly + mock_wait_for_async.assert_called_once_with( + uri="/entity/syn123456/table/query/nextPage/async", + request="sample_token_string", + ) + + # Verify the QueryResultBundle was populated correctly from the response + assert ( + result.concrete_type + == "org.sagebionetworks.repo.model.table.QueryResultBundle" + ) + assert result.query_count == 100 + assert result.last_updated_on == "2025-08-20T10:00:00.000Z" + + # Verify the nested QueryResult + assert isinstance(result.query_result, QueryResult) + assert ( + result.query_result.concrete_type + == "org.sagebionetworks.repo.model.table.QueryResult" + ) + assert result.query_result.next_page_token is None + + # Verify the nested RowSet + assert isinstance(result.query_result.query_results, RowSet) + assert result.query_result.query_results.table_id == sample_table_id + assert result.query_result.query_results.etag == "test-etag" + assert len(result.query_result.query_results.headers) == 2 + assert len(result.query_result.query_results.rows) == 2 + + # Verify the nested SelectColumns + assert isinstance(result.select_columns, list) + assert len(result.select_columns) == 1 + assert result.select_columns[0].name == "column1" + assert result.select_columns[0].column_type == "STRING" + assert result.select_columns[0].id == "12345"