[AIRFLOW-4184] Add an athena helper to insert into table#4996
[AIRFLOW-4184] Add an athena helper to insert into table#4996bryanyang0528 wants to merge 2 commits intoapache:masterfrom
Conversation
9adfc83 to
660714f
Compare
ashb
left a comment
There was a problem hiding this comment.
The implementation needs some work.
Is this even needed, couldn't this be done via an SQL statement in the Athena operator?
| return self.conn.stop_query_execution(QueryExecutionId=query_execution_id) | ||
|
|
||
|
|
||
| class AWSAthenaHelpers(AWSAthenaHook): |
There was a problem hiding this comment.
This class name doesn't follow any of Airflow's conventions.
This feels like an operator, not a hook class.
There was a problem hiding this comment.
Because original Athena didn't support INSERT INTO TABLE clause, so I'm not what kind of this function belong to.
| SUCCESS_STATES = ('SUCCEEDED',) | ||
|
|
||
| def __init__(self, aws_conn_id='aws_default', sleep_time=30, *args, **kwargs): | ||
| def __init__(self, aws_conn_id='aws_default', region_name=None, sleep_time=30, *args, **kwargs): |
There was a problem hiding this comment.
This shouldn't be needed - this already comes from the connection
|
|
||
| def run_insert_into_table(self, src_db, src_table, dst_db, dst_table, mode='error'): | ||
| """ | ||
| insert data in s3 from the source table to the destination table |
There was a problem hiding this comment.
Why is this going via S3 rather than issuing an athena query?
There was a problem hiding this comment.
Because original Athena didn't support INSERT INTO TABLE AS SELECT clause, so actually we need to create temp_table as select and move data from the location of temp table to the s3 location of the target table.
There was a problem hiding this comment.
@bryanyang0528
Can you please explain the use case for this?
When I want to add data to Athena I just upload files to the same directory and it works like a charm as Athena scans the whole folder.
For cases where I upload files to another path I just need to update the table partition with:
ALTER TABLE _____ADD IF NOT EXISTS PARTITION (dt=____) location 's3://____
https://stackoverflow.com/questions/50164744/how-to-efficiently-append-new-data-to-table-in-aws-athena
There was a problem hiding this comment.
@RosterIn Thank you for your suggestion. I knew ATLER TABLE can assign an s3 location for partitions. But I would like to move data to the same s3 location after creating a new partition.
For example, I have a table called user_profile in which I aggregated user behavior data on my website. I have to select data from the source table daily and insert the result to user_profile. In this case, in Hive, I just use insert into table user_profile partition (dt) as select count(*) from src_table. And data will be put into the same location as the target table.
Maybe I could combine Alter table and s3 copy to implement this idea. Does it make sense?
There was a problem hiding this comment.
@bryanyang0528 better to wait for @ashb respond.
There was a problem hiding this comment.
If (and it is still and if) we add this to Airflow this function belongs in an Operator, not a Hook.
Additionally you are putting S3 files in place but doing nothing to update the Glue catalog.
Overall I am skeptical that this approach is the right way of doing it. In the past I have done this sort of thing from an EMR cluster using the Glue-compatible hive metastore to insert into tables.
There was a problem hiding this comment.
Thank you for the suggestion. I will close this issue and create a new one. I know EMR could do that thing, but it takes a long time to create an EMR cluster and compute on the EMR.
Make sure you have checked all steps below.
Jira
Description
Tests
Commits
Documentation
Code Quality
flake8