Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synapse destination #900

Merged
merged 25 commits into from Feb 6, 2024
Merged

Synapse destination #900

merged 25 commits into from Feb 6, 2024

Conversation

jorritsandbrink
Copy link
Collaborator

@jorritsandbrink jorritsandbrink commented Jan 18, 2024

Description

This PR adds initial support for the Synapse destination. It only implements the insert_values file format. I'll add the staging_file_format parquet (using COPY INTO to load from Azure Blob Storage into Synapse) in an upcoming separate PR. That next PR will also contain user docs. It implements the insert_values file format and the staging_file_format parquet (using COPY INTO to load from Azure Blob Storage into Synapse).

Related Issues

Additional Context

  • I aimed to reuse the code for the mssql destination as much as possible.
  • Since Synapse cannot handle multi row inserts, I introduced a new insert_values_writer_type called select_union that is based on SELECT and UNION_ALL. Relevant SO here.
  • All tables are created as HEAP tables. This is the most robust choice as it always works (as opposed to CLUSTERED COLUMNSTORE INDEX tables, which do not support varchar(max), nvarchar(max), and varbinary(max) data types). See Synapse docs about indexes for context.
  • The user can configure which table index type is used: heap or clustered_columnstore_index. The default is heap, because that's the most robust choice that always works with all data types. The default can be overridden by setting destination.synapse.default_table_index_type in a config TOML or by exporting the DESTINATION__SYNAPSE__DEFAULT_TABLE_INDEX_TYPE env var. All data tables will be created with the table index type specified as the default, unless another table index type is specified at the resource level. The table_index_type argument can be provided to the @dlt.resource decorator to configure indexes at a more granular level. The x-table-index-type argument can be provided to the synapse_adapter to configure indexes at a more granular level. Dlt system tables (the ones prefixed with _dlt) always have heap as index type, regardless of any configuration, because the recommendation is that "For small lookup tables, less than 60 million rows, consider using HEAP or clustered index for faster query performance.". The setup for handling table index type is inspired by the way table_format is handled.
  • I included support for primary_key and unique column hints, which are implemented with PRIMARY KEY NONCLUSTERED NOT ENFORCED and UNIQUE NOT ENFORCED, respectively. Because these constraints are a little tricky (they are not enforced and can lead to innacurate results if the user does not ensure all column values are unique), they are not applied by default. The user would need to set create_indexes to True explicitly to apply them.
  • Since long types (text, ntext, image) are not supported on Synapse, the LongAsMax keyword is set to yes so long types get converted to MAX types as described here. LongAsMax got introduced in ODBC Driver 18 for SQL Server. Hence, as opposed to for the mssql destination, ODBC Driver 17 for SQL Server is not supported for Synapse.
  • The logic for byte literals in escape_mssql_literal has been adapted to make it work for Synapse. The old logic only worked for mssql, the new logic works for both mssql and synapse.
  • OUTDATED, NO LONGER APPLIES AFTER CODE CHANGES: Synapse seems to struggle with concurrency in specific scenario's (completely hangs and need to execute KILL commands to release the stuck process). Because of this, I had to set the LOAD__WORKERS env var to 1 for the test tests/load/pipeline/test_replace_disposition.py::test_replace_table_clearing[staging-optimized-synapse-no-staging]. This is now handled automatically. When auto_disable_concurrency is set to true (the default value), the number of load workers automatically gets set to 1 at runtime in cases that are known to have issues with concurrency (for now, that's the case when there are tables with write_disposition == replace and replace_strategy == staging-optimized. A warning message is shown to the user in such cases, to make them aware of this intervention and to inform them of their configuration options.
  • dbt with Synapse has multiple complications. Since I could not make it pass all the tests in tests/load/pipeline/test_dbt_helper.py, I set supports_dbt to False in the DestinationTestConfiguration for Synapse.

Points regarding the support for staging_file_format parquet:

  • The implementation mostly follows redshift's implementation for staged file loading.
  • The COPY INTO command is used to load the Parquet files into Synapse tables.
  • COPY INTO does not support JSON, that's why only Parquet has been implemented.
  • Authorizing access from the Synapse workspace to the Azure Storage Account can be done in two ways:
  1. based on AzureCredentialsWithoutDefaults, using azure_storage_sas_token. The same credentials are used for both the write into the Storage Account, and the read from the Storage Account into Synapse. This is the default option.
  2. using the Synapse workspace managed identity. This requires that the managed identity has been assigned the Storage Blob Data Reader role (or a role with higher permissions) on the Blob container (or at the Storage Account level). The user can opt in for managed identity authentication by setting staging_use_msi to True.
  • Parquet TIME columns are not supported. Synapse reads those columns as bigint, and raises an incompatibility error as such. This might actually be caused by a bug in the pyarrow Parquet writer, but I'm not sure about that.
  • The Parquet staging files are not deleted after the load into the Synapse table has been completed. I have looked into this though—file deletion can relatively easily be implemented as a step after the COPY INTO (potentially optional similar to how snowflake uses keep_staged_files), but this would leave empty folders dangling behind. I think also deleting the folders would require orchestration at a higher level, which is less straightforward to implement.

Other changes in this PR:

  • Added a proper way to skip dbt-specific tests for destinations that don't support dbt.

Copy link

netlify bot commented Jan 18, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit c3efe33
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/65c0e6eb88500400089c93b3

@jorritsandbrink jorritsandbrink linked an issue Jan 18, 2024 that may be closed by this pull request

def __init__(self, schema: Schema, config: SynapseClientConfiguration) -> None:
sql_client = SynapseSqlClient(config.normalize_dataset_name(schema), config.credentials)
InsertValuesJobClient.__init__(self, schema, config, sql_client)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this line about?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code mirrored MsSqlClient.__init__. While MsSqlClient can call InsertValuesJobClient.__init__ through super(), SynapseClient.__init__ needs to call it explicitly (because it inherits from MsSqlClient, and not from InsertValuesJobClient like MsSqlClient does).

That being said, I have added a new commit to simplify SynapseClient.__init__. Would you agree the new code is better?

@@ -165,6 +167,7 @@ class TTableSchema(TypedDict, total=False):
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]
table_index_type: Optional[TTableIndexType]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix we can switch between 2 index types (https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index) and we have a new table hint here. I am wondering if we can re-use table_format or wether this needs to be a separate hint. I am not quite sure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to keep table_format limited to table formats such as Iceberg, Delta, and Hudi. Then within a certain table format, one can still choose between different table indexing types supported by that table format. Look at Hudi for example, which supports a bunch of different indexing types.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a case for destination adapter. it sets per-destination options on a resource using x- hints. please look into:
https://github.com/dlt-hub/dlt/blob/devel/dlt/destinations/adapters.py
and bigquery adapter that @Pipboyguy is doing: #855

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote the code to make use of a destination adapter. Looked at qdrant_adapter and weaviate_adapter for inspiration. The synapse_adapter is slightly different because it works with a table hint, not column hints like qdrant and weaviate. Since x-table-index-type is not defined on TResourceHints, I have to ignore mypy's typeddict-unknown-key errors. Is this what you had in mind @rudolfix?

"auto_disable_concurrency",
]

def get_load_workers(self, tables: TSchemaTables, workers: int) -> int:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My take is, that this needs to be an interface for all destinations. So:

  • Implement a method on JobClientBase "def enable_parallel_loading" which returns true as a default value
  • Evaluate this in the Load class for each destination, and set workers to 1 if it returns false
  • Implement this method for synapse, all the other fall back to true.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is good idea. we may need in a few other places (ie. sink destination if user wants to serialize processing).
I'd add it as an option to base configuration of each destination. (should we do it in this PR or a followup?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed, at least not for what I used it for. I replaced the CTAS that caused the concurrency issues with regular CREATE TABLE statements. This makes it simpler (no need to manage the number of workers) and also faster because loads can be done concurrently.

else:
table_index_type = table.get("table_index_type")
if table_index_type == "clustered_columnstore_index":
new_columns = self._get_columstore_valid_columns(new_columns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of doing this here, you should probably implement _get_column_def_sql (extend it to also accept index type) and react to it in the type_mapper. see how we do it with athena tables.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that table_index_type (generic for all destinations) has been "downgraded" to x-table-index-type (Synapse-specific), would you still recommend this?

I ask because taking this approach would require fetching x-table-index-type from the table schema in SqlJobClientBase._get_table_update_sql, and then pass this value to _get_column_def_sql(...), thus having to add table_index_type as an argument to all implementations for that function (similar to how table_format is handled). Destination-specific logic would end up in generic and other-destination-specific code (where it is unused).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, let's keep it like this!

table["table_index_type"] = self.config.default_table_index_type
return table

def get_storage_table_index_type(self, table_name: str) -> TTableIndexType:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can move this to the tests if it is not used in production code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# This can be prevented by setting the env var LOAD__WORKERS = "1".
sql.append(
f"CREATE TABLE {staging_table_name}"
" WITH ( DISTRIBUTION = ROUND_ROBIN, HEAP )" # distribution must be explicitly specified with CTAS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be configurable, but maybe let's jsut stick with this until someone requests otherwise. you can add a note in the docs about it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included a note in the docs.

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more small changes

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorritsandbrink this is really good! also thanks for diving in into synapse (ie various types of indexes and union inserts)

@jorritsandbrink jorritsandbrink changed the title Synapse destination insert_values file format Synapse destination Jan 25, 2024
@jorritsandbrink
Copy link
Collaborator Author

@sh-rp and @rudolfix I addressed all the points you raised and requested a re-review from both of you.

f"Table index type {table_index_type} is invalid. Allowed table index"
f" types are: {allowed_types}."
)
resource._hints[TABLE_INDEX_TYPE_HINT] = table_index_type # type: ignore[typeddict-unknown-key]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my take here is to do this with resource.apply_hints() but extend apply_hints to accept a new arg which is a dict which then get's merged into _hints. one of the other freelancers was asking about this (or was it you) so there may be some conflicts. @rudolfix wdyt.

Copy link
Collaborator

@sh-rp sh-rp Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this new arg would be something like additional_table_hints: DictStrAny.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extended apply_hints() to handle table hints as you suggested. Also aligned with @Pipboyguy about this on Slack.

For all other destinations the `staging-optimized` will fall back to the behavior of the `insert-from-staging` strategy.


For all other [destinations](../dlt-ecosystem/destinations/index.md), please look at their respective documentation pages to see if and how the `staging-optimized` strategy is implemented. If it is not implemented, `dlt` will fall back to the `insert-from-staging` strategy.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for cleaning up these random parts all over the place, really appreciated :)


> ❗ These hints are **disabled by default**. This is because the `PRIMARY KEY` and `UNIQUE` [constraints](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-table-constraints) are tricky in Synapse: they are **not enforced** and can lead to innacurate results if the user does not ensure all column values are unique. For the column hints to take effect, the `create_indexes` configuration needs to be set to `True`, see [additional destination options](#additional-destination-options).

## Load concurrency issue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is outdated, no?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and you need to link this page in the sidebars.js. by the way you can see a preview of this here: https://deploy-preview-900--dlt-hub-docs.netlify.app/docs/dlt-ecosystem/destinations/synapse/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the outdated documentation and added the link to the sidebar.

# Copy data from staging file into Synapse table.
with self._sql_client.begin_transaction():
dataset_name = self._sql_client.dataset_name
sql = dedent(f"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, is dedent needed here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Functionally it doesn't make a difference, but I like the idea the query is properly formatted without unnecessary indentation.

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

almost there, only a few small improvements left.

if additional_table_hints is not None:
# loop through provided hints and add, overwrite, or remove them
for k, v in additional_table_hints.items():
if v:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be "if v is not None" otherwise a value of false or [] will also be removed although it needs to stay in.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted it.

I had it like that to maintain consistency with how the standard table hints are handles (remove when "empty value" is provided), but I agree that isn't desired behavior for the additional table hints.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! thanks everyone working on this @jorritsandbrink @sh-rp !

@sh-rp sh-rp merged commit 5b240cd into devel Feb 6, 2024
60 of 62 checks passed
@sh-rp sh-rp deleted the 832-synapse-destination branch February 6, 2024 10:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

implement synapse destination
3 participants