-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support incremental_strategy and data cleaning up with boto3 #27
Conversation
Look forward to merging this PR. I can't use this adapter while data in S3 is not overwriting. |
dbt/adapters/athena/connections.py
Outdated
glue_client = boto3.client('glue') | ||
s3_resource = boto3.resource('s3') | ||
partitions = glue_client.get_partitions( | ||
# CatalogId='awsdatacatalog', # Using this caused permission error that 'glue:GetPartitions' is required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tuan-seek I am afraid this will need to be configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrshu Thanks for checking out. Can you elaborate the scenarios that you would need to configure this CatalogID, instead of using the default AWS Account ID
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tuan-seek please take this with a grain of salt (the bigger the better).
It is my understanding that you can have multiple Glue Data Catalogs in one account. I don't think it happens often and by default the AWS Account ID is used -- that would probably explain why passing awsdatacatalog
resulted in errors and why not passing it ended up working.
Here is what the docs for Glue's CreateDatabase
method say:
The ID of the Data Catalog in which to create the database. If none is provided, the AWS account ID is used by default.
All of the other Glue Database methods take the CatalogId
as a parameter, so having a non-default one does seem like a real possibility.
I wouldn't worry too much about it for now though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @mrshu for my late response.
Yes, I agree with you there is possibility that non-default one will be needed. Part of the reason for my late response was to take sometime to investigate if we do have such a need in our systems. And I confirmed that all our use cases use default one. Btw, we have a quite complex data platform with a lot of data teams interacting with it from different AWS accounts and IAM setting.
Also for that reason, I agree with you that we don't need to worry too much about making this configurable now. We can always address it when we cross the bridge :)
{%- do partitions.append('(' + single_partition_expression + ')') -%} | ||
{%- endfor -%} | ||
{%- set partition_expression = partitions | join(' or ') -%} | ||
delete from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is DELETE FROM
supported in Athena?
Running it manually on a table yields [ErrorCategory:USER_ERROR, ErrorCode:NOT_SUPPORTED], Detail:NOT_SUPPORTED: Cannot delete from non-managed Hive table
in Athena (version 2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these delete from
statements are getting caught in the execute
function and not actually being ran.
I'm not a particular fan of this since it is it not very clear and seems verbose. @tuan-seek is there a reason behind this pattern, why not just call _clean_up_partitions
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The short answer is, I don't know if there is a way to call _clean_up_partitions()
directly from macros. All Athena queries and AWS SDK calls are delegated back to connections.py
. So I followed the same pattern here.
DELETE FROM
is currently not supported in Athena. So the statement is intercepted in connections.py
for us to do custom cleaning up and not issued towards Athena. If AWS is going to support DELETE FROM
in the future, we can just clean up this custom implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tuan-seek If you add functions to the Adapter class you can call them within Jinja. The Adapter class can be found within imply.py
and can be called called as macro like {% do adapter.your_function(your_parameters) %}
or {{ adapter.drop_relation(existing_relation) }}
.
I will add a review pointing out what would be good to migrate to the Adapter class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Let me try that one!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tuan-seek Great and a big thanks for this PR, the functionality it could provide will be appreciated by so many, also apologies for taking so long to review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, thanks for starting this project. I've refactored the change as discussed. Please have a look when you have time @Tomme
dbt/adapters/athena/connections.py
Outdated
@@ -57,6 +59,74 @@ def _collect_result_set(self, query_id: str) -> AthenaResultSet: | |||
retry_config=self._retry_config, | |||
) | |||
|
|||
def _clean_up_partitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would advise migration to the AthenaAdapter
class within imply.py
and calling directly within incremental.sql
file
dbt/adapters/athena/connections.py
Outdated
s3_bucket.objects.filter(Prefix=prefix).delete() | ||
|
||
|
||
def _clean_up_table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to _clean_up_partitions
I would migrate this to imply.py
and it should be incorporated into the drop_relation
function.
9d479e7
to
920e862
Compare
Hi @Tomme Have you got time to review the latest change? |
@tuan-seek Planned for this week. Apologies for my slow replies and reviews. |
Added the fix for Glue Crawler get_partitions where clause 2048 char limit
Hi @Tomme Is there any blocker for this PR to be merged? I'm planning some new feature change and would prefer to do it in a new PR from Btw, I have updated README to reflect the change in this PR. Feel free to update README as you like. |
Changes:
HIVE_PATH_ALREADY_EXISTS
error.