Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres to MySQL fails if source has a column with json data #4583

Closed
julianopiovezan opened this issue Jul 6, 2021 · 1 comment · Fixed by #4825
Closed

Postgres to MySQL fails if source has a column with json data #4583

julianopiovezan opened this issue Jul 6, 2021 · 1 comment · Fixed by #4825

Comments

@julianopiovezan
Copy link
Contributor

julianopiovezan commented Jul 6, 2021

Enviroment

  • Airbyte version: 0.26.2-alpha
  • OS Version / Instance: Debian bullseye/sid, AWS EC2
  • Deployment: Docker
  • Source Connector and version: Postgres 0.3.5
  • Destination Connector and version: MySQL 0.1.6
  • Severity: Critical
  • Step where error happened: Sync job

Current Behavior

Sync job doesn't work when the source table has a column with json data, failing with the following error:

ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.RuntimeException: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Invalid JSON text: "Missing a comma or '}' after an object member." at position 92 in value for column '_airbyte_tmp_pgf_metabase_test._airbyte_data'.

Expected Behavior

Sync should work.

Logs

Sync LOG

2021-07-06 20:44:58 INFO () WorkerRun(call):62 - Executing worker wrapper. Airbyte version: 0.26.2-alpha
2021-07-06 20:44:58 INFO () TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.26.2-alpha
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(run):102 - start sync worker. job id: 1309 attempt id: 0
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(run):111 - configured sync modes: {public.test=full_refresh - overwrite}
2021-07-06 20:44:58 INFO () DefaultAirbyteDestination(start):78 - Running destination...
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - Checking if airbyte/destination-mysql:0.1.6 exists...
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - airbyte/destination-mysql:0.1.6 was found locally.
2021-07-06 20:44:58 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1309/0 --network host airbyte/destination-mysql:0.1.6 write --config destination_config.json --catalog destination_catalog.json
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - Checking if airbyte/source-postgres:0.3.5 exists...
2021-07-06 20:44:58 INFO () LineGobbler(voidCall):85 - airbyte/source-postgres:0.3.5 was found locally.
2021-07-06 20:44:58 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1309/0 --network host airbyte/source-postgres:0.3.5 read --config source_config.json --catalog source_catalog.json --state input_state.json
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):237 - Destination output thread started.
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(run):139 - Waiting for source thread to join.
2021-07-06 20:44:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):207 - Replication thread started.
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.d.m.MySQLDestination(main):110 - {} - starting destination: class io.airbyte.integrations.destination.mysql.MySQLDestination
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):78 - {} - Running integration: io.airbyte.integrations.destination.mysql.MySQLDestination
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):82 - {} - Command: WRITE
2021-07-06 20:45:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):83 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:04 �[32mINFO�[m i.a.i.s.p.PostgresSource(main):336 - {} - starting source: class io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):78 - {} - Running integration: io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {read=null, catalog=source_catalog.json, state=input_state.json, config=source_config.json}
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):82 - {} - Command: READ
2021-07-06 20:45:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:05 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):83 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='input_state.json'}
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=metabase-test, namespace=null, outputSchemaName=airbyte_raw, tmpTableName=_airbyte_tmp_pgf_metabase_test, outputTableName=_airbyte_raw_metabase_test, syncMode=overwrite}
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(startTracked):120 - {} - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):139 - {} - Preparing tmp tables in destination started for 1 streams
2021-07-06 20:45:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:06 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream metabase-test. schema: airbyte_raw, tmp table name: _airbyte_tmp_pgf_metabase_test
2021-07-06 20:45:10 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:10 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):149 - {} - Preparing tables in destination completed.
2021-07-06 20:45:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:11 �[32mINFO�[m i.a.i.s.p.PostgresSource(isCdc):287 - {} - using CDC: false
2021-07-06 20:45:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:11 �[32mINFO�[m i.a.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):98 - {} - Attempting to get metadata from the database to see if we can connect.
2021-07-06 20:45:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:12 �[32mINFO�[m i.a.i.s.r.CdcStateManager(<init>):46 - {} - Initialized CDC state with: null
2021-07-06 20:45:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:12 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='test', namespace='public'}, New Cursor Field: null. Resetting cursor value
2021-07-06 20:45:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:18 �[32mINFO�[m i.a.i.s.p.PostgresSource(isCdc):287 - {} - using CDC: false
2021-07-06 20:45:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:18 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):460 - {} - Queueing query for table: test
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):200 - {} - Closing database connection pool.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):202 - {} - Closed database connection pool.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):118 - {} - Completed integration: io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.s.p.PostgresSource(main):338 - {} - completed source: class io.airbyte.integrations.source.postgres.PostgresSource
2021-07-06 20:45:21 INFO () DefaultReplicationWorker(run):141 - Source thread complete.
2021-07-06 20:45:21 INFO () DefaultReplicationWorker(run):142 - Waiting for destination thread to join.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 - {} - Airbyte message consumer: succeeded.
2021-07-06 20:45:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-07-06 20:45:21 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(close):190 - {} - executing on success close procedure.
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.RuntimeException: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Invalid JSON text: "Missing a comma or '}' after an object member." at position 92 in value for column '_airbyte_tmp_pgf_metabase_test._airbyte_data'.
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.lambda$loadDataIntoTable$0(MySQLSqlOperations.java:90)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.java:61)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.loadDataIntoTable(MySQLSqlOperations.java:76)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.insertRecords(MySQLSqlOperations.java:62)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.jdbc.JdbcBufferedConsumerFactory.lambda$recordWriterFunction$2(JdbcBufferedConsumerFactory.java:167)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.flushQueueToDestination(BufferedStreamConsumer.java:164)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:191)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:82)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:138)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:113)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLDestination.main(MySQLDestination.java:111)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - Caused by: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Invalid JSON text: "Missing a comma or '}' after an object member." at position 92 in value for column '_airbyte_tmp_pgf_metabase_test._airbyte_data'.
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:104)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:764)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:648)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:194)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:194)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - at io.airbyte.integrations.destination.mysql.MySQLSqlOperations.lambda$loadDataIntoTable$0(MySQLSqlOperations.java:87)
2021-07-06 20:45:21 ERROR () LineGobbler(voidCall):85 - ... 10 more
2021-07-06 20:45:22 INFO () DefaultReplicationWorker(run):144 - Destination thread complete.
2021-07-06 20:45:22 WARN () DefaultAirbyteDestination(close):125 - Destination process might not have shut down correctly. destination process alive: false, destination process exit value: 1. This warning is normal if the job was cancelled.
2021-07-06 20:45:22 INFO () DefaultReplicationWorker(run):169 - sync summary: io.airbyte.config.ReplicationAttemptSummary@db6c580[status=completed,recordsSynced=1,bytesSynced=206,startTime=1625604298259,endTime=1625604322001]
2021-07-06 20:45:22 INFO () DefaultReplicationWorker(run):178 - Source did not output any state messages
2021-07-06 20:45:22 WARN () DefaultReplicationWorker(run):186 - State capture: No new state, falling back on input state: io.airbyte.config.State@289cf40e[state={}]
2021-07-06 20:45:22 INFO () TemporalAttemptExecution(get):133 - Stopping cancellation check scheduling...
2021-07-06 20:45:22 INFO () RetryingTemporalAttemptExecution(get):118 - Last output present: true. Should attempt again: false
2021-07-06 20:45:22 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):200 - attempt summaries: [io.airbyte.config.ReplicationOutput@1657875c[replicationAttemptSummary=io.airbyte.config.ReplicationAttemptSummary@db6c580[status=completed,recordsSynced=1,bytesSynced=206,startTime=1625604298259,endTime=1625604322001],state=io.airbyte.config.State@289cf40e[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@259c0837[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@75889e09[stream=io.airbyte.protocol.models.AirbyteStream@3fd3a21a[name=metabase-test,jsonSchema={"type":"object","properties":{"id":{"type":"number"},"model":{"type":"string"},"topic":{"type":"string"},"details":{"type":"string"},"user_id":{"type":"number"},"model_id":{"type":"number"},"table_id":{"type":"number"},"custom_id":{"type":"string"},"timestamp":{"type":"string"},"database_id":{"type":"number"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}]]]
2021-07-06 20:45:22 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):201 - sync summary: io.airbyte.config.StandardSyncOutput@61c57430[standardSyncSummary=io.airbyte.config.StandardSyncSummary@4324c960[status=completed,recordsSynced=1,bytesSynced=206,startTime=1625604298259,endTime=1625604322001],state=io.airbyte.config.State@289cf40e[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@259c0837[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@75889e09[stream=io.airbyte.protocol.models.AirbyteStream@3fd3a21a[name=metabase-test,jsonSchema={"type":"object","properties":{"id":{"type":"number"},"model":{"type":"string"},"topic":{"type":"string"},"details":{"type":"string"},"user_id":{"type":"number"},"model_id":{"type":"number"},"table_id":{"type":"number"},"custom_id":{"type":"string"},"timestamp":{"type":"string"},"database_id":{"type":"number"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}]]

Source table definition and test data


create table test
(
	id integer,
	topic varchar(32),
	timestamp timestamp with time zone,
	user_id integer,
	model varchar(16),
	model_id integer,
	database_id integer,
	table_id integer,
	custom_id varchar(48),
	details varchar
);

insert into public.test (id, topic, timestamp, user_id, model, model_id, database_id, table_id, custom_id, details) values (1, 'install', '2017-08-22 17:22:14.519000', null, 'install', null, null, null, null, '{"name":"Conferência Faturamento - Custo - Taxas - Margem - Resumo ano inicial até -2","description":null}');

Steps to Reproduce

  1. Create the test table in a postgres db with the above statement.
  2. Setup a postgres source.
  3. Setup a mysql destination.
  4. Setup a connection.
  5. Run sync.
@julianopiovezan julianopiovezan added the type/bug Something isn't working label Jul 6, 2021
@marcosmarxm marcosmarxm added the area/connectors Connector related issues label Jul 7, 2021
@marcosmarxm
Copy link
Member

thanks for reporting this @julianopiovezan. I added this to the connector roadmap and soon this is going to be prioritize.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment