Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker does not finish off job. Source and Destination containers hang #5754

Closed
danieldiamond opened this issue Aug 31, 2021 · 22 comments · Fixed by #12544
Closed

Worker does not finish off job. Source and Destination containers hang #5754

danieldiamond opened this issue Aug 31, 2021 · 22 comments · Fixed by #12544

Comments

@danieldiamond
Copy link
Contributor

danieldiamond commented Aug 31, 2021

Enviroment

  • Airbyte version: 0.29.13-alpha
  • OS Version / Instance: AWS EC2
  • Deployment: Docker
  • Source Connector and version: MySQL 0.4.4
  • Destination Connector and version: Snowflake 0.3.13
  • Severity: Critical
  • Step where error happened: Sync

Current Behavior

Sync job just hangs after completed source and whilst the source and destination containers exist, there appears to be no worker container. Additionally, the destination is not finishes with its inserts into snowflake

Expected Behavior

Worker should send close message to destination and ensure that the source and destination containers finish. Something like

BufferedStreamConsumer(close):212 - {} - executing on success close procedure.

Logs

If applicable, please upload the logs from the failing operation.
For sync jobs, you can download the full logs from the UI by going to the sync attempt page and
clicking the download logs button at the top right of the logs display window.

LOG
2021-08-31 02:27:03 INFO () WorkerRun(call):62 - Executing worker wrapper. Airbyte version: 0.29.13-alpha
2021-08-31 02:27:04 INFO () TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.29.13-alpha
2021-08-31 02:27:04 INFO () DefaultReplicationWorker(run):102 - start sync worker. job id: 7000 attempt id: 0
2021-08-31 02:27:04 INFO () DefaultReplicationWorker(run):111 - configured sync modes: {myschema.tableA=incremental - append_dedup, myschema.tableb=incremental - append_dedup, myschema.tableC=incremental - append_dedup, myschema.tableD=incremental - append_dedup, myschema.tableE=incremental - append_dedup, myschema.tableF=incremental - append_dedup, myschema.tableH=incremental - append, myschema.tableI=incremental - append_dedup, myschema.tableJ=incremental - append_dedup, myschema.tableK=incremental - append_dedup}
2021-08-31 02:27:04 INFO () DefaultAirbyteDestination(start):78 - Running destination...
2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - Checking if airbyte/destination-snowflake:0.3.12 exists...
2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - airbyte/destination-snowflake:0.3.12 was found locally.
2021-08-31 02:27:04 INFO () DockerProcessFactory(create):146 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/7000/0 --network host --log-driver none airbyte/destination-snowflake:0.3.12 write --config destination_config.json --catalog destination_catalog.json
2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - Checking if airbyte/source-mysql:0.4.4 exists...
2021-08-31 02:27:04 INFO () LineGobbler(voidCall):85 - airbyte/source-mysql:0.4.4 was found locally.
2021-08-31 02:27:04 INFO () DockerProcessFactory(create):146 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/7000/0 --network host --log-driver none airbyte/source-mysql:0.4.4 read --config source_config.json --catalog source_catalog.json
2021-08-31 02:27:04 INFO () DefaultReplicationWorker(run):139 - Waiting for source thread to join.
2021-08-31 02:27:04 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):246 - Destination output thread started.
2021-08-31 02:27:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):210 - Replication thread started.
2021-08-31 02:27:10 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:10 �[32mINFO�[m i.a.i.d.s.SnowflakeDestination(main):81 - {} - starting destination: class io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-08-31 02:27:10 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:10 �[32mINFO�[m i.a.i.s.m.MySqlSource(main):249 - {} - starting source: class io.airbyte.integrations.source.mysql.MySqlSource
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: WRITE
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.source.mysql.MySqlSource
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {read=null, catalog=source_catalog.json, config=source_config.json}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: READ
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.c.SwitchingDestination(getConsumer):83 - {} - Using destination type: INSERT
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.CdcStateManager(<init>):46 - {} - Initialized CDC state with: null
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableD', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableF', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableH', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableb', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableA', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableE', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableI', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableJ, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_lzs_tableJ, outputTableName=_airbyte_raw_tableJ, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableJ', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableK', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='tableC', namespace='mynamespace'}, New Cursor Field: null. Resetting cursor value
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableb, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_mas_tableb, outputTableName=_airbyte_raw_tableb, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableH, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_grh_tableH, outputTableName=_airbyte_raw_tableH, syncMode=append}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableE, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_cvl_tableE, outputTableName=_airbyte_raw_tableE, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableC, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_shl_tableC, outputTableName=_airbyte_raw_tableC, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableD, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_tgr_tableD, outputTableName=_airbyte_raw_tableD, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableK, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_ahf_tableK, outputTableName=_airbyte_raw_tableK, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableF, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_wmm_tableF, outputTableName=_airbyte_raw_tableF, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableI, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_hla_tableI, outputTableName=_airbyte_raw_tableI, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):116 - {} - Write config: WriteConfig{streamName=tableA, namespace=my_destination, outputSchemaName=my_destination, tmpTableName=_airbyte_tmp_jdu_tableA, outputTableName=_airbyte_raw_tableA, syncMode=append_dedup}
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(startTracked):142 - {} - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):139 - {} - Preparing tmp tables in destination started for 10 streams
2021-08-31 02:27:11 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:11 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableJ. schema: my_destination, tmp table name: _airbyte_tmp_lzs_tableJ
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.a.i.s.m.MySqlCdcTargetPosition(targetPosition):81 - {} - Target tableJ position : tableJName: mysql-bin-changelog.164150, Position : 71313
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.a.i.d.AirbyteDebeziumHandler(getIncrementalIterators):92 - {} - using CDC: true
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m o.a.k.c.c.AbstractConfig(logAll):354 - {} - EmbeddedConfig values: 
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	access.control.allow.methods = 
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	access.control.allow.origin = 
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	admin.listeners = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	bootstrap.servers = [localhost:9092]
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	client.dns.lookup = use_all_dns_ips
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	config.providers = []
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	connector.client.config.override.policy = None
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	key.converter = class org.apache.kafka.connect.json.JsonConverter
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	listeners = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	metric.reporters = []
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	metrics.num.samples = 2
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	metrics.recording.level = INFO
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	metrics.sample.window.ms = 30000
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	offset.flush.interval.ms = 1000
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	offset.flush.timeout.ms = 5000
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	offset.storage.tableJ.tableJname = /tmp/cdc-state-offset10480844211472211971/offset.dat
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	offset.storage.partitions = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	offset.storage.replication.factor = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	offset.storage.topic = 
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	plugin.path = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	response.http.headers.config = 
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	rest.advertised.host.name = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	rest.advertised.listener = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	rest.advertised.port = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	rest.extension.classes = []
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	rest.host.name = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	rest.port = 8083
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.cipher.suites = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.client.auth = none
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.endpoint.identification.algorithm = https
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.engine.factory.class = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.key.password = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.keymanager.algorithm = SunX509
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.keystore.location = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.keystore.password = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.keystore.type = JKS
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.protocol = TLSv1.3
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.provider = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.secure.random.implementation = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.trustmanager.algorithm = PKIX
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.truststore.location = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.truststore.password = null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	ssl.truststore.type = JKS
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	task.shutdown.graceful.timeout.ms = 5000
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	topic.creation.enable = true
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	topic.tracking.allow.reset = true
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	topic.tracking.enable = true
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 	value.converter = class org.apache.kafka.connect.json.JsonConverter
2021-08-31 02:27:12 INFO () JsonSchemaValidator(test):76 - JSON schema validation failed. 
errors: $: unknown found, object expected
2021-08-31 02:27:12 ERROR () DefaultAirbyteStreamFactory(lambda$create$1):83 - Validation failed: null
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m o.a.k.c.r.WorkerConfig(logPluginPathConfigProviderWarning):420 - {} - Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "table.blacklist" is deprecated and will be removed in future versions. Please use "table.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "table.blacklist" is deprecated and will be removed in future versions. Please use "table.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "database.blacklist" is deprecated and will be removed in future versions. Please use "database.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "database.blacklist" is deprecated and will be removed in future versions. Please use "database.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "column.whitelist" is deprecated and will be removed in future versions. Please use "column.include.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "column.blacklist" is deprecated and will be removed in future versions. Please use "column.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "column.whitelist" is deprecated and will be removed in future versions. Please use "column.include.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "column.blacklist" is deprecated and will be removed in future versions. Please use "column.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "column.whitelist" is deprecated and will be removed in future versions. Please use "column.include.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {} - Using configuration property "column.blacklist" is deprecated and will be removed in future versions. Please use "column.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.c.BaseSourceTask(start):100 - {} - Starting MySqlConnectorTask with configuration:


...
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Using configuration property "database.blacklist" is deprecated and will be removed in future versions. Please use "database.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Using configuration property "table.blacklist" is deprecated and will be removed in future versions. Please use "table.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[33mWARN�[m i.d.c.Configuration(lambda$getFallbackStringProperty$34):2135 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Using configuration property "column.blacklist" is deprecated and will be removed in future versions. Please use "column.exclude.list" instead.
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.MySqlConnectorTask(start):168 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Found no existing offset, so preparing to perform a snapshot
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.u.Threads(threadFactory):270 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Requested thread factory for connector MySqlConnector, id = mysource named = binlog-client
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.u.Threads(threadFactory):270 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Requested thread factory for connector MySqlConnector, id = mysource named = snapshot
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.u.Threads$3(newThread):287 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Creating thread debezium-mysqlconnector-mysource-snapshot
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):248 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Starting snapshot for jdbc:mysql://mydb.rds.amazonaws.com:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'user' with locking mode 'none'
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(logRolesForCurrentUser):1021 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Snapshot is using user 'user' with these MySQL grants:
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logRolesForCurrentUser$21):1022 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	GRANT SELECT, RELOAD, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%'
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(logServerInformation):992 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - MySQL server variables related to change data capture:
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_cache_size                             = 32768                                        
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_checksum                               = NONE                                         
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_direct_non_transactional_updates       = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_error_action                           = ABORT_SERVER                                 
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_format                                 = ROW                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_group_commit_sync_delay                = 0                                            
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_group_commit_sync_no_delay_count       = 0                                            
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_gtid_simple_recovery                   = ON                                           
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_max_flush_queue_time                   = 0                                            
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_order_commits                          = ON                                           
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_row_image                              = FULL                                         
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_rows_query_log_events                  = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_stmt_cache_size                        = 32768                                        
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_transaction_dependency_history_size    = 25000                                        
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	binlog_transaction_dependency_tracking        = COMMIT_ORDER                                 
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_set_client                          = utf8mb4                                      
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_set_connection                      = utf8mb4                                      
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_set_database                        = utf8mb4                                      
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_set_tableJsystem                      = binary                                       
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_set_results                         = utf8mb4                                      
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_set_server                          = latin1                                       
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_set_system                          = utf8                                         
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	character_sets_dir                            = /rdsdbbin/mysql-5.7.33.R2/share/charsets/    
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	collation_connection                          = utf8mb4_general_ci                           
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	collation_database                            = utf8mb4_unicode_520_ci                       
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	collation_server                              = latin1_swedish_ci                            
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	enforce_gtid_consistency                      = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	gtid_executed_compression_period              = 1000                                         
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	gtid_mode                                     = OFF_PERMISSIVE                               
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	gtid_next                                     = AUTOMATIC                                    
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	gtid_owned                                    =                                              
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	gtid_purged                                   =                                              
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	innodb_api_enable_binlog                      = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	innodb_locks_unsafe_for_binlog                = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	innodb_version                                = 5.7.33                                       
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	log_statements_unsafe_for_binlog              = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	max_binlog_cache_size                         = 18446744073709547520                         
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	max_binlog_size                               = 134217728                                    
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	max_binlog_stmt_cache_size                    = 18446744073709547520                         
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	protocol_version                              = 10                                           
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	session_track_gtids                           = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	slave_type_conversions                        =                                              
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	sync_binlog                                   = 1                                            
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	system_time_zone                              = UTC                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	time_zone                                     = UTC                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	tls_version                                   = TLSv1,TLSv1.1,TLSv1.2                        
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	tx_isolation                                  = REPEATABLE-READ                              
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	tx_read_only                                  = OFF                                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	version                                       = 5.7.33-log                                   
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	version_comment                               = Source distribution                          
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	version_compile_machine                       = x86_64                                       
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$logServerInformation$19):995 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	version_compile_os                            = Linux                                        
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):282 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):342 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 2: start transaction with consistent snapshot
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):367 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 3: read list of available databases
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):375 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	 list of available databases is: [...]
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):386 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 4: read list of available tables in each database

...
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):448 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	snapshot continuing with database(s): [mydbt]
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(readBinlogPosition):906 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 5: read binlog position of MySQL primary server
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$readBinlogPosition$18):918 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - 	 using binlog 'mysql-bin-changelog.164150' at position '71313' and gtid ''
2021-08-31 02:27:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:12 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):494 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 6: generating DROP and CREATE statements to reflect current database schemas:
2021-08-31 02:27:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:14 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableb. schema: my_destination, tmp table name: _airbyte_tmp_mas_tableb
2021-08-31 02:27:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:14 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):595 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: scanning contents of 10 tables while still in transaction
2021-08-31 02:27:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:14 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):646 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - scanning table 'myschema.tableJ' (1 of 10 tables)
2021-08-31 02:27:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:14 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):651 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - For table 'myschema.tableJ' using select statement: 'SELECT * FROM `mydbt`.`tableJ`'
2021-08-31 02:27:14 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:14 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 10000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:00.541
2021-08-31 02:27:15 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:15 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableH. schema: my_destination, tmp table name: _airbyte_tmp_grh_tableH
2021-08-31 02:27:16 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:16 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableE. schema: my_destination, tmp table name: _airbyte_tmp_cvl_tableE
2021-08-31 02:27:17 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:17 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableC. schema: my_destination, tmp table name: _airbyte_tmp_shl_tableC
2021-08-31 02:27:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:18 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableD. schema: my_destination, tmp table name: _airbyte_tmp_tgr_tableD
2021-08-31 02:27:19 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:19 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableK. schema: my_destination, tmp table name: _airbyte_tmp_ahf_tableK
2021-08-31 02:27:20 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:20 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableF. schema: my_destination, tmp table name: _airbyte_tmp_wmm_tableF
2021-08-31 02:27:21 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:21 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableI. schema: my_destination, tmp table name: _airbyte_tmp_hla_tableI
2021-08-31 02:27:22 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:22 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):143 - {} - Preparing tmp table in destination started for stream tableA. schema: my_destination, tmp table name: _airbyte_tmp_jdu_tableA
2021-08-31 02:27:23 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:23 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):149 - {} - Preparing tables in destination completed.
2021-08-31 02:27:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 1000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 2000
2021-08-31 02:27:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:15 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 20000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:00.928
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 3000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 4000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 5000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 6000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 7000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 8000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 9000
2021-08-31 02:27:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 10000
2021-08-31 02:27:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:24 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 11000
2021-08-31 02:27:28 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:24 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 30000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:10.384
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 12000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 13000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 14000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 15000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 16000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 17000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 18000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 19000
2021-08-31 02:27:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 20000
2021-08-31 02:27:28 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:28 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 21000
2021-08-31 02:27:33 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:28 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 40000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:14.358
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 22000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 23000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 24000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 25000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 26000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 27000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 28000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 29000
2021-08-31 02:27:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 30000
2021-08-31 02:27:33 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:33 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 31000
2021-08-31 02:27:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:33 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 50000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:19.459
2021-08-31 02:27:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 32000
2021-08-31 02:27:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 33000
2021-08-31 02:27:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 34000
2021-08-31 02:27:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 35000
2021-08-31 02:27:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 36000
2021-08-31 02:27:38 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 37000
2021-08-31 02:27:38 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 38000
2021-08-31 02:27:38 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 39000
2021-08-31 02:27:38 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 40000
2021-08-31 02:27:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:38 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:41 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 41000
2021-08-31 02:27:41 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:38 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 60000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:23.843
2021-08-31 02:27:41 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 42000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 43000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 44000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 46000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 47000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 48000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 49000
2021-08-31 02:27:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 50000
2021-08-31 02:27:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:42 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 51000
2021-08-31 02:27:45 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:42 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 70000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:28.241
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 52000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 53000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 54000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 55000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 56000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 57000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 58000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 59000
2021-08-31 02:27:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 60000
2021-08-31 02:27:45 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:45 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 61000
2021-08-31 02:27:49 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:45 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 80000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:31.539
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 62000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 63000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 64000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 65000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 66000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 67000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 68000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 69000
2021-08-31 02:27:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 70000
2021-08-31 02:27:49 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:49 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:49 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:49 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 90000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:35.423
2021-08-31 02:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 71000
2021-08-31 02:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 72000
2021-08-31 02:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 73000
2021-08-31 02:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 74000
2021-08-31 02:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 75000
2021-08-31 02:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 76000
2021-08-31 02:27:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 77000
2021-08-31 02:27:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 78000
2021-08-31 02:27:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 79000
2021-08-31 02:27:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 80000
2021-08-31 02:27:53 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:53 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:53 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 100000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:38.811
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 81000
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 82000
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 83000
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 84000
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 85000
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 86000
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 87000
2021-08-31 02:27:56 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 88000
2021-08-31 02:27:57 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 89000
2021-08-31 02:27:57 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 90000
2021-08-31 02:27:57 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:57 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:27:59 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 91000
2021-08-31 02:28:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:27:57 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 110000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:42.68
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 92000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 93000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 94000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 95000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 96000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 97000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 98000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 99000
2021-08-31 02:28:00 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 100000
2021-08-31 02:28:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:28:00 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 101000
2021-08-31 02:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:28:00 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 120000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:46.228
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 102000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 103000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 104000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 105000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 106000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 107000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 108000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 109000
2021-08-31 02:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 110000
2021-08-31 02:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:28:04 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 02:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 111000
2021-08-31 02:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 02:28:04 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 130000 of 12540934 rows scanned from table 'myschema.tableJ' after 00:00:50.307
2021-08-31 02:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 112000
2021-08-31 02:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 113000
2021-08-31 02:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 114000



2021-08-31 07:27:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45534000
2021-08-31 07:27:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45535000
2021-08-31 07:27:48 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:48 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 2620000 of 2354186 rows scanned from table 'myschema.tableA' after 00:16:08.399
2021-08-31 07:27:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45536000
2021-08-31 07:27:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45537000
2021-08-31 07:27:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45538000
2021-08-31 07:27:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45539000
2021-08-31 07:27:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45540000
2021-08-31 07:27:48 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:48 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 07:27:51 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45541000
2021-08-31 07:27:51 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45542000
2021-08-31 07:27:51 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45543000
2021-08-31 07:27:51 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45544000
2021-08-31 07:27:51 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45545000
2021-08-31 07:27:51 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:51 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 2630000 of 2354186 rows scanned from table 'myschema.tableA' after 00:16:11.743
2021-08-31 07:27:51 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45546000
2021-08-31 07:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45547000
2021-08-31 07:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45548000
2021-08-31 07:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45549000
2021-08-31 07:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45550000
2021-08-31 07:27:52 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:52 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 07:27:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45551000
2021-08-31 07:27:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45552000
2021-08-31 07:27:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45553000
2021-08-31 07:27:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45554000
2021-08-31 07:27:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45555000
2021-08-31 07:27:54 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:52 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 2640000 of 2354186 rows scanned from table 'myschema.tableA' after 00:16:12.391
2021-08-31 07:27:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45556000
2021-08-31 07:27:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45557000
2021-08-31 07:27:55 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45558000
2021-08-31 07:27:55 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45559000
2021-08-31 07:27:55 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45560000
2021-08-31 07:27:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:55 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 07:27:57 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45561000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45562000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45563000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45564000
2021-08-31 07:27:58 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:55 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 2650000 of 2354186 rows scanned from table 'myschema.tableA' after 00:16:15.385
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45565000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45566000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45567000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45568000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45569000
2021-08-31 07:27:58 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45570000
2021-08-31 07:27:58 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:58 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45571000
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45572000
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45573000
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45574000
2021-08-31 07:28:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:27:58 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):677 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - 2660000 of 2354186 rows scanned from table 'myschema.tableA' after 00:16:18.606
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45575000
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45576000
2021-08-31 07:28:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:01 �[32mINFO�[m i.d.c.m.SnapshotReader(lambda$execute$16):687 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: - Completed scanning a total of 2661367 rows from table 'myschema.tableA' after 00:16:21.172
2021-08-31 07:28:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:01 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):724 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 7: scanned 45595383 rows in 10 tables in 05:00:46.833
2021-08-31 07:28:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:01 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):761 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Step 8: committing transaction
2021-08-31 07:28:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:01 �[32mINFO�[m i.d.c.m.SnapshotReader(execute):840 - {dbz.connectorContext=snapshot, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Completed snapshot in 05:00:48.803
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45577000
2021-08-31 07:28:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:01 �[32mINFO�[m i.d.j.JdbcConnection(lambda$doClose$3):945 - {} - Connection gracefully closed
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45578000
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45579000
2021-08-31 07:28:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45580000
2021-08-31 07:28:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:01 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45581000
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45582000
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45583000
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45584000
2021-08-31 07:28:04 ERROR () LineGobbler(voidCall):85 - Aug 31, 2021 7:28:04 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
2021-08-31 07:28:04 ERROR () LineGobbler(voidCall):85 - INFO: Connected to mydb.rds.amazonaws.com:3306 at mysql-bin-changelog.164150/71313 (sid:5700, cid:126306)
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45585000
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.ChainedReader(startNextReader):201 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Transitioning from the snapshot reader to the binlog reader
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45586000
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.BinlogReader(doStart):367 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - GTID set purged on server: 
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.u.Threads$3(newThread):287 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Creating thread debezium-mysqlconnector-mydb-binlog-client
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.u.Threads$3(newThread):287 - {} - Creating thread debezium-mysqlconnector-mydb-binlog-client
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45587000
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.BinlogReader$ReaderThreadLifecycleListener(onConnect):1128 - {dbz.connectorContext=binlog, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Connected to MySQL binlog at mydb.rds.amazonaws.com:3306, starting at binlog tableJ 'mysql-bin-changelog.164150', pos=71313, skipping 0 events plus 0 rows
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.BinlogReader(doStart):415 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Waiting for keepalive thread to start
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.u.Threads$3(newThread):287 - {dbz.connectorContext=binlog, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Creating thread debezium-mysqlconnector-mydb-binlog-client
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45588000
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45589000
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.BinlogReader(doStart):422 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Keepalive thread is running
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45590000
2021-08-31 07:28:04 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 10000
2021-08-31 07:28:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45591000
2021-08-31 07:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45592000
2021-08-31 07:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45593000
2021-08-31 07:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45594000
2021-08-31 07:28:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45595000
2021-08-31 07:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.a.i.s.m.MySqlCdcTargetPosition(reachedTargetPosition):103 - {} - Signalling close because record's binlog tableJ : mysql-bin-changelog.164150 , position : 71517 is after target tableJ : mysql-bin-changelog.164150 , target position : 71313
2021-08-31 07:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.e.EmbeddedEngine(stop):996 - {} - Stopping the embedded engine
2021-08-31 07:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.e.EmbeddedEngine(stop):1004 - {} - Waiting for PT5M for connector to stop
2021-08-31 07:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.c.BaseSourceTask(stop):192 - {} - Stopping down connector
2021-08-31 07:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.MySqlConnectorTask(doStop):453 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Stopping MySQL connector task
2021-08-31 07:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.ChainedReader(stop):121 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - ChainedReader: Stopping the binlog reader
2021-08-31 07:28:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:28:04 �[32mINFO�[m i.d.c.m.AbstractReader(stop):140 - {dbz.connectorContext=task, dbz.connectorName=myConnector, dbz.connectorType=MySQL} - Discarding 5375 unsent record(s) due to the connector shutting down
2021-08-31 07:43:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45596000
2021-08-31 07:43:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45597000
2021-08-31 07:43:05 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45598000
2021-08-31 07:43:05 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):223 - Records read: 45599000
2021-08-31 07:43:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:43:05 �[32mINFO�[m i.a.i.s.m.MySqlCdcStateHandler(saveState):61 - {} - debezium state: {.....
2021-08-31 07:43:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:43:05 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):200 - {} - Closing database connection pool.
2021-08-31 07:43:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:43:05 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):202 - {} - Closed database connection pool.
2021-08-31 07:43:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:43:05 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.source.mysql.MySqlSource
2021-08-31 07:43:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 07:43:05 �[32mINFO�[m i.a.i.s.m.MySqlSource(main):251 - {} - completed source: class io.airbyte.integrations.source.mysql.MySqlSource

The logs above are associated with a sync that fails. What i should be expected after that last line is this: (logs from a successful sync)

snippet from a successful sync with the same source + destination connectors
2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:34 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.source.mysql.MySqlSource
2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:34 �[32mINFO�[m i.a.i.s.m.MySqlSource(main):251 - {} - completed source: class io.airbyte.integrations.source.mysql.MySqlSource
2021-08-31 01:55:36 INFO () DefaultReplicationWorker(run):141 - Source thread complete.
2021-08-31 01:55:36 INFO () DefaultReplicationWorker(run):142 - Waiting for destination thread to join.
2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:36 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 - {} - Airbyte message consumer: succeeded.
2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:36 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(close):212 - {} - executing on success close procedure.
2021-08-31 01:55:36 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:36 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 62
2021-08-31 01:55:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:38 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 237
2021-08-31 01:55:39 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:39 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 1442
2021-08-31 01:55:41 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:41 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 48
2021-08-31 01:55:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:42 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 77
2021-08-31 01:55:44 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:44 �[32mINFO�[m i.a.i.d.s.SnowflakeSqlOperations(insertRecords):56 - {} - actual size of batch: 21
2021-08-31 01:55:45 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-08-31 01:55:45 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):176 - {} - Finalizing tables in destination started for 12 streams
...

Steps to Reproduce

These aren't scientific, but Description of the schema of the table. It has 60m rows, amounting to about 1GB of data on the DB server.

  • fails to work in CDC
  • but confirming it works in non-CDC (appears to conflate to 17gb in the job status even though schema is all integers - see schema provided)

Screen Shot 2021-09-23 at 12 12 22 am

So presumably the repro steps would be

  1. Create a DB with the same schema and fill it up with that many records
  2. Run a sync and wait for the logs to just stop
  3. profile the connector/java process to understand where CPU time is going

Implementation hints

We should:

  1. Reproduce the issue
  2. understand why it's happening
  3. Figure out the right solution. please reach out to @sherifnada as needed for rubberducking.
@danieldiamond danieldiamond added the type/bug Something isn't working label Aug 31, 2021
@danieldiamond
Copy link
Contributor Author

Just tried the same connection with MySQL 0.4.3 and it works

@Phlair
Copy link
Contributor

Phlair commented Sep 2, 2021

breaking change likely from this PR , could also therefore be affecting CDC on MSSQL & Postgres in the same way.

@subodh1810
Copy link
Contributor

weird! am not sure how my PR could have caused this.

@danieldiamond
Copy link
Contributor Author

Related?

#4322

@subodh1810
Copy link
Contributor

I spent some time trying to reproduce it and I dont think its happening cause of my changed from this PR #5600

Currently we dont handle the scenario when the worker dies. If a the worker dies then then our source and destination containers would just hang. I could not reproduce why the worker dies but perhaps its because of resource exhaustion. As part of this issue we should handle the scenario when the worker dies and what should happen to the source and destination containers in that case.

@cgardens I would like to have your thoughts on this. Few things that we can do is the source and destination containers should check if the parent worker exists or not, if parent worker doesnt exist, the source and destination containers should self kill.

@jrhizor
Copy link
Contributor

jrhizor commented Sep 7, 2021

We actually handle this explicitly for the Kubernetes case by having another container in the connector pod that kills it if it can't reach back to the worker pod (which is serving a heartbeat signal).

It's a bit harder in docker-compose since we'd need more of a "supervisor" container to manage killing other pods (it'd need to keep track of relevant pods and handle the heartbeat checking).

@danieldiamond
Copy link
Contributor Author

To confirm, the solution here is to kill source and destination containers? Because that doesn't seem like a viable fix if this happens every time ie never move onto the normalisation phase

@subodh1810
Copy link
Contributor

@danieldiamond agreed! we will be taking this up and releasing the correct fix

@davinchia davinchia added the priority/medium Medium priority label Sep 9, 2021
@danieldiamond
Copy link
Contributor Author

danieldiamond commented Sep 10, 2021

CONTAINER ID        NAME                  CPU %               MEM USAGE / LIMIT     MEM %               NET I/O             BLOCK I/O           PIDS
d60c85d16526        thirsty_brahmagupta   100.90%             1.405GiB / 15.38GiB   9.14%               0B / 0B             231MB / 751MB       35
98d821ed12e5        angry_elion           0.07%               1.044GiB / 15.38GiB   6.79%               0B / 0B             227MB / 0B          132
3f4995606fc7        airbyte-db            0.09%               62.71MiB / 15.38GiB   0.40%               54.4MB / 1.13GB     53.1MB / 209kB      41
0c2f8f3d8d4f        airbyte-scheduler     0.04%               302.2MiB / 15.38GiB   1.92%               975MB / 12.2MB      277MB / 0B          39
c1433fbc0188        airbyte-server        23.27%              480.7MiB / 15.38GiB   3.05%               139MB / 1.14GB      320MB / 110MB       45
a07ab1e72be0        airbyte-webapp        0.02%               5.172MiB / 15.38GiB   0.03%               1.15GB / 1.15GB     27MB / 24.6kB       5
a64a5cde0ddb        airbyte-worker        0.06%               851.5MiB / 15.38GiB   5.41%               3.96MB / 3.73MB     417MB / 0B          119
7d6f14def198        airbyte-temporal      1.07%               61.82MiB / 15.38GiB   0.39%               27.5MB / 45.4MB     399MB / 24.6kB      16
^C
$ docker ps -a
CONTAINER ID        IMAGE                                  COMMAND                  CREATED             STATUS                   PORTS                                                                      NAMES
d60c85d16526        airbyte/source-mysql:0.4.4             "/airbyte/base.sh re…"   2 hours ago         Up 2 hours                                                                                          thirsty_brahmagupta
98d821ed12e5        airbyte/destination-snowflake:0.3.14   "/airbyte/base.sh wr…"   2 hours ago         Up 2 hours                                                                                          angry_elion
3f4995606fc7        airbyte/db:0.29.17-alpha               "docker-entrypoint.s…"   4 hours ago         Up 4 hours               5432/tcp                                                                   airbyte-db
0c2f8f3d8d4f        airbyte/scheduler:0.29.17-alpha        "/bin/bash -c bin/${…"   4 hours ago         Up 4 hours                                                                                          airbyte-scheduler
c1433fbc0188        airbyte/server:0.29.17-alpha           "/bin/bash -c bin/${…"   4 hours ago         Up 4 hours               8000/tcp, 0.0.0.0:8001->8001/tcp                                           airbyte-server
a07ab1e72be0        airbyte/webapp:0.29.17-alpha           "/docker-entrypoint.…"   4 hours ago         Up 4 hours               0.0.0.0:8000->80/tcp                                                       airbyte-webapp
8a59f335d32d        airbyte/init:0.29.17-alpha             "/bin/sh -c './scrip…"   4 hours ago         Exited (0) 4 hours ago                                                                              init
a64a5cde0ddb        airbyte/worker:0.29.17-alpha           "/bin/bash -c bin/${…"   4 hours ago         Up 4 hours                                                                                          airbyte-worker
7d6f14def198        temporalio/auto-setup:1.7.0            "/entrypoint.sh /bin…"   8 days ago          Up 4 hours               6933-6935/tcp, 6939/tcp, 7234-7235/tcp, 7239/tcp, 0.0.0.0:7233->7233/tcp   airbyte-temporal

Airbyte version: 0.29.17-alpha
OS Version / Instance: AWS EC2
Deployment: Docker
Source Connector and version: MySQL 0.4.4
Destination Connector and version: Snowflake 0.3.14

@subodh1810
Running with 0.4.4 where the worker dies issue occurs. Here the worker is still alive but the sync is hanging.

2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):200 - {} - Closing database connection pool.
2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.s.r.AbstractRelationalDbSource(lambda$read$2):202 - {} - Closed database connection pool.
2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.source.mysql.MySqlSource
2021-09-10 03:24:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):73 - 2021-09-10 03:24:01 INFO i.a.i.s.m.MySqlSource(main):251 - {} - completed source: class io.airbyte.integrations.source.mysql.MySqlSource

@danieldiamond
Copy link
Contributor Author

danieldiamond commented Sep 10, 2021

tl;dr confirming Worker container is still alive but getting stuck before providing DefaultReplicationWorker(run):141 - Source thread complete

and that this issue is occurring for both MySQL 0.4.3 and 0.4.4

@danieldiamond
Copy link
Contributor Author

danieldiamond commented Sep 11, 2021

More updates. This could be a resource issue or I lucked out on a random try but I just doubled the instance to m5.2xlarge and the larger connector that consistently failed has now worked with source 0.4.4

@danieldiamond danieldiamond changed the title Worker dies causing source and destination containers to hang Worker does not finish off job. Source and Destination containers hang Sep 12, 2021
@danieldiamond
Copy link
Contributor Author

danieldiamond commented Sep 12, 2021

So I think I've ruled out it being a resource issue as I've tried another connector with the m5.2xlarge (after the previous one successfully completed) and it hangs.

What is interesting though is that in this hanging state (where source container is at ~100% CPU and worker container is at ~0.04% CPU) I run docker restart airbyte-worker, which then triggers the COPY INTO commands in snowflake but the actual job doesn't change state. Still hanging even though the _AIRBYTE_RAW tables were created

@danieldiamond
Copy link
Contributor Author

Update: I attempted to retry this connection (MySQL CDC for one table that is 60m rows and ~5gb data in source - although successful sync shows ~17gb in job sync status when trying it as STANDARD).
Anyways, it appears to push past the hanging stage! 🎉 but then I run into another issue, which spits out this error repeatedly to no end

2021-09-26 02:45:03 WARN () GrpcRetryer(retryWithResult):152 - Retrying after failure
io.grpc.StatusRuntimeException: UNAVAILABLE: Network closed for unknown reason
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.getWorkflowExecutionHistory(WorkflowServiceGrpc.java:2638) ~[temporal-serviceclient-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.lambda$getInstanceCloseEvent$1(WorkflowExecutionUtils.java:256) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.GrpcRetryer.retryWithResult(GrpcRetryer.java:127) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.getInstanceCloseEvent(WorkflowExecutionUtils.java:244) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:132) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:346) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:328) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178) ~[temporal-sdk-1.0.4.jar:?]
	at com.sun.proxy.$Proxy38.run(Unknown Source) ~[?:?]
	at io.airbyte.workers.temporal.TemporalClient.lambda$submitSync$3(TemporalClient.java:124) ~[io.airbyte-airbyte-workers-0.29.22-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalClient.execute(TemporalClient.java:144) ~[io.airbyte-airbyte-workers-0.29.22-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalClient.submitSync(TemporalClient.java:123) ~[io.airbyte-airbyte-workers-0.29.22-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory.lambda$createSupplier$0(TemporalWorkerRunFactory.java:67) ~[io.airbyte.airbyte-scheduler-app-0.29.22-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:65) [io.airbyte.airbyte-scheduler-app-0.29.22-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:43) [io.airbyte.airbyte-scheduler-app-0.29.22-alpha.jar:?]
	at io.airbyte.commons.concurrency.LifecycledCallable.execute(LifecycledCallable.java:114) [io.airbyte-airbyte-commons-0.29.22-alpha.jar:?]
	at io.airbyte.commons.concurrency.LifecycledCallable.call(LifecycledCallable.java:98) [io.airbyte-airbyte-commons-0.29.22-alpha.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]

@danieldiamond
Copy link
Contributor Author

Updates:
I successful sync a 60M row table that I was previously having this issue with. but now attempting to try the entire schema (~250M rows) and we're back at the hanging issue

MySQL 0.4.8
Snowflake 0.3.16
CDC
AWS S3 Staging
0.30.20-alpha

@joshuataylor
Copy link

joshuataylor commented Nov 18, 2021

Seeing the same thing for Postgres -> Snowflake with v0.32.1-alpha:

2021-11-18 08:42:25 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 74243000
�[35mdestination�[0m - 2021-11-18 08:42:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:26 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 46 containing 10.01 MB]
�[35mdestination�[0m - 2021-11-18 08:42:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:26 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 47 containing 10.01 MB]
�[35mdestination�[0m - 2021-11-18 08:42:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:26 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 48 containing 10.01 MB]
�[35mdestination�[0m - 2021-11-18 08:42:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-18 08:42:27 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to s3bucketname/69e1d112-3730-46b9-b8f1-0505a4d675b4/public/mytable_00000 with id Cu_bOmfrl...it7f84ja_]: Finished uploading [Part number 49 containing 10.01 MB]
2021-11-18 08:57:58 WARN () ActivityExecutionContextImpl(doHeartBeat):153 - Heartbeat failed
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 6.648743300s. [closed=[], open=[[remote_addr=airbyte-temporal/172.18.0.4:7233]]]
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.recordActivityTaskHeartbeat(WorkflowServiceGrpc.java:2710) ~[temporal-serviceclient-1.0.4.jar:?]
	at io.temporal.internal.sync.ActivityExecutionContextImpl.sendHeartbeatRequest(ActivityExecutionContextImpl.java:203) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.ActivityExecutionContextImpl.doHeartBeat(ActivityExecutionContextImpl.java:147) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.ActivityExecutionContextImpl.heartbeat(ActivityExecutionContextImpl.java:108) ~[temporal-sdk-1.0.4.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:46) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:216) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
�[35mdestination�[0m - 2021-11-18 08:57:58 ERROR () LineGobbler(voidCall):82 - /airbyte/javabase.sh: line 12:     9 Killed                  /airbyte/bin/"$APPLICATION" "$@"
2021-11-18 08:57:58 WARN () GrpcRetryer(retryWithResult):152 - Retrying after failure
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 120.999879060s. [closed=[], open=[[remote_addr=airbyte-temporal/172.18.0.4:7233]]]
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.getWorkflowExecutionHistory(WorkflowServiceGrpc.java:2638) ~[temporal-serviceclient-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.lambda$getInstanceCloseEvent$1(WorkflowExecutionUtils.java:256) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.GrpcRetryer.retryWithResult(GrpcRetryer.java:127) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.getInstanceCloseEvent(WorkflowExecutionUtils.java:244) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:132) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:346) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:328) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178) ~[temporal-sdk-1.0.4.jar:?]
	at com.sun.proxy.$Proxy38.run(Unknown Source) ~[?:?]
	at io.airbyte.workers.temporal.TemporalClient.lambda$submitSync$3(TemporalClient.java:110) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalClient.execute(TemporalClient.java:131) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalClient.submitSync(TemporalClient.java:109) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory.lambda$createSupplier$0(TemporalWorkerRunFactory.java:51) ~[io.airbyte.airbyte-scheduler-app-0.32.1-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:51) [io.airbyte.airbyte-scheduler-app-0.32.1-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:22) [io.airbyte.airbyte-scheduler-app-0.32.1-alpha.jar:?]
	at io.airbyte.commons.concurrency.LifecycledCallable.execute(LifecycledCallable.java:94) [io.airbyte-airbyte-commons-0.32.1-alpha.jar:?]
	at io.airbyte.commons.concurrency.LifecycledCallable.call(LifecycledCallable.java:78) [io.airbyte-airbyte-commons-0.32.1-alpha.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-18 09:43:23 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$3):203 - Running sync worker cancellation...
2021-11-18 09:43:23 INFO () DefaultReplicationWorker(cancel):250 - Cancelling replication worker...
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(cancel):258 - Cancelling destination...
2021-11-18 09:43:33 INFO () DefaultAirbyteDestination(cancel):120 - Attempting to cancel destination process...
2021-11-18 09:43:33 INFO () DefaultAirbyteDestination(cancel):125 - Destination process exists, cancelling...
2021-11-18 09:43:33 INFO () DefaultAirbyteDestination(cancel):127 - Cancelled destination process!
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(cancel):265 - Cancelling source...
2021-11-18 09:43:33 INFO () DefaultAirbyteSource(cancel):128 - Attempting to cancel source process...
2021-11-18 09:43:33 INFO () DefaultAirbyteSource(cancel):133 - Source process exists, cancelling...
2021-11-18 09:43:33 WARN () LineGobbler(voidCall):86 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job.
2021-11-18 09:43:33 INFO () DefaultAirbyteSource(cancel):135 - Cancelled source process!
2021-11-18 09:43:33 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$3):207 - Interrupting worker thread...
2021-11-18 09:43:33 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$3):210 - Cancelling completable future...
2021-11-18 09:43:33 INFO () TemporalAttemptExecution(get):137 - Stopping cancellation check scheduling...
2021-11-18 09:43:33 WARN () CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):49 - Job either timeout-ed or was cancelled.
2021-11-18 09:43:33 WARN () POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=73f4cfbc-0bc5-3073-9135-8409eb33ec1b, activityType=Replicate, attempt=1
java.util.concurrent.CancellationException: null
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2468) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:213) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:48) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:216) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-18 09:43:33 ERROR () DefaultReplicationWorker(run):128 - Sync worker failed.
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Broken pipe
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
	at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:120) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
	Suppressed: io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:122) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:101) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:832) [?:?]
	Suppressed: java.io.IOException: Stream closed
		at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:442) ~[?:?]
		at java.io.OutputStream.write(OutputStream.java:162) ~[?:?]
		at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
		at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242) ~[?:?]
		at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:321) ~[?:?]
		at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:325) ~[?:?]
		at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:159) ~[?:?]
		at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251) ~[?:?]
		at java.io.BufferedWriter.flush(BufferedWriter.java:257) ~[?:?]
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.notifyEndOfStream(DefaultAirbyteDestination.java:93) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:106) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:101) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.RuntimeException: java.io.IOException: Broken pipe
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:214) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
	... 1 more
Caused by: java.io.IOException: Broken pipe
	at java.io.FileOutputStream.writeBytes(Native Method) ~[?:?]
	at java.io.FileOutputStream.write(FileOutputStream.java:347) ~[?:?]
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
	at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242) ~[?:?]
	at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:312) ~[?:?]
	at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:290) ~[?:?]
	at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:131) ~[?:?]
	at java.io.OutputStreamWriter.write(OutputStreamWriter.java:208) ~[?:?]
	at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?]
	at java.io.BufferedWriter.write(BufferedWriter.java:233) ~[?:?]
	at java.io.Writer.write(Writer.java:249) ~[?:?]
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:85) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:33) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:199) ~[io.airbyte-airbyte-workers-0.32.1-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
	... 1 more
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@6ce23dad[status=cancelled,recordsSynced=74243233,bytesSynced=35433328953,startTime=1637214273337,endTime=1637228613362]
2021-11-18 09:43:33 INFO () DefaultReplicationWorker(run):159 - Source output at least one state message
2021-11-18 09:43:33 WARN () DefaultReplicationWorker(run):172 - State capture: No state retained.

@joshuataylor
Copy link

Running with 51866fd seems to work and actually finish the job 👍.

@danieldiamond
Copy link
Contributor Author

I've created a clean k8s deployment of airbyte with one connector, one table. And this still occurs. I'm not sure how/if any user is using airbyte to migrate large tables with CDC

Airbyte version: 0.35.12-alpha
OS Version / Instance: AWS EC2
Deployment: Kubernetes
Source Connector and version: MySQL 0.5.1
Destination Connector and version: Snowflake 0.4.8
Severity: Critical

@danieldiamond
Copy link
Contributor Author

here are four seperate users with various sources/destinations that seem to be experiencing this issue:

@VitaliiMaltsev
Copy link
Contributor

Airbyte version: 0.35.15-alpha
Deployment: Cloud
Source Connector and version: MySQL Strict Encrypt 0.1.3 (CDC)
Destination Connector and version: Snowflake 0.4.8 (Internal Staging and Copy S3)

@danieldiamond I did some lengthy experiments in cloud using the same database schema as in the description. The number of rows is 300 million. In none of the cases did the connection lead to a hang, but at the same time I confirm the failed connection with errors

  1. https://cloud.airbyte.io/workspaces/071c149e-9bca-4a99-84c6-3c9b02f566a7/connections/70a22d3a-dceb-46a9-bf7d-14ce7157b8bb

Caused by: java.net.SocketException: Broken pipe at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420) ~[?:?] at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) ~[?:?] at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826) ~[?:?] at java.net.Socket$SocketOutputStream.write(Socket.java:1035) ~[?:?] at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) ~[?:?] at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) ~[?:?] at java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) ~[?:?] at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?] at java.io.BufferedWriter.write(BufferedWriter.java:233) ~[?:?] at java.io.Writer.write(Writer.java:249) ~[?:?] at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:90) ~[io.airbyte-airbyte-workers-0.35.15-alpha.jar:?] at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$5(DefaultReplicationWorker.java:277) ~[io.airbyte-airbyte-workers-0.35.15-alpha.jar:?] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] ... 1 more

  1. https://cloud.airbyte.io/workspaces/071c149e-9bca-4a99-84c6-3c9b02f566a7/connections/35ff4ee3-e84a-4150-abcb-c72089e52ff2

Caused by: java.net.SocketException: Connection reset by peer at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420) ~[?:?] at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) ~[?:?] at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826) ~[?:?] at java.net.Socket$SocketOutputStream.write(Socket.java:1035) ~[?:?] at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) ~[?:?] at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) ~[?:?] at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) ~[?:?] at java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) ~[?:?]
3) https://cloud.airbyte.io/workspaces/071c149e-9bca-4a99-84c6-3c9b02f566a7/connections/a509a140-589b-4771-8850-359b416264ff

Caused by: java.net.SocketException: Connection timed out at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420) at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826) at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035) at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) at java.base/java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) at java.base/java.io.BufferedWriter.write(BufferedWriter.java:233) at java.base/java.io.Writer.write(Writer.java:249) at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:90) at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$5(DefaultReplicationWorker.java:277) ... 4 more

@danieldiamond
Copy link
Contributor Author

@VitaliiMaltsev are you suggesting that it doesn't work in the cloud either?

separately, rereading earlier comments: @jrhizor @subodh1810

We actually handle this explicitly for the Kubernetes case by having another container in the connector pod that kills it if it can't reach back to the worker pod (which is serving a heartbeat signal).

It's a bit harder in docker-compose since we'd need more of a "supervisor" container to manage killing other pods (it'd need to keep track of relevant pods and handle the heartbeat checking).

i've tried this now with k8s and an unreasonable amount of resources to ensure this isn't a resource/memory issue. the job still hangs after reading all the records. then is the issue still the situation where the worker dies (if this is explicitly handled that in k8s, then that might not be working as expected)

@danieldiamond
Copy link
Contributor Author

FYI @VitaliiMaltsev this seems to be the exact same issue, with a lot more context if you're interested
#8218

@VitaliiMaltsev
Copy link
Contributor

All syncs permanently failed with latest master branch
io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:252) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:209) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:193) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:151) ~[temporal-sdk-1.6.0.jar:?] at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73) ~[temporal-sdk-1.6.0.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: java.util.concurrent.CancellationException at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478) ~[?:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:217) ~[io.airbyte-airbyte-workers-0.35.16-alpha.jar:?] at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:53) ~[io.airbyte-airbyte-workers-0.35.16-alpha.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:220) ~[io.airbyte-airbyte-workers-0.35.16-alpha.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?] ... 3 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.