Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Snowflake destination: snowflake s3 destination COPY is writing records from different table in the same raw table fix #5924

Merged
merged 14 commits into from
Sep 13, 2021

Conversation

andriikorotkov
Copy link
Contributor

@andriikorotkov andriikorotkov commented Sep 8, 2021

What

Fix snowflake destination aws s3 Staging COPY is writing records from different table in the same raw table

How

This bug occurs due to the fact that the snowflake does not look for the full path to the file on s3, but uses this path to find all matches in the file path. For example, we have tables user and user_something. For the table user_something, one file will be found and copying will be done only from it. And for the user table, two files will be found: 1 - identifier/schema/user and 2 - identifier/schema/user_something - because 1 path is part of the second.

I fixed this by prefixing the table name in the file path.

Here are the logs for my test tables (actor, actor_my, actor_test and actor_payment) in the case when it works without a prefix:

2021-09-09 08:40:48 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:40:48 INFO i.a.i.d.j.c.s.S3StreamCopier(createTemporaryTable):183 - {} - Preparing tmp table in destination for stream: actor, schema: AA_AIRBYTE_TEST_SCHEMA, tmp table name: _airbyte_tmp_suk_actor.
2021-09-09 08:40:49 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:40:49 INFO i.a.i.d.j.c.s.S3StreamCopier(copyStagingFileToTemporaryTable):189 - {} - Starting copy to tmp table: _airbyte_tmp_suk_actor in destination for stream: actor, schema: AA_AIRBYTE_TEST_SCHEMA, .
2021-09-09 08:40:51 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:40:51 ERROR i.a.i.d.s.SnowflakeS3StreamCopier(copyS3CsvFileIntoTable):73 - {} - **there was a download from the following files ==>> [{"file":"s3://airbyte-snowflake-integration-tests/692b491d-d39a-4e5b-a120-c44407925017/AA_AIRBYTE_TEST_SCHEMA/actor_test","status":"LOADED","rows_parsed":3,"rows_loaded":3,"error_limit":1,"errors_seen":0}, {"file":"s3://airbyte-snowflake-integration-tests/692b491d-d39a-4e5b-a120-c44407925017/AA_AIRBYTE_TEST_SCHEMA/actor_my","status":"LOADED","rows_parsed":2,"rows_loaded":2,"error_limit":1,"errors_seen":0}, {"file":"s3://airbyte-snowflake-integration-tests/692b491d-d39a-4e5b-a120-c44407925017/AA_AIRBYTE_TEST_SCHEMA/actor","status":"LOADED","rows_parsed":200,"rows_loaded":200,"error_limit":1,"errors_seen":0}, {"file":"s3://airbyte-snowflake-integration-tests/692b491d-d39a-4e5b-a120-c44407925017/AA_AIRBYTE_TEST_SCHEMA/actor_payment","status":"LOADED","rows_parsed":14596,"rows_loaded":14596,"error_limit":1,"errors_seen":0}]**
2021-09-09 08:40:51 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:40:51 INFO i.a.i.d.j.c.s.S3StreamCopier(copyStagingFileToTemporaryTable):191 - {} - Copy to tmp table _airbyte_tmp_suk_actor in destination for stream actor complete.
2021-09-09 08:40:51 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:40:51 INFO i.a.i.d.j.c.s.S3StreamCopier(createDestinationTable):197 - {} - Preparing table _airbyte_raw_actor in destination.
2021-09-09 08:40:53 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:40:53 INFO i.a.i.d.j.c.s.S3StreamCopier(createDestinationTable):199 - {} - Table _airbyte_tmp_suk_actor in destination prepared.

And my logs for my test tables (actor) in the case when it works with a prefix:

2021-09-09 08:45:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:45:55 INFO i.a.i.d.j.c.s.S3StreamCopier(createDestinationSchema):177 - {} - Creating schema in destination if it doesn't exist: AA_AIRBYTE_TEST_SCHEMA
2021-09-09 08:45:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:45:56 INFO i.a.i.d.j.c.s.S3StreamCopier(createTemporaryTable):183 - {} - Preparing tmp table in destination for stream: actor, schema: AA_AIRBYTE_TEST_SCHEMA, tmp table name: _airbyte_tmp_sot_actor.
2021-09-09 08:45:57 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:45:57 INFO i.a.i.d.j.c.s.S3StreamCopier(copyStagingFileToTemporaryTable):189 - {} - Starting copy to tmp table: _airbyte_tmp_sot_actor in destination for stream: actor, schema: AA_AIRBYTE_TEST_SCHEMA, .
2021-09-09 08:45:58 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:45:58 ERROR i.a.i.d.s.SnowflakeS3StreamCopier(copyS3CsvFileIntoTable):73 - {} - **there was a download from the following files ==>> [{"file":"s3://airbyte-snowflake-integration-tests/2db32d83-b50b-4892-97cc-466956a96c34/AA_AIRBYTE_TEST_SCHEMA/bqzactor","status":"LOADED","rows_parsed":200,"rows_loaded":200,"error_limit":1,"errors_seen":0}]**
2021-09-09 08:45:58 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:45:58 INFO i.a.i.d.j.c.s.S3StreamCopier(copyStagingFileToTemporaryTable):191 - {} - Copy to tmp table _airbyte_tmp_sot_actor in destination for stream actor complete.
2021-09-09 08:45:58 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:45:58 INFO i.a.i.d.j.c.s.S3StreamCopier(createDestinationTable):197 - {} - Preparing table _airbyte_raw_actor in destination.
2021-09-09 08:46:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-09 08:46:00 INFO i.a.i.d.j.c.s.S3StreamCopier(createDestinationTable):199 - {} - Table _airbyte_tmp_sot_actor in destination prepared.

Recommended reading order

  1. x.java
  2. y.python

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions
  • Connector added to connector index like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions
  • Connector version bumped like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Connector Generator

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@github-actions github-actions bot added area/connectors Connector related issues area/documentation Improvements or additions to documentation labels Sep 8, 2021
Copy link
Contributor

@etsybaev etsybaev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@danieldiamond
Copy link
Contributor

@andriikorotkov are you able to add to the description? e.g. logs that you've produced or how the SQL for COPY INTO.. is generated incorrectly
Or is this simply a bug with snowflake when file extension is omitted - and if so, are we considering the future if we want to COPY INTO with other extensions/compressions?

@andriikorotkov
Copy link
Contributor Author

@danieldiamond, I slightly changed the formation of the name for the s3 file and added logs. you can see them in the description. If in your opinion everything looks good - I will merge this pull request.

@andriikorotkov andriikorotkov marked this pull request as ready for review September 9, 2021 09:19
Copy link
Contributor

@danieldiamond danieldiamond left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for resolving this. One suggestion for readability

@andriikorotkov
Copy link
Contributor Author

andriikorotkov commented Sep 9, 2021

/test connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1217465958
✅ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1217465958

@jrhizor jrhizor temporarily deployed to more-secrets September 9, 2021 13:23 Inactive
Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andriikorotkov is there a test we can add to make sure there are no regressions?

Could you also make an issue to verify this isn't happening on other Dbs?

Both of these can be handled after releasing this PR since it's a critical fix

@sherifnada
Copy link
Contributor

sherifnada commented Sep 9, 2021

/publish connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1219226263
✅ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/1219226263

@jrhizor jrhizor temporarily deployed to more-secrets September 9, 2021 23:03 Inactive
@sherifnada
Copy link
Contributor

@andriikorotkov I've published a version to unblock @danieldiamond -- so we can probably take care of the unit test in this PR and create the follow up issue for other DBs

@andriikorotkov
Copy link
Contributor Author

andriikorotkov commented Sep 10, 2021

/test connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/1220851466
✅ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/1220851466

@jrhizor jrhizor temporarily deployed to more-secrets September 10, 2021 10:24 Inactive
@andriikorotkov
Copy link
Contributor Author

andriikorotkov commented Sep 10, 2021

/publish connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/1221501599
✅ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/1221501599

@jrhizor jrhizor temporarily deployed to more-secrets September 10, 2021 14:01 Inactive
@andriikorotkov
Copy link
Contributor Author

andriikorotkov commented Sep 10, 2021

@sherifnada I have reproduced and fixed this bug for redshift as well. I also made small changes for the tests so that we don't get this error in future destinations. now the problem is in the jobs. Can I merge this pull request if not all jobs have worked correctly?

@sherifnada
Copy link
Contributor

@andriikorotkov yup you can move forward

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AWS S3 Staging COPY is writing records from different table in the same raw table
6 participants