Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEST Snowflake] - Sync succeeded but destination tables are empty #4745

Closed
philippeboyd opened this issue Jul 14, 2021 · 12 comments · Fixed by #5783 or #5784
Closed

[DEST Snowflake] - Sync succeeded but destination tables are empty #4745

philippeboyd opened this issue Jul 14, 2021 · 12 comments · Fixed by #5783 or #5784

Comments

@philippeboyd
Copy link
Contributor

Environment

  • Airbyte version: 0.27.2-alpha
  • OS Version / Instance: GCP COS
  • Deployment: Docker
  • Source Connector and version: MSSQL 0.3.2
  • Destination Connector and version: Snowflake 0.3.10 (with GCS stage)
  • Severity: High
  • Step where error happened: Sync

Current Behavior

The sync appears to be successful but the tables (raw and normalized) in the destination (Snowflake) are empty. In the logs, there appear to be only 1 error during data transfer (I guess) which seems to makes the whole data transfer unsuccessful.

image

Expected Behavior

There should be an exponential backoff on the all potential pitfalls during data transfers to retry the failed command.

Logs

LOG
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-13 23:56:30 �[1;31mERROR�[m i.a.i.d.b.BufferedStreamConsumer(close):203 - {} - on close failed.
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - net.snowflake.client.jdbc.SnowflakeSQLException: SQL execution canceled
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:124) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:64) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:434) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:338) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:501) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:229) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:167) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.core.SFStatement.execute(SFStatement.java:749) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:292) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at net.snowflake.client.jdbc.SnowflakeStatementV1.execute(SnowflakeStatementV1.java:342) ~[snowflake-jdbc-3.12.14.jar:3.12.14]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.db.jdbc.JdbcDatabase.lambda$execute$0(JdbcDatabase.java:54) ~[io.airbyte-airbyte-db-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.java:62) ~[io.airbyte-airbyte-db-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.db.jdbc.JdbcDatabase.execute(JdbcDatabase.java:54) ~[io.airbyte-airbyte-db-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.destination.snowflake.SnowflakeGcsStreamCopier.copyGcsCsvFileIntoTable(SnowflakeGcsStreamCopier.java:63) ~[io.airbyte.airbyte-integrations.connectors-destination-snowflake-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.destination.jdbc.copy.gcs.GcsStreamCopier.copyStagingFileToTemporaryTable(GcsStreamCopier.java:121) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.closeAsOneTransaction(CopyConsumerFactory.java:152) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$onCloseFunction$2(CopyConsumerFactory.java:136) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction.accept(OnCloseFunction.java:29) ~[io.airbyte.airbyte-integrations.bases-base-java-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:195) [io.airbyte.airbyte-integrations.bases-base-java-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:82) [io.airbyte.airbyte-integrations.bases-base-java-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:142) [io.airbyte.airbyte-integrations.bases-base-java-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:117) [io.airbyte.airbyte-integrations.bases-base-java-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	at io.airbyte.integrations.destination.snowflake.SnowflakeDestination.main(SnowflakeDestination.java:82) [io.airbyte.airbyte-integrations.connectors-destination-snowflake-0.27.1-alpha.jar:?]
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-13 23:56:30 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):122 - {} - Completed integration: io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-07-13 23:56:30 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-13 23:56:30 �[32mINFO�[m i.a.i.d.s.SnowflakeDestination(main):83 - {} - completed destination: class io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-07-13 23:56:31 INFO () DefaultReplicationWorker(run):144 - Destination thread complete.
2021-07-13 23:56:31 INFO () DefaultReplicationWorker(run):172 - sync summary: io.airbyte.config.ReplicationAttemptSummary@56a72bd4[status=completed,recordsSynced=42289296,bytesSynced=109522548726,startTime=1626186650362,endTime=1626220591157]
2021-07-13 23:56:31 INFO () DefaultReplicationWorker(run):181 - Source did not output any state messages
2021-07-13 23:56:31 WARN () DefaultReplicationWorker(run):189 - State capture: No new state, falling back on input state: io.airbyte.config.State@6ff49b54[state={}]
2021-07-13 23:56:31 INFO () TemporalAttemptExecution(get):133 - Stopping cancellation check scheduling...
2021-07-13 23:56:31 INFO () RetryingTemporalAttemptExecution(get):118 - Last output present: true. Should attempt again: false
2021-07-13 23:56:31 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):202 - attempt summaries: <REDACTED>
2021-07-13 23:56:31 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):203 - sync summary: <REDACTED>
2021-07-13 23:56:31 INFO () TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.27.2-alpha
2021-07-13 23:56:31 INFO () DefaultNormalizationWorker(run):61 - Running normalization.
2021-07-13 23:56:31 INFO () LineGobbler(voidCall):85 - Checking if airbyte/normalization:0.1.36 exists...
2021-07-13 23:56:31 INFO () LineGobbler(voidCall):85 - airbyte/normalization:0.1.36 was found locally.
2021-07-13 23:56:31 INFO () DockerProcessFactory(create):146 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/52/0/normalize --network host --log-driver none airbyte/normalization:0.1.36 run --integration-type snowflake --config destination_config.json --catalog destination_catalog.json
2021-07-13 23:56:31 INFO () LineGobbler(voidCall):85 - Running: transform-config --config destination_config.json --integration-type snowflake --out /data/52/0/normalize
2021-07-13 23:56:32 INFO () LineGobbler(voidCall):85 - Namespace(config='destination_config.json', integration_type=<DestinationType.snowflake: 'snowflake'>, out='/data/52/0/normalize')
2021-07-13 23:56:32 INFO () LineGobbler(voidCall):85 - transform_snowflake
2021-07-13 23:56:32 INFO () LineGobbler(voidCall):85 - Running: transform-catalog --integration-type snowflake --profile-config-dir /data/52/0/normalize --catalog destination_catalog.json --out /data/52/0/normalize/models/generated/ --json-column _airbyte_data
2021-07-13 23:56:32 INFO () LineGobbler(voidCall):85 - Processing destination_catalog.json...

 <REDACTED>

2021-07-13 23:56:52 INFO () LineGobbler(voidCall):85 - �[32mCompleted successfully�[0m
2021-07-13 23:56:52 INFO () LineGobbler(voidCall):85 - 
2021-07-13 23:56:52 INFO () LineGobbler(voidCall):85 - Done. PASS=22 WARN=0 ERROR=0 SKIP=0 TOTAL=22
2021-07-13 23:56:52 INFO () DefaultNormalizationWorker(run):77 - Normalization executed in 0.
2021-07-13 23:56:52 INFO () TemporalAttemptExecution(get):133 - Stopping cancellation check scheduling...

Steps to Reproduce

  1. Difficult to reproduce... Transfer 100GB+ of data from MSSQL to Snowflake
@philippeboyd philippeboyd added the type/bug Something isn't working label Jul 14, 2021
@philippeboyd philippeboyd changed the title Sync succeeded but destination tables are empty [DEST Snowflake] - Sync succeeded but destination tables are empty Jul 14, 2021
@marcosmarxm
Copy link
Member

marcosmarxm commented Jul 14, 2021

@philippeboyd is possible to read this to download Snowflake logs from their side (not completely sure you can do it too) . Maybe you can find more information from the error, this could be really helpful to us.

@sherifnada
Copy link
Contributor

sherifnada commented Jul 14, 2021

@philippeboyd is there any chance you also share the logs right before the snippet you provided above? Trying to see if there is anything potentially helpful context here

Really as much as you can share from the log would be great

@sherifnada
Copy link
Contributor

There are two issues here:

  1. the job is marked as success when it should have failed. A fix is WIP in Checkpointing: Partial Success in BufferedStreamConsumer (Destination) #3555
  2. the snowflake destination connector is failing. To understand what we can do better @philippeboyd can you review the past two comments from me and marcos so we can move forward?

@jbowlen
Copy link

jbowlen commented Jul 17, 2021

Since I have the same error (MSSQL -> Snowflake via GCS): The COPY INTO command aborts after 30 minutes. Does JDBC possibly use a timeout?
sfsql
Unfortunately difficult to test again, because the run is shown as "Succeeded" and therefore the files were deleted from GCS.

@philippeboyd
Copy link
Contributor Author

@jbowlen good find! same thing, looks like there's a timeout at 30 minutes
image

@sherifnada any possibilities to customize that timeout?

@sherifnada sherifnada added area/connectors Connector related issues lang/java labels Jul 19, 2021
@philippeboyd
Copy link
Contributor Author

philippeboyd commented Jul 19, 2021

This line is problematic when having big data:

Could the GCS/S3 copy from be done in batch?

@sherifnada sherifnada added the priority/high High priority label Jul 19, 2021
@sherifnada
Copy link
Contributor

thanks for the pointers everyone! Will get to this very soon. I think the fastest solution to unblock you will be to bump the timeout by a lot for now (let's say 12 hours).

@philippeboyd how do you mean in batch?

@philippeboyd
Copy link
Contributor Author

@sherifnada in batch I mean split the copy into request in multiple ones. Currently, I think the copy into copies from the stage recursively. Airbyte could execute that request for each subdirectory, if the structure is appropriate?

What if there's 1B rows to sync or 5 billion? That copy into could potentially take more than 12h...

I'm just trying to think of a more permanent solution that can scale for any amount of data... But for sure, that 12 hours timeout will surely help for the time being (hopefully). It's still a quick and temporary fix though.

@sherifnada
Copy link
Contributor

Agreed - my goal is to unblock y'all ASAP. Your suggestion is a great one, Philippe. Will follow up on it with the team.

@jbowlen
Copy link

jbowlen commented Jul 20, 2021

As far as I can see the data uploaded to GCS is not compressed. Wouldn't it be better to (gz) compress the data before uploading it? My use case would be uploading from on-premise mssql to Snowflake and there you would benefit from compression.

@jbowlen
Copy link

jbowlen commented Jul 24, 2021

I have created a CSV file locally with the destination connector "local CSV". This 55 GB file was split into 28 files and gz compressed, uploaded to GCS and imported to Snowflake.
The import process took 6 minutes.

I saw that there is a "best practice" issue (#4904). Possibly this issue will be covered there as well.
sfsql_gz

@danieldiamond
Copy link
Contributor

just stumbled upon this. was also experiencing the 30min timeout! glad thats been resolved 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment