Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres connector creates a similar table for "Center", but tries to load data from another existing table "center" #5171

Open
cristiscu opened this issue Aug 3, 2021 · 11 comments
Labels
area/connectors Connector related issues connectors/destination/snowflake connectors/source/postgres frozen Not being actively worked on lang/java team/destinations Destinations team's backlog type/bug Something isn't working

Comments

@cristiscu
Copy link

cristiscu commented Aug 3, 2021

Enviroment

  • Airbyte version: 0.28.2-alpha
  • OS Version / Instance: Amazon Linux 2 on Amazon Workspaces
  • Deployment: Docker
  • Source Connector and version: Postgres 0.3.7
  • Destination Connector and version: Snowflake 0.3.11
  • Severity: Medium
  • Step where error happened: Sync job

Current Behavior

You have both a "Center" and a "center" table in Postgres (this is valid, because Postgres table names are case sensitive). When you transfer data from "Center" through Airbyte from Postgres to let's say Snowflake, with normalization, a similar table is properly created, but the Sync job tries to load data from "center" instead. Delete "center", and data will be properly loaded from "Center".

Expected Behavior

When you transfer both metadata and data from "Center" with normalization, you expect a similar table created, and data loaded from the same "Center" table. Not from another existing "center" table.

Logs

Look below for the following rows:

Message contained record from a stream that was not in the catalog.
 catalog: {"streams":[{"stream":{"name":"Center", ...
 message: {"type":"RECORD","record":{"stream":"center","data":{...

LOG 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.IllegalArgumentException: Message contained record from a stream that was not in the catalog. 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - catalog: {"streams":[{"stream":{"name":"Center","json_schema":{"type":"object","properties":{"id":{"type":"number"},"fax":{"type":"string"},"url":{"type":"string"},"city":{"type":"string"},"logo":{"type":"string"},"name":{"type":"string"},"address":{"type":"string"},"contact":{"type":"string"},"StatusId":{"type":"number"},"province":{"type":"string"},"timezone":{"type":"string"},"tollFree":{"type":"string"},"createdAt":{"type":"string"},"legalName":{"type":"string"},"updatedAt":{"type":"string"},"postalCode":{"type":"string"},"_ab_cdc_lsn":{"type":"number"},"phoneNumber":{"type":"string"},"sequenceName":{"type":"string"},"invoicePrefix":{"type":"string"},"AuthenticationId":{"type":"string"},"preauthorization":{"type":"boolean"},"_ab_cdc_deleted_at":{"type":"number"},"_ab_cdc_updated_at":{"type":"number"},"spaBookerLocationId":{"type":"number"},"userLocaleFieldValue":{"type":"string"}}},"supported_sync_modes":["full_refresh","incremental"],"source_defined_cursor":true,"default_cursor_field":[],"source_defined_primary_key":[["id"]],"namespace":"CDC"},"sync_mode":"incremental","cursor_field":[],"destination_sync_mode":"append_dedup","primary_key":[["id"]]}]} , 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - message: {"type":"RECORD","record":{"stream":"center","data":{"id":1,"name":"**_","timezone":"America/Toronto","spaBookerLocationId":3510,"userLocaleFieldValue":"45997","createdAt":"2019-08-16T23:55:27.34661Z","updatedAt":"2019-08-16T23:55:27.34661Z","StatusId":3,"url":"https://_**.com","_ab_cdc_updated_at":1628016683316,"_ab_cdc_lsn":16161089541632,"_ab_cdc_deleted_at":null},"emitted_at":1628016678681,"namespace":"CDC"}} 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.throwUnrecognizedStream(BufferedStreamConsumer.java:197) 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:157) 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:66) 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:167) 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:148) 2021-08-03 18:51:25 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.snowflake.SnowflakeDestination.main(SnowflakeDestination.java:82)

Steps to Reproduce

  1. Select table "Center" to transfer from Postgres to Snowflake
  2. Reset works well, as empty tables are properly created
  3. Sync fails with a logged error "Message contained record from a stream that was not in the catalog.", as described before.

Are you willing to submit a PR?

Remove this with your answer.

┆Issue is synchronized with this Asana task by Unito

@alexandr-shegeda
Copy link
Contributor

@etsybaev etsybaev self-assigned this May 10, 2022
@etsybaev
Copy link
Contributor

Hi @cristiscu.
Is it still reproducible for you?

I was trying to reproduce it. Created 2 tables in Postgres:
Selection_202.png
Selection_203.png

Then I tried to migrate it:
Selection_204.png
Selection_205.png

Here is the result:
Successfully migrated 2 different streams with replicated data, no issues:
Selection_206.png

Thanks

@alexandr-shegeda
Copy link
Contributor

seems like we can not reproduce the issue with provided information, moved on hold until getting a response from @cristiscu

@etsybaev
Copy link
Contributor

etsybaev commented May 18, 2022

Additionally checked for other connectors, the issue seems to be in destinations, probably some kind of "toUpperCase()\to LowerCase()" standardization is used. Here are examples for snowflake and MySQL destinations. Records from both tables appear to be migrated to a single table:

Selection_207.png
Selection_210.png

Here is an example of specs for stream from source:
2022-05-18 15:37:53 �[32mINFO�[m i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):157 - sync summary: io.airbyte.config.StandardSyncOutput@402b3dd7[standardSyncSummary=io.airbyte.config.StandardSyncSummary@529bbb6[status=completed,recordsSynced=6,bytesSynced=178,startTime=1652888271735,endTime=1652888273913,totalStats=io.airbyte.config.SyncStats@29bb0c80[recordsEmitted=6,bytesEmitted=178,stateMessagesEmitted=0,recordsCommitted=6],streamStats=[io.airbyte.config.StreamSyncStats@eb5387[streamName=test_1_bigint,stats=io.airbyte.config.SyncStats@7c898db6[recordsEmitted=4,bytesEmitted=136,stateMessagesEmitted=,recordsCommitted=4]], io.airbyte.config.StreamSyncStats@80286e9[streamName=Test_1_bigint,stats=io.airbyte.config.SyncStats@79ac5465[recordsEmitted=2,bytesEmitted=42,stateMessagesEmitted=,recordsCommitted=2]]]],normalizationSummary=,state=io.airbyte.config.State@238147bc[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@551fb3da[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@3466f82d[stream=io.airbyte.protocol.models.AirbyteStream@2dbaaef3[name=test_1_bigint,jsonSchema={"type":"object","properties":{"id":{"type":"number"},"test_column":{"type":"number"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=test,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@650644f9[stream=io.airbyte.protocol.models.AirbyteStream@4d2fbae0[name=Test_1_bigint,jsonSchema={"type":"object","properties":{"id":{"type":"number"},"data":{"type":"string"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=test,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}],failures=[]]

@cristiscu
Copy link
Author

I would like to keep this open a bit longer. You came up with a lot of stuff and I had no time to review it.

@etsybaev
Copy link
Contributor

@cristiscu I can confirm that basically issue exists, but it's on the destination side as we normalize names for destination connectors. The issue appears to be more comprehensive and reviewers some business solutions as the straightforward fix will break backward compatibility. We're on it.
Thanks

@etsybaev
Copy link
Contributor

@sherifnada
Copy link
Contributor

relies on airbytehq/airbyte-internal-issues#206

@cristiscu
Copy link
Author

guys, you can go ahead and close it if you wish, as I really don't have time right now for this. :(

@etsybaev
Copy link
Contributor

@grishick
Copy link
Contributor

grishick commented Jan 31, 2023

Proposals in RFC are blocked on https://github.com/airbytehq/airbyte-internal-issues/issues/645 and #20561

@bleonard bleonard added the frozen Not being actively worked on label Mar 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues connectors/destination/snowflake connectors/source/postgres frozen Not being actively worked on lang/java team/destinations Destinations team's backlog type/bug Something isn't working
Projects
Status: Prioritized for scoping
Development

No branches or pull requests

7 participants