Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterable Sync timeout/cancelation due to Memory Error/Heartbeat Failure #7558

Closed
pprithvi opened this issue Nov 2, 2021 · 27 comments · Fixed by #7591, #7780 or #8091
Closed

Iterable Sync timeout/cancelation due to Memory Error/Heartbeat Failure #7558

pprithvi opened this issue Nov 2, 2021 · 27 comments · Fixed by #7591, #7780 or #8091

Comments

@pprithvi
Copy link

pprithvi commented Nov 2, 2021

Enviroment

  • Airbyte version: 0.30.18-alpha
  • OS Version / Instance: Linux , AWS EC2
  • Deployment: Docker
  • Source Connector and version: Iterable - 0.1.9
  • Destination Connector and version: AWS S3 0.1.10
  • Severity: Critical
  • Step where error happened: Sync Job

Current Behavior

Sync is automatically canceling/timed out while trying to bring in data from email_click and email_send tables. Memory error - and heartbeat failure was found in logs. EC2 instance does not show any spikes and is functioning normal.

Expected Behavior

Sync should initialize and bring in data.

Log Attempt 1 -

2021-10-29 06:00:29 INFO () WorkerRun(call):42 - Executing worker wrapper. Airbyte version: 0.30.18-alpha
2021-10-29 06:00:29 INFO () TemporalAttemptExecution(get):94 - Executing worker wrapper. Airbyte version: 0.30.18-alpha
2021-10-29 06:00:29 WARN () Databases(createPostgresDatabaseWithRetry):38 - Waiting for database to become available...
2021-10-29 06:00:29 INFO () JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready...
2021-10-29 06:00:29 INFO () Databases(createPostgresDatabaseWithRetry):55 - Database available!
2021-10-29 06:00:29 INFO () DefaultReplicationWorker(run):82 - start sync worker. job id: 1186 attempt id: 0
2021-10-29 06:00:29 INFO () DefaultReplicationWorker(run):91 - configured sync modes: {null.email_send=full_refresh - append}
2021-10-29 06:00:29 INFO () DefaultAirbyteDestination(start):58 - Running destination...
2021-10-29 06:00:29 INFO () LineGobbler(voidCall):65 - Checking if airbyte/destination-******** exists...
2021-10-29 06:00:29 INFO () LineGobbler(voidCall):65 - airbyte/destination-******** was found locally.
2021-10-29 06:00:29 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1186/0 --network host --log-driver none airbyte/destination-******** write --config destination_config.json --catalog destination_catalog.json
2021-10-29 06:00:29 INFO () LineGobbler(voidCall):65 - Checking if airbyte/source-******** exists...
2021-10-29 06:00:29 INFO () LineGobbler(voidCall):65 - airbyte/source-******** was found locally.
2021-10-29 06:00:29 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1186/0 --network host --log-driver none airbyte/source-******** read --config source_config.json --catalog source_catalog.json --state input_state.json
2021-10-29 06:00:29 INFO () DefaultReplicationWorker(run):119 - Waiting for source thread to join.
2021-10-29 06:00:29 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):226 - Destination output thread started.
2021-10-29 06:00:29 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):190 - Replication thread started.
2021-10-29 06:00:31 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:31 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.destination.s3.S3Destination
2021-10-29 06:00:31 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:31 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2021-10-29 06:00:31 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:31 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: WRITE
2021-10-29 06:00:31 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:31 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2021-10-29 06:00:31 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:31 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-10-29 06:00:31 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:31 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-10-29 06:00:31 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:31 �[32mINFO�[m i.a.i.d.s.S3FormatConfigs(getS3FormatConfig):42 - {} - S3 format config: {"format_type":"JSONL"}
2021-10-29 06:00:32 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:32 �[32mINFO�[m i.a.i.d.s.j.S3JsonlWriter(<init>):70 - {} - Full S3 path for stream 'email_send': ***********
2021-10-29 06:00:32 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:00:32 �[32mINFO�[m a.m.s.StreamTransferManager(getMultiPartOutputStreams):329 - {} - Initiated multipart upload to *********** with full ID *************
2021-10-29 06:52:17 INFO () DefaultAirbyteStreamFactory(internalLog):90 - Starting syncing SourceIterable
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "*********", line 33, in <module>
2021-10-29 06:52:17 INFO () DefaultAirbyteStreamFactory(internalLog):90 - Syncing stream: email_send 
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     launch(source, sys.argv[1:])
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 117, in launch
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     for message in source_entrypoint.run(parsed_args):
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 108, in run
2021-10-29 06:52:17 ERROR () DefaultAirbyteStreamFactory(internalLog):88 - Encountered an exception while reading stream SourceIterable
Traceback (most recent call last):
  File "**********", line 110, in read
    logger=logger, stream_instance=stream_instance, configured_stream=configured_stream, connector_state=connector_state
  File "**********", line 135, in _read_stream
    for record in record_iterator:
  File "**********", line 181, in _read_full_refresh
    for record in records:
  File "**********", line 228, in read_records
    response = self._send_request(request)
  File "**********", line 94, in retry
    ret = target(*args, **kwargs)
  File "**********", line 94, in retry
    ret = target(*args, **kwargs)
  File "**********", line 193, in _send_request
    response: requests.Response = self._session.send(request)
  File "**********", line 697, in send
    r.content
  File "**********", line 831, in content
    self._content = b''.join(self.iter_content(CONTENT_CHUNK_SIZE)) or b''
MemoryError

2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     for message in generator:
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 114, in read
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     raise e
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 110, in read
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     logger=logger, stream_instance=stream_instance, configured_stream=configured_stream, connector_state=connector_state
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 135, in _read_stream
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     for record in record_iterator:
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 181, in _read_full_refresh
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     for record in records:
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 228, in read_records
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     response = self._send_request(request)
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 94, in retry
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     ret = target(*args, **kwargs)
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 94, in retry
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     ret = target(*args, **kwargs)
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 193, in _send_request
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     response: requests.Response = self._session.send(request)
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 697, in send
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     r.content
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -   File "**********", line 831, in content
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 -     self._content = b''.join(self.iter_content(CONTENT_CHUNK_SIZE)) or b''
2021-10-29 06:52:17 ERROR () LineGobbler(voidCall):65 - MemoryError
2021-10-29 06:52:18 INFO () DefaultReplicationWorker(run):121 - Source thread complete.
2021-10-29 06:52:18 INFO () DefaultReplicationWorker(run):122 - Waiting for destination thread to join.
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 - {} - Airbyte message consumer: succeeded.
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):118 - {} - Uploading remaining data for stream 'email_send'.
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[33mWARN�[m a.m.s.MultiPartOutputStream(close):160 - {} - [MultipartOutputStream for parts 1 - 10000] is already closed
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[32mINFO�[m a.m.s.StreamTransferManager(complete):395 - {} - [Manager uploading to *********** with id *********]: Completed
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):120 - {} - Upload completed for stream 'email_send'.
2021-10-29 06:52:18 INFO () JsonSchemaValidator(test):56 - JSON schema validation failed. 
errors: $: null found, object expected
2021-10-29 06:52:18 ERROR () DefaultAirbyteStreamFactory(lambda$create$1):63 - Validation failed: null
2021-10-29 06:52:18 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 06:52:18 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.destination.s3.S3Destination
2021-10-29 06:52:18 INFO () DefaultReplicationWorker(run):124 - Destination thread complete.
2021-10-29 06:52:18 ERROR () DefaultReplicationWorker(run):128 - Sync worker failed.
io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:115) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:126) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:145) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
2021-10-29 06:52:18 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@6ff70434[status=failed,recordsSynced=0,bytesSynced=0,startTime=1635487229532,endTime=1635490338698]
2021-10-29 06:52:18 INFO () DefaultReplicationWorker(run):161 - Source did not output any state messages
2021-10-29 06:52:18 WARN () DefaultReplicationWorker(run):169 - State capture: No new state, falling back on input state: io.airbyte.config.State@380a8029[state={}]
2021-10-29 06:52:18 INFO () TemporalAttemptExecution(get):115 - Stopping cancellation check scheduling...
2021-10-29 06:52:18 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):177 - sync summary: io.airbyte.config.StandardSyncOutput@15016098[standardSyncSummary=io.airbyte.config.StandardSyncSummary@523c37ff[status=failed,recordsSynced=0,bytesSynced=0,startTime=1635487229532,endTime=1635490338698],state=io.airbyte.config.State@380a8029[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@1f3b546a[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@f54adfa[stream=io.airbyte.protocol.models.AirbyteStream@114f1928[name=email_send,jsonSchema={"type":["null","object"],"properties":{"email":{"type":["null","string"]},"channelId":{"type":["null","integer"]},"contentId":{"type":["null","integer"]},"createdAt":{"type":["null","string"],"format":"date-time"},"messageId":{"type":["null","string"]},"campaignId":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"messageBusId":{"type":["null","string"]},"messageTypeId":{"type":["null","integer"]},"transactionalData":{"type":["null","object"],"properties":{"id":{"type":["null","string"]},"sku":{"type":["null","string"]},"url":{"type":["null","string"]},"name":{"type":["null","string"]},"email":{"type":["null","string"]},"price":{"type":["null","integer"]},"handle":{"type":["null","string"]},"vendor":{"type":["null","string"]},"discount":{"type":["null","integer"]},"imageUrl":{"type":["null","string"]},"createdAt":{"type":["null","string"],"format":"date-time"},"eventName":{"type":["null","string"]},"inventory":{"type":["null","integer"]},"campaignId":{"type":["null","integer"]},"categories":{"type":["null","array"],"items":{}},"product_id":{"type":["null","string"]},"templateId":{"type":["null","integer"]},"description":{"type":["null","string"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"product_type":{"type":["null","string"]},"eventUpdatedAt":{"type":["null","string"],"format":"date-time"},"compare_at_price":{"type":["null","number"]}}}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[createdAt],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[createdAt],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}]]

Attempt 2 :

2021-10-29 07:47:24 INFO () WorkerRun(call):42 - Executing worker wrapper. Airbyte version: 0.30.18-alpha
2021-10-29 07:47:24 INFO () TemporalAttemptExecution(get):94 - Executing worker wrapper. Airbyte version: 0.30.18-alpha
2021-10-29 07:47:24 WARN () Databases(createPostgresDatabaseWithRetry):38 - Waiting for database to become available...
2021-10-29 07:47:24 INFO () JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready...
2021-10-29 07:47:24 INFO () Databases(createPostgresDatabaseWithRetry):55 - Database available!
2021-10-29 07:47:24 INFO () DefaultReplicationWorker(run):82 - start sync worker. job id: 1186 attempt id: 2
2021-10-29 07:47:24 INFO () DefaultReplicationWorker(run):91 - configured sync modes: {null.email_send=full_refresh - append}
2021-10-29 07:47:24 INFO () DefaultAirbyteDestination(start):58 - Running destination...
2021-10-29 07:47:24 INFO () LineGobbler(voidCall):65 - Checking if airbyte/destination-********* exists...
2021-10-29 07:47:25 INFO () LineGobbler(voidCall):65 - airbyte/destination-********* was found locally.
2021-10-29 07:47:25 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1186/2 --network host --log-driver none airbyte/destination-********* write --config destination_config.json --catalog destination_catalog.json
2021-10-29 07:47:25 INFO () LineGobbler(voidCall):65 - Checking if airbyte/source-********* exists...
2021-10-29 07:47:25 INFO () LineGobbler(voidCall):65 - airbyte/source-********* was found locally.
2021-10-29 07:47:25 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/1186/2 --network host --log-driver none airbyte/source-********* read --config source_config.json --catalog source_catalog.json --state input_state.json
2021-10-29 07:47:25 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):226 - Destination output thread started.
2021-10-29 07:47:25 INFO () DefaultReplicationWorker(run):119 - Waiting for source thread to join.
2021-10-29 07:47:25 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):190 - Replication thread started.
2021-10-29 07:47:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:26 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.destination.s3.S3Destination
2021-10-29 07:47:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:26 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2021-10-29 07:47:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:26 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: WRITE
2021-10-29 07:47:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:26 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2021-10-29 07:47:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:26 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-10-29 07:47:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:26 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-10-29 07:47:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:26 �[32mINFO�[m i.a.i.d.s.S3FormatConfigs(getS3FormatConfig):42 - {} - S3 format config: {"format_type":"JSONL"}
2021-10-29 07:47:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:27 �[32mINFO�[m i.a.i.d.s.j.S3JsonlWriter(<init>):70 - {} - Full S3 path for stream 'email_send': *********
2021-10-29 07:47:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-29 07:47:27 �[32mINFO�[m a.m.s.StreamTransferManager(getMultiPartOutputStreams):329 - {} - Initiated multipart upload to ********* with full ID .**********
2021-10-29 08:11:14 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$2):179 - Running sync worker cancellation...
2021-10-29 08:11:14 INFO () DefaultReplicationWorker(cancel):250 - Cancelling replication worker...
2021-10-29 08:11:24 INFO () DefaultReplicationWorker(cancel):258 - Cancelling destination...
2021-10-29 08:11:24 INFO () DefaultAirbyteDestination(cancel):114 - Attempting to cancel destination process...
2021-10-29 08:11:24 INFO () DefaultAirbyteDestination(cancel):119 - Destination process exists, cancelling...
2021-10-29 08:11:25 INFO () DefaultAirbyteDestination(cancel):121 - Cancelled destination process!
2021-10-29 08:11:25 INFO () DefaultReplicationWorker(cancel):265 - Cancelling source...
2021-10-29 08:11:25 WARN () LineGobbler(voidCall):68 - airbyte-destination gobbler IOException: Stream closed. Typically happens when cancelling a job.
2021-10-29 08:11:25 INFO () DefaultAirbyteSource(cancel):121 - Attempting to cancel source process...
2021-10-29 08:11:25 INFO () DefaultAirbyteSource(cancel):126 - Source process exists, cancelling...
2021-10-29 08:11:25 INFO () DefaultReplicationWorker(run):121 - Source thread complete.
2021-10-29 08:11:25 INFO () DefaultReplicationWorker(run):122 - Waiting for destination thread to join.
2021-10-29 08:11:25 INFO () DefaultReplicationWorker(run):124 - Destination thread complete.
2021-10-29 08:11:25 ERROR () DefaultReplicationWorker(run):128 - Sync worker failed.
io.airbyte.workers.WorkerException: Source process exit with code 143. This warning is normal if the job was cancelled.
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:115) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:126) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:145) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
	Suppressed: io.airbyte.workers.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:108) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:101) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:145) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:832) [?:?]
2021-10-29 08:11:25 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@52c3c33b[status=cancelled,recordsSynced=0,bytesSynced=0,startTime=1635493644989,endTime=1635495085421]
2021-10-29 08:11:25 INFO () DefaultReplicationWorker(run):161 - Source did not output any state messages
2021-10-29 08:11:25 WARN () DefaultReplicationWorker(run):169 - State capture: No new state, falling back on input state: io.airbyte.config.State@2f536649[state={}]
2021-10-29 08:11:25 INFO () TemporalAttemptExecution(get):115 - Stopping cancellation check scheduling...
2021-10-29 08:11:25 INFO () DefaultAirbyteSource(cancel):128 - Cancelled source process!
2021-10-29 08:11:25 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$2):183 - Interrupting worker thread...
2021-10-29 08:11:25 INFO () TemporalAttemptExecution(lambda$getCancellationChecker$2):186 - Cancelling completable future...
2021-10-29 08:11:25 WARN () CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):51 - Job either timeout-ed or was cancelled.
2021-10-29 08:11:25 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):177 - sync summary: io.airbyte.config.StandardSyncOutput@3238405f[standardSyncSummary=io.airbyte.config.StandardSyncSummary@44ba25c3[status=cancelled,recordsSynced=0,bytesSynced=0,startTime=1635493644989,endTime=1635495085421],state=io.airbyte.config.State@2f536649[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@5f434a72[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@46bc94fe[stream=io.airbyte.protocol.models.AirbyteStream@36761ece[name=email_send,jsonSchema={"type":["null","object"],"properties":{"email":{"type":["null","string"]},"channelId":{"type":["null","integer"]},"contentId":{"type":["null","integer"]},"createdAt":{"type":["null","string"],"format":"date-time"},"messageId":{"type":["null","string"]},"campaignId":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"messageBusId":{"type":["null","string"]},"messageTypeId":{"type":["null","integer"]},"transactionalData":{"type":["null","object"],"properties":{"id":{"type":["null","string"]},"sku":{"type":["null","string"]},"url":{"type":["null","string"]},"name":{"type":["null","string"]},"email":{"type":["null","string"]},"price":{"type":["null","integer"]},"handle":{"type":["null","string"]},"vendor":{"type":["null","string"]},"discount":{"type":["null","integer"]},"imageUrl":{"type":["null","string"]},"createdAt":{"type":["null","string"],"format":"date-time"},"eventName":{"type":["null","string"]},"inventory":{"type":["null","integer"]},"campaignId":{"type":["null","integer"]},"categories":{"type":["null","array"],"items":{}},"product_id":{"type":["null","string"]},"templateId":{"type":["null","integer"]},"description":{"type":["null","string"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"product_type":{"type":["null","string"]},"eventUpdatedAt":{"type":["null","string"],"format":"date-time"},"compare_at_price":{"type":["null","number"]}}}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[createdAt],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[createdAt],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}]]

Attempt 3 : 

2021-10-22 02:54:54 INFO () WorkerRun(call):42 - Executing worker wrapper. Airbyte version: 0.30.18-alpha
2021-10-22 02:54:54 INFO () TemporalAttemptExecution(get):94 - Executing worker wrapper. Airbyte version: 0.30.18-alpha
2021-10-22 02:54:54 WARN () Databases(createPostgresDatabaseWithRetry):38 - Waiting for database to become available...
2021-10-22 02:54:54 INFO () JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready...
2021-10-22 02:54:54 INFO () Databases(createPostgresDatabaseWithRetry):55 - Database available!
2021-10-22 02:54:54 INFO () DefaultReplicationWorker(run):82 - start sync worker. job id: 970 attempt id: 0
2021-10-22 02:54:54 INFO () DefaultReplicationWorker(run):91 - configured sync modes: {null.email_send=incremental - append}
2021-10-22 02:54:54 INFO () DefaultAirbyteDestination(start):58 - Running destination...
2021-10-22 02:54:54 INFO () LineGobbler(voidCall):65 - Checking if airbyte/destination-******** exists...
2021-10-22 02:54:54 INFO () LineGobbler(voidCall):65 - airbyte/destination-******** was found locally.
2021-10-22 02:54:54 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/970/0 --network host --log-driver none airbyte/destination-******** write --config destination_config.json --catalog destination_catalog.json
2021-10-22 02:54:54 INFO () LineGobbler(voidCall):65 - Checking if airbyte/source-******** exists...
2021-10-22 02:54:55 INFO () LineGobbler(voidCall):65 - airbyte/source-******** was found locally.
2021-10-22 02:54:55 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/970/0 --network host --log-driver none airbyte/source-******** read --config source_config.json --catalog source_catalog.json --state input_state.json
2021-10-22 02:54:55 INFO () DefaultReplicationWorker(run):119 - Waiting for source thread to join.
2021-10-22 02:54:55 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):190 - Replication thread started.
2021-10-22 02:54:55 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):226 - Destination output thread started.
2021-10-22 02:54:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:56 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.destination.s3.S3Destination
2021-10-22 02:54:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:56 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2021-10-22 02:54:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:56 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: WRITE
2021-10-22 02:54:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:56 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2021-10-22 02:54:57 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:57 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-10-22 02:54:57 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:57 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-10-22 02:54:57 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:57 �[32mINFO�[m i.a.i.d.s.S3FormatConfigs(getS3FormatConfig):42 - {} - S3 format config: {"format_type":"Parquet","page_size_kb":1024,"block_size_mb":128,"compression_codec":"SNAPPY","dictionary_encoding":true,"max_padding_size_mb":8,"dictionary_page_size_kb":1024}
2021-10-22 02:54:57 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-10-22 02:54:57 �[33mWARN�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):78 - {} - Airbyte message consumer: failed.
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - Exception in thread "main" java.lang.IllegalStateException: Field categories.items has no type
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getTypes(JsonToAvroSchemaConverter.java:82)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getNonNullTypes(JsonToAvroSchemaConverter.java:70)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getNullableFieldTypes(JsonToAvroSchemaConverter.java:223)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getSingleFieldType(JsonToAvroSchemaConverter.java:186)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.lambda$getNullableFieldTypes$4(JsonToAvroSchemaConverter.java:226)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getNullableFieldTypes(JsonToAvroSchemaConverter.java:234)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getAvroSchema(JsonToAvroSchemaConverter.java:162)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getSingleFieldType(JsonToAvroSchemaConverter.java:196)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.lambda$getNullableFieldTypes$4(JsonToAvroSchemaConverter.java:226)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getNullableFieldTypes(JsonToAvroSchemaConverter.java:234)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter.getAvroSchema(JsonToAvroSchemaConverter.java:162)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory.create(ProductionWriterFactory.java:58)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.S3Consumer.startTracked(S3Consumer.java:103)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.start(FailureTrackingAirbyteMessageConsumer.java:54)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:162)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:148)
2021-10-22 02:54:57 ERROR () LineGobbler(voidCall):65 - 	at io.airbyte.integrations.destination.s3.S3Destination.main(S3Destination.java:49)
2021-10-22 03:35:05 WARN () ActivityExecutionContextImpl(doHeartBeat):153 - Heartbeat failed
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9.890651415s. [closed=[], open=[[remote_addr=airbyte-temporal/172.18.0.5:7233]]]
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.recordActivityTaskHeartbeat(WorkflowServiceGrpc.java:2710) ~[temporal-serviceclient-1.0.4.jar:?]
	at io.temporal.internal.sync.ActivityExecutionContextImpl.sendHeartbeatRequest(ActivityExecutionContextImpl.java:203) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.ActivityExecutionContextImpl.doHeartBeat(ActivityExecutionContextImpl.java:147) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.ActivityExecutionContextImpl.heartbeat(ActivityExecutionContextImpl.java:108) ~[temporal-sdk-1.0.4.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:48) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:192) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
2021-10-22 03:37:28 WARN () GrpcRetryer(retryWithResult):152 - Retrying after failure
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 120.998125495s. [closed=[], open=[[remote_addr=airbyte-temporal/172.18.0.5:7233]]]
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
	at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.getWorkflowExecutionHistory(WorkflowServiceGrpc.java:2638) ~[temporal-serviceclient-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.lambda$getInstanceCloseEvent$1(WorkflowExecutionUtils.java:256) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.GrpcRetryer.retryWithResult(GrpcRetryer.java:127) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.getInstanceCloseEvent(WorkflowExecutionUtils.java:244) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:132) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:346) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:328) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270) ~[temporal-sdk-1.0.4.jar:?]
	at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178) ~[temporal-sdk-1.0.4.jar:?]
	at com.sun.proxy.$Proxy38.run(Unknown Source) ~[?:?]
	at io.airbyte.workers.temporal.TemporalClient.lambda$submitSync$3(TemporalClient.java:104) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalClient.execute(TemporalClient.java:124) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalClient.submitSync(TemporalClient.java:103) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory.lambda$createSupplier$0(TemporalWorkerRunFactory.java:47) ~[io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:45) [io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
	at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:23) [io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
	at io.airbyte.commons.concurrency.LifecycledCallable.execute(LifecycledCallable.java:94) [io.airbyte-airbyte-commons-0.30.18-alpha.jar:?]
	at io.airbyte.commons.concurrency.LifecycledCallable.call(LifecycledCallable.java:78) [io.airbyte-airbyte-commons-0.30.18-alpha.jar:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
2021-10-22 03:41:03 INFO () DefaultReplicationWorker(run):121 - Source thread complete.
2021-10-22 03:41:03 INFO () DefaultReplicationWorker(run):122 - Waiting for destination thread to join.
2021-10-22 03:41:03 INFO () DefaultReplicationWorker(run):124 - Destination thread complete.
2021-10-22 03:41:03 ERROR () DefaultReplicationWorker(run):128 - Sync worker failed.
io.airbyte.workers.WorkerException: Source process exit with code 137. This warning is normal if the job was cancelled.
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:115) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:126) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:145) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
	Suppressed: io.airbyte.workers.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:108) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:101) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:145) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:832) [?:?]
2021-10-22 03:41:03 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@58ee97cb[status=failed,recordsSynced=0,bytesSynced=0,startTime=1634871294910,endTime=1634874063206]
2021-10-22 03:41:03 INFO () DefaultReplicationWorker(run):161 - Source did not output any state messages
2021-10-22 03:41:03 WARN () DefaultReplicationWorker(run):169 - State capture: No new state, falling back on input state: io.airbyte.config.State@6094205d[state={}]
2021-10-22 03:41:03 INFO () TemporalAttemptExecution(get):115 - Stopping cancellation check scheduling...
2021-10-22 03:41:03 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):177 - sync summary: io.airbyte.config.StandardSyncOutput@37e41c3e[standardSyncSummary=io.airbyte.config.StandardSyncSummary@28dab60f[status=failed,recordsSynced=0,bytesSynced=0,startTime=1634871294910,endTime=1634874063206],state=io.airbyte.config.State@6094205d[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@7ed8378e[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@24383a3e[stream=io.airbyte.protocol.models.AirbyteStream@33aac76b[name=email_send,jsonSchema={"type":["null","object"],"properties":{"email":{"type":["null","string"]},"channelId":{"type":["null","integer"]},"contentId":{"type":["null","integer"]},"createdAt":{"type":["null","string"],"format":"date-time"},"messageId":{"type":["null","string"]},"campaignId":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"messageBusId":{"type":["null","string"]},"messageTypeId":{"type":["null","integer"]},"transactionalData":{"type":["null","object"],"properties":{"id":{"type":["null","string"]},"sku":{"type":["null","string"]},"url":{"type":["null","string"]},"name":{"type":["null","string"]},"email":{"type":["null","string"]},"price":{"type":["null","integer"]},"handle":{"type":["null","string"]},"vendor":{"type":["null","string"]},"discount":{"type":["null","integer"]},"imageUrl":{"type":["null","string"]},"createdAt":{"type":["null","string"],"format":"date-time"},"eventName":{"type":["null","string"]},"inventory":{"type":["null","integer"]},"campaignId":{"type":["null","integer"]},"categories":{"type":["null","array"],"items":{}},"product_id":{"type":["null","string"]},"templateId":{"type":["null","integer"]},"description":{"type":["null","string"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"product_type":{"type":["null","string"]},"eventUpdatedAt":{"type":["null","string"],"format":"date-time"},"compare_at_price":{"type":["null","number"]}}}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[createdAt],sourceDefinedPrimaryKey=[[id]],namespace=<null>,additionalProperties={}],syncMode=incremental,cursorField=[createdAt],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}]]

logs-1186-2 Edit.txt
logs-970-0 Edit.txt
logs-1186-0 Edit.txt

@pprithvi pprithvi added the type/bug Something isn't working label Nov 2, 2021
@avida
Copy link
Contributor

avida commented Nov 2, 2021

Hi @tredencegithub, Looks like it fails when trying to download all send email data with export request.

What is the EC2 instance type/size? Does it make sense to temporary instance memory size while we are fixing this?

@avida avida self-assigned this Nov 2, 2021
@VasylLazebnyk VasylLazebnyk added this to the Connectors 2021-11-12 milestone Nov 2, 2021
@pprithvi
Copy link
Author

pprithvi commented Nov 3, 2021

@avida The EC2 instance type is t3.xlarge which should be able to easily accommodate the volume as I do not see any spike in the cpu utilization while syncing. Also have selected 8GB for worker and 4GB for server using JAVA_OPTS. I see that the issue has been added to the current sprint. Is there an estimated timeline on it?

@avida
Copy link
Contributor

avida commented Nov 3, 2021

@tredencegithub Yeah t3.xlarge is more than enough. Going to submit fix today.

@pprithvi
Copy link
Author

pprithvi commented Nov 3, 2021

Much appreciated.

@avida
Copy link
Contributor

avida commented Nov 3, 2021

@tredencegithub Done, please try to update iterable connector to 0.1.10, it should help

@avida avida reopened this Nov 3, 2021
@pprithvi
Copy link
Author

pprithvi commented Nov 3, 2021

@avida it error out again at 49M records.
error.txt

Heres the error log :

2021-11-03 19:51:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 49195000
2021-11-03 19:51:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 49196000
2021-11-03 19:51:04 ERROR () DefaultAirbyteStreamFactory(internalLog):88 - Encountered an exception while reading stream SourceIterable
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 697, in _update_chunk_length
self.chunk_left = int(line, 16)

ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 438, in _error_catcher
yield
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 764, in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 701, in _update_chunk_length
raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 758, in generate
for chunk in self.raw.stream(chunk_size, decode_content=True):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 572, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 793, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 455, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 106, in read
internal_config=internal_config,
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in _read_stream
for record in record_iterator:
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 181, in _read_incremental
for record_counter, record_data in enumerate(records, start=1):
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/streams/http/http.py", line 353, in read_records
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
File "/airbyte/integration_code/source_iterable/api.py", line 112, in parse_response
for obj in response.iter_lines():
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 802, in iter_lines
for chunk in self.iter_content(chunk_size=chunk_size, decode_unicode=decode_unicode):
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 761, in generate
raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 697, in _update_chunk_length
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - self.chunk_left = int(line, 16)
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - ValueError: invalid literal for int() with base 16: b''
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 -
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - During handling of the above exception, another exception occurred:
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 -
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 438, in _error_catcher
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - yield
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 764, in read_chunked
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - self._update_chunk_length()
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 701, in _update_chunk_length
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - raise InvalidChunkLength(self, line)
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 -
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - During handling of the above exception, another exception occurred:
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 -
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 758, in generate
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for chunk in self.raw.stream(chunk_size, decode_content=True):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 572, in stream
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for line in self.read_chunked(amt, decode_content=decode_content):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 793, in read_chunked
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - self._original_response.close()
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/contextlib.py", line 130, in exit
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - self.gen.throw(type, value, traceback)
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 455, in _error_catcher
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - raise ProtocolError("Connection broken: %r" % e, e)
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 -
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - During handling of the above exception, another exception occurred:
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 -
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/airbyte/integration_code/main.py", line 13, in
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - launch(source, sys.argv[1:])
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/entrypoint.py", line 108, in launch
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for message in source_entrypoint.run(parsed_args):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/entrypoint.py", line 99, in run
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for message in generator:
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 110, in read
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - raise e
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 106, in read
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - internal_config=internal_config,
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in _read_stream
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for record in record_iterator:
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 181, in _read_incremental
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for record_counter, record_data in enumerate(records, start=1):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/streams/http/http.py", line 353, in read_records
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/airbyte/integration_code/source_iterable/api.py", line 112, in parse_response
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for obj in response.iter_lines():
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 802, in iter_lines
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - for chunk in self.iter_content(chunk_size=chunk_size, decode_unicode=decode_unicode):
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 761, in generate
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - raise ChunkedEncodingError(e)
2021-11-03 19:51:04 ERROR () LineGobbler(voidCall):65 - requests.exceptions.ChunkedEncodingError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
2021-11-03 19:51:05 INFO () DefaultReplicationWorker(run):121 - Source thread complete.
2021-11-03 19:51:05 INFO () DefaultReplicationWorker(run):122 - Waiting for destination thread to join.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 - {} - Airbyte message consumer: succeeded.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):118 - {} - Uploading remaining data for stream 'channels'.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[33mWARN�[m a.m.s.MultiPartOutputStream(close):160 - {} - [MultipartOutputStream for parts 1 - 10000] is already closed
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(complete):367 - {} - [Manager uploading to channels/2021_11_03_1635946042216_0.jsonl with id x: Uploading leftover stream [Part number 1 containing 0.00 MB]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to channels/2021_11_03_1635946042216_0.jsonl with id x: Finished uploading [Part number 1 containing 0.00 MB]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(complete):395 - {} - [Manager uploading to /channels/2021_11_03_1635946042216_0.jsonl with id x: Completed
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):120 - {} - Upload completed for stream 'channels'.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):118 - {} - Uploading remaining data for stream 'email_click'.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[33mWARN�[m a.m.s.MultiPartOutputStream(close):160 - {} - [MultipartOutputStream for parts 1 - 10000] is already closed
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to /email_click/2021_11_03_1635946042216_0.jsonl with id x: Finished uploading [Part number 121 containing 8.52 MB]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(complete):395 - {} - [Manager uploading to email_click/2021_11_03_1635946042216_0.jsonl with id x: Completed
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):120 - {} - Upload completed for stream 'email_click'.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):118 - {} - Uploading remaining data for stream 'campaigns'.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[33mWARN�[m a.m.s.MultiPartOutputStream(close):160 - {} - [MultipartOutputStream for parts 1 - 10000] is already closed
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(complete):367 - {} - [Manager uploading to campaigns/2021_11_03_1635946042216_0.jsonl with id x: Uploading leftover stream [Part number 1 containing 1.26 MB]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to campaigns/2021_11_03_1635946042216_0.jsonl with id x: Finished uploading [Part number 1 containing 1.26 MB]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(complete):395 - {} - [Manager uploading to iterable/campaigns/2021_11_03_1635946042216_0.jsonl with id x: Completed
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):120 - {} - Upload completed for stream 'campaigns'.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):118 - {} - Uploading remaining data for stream 'email_open'.
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[33mWARN�[m a.m.s.MultiPartOutputStream(close):160 - {} - [MultipartOutputStream for parts 1 - 10000] is already closed
2021-11-03 19:51:05 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:05 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to email_open/2021_11_03_1635946042216_0.jsonl with id x Finished uploading [Part number 2859 containing 8.42 MB]
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m a.m.s.StreamTransferManager(complete):395 - {} - [Manager uploading to email_open/2021_11_03_1635946042216_0.jsonl with id x: Completed
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):120 - {} - Upload completed for stream 'email_open'.
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):118 - {} - Uploading remaining data for stream 'email_send'.
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[33mWARN�[m a.m.s.MultiPartOutputStream(close):160 - {} - [MultipartOutputStream for parts 1 - 10000] is already closed
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to email_send/2021_11_03_1635946042216_0.jsonl with id x: Finished uploading [Part number 2552 containing 8.13 MB]
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m a.m.s.StreamTransferManager(complete):395 - {} - [Manager uploading to email_send/2021_11_03_1635946042216_0.jsonl with id x: Completed
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):120 - {} - Upload completed for stream 'email_send'.
2021-11-03 19:51:06 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):231 - state in DefaultReplicationWorker from Destination: io.airbyte.protocol.models.AirbyteMessage@20f04a4[type=STATE,log=,spec=,connectionStatus=,catalog=,record=,state=io.airbyte.protocol.models.AirbyteStateMessage@468ef039[data={"email_click":{"createdAt":"2021-11-03 13:09:43"},"email_open":{"createdAt":"2021-11-03 13:37:11"}},additionalProperties={}],additionalProperties={}]
2021-11-03 19:51:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-03 19:51:06 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):153 - {} - Completed integration: io.airbyte.integrations.destination.s3.S3Destination
2021-11-03 19:51:07 INFO () DefaultReplicationWorker(run):124 - Destination thread complete.
2021-11-03 19:51:07 ERROR () DefaultReplicationWorker(run):128 - Sync worker failed.
io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:115) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:126) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:145) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-03 19:51:07 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@11f3acdb[status=failed,recordsSynced=49196831,bytesSynced=23557993238,startTime=1635946039318,endTime=1635969067053]
2021-11-03 19:51:07 INFO () DefaultReplicationWorker(run):159 - Source output at least one state message
2021-11-03 19:51:07 INFO () DefaultReplicationWorker(run):165 - State capture: Updated state to: Optional[io.airbyte.config.State@5f3c91a9[state={"email_click":{"createdAt":"2021-11-03 13:09:43"},"email_open":{"createdAt":"2021-11-03 13:37:11"}}]]
2021-11-03 19:51:07 INFO () TemporalAttemptExecution(get):115 - Stopping cancellation check scheduling...
2021-11-03 19:51:07 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):177 - sync summary: io.airbyte.config.StandardSyncOutput@609a14f0[standardSyncSummary=io.airbyte.config.StandardSyncSummary@386d1d03[status=failed,recordsSynced=49196831,bytesSynced=23557993238,startTime=1635946039318,endTime=1635969067053],state=io.airbyte.config.State@5f3c91a9[state={"email_click":{"createdAt":"2021-11-03 13:09:43"},"email_open":{"createdAt":"2021-11-03 13:37:11"}}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@74918347[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@5459ab3c[stream=io.airbyte.protocol.models.AirbyteStream@64d408d3[name=campaigns,jsonSchema={"type":["null","object"],"properties":{"id":{"type":["null","integer"]},"name":{"type":["null","string"]},"type":{"type":["null","string"]},"labels":{"type":["null","array"],"items":{}},"endedAt":{"type":["null","integer"]},"listIds":{"type":["null","array"],"items":{}},"startAt":{"type":["null","integer"]},"sendSize":{"type":["null","number"]},"createdAt":{"type":["null","integer"]},"updatedAt":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"workflowId":{"type":["null","number"]},"campaignState":{"type":["null","string"]},"messageMedium":{"type":["null","string"]},"createdByUserId":{"type":["null","string"]},"updatedByUserId":{"type":["null","string"]},"suppressionListIds":{"type":["null","array"],"items":{}},"recurringCampaignId":{"type":["null","number"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@3401944f[stream=io.airbyte.protocol.models.AirbyteStream@4cf2b233[name=channels,jsonSchema={"type":["null","object"],"properties":{"id":{"type":["null","number"]},"name":{"type":["null","string"]},"channelType":{"type":["null","string"]},"messageMedium":{"type":["null","string"]}}},supportedSyncModes=[full_refresh],sourceDefinedCursor=,defaultCursorField=[],sourceDefinedPrimaryKey=[[id]],namespace=,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@556be0da[stream=io.airbyte.protocol.models.AirbyteStream@11bde834[name=email_click,jsonSchema={"type":["null","object"],"properties":{"ip":{"type":["null","string"]},"url":{"type":["null","string"]},"city":{"type":["null","string"]},"email":{"type":["null","string"]},"region":{"type":["null","string"]},"country":{"type":["null","string"]},"contentId":{"type":["null","integer"]},"createdAt":{"type":["null","string"],"format":"date-time"},"hrefIndex":{"type":["null","integer"]},"messageId":{"type":["null","string"]},"userAgent":{"type":["null","string"]},"campaignId":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"userAgentDevice":{"type":["null","string"]}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[createdAt],sourceDefinedPrimaryKey=[],namespace=,additionalProperties={}],syncMode=incremental,cursorField=[createdAt],destinationSyncMode=append,primaryKey=[],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@29a6a5a0[stream=io.airbyte.protocol.models.AirbyteStream@709cf98d[name=email_open,jsonSchema={"type":["null","object"],"properties":{"ip":{"type":["null","string"]},"city":{"type":["null","string"]},"email":{"type":["null","string"]},"region":{"type":["null","string"]},"country":{"type":["null","string"]},"createdAt":{"type":["null","string"],"format":"date-time"},"messageId":{"type":["null","string"]},"userAgent":{"type":["null","string"]},"campaignId":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"userAgentDevice":{"type":["null","string"]}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[createdAt],sourceDefinedPrimaryKey=[],namespace=,additionalProperties={}],syncMode=incremental,cursorField=[createdAt],destinationSyncMode=append,primaryKey=[],additionalProperties={}], io.airbyte.protocol.models.ConfiguredAirbyteStream@7afbba74[stream=io.airbyte.protocol.models.AirbyteStream@4f91157[name=email_send,jsonSchema={"type":["null","object"],"properties":{"email":{"type":["null","string"]},"channelId":{"type":["null","integer"]},"contentId":{"type":["null","integer"]},"createdAt":{"type":["null","string"],"format":"date-time"},"messageId":{"type":["null","string"]},"campaignId":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"messageBusId":{"type":["null","string"]},"messageTypeId":{"type":["null","integer"]},"transactionalData":{"type":["null","object"],"properties":{"id":{"type":["null","string"]},"sku":{"type":["null","string"]},"url":{"type":["null","string"]},"name":{"type":["null","string"]},"email":{"type":["null","string"]},"price":{"type":["null","integer"]},"handle":{"type":["null","string"]},"vendor":{"type":["null","string"]},"discount":{"type":["null","integer"]},"imageUrl":{"type":["null","string"]},"createdAt":{"type":["null","string"],"format":"date-time"},"eventName":{"type":["null","string"]},"inventory":{"type":["null","integer"]},"campaignId":{"type":["null","integer"]},"categories":{"type":["null","array"],"items":{}},"product_id":{"type":["null","string"]},"templateId":{"type":["null","integer"]},"description":{"type":["null","string"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"product_type":{"type":["null","string"]},"eventUpdatedAt":{"type":["null","string"],"format":"date-time"},"compare_at_price":{"type":["null","number"]}}}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[createdAt],sourceDefinedPrimaryKey=[],namespace=,additionalProperties={}],syncMode=incremental,cursorField=[createdAt],destinationSyncMode=append,primaryKey=[[id]],additionalProperties={}]],additionalProperties={}]]

@avida
Copy link
Contributor

avida commented Nov 4, 2021

Good news, looks like memory issue fixed. But it also introduced another defect: connector trying to fetch large amount of data with single request. Iterable API use chunked encoding to transfer big piece of analytics report.
According to logs you have synced ~22 GB of data which takes a lot of time and server have closed connection and lead to ChunkedEncodingError exception. As a solution I can break large request into smaller ones based on start_date parameter.

On other hand I cant produce a lot of requests cause /api/export/data.json request has a limit of 4 per minute.

@tredencegithub What start_date parameter are you using?

@pprithvi
Copy link
Author

pprithvi commented Nov 8, 2021

@avida Am trying to sync from 01-06-2019. The problem is, i can only select start date and not end date. Hence if i select 01-06-2019, i have to sync till today and do not have an option to break it into parts as i can only change the start date, the end date will always be current date. Kindly do advise on this.

@avida
Copy link
Contributor

avida commented Nov 8, 2021

@tredencegithub Thanks, I suppose breaking this period into 90 days ranges will be enough. I will change connector's code to do it automatically.

@pprithvi
Copy link
Author

pprithvi commented Nov 8, 2021

@avida , Just ran a nsync again today and it looks like its failing again.

2021-11-08 14:10:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 81479000
2021-11-08 14:10:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 81480000
2021-11-08 14:10:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-08 14:10:07 WARN i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):78 - {} - Airbyte message consumer: failed.
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - Exception in thread "main" java.lang.IndexOutOfBoundsException: This stream was allocated the part numbers from 1 (inclusive) to 10001 (exclusive)and it has gone beyond the end.
2021-11-08 14:10:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-08 14:10:07 WARN i.a.i.d.s.w.BaseS3Writer(close):114 - {} - Failure detected. Aborting upload of stream 'email_send'...
2021-11-08 14:10:07 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-08 14:10:07 INFO a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.putCurrentStream(MultiPartOutputStream.java:121)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.checkSize(MultiPartOutputStream.java:110)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.write(MultiPartOutputStream.java:143)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:321)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:325)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:159)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.BufferedWriter.flush(BufferedWriter.java:257)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.newLine(PrintWriter.java:568)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.println(PrintWriter.java:711)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.println(PrintWriter.java:822)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.jsonl.S3JsonlWriter.write(S3JsonlWriter.java:85)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.S3Consumer.acceptTracked(S3Consumer.java:133)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:66)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:167)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:148)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.S3Destination.main(S3Destination.java:49)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - Suppressed: java.lang.IndexOutOfBoundsException: This stream was allocated the part numbers from 1 (inclusive) to 10001 (exclusive)and it has gone beyond the end.
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.putCurrentStream(MultiPartOutputStream.java:121)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.close(MultiPartOutputStream.java:164)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:353)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:168)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.BufferedWriter.close(BufferedWriter.java:269)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.close(PrintWriter.java:415)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.jsonl.S3JsonlWriter.closeWhenFail(S3JsonlWriter.java:97)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.writer.BaseS3Writer.close(BaseS3Writer.java:115)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.S3Consumer.close(S3Consumer.java:139)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:82)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:161)
2021-11-08 14:10:07 ERROR () LineGobbler(voidCall):65 - ... 2 more

@pprithvi
Copy link
Author

pprithvi commented Nov 9, 2021

@avida Have you updated the code?

@avida
Copy link
Contributor

avida commented Nov 10, 2021

@tredencegithub Yep, please update iterable connector to 0.1.12 version

@PVMer
Copy link

PVMer commented Nov 11, 2021

@avida Thank you so much, was able to move 143M records with output as csv. Tring the same with json for better efficiency.

@PVMer
Copy link

PVMer commented Nov 12, 2021

@avida Hi Dmytro,
I have updated the connector as you had suggested and the synced worked when the out put was in csv format. I was able to move 140M records. But there was a parsing issue in the transactional_data column in the email_send table which is of object data type. Have to bring it in as JSON to that i can preserve data and un-nest it. While syncing it with output as JSON the sync fails at 81M. Here is the log for the same. 

2021-11-11 20:14:17 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 81329000
2021-11-11 20:14:18 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 81330000
2021-11-11 20:14:18 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 81331000
2021-11-11 20:14:19 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-11 20:14:19 �[33mWARN�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):78 - {} - Airbyte message consumer: failed.
2021-11-11 20:14:19 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-11 20:14:19 �[33mWARN�[m i.a.i.d.s.w.BaseS3Writer(close):114 - {} - Failure detected. Aborting upload of stream 'email_send'...
2021-11-11 20:14:19 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-11 20:14:19 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - Exception in thread "main" java.lang.IndexOutOfBoundsException: This stream was allocated the part numbers from 1 (inclusive) to 10001 (exclusive)and it has gone beyond the end.
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.putCurrentStream(MultiPartOutputStream.java:121)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.checkSize(MultiPartOutputStream.java:110)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.write(MultiPartOutputStream.java:143)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:321)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:325)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:159)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.BufferedWriter.flush(BufferedWriter.java:257)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.newLine(PrintWriter.java:568)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.println(PrintWriter.java:711)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.println(PrintWriter.java:822)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.jsonl.S3JsonlWriter.write(S3JsonlWriter.java:85)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.S3Consumer.acceptTracked(S3Consumer.java:133)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:66)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:167)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:148)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.S3Destination.main(S3Destination.java:49)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - Suppressed: java.lang.IndexOutOfBoundsException: This stream was allocated the part numbers from 1 (inclusive) to 10001 (exclusive)and it has gone beyond the end.
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.putCurrentStream(MultiPartOutputStream.java:121)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at alex.mojaki.s3upload.MultiPartOutputStream.close(MultiPartOutputStream.java:164)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:353)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:168)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.BufferedWriter.close(BufferedWriter.java:269)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at java.base/java.io.PrintWriter.close(PrintWriter.java:415)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.jsonl.S3JsonlWriter.closeWhenFail(S3JsonlWriter.java:97)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.writer.BaseS3Writer.close(BaseS3Writer.java:115)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.destination.s3.S3Consumer.close(S3Consumer.java:139)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:82)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:161)
2021-11-11 20:14:19 ERROR () LineGobbler(voidCall):65 - ... 2 more
2021-11-11 20:14:19 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 81332000

@avida
Copy link
Contributor

avida commented Nov 12, 2021

Hey @PVMer, happy that iterable source works now. Looks like this time there is an issues with s3 destination. I see you are using 0.1.10. Could you please update to latest 0.1.13 version. Not sure if it help but at least stacktrace would be releavant. @tuliren could you help please?

@avida avida reopened this Nov 12, 2021
@sherifnada
Copy link
Contributor

relevant issue: #6090

@PVMer could you follow along on that issue for the details of the fix? I'll close this issue, please reopen it if there are further issues in the Iterable connector

@pprithvi
Copy link
Author

@sherifnada @avida

I tried increasing the part size from 5mb to 20 mb so that it will not reach the 10000 part limit by aws. But got the below error while the source connector was chunking.

2021-11-15 21:33:35 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 34927000
2021-11-15 21:33:36 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 3492800
2021-11-15 21:33:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 34929000
2021-11-15 21:33:37 ERROR () DefaultAirbyteStreamFactory(internalLog):88 - Encountered an exception while reading stream SourceIterable
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 697, in _update_chunk_length
self.chunk_left = int(line, 16)
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 438, in _error_catcher
yield
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 764, in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 701, in _update_chunk_length
raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 758, in generate
for chunk in self.raw.stream(chunk_size, decode_content=True):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 572, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 793, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.7/contextlib.py", line 130, in exit
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 455, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 106, in read
internal_config=internal_config,
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in _read_stream
for record in record_iterator:
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 181, in _read_incremental
for record_counter, record_data in enumerate(records, start=1):
File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/streams/http/http.py", line 353, in read_records
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
File "/airbyte/integration_code/source_iterable/iterable_streams.py", line 132, in parse_response
for obj in response.iter_lines():
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 802, in iter_lines
for chunk in self.iter_content(chunk_size=chunk_size, decode_unicode=decode_unicode):
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 761, in generate
raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 697, in _update_chunk_length
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - self.chunk_left = int(line, 16)
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - ValueError: invalid literal for int() with base 16: b''
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 -
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - During handling of the above exception, another exception occurred:
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 -
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 438, in _error_catcher
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - yield
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 764, in read_chunked
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - self._update_chunk_length()
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 701, in _update_chunk_length
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - raise InvalidChunkLength(self, line)
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 -
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - During handling of the above exception, another exception occurred:
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 -
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 758, in generate
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for chunk in self.raw.stream(chunk_size, decode_content=True):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 572, in stream
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for line in self.read_chunked(amt, decode_content=decode_content):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 793, in read_chunked
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - self._original_response.close()
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/contextlib.py", line 130, in exit
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - self.gen.throw(type, value, traceback)
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 455, in _error_catcher
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - raise ProtocolError("Connection broken: %r" % e, e)
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 -
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - During handling of the above exception, another exception occurred:
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 -
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - Traceback (most recent call last):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/airbyte/integration_code/main.py", line 13, in
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - launch(source, sys.argv[1:])
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/entrypoint.py", line 108, in launch
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for message in source_entrypoint.run(parsed_args):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/entrypoint.py", line 99, in run
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for message in generator:
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 110, in read
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - raise e
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 106, in read
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - internal_config=internal_config,
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in _read_stream
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for record in record_iterator:
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 181, in _read_incremental
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for record_counter, record_data in enumerate(records, start=1):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/streams/http/http.py", line 353, in read_records
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/airbyte/integration_code/source_iterable/iterable_streams.py", line 132, in parse_response
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for obj in response.iter_lines():
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 802, in iter_lines
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - for chunk in self.iter_content(chunk_size=chunk_size, decode_unicode=decode_unicode):
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 761, in generate
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - raise ChunkedEncodingError(e)
2021-11-15 21:33:37 ERROR () LineGobbler(voidCall):65 - requests.exceptions.ChunkedEncodingError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
2021-11-15 21:33:38 INFO () DefaultReplicationWorker(run):121 - Source thread complete.
2021-11-15 21:33:38 INFO () DefaultReplicationWorker(run):122 - Waiting for destination thread to join.
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):60 - {} - Airbyte message consumer: succeeded.
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):102 - {} - Uploading remaining data for stream 'email_send'.
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m a.m.s.MultiPartOutputStream(close):158 - {} - Called close() on [MultipartOutputStream for parts 1 - 10000]
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[33mWARN�[m a.m.s.MultiPartOutputStream(close):160 - {} - [MultipartOutputStream for parts 1 - 10000] is already closed
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m a.m.s.StreamTransferManager(uploadStreamPart):558 - {} - [Manager uploading to #/iterable/email_send/2021_11_15_1636984606446_0.jsonl with id ya5XcMO.2...RDhrmjoPn]: Finished uploading [Part number 1256 containing 13.97 MB]
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m a.m.s.StreamTransferManager(complete):395 - {} - [Manager uploading to #/iterable/email_send/2021_11_15_1636984606446_0.jsonl with id ya5XcMO.2...RDhrmjoPn]: Completed
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m i.a.i.d.s.w.BaseS3Writer(close):104 - {} - Upload completed for stream 'email_send'.
2021-11-15 21:33:38 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):231 - state in DefaultReplicationWorker from Destination: io.airbyte.protocol.models.AirbyteMessage@101a22e0[type=STATE,log=,spec=,connectionStatus=,catalog=,record=,state=io.airbyte.protocol.models.AirbyteStateMessage@1eddce93[data={"email_send":{"createdAt":"2020-11-21T23:56:55+00:00"}},additionalProperties={}],additionalProperties={}]
2021-11-15 21:33:38 INFO () DefaultAirbyteStreamFactory(lambda$create$0):53 - 2021-11-15 21:33:38 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):133 - {} - Completed integration: io.airbyte.integrations.destination.s3.S3Destination
2021-11-15 21:33:39 INFO () DefaultReplicationWorker(run):124 - Destination thread complete.
2021-11-15 21:33:39 ERROR () DefaultReplicationWorker(run):128 - Sync worker failed.
io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:115) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:126) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:32) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$1(TemporalAttemptExecution.java:145) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-15 21:33:39 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@2686a69a[status=failed,recordsSynced=34929609,bytesSynced=22457863686,startTime=1636984603258,endTime=1637012019107]
2021-11-15 21:33:39 INFO () DefaultReplicationWorker(run):159 - Source output at least one state message
2021-11-15 21:33:39 INFO () DefaultReplicationWorker(run):165 - State capture: Updated state to: Optional[io.airbyte.config.State@117a3667[state={"email_send":{"createdAt":"2020-11-21T23:56:55+00:00"}}]]
2021-11-15 21:33:39 INFO () TemporalAttemptExecution(get):115 - Stopping cancellation check scheduling...
2021-11-15 21:33:39 INFO () SyncWorkflow$ReplicationActivityImpl(replicate):177 - sync summary: io.airbyte.config.StandardSyncOutput@7d25f259[standardSyncSummary=io.airbyte.config.StandardSyncSummary@d88d5d7[status=failed,recordsSynced=34929609,bytesSynced=22457863686,startTime=1636984603258,endTime=1637012019107],state=io.airbyte.config.State@117a3667[state={"email_send":{"createdAt":"2020-11-21T23:56:55+00:00"}}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@4a3a8cd0[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@2b0ef3c7[stream=io.airbyte.protocol.models.AirbyteStream@796b6bc5[name=email_send,jsonSchema={"type":["null","object"],"properties":{"email":{"type":["null","string"]},"channelId":{"type":["null","integer"]},"contentId":{"type":["null","integer"]},"createdAt":{"type":["null","string"],"format":"date-time"},"messageId":{"type":["null","string"]},"campaignId":{"type":["null","integer"]},"templateId":{"type":["null","integer"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"messageBusId":{"type":["null","string"]},"messageTypeId":{"type":["null","integer"]},"transactionalData":{"type":["null","object"],"properties":{"id":{"type":["null","string"]},"sku":{"type":["null","string"]},"url":{"type":["null","string"]},"name":{"type":["null","string"]},"email":{"type":["null","string"]},"price":{"type":["null","integer"]},"handle":{"type":["null","string"]},"vendor":{"type":["null","string"]},"discount":{"type":["null","integer"]},"imageUrl":{"type":["null","string"]},"createdAt":{"type":["null","string"],"format":"date-time"},"eventName":{"type":["null","string"]},"inventory":{"type":["null","integer"]},"campaignId":{"type":["null","integer"]},"categories":{"type":["null","array"],"items":{}},"product_id":{"type":["null","string"]},"templateId":{"type":["null","integer"]},"description":{"type":["null","string"]},"itblInternal":{"type":["null","object"],"properties":{"documentCreatedAt":{"type":["null","string"],"format":"date-time"},"documentUpdatedAt":{"type":["null","string"],"format":"date-time"}}},"product_type":{"type":["null","string"]},"eventUpdatedAt":{"type":["null","string"],"format":"date-time"},"compare_at_price":{"type":["null","number"]}}}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[createdAt],sourceDefinedPrimaryKey=[],namespace=,additionalProperties={}],syncMode=incremental,cursorField=[createdAt],destinationSyncMode=append,primaryKey=[],additionalProperties={}]],additionalProperties={}]]

Please advice on this.

@sherifnada sherifnada reopened this Nov 16, 2021
@sherifnada sherifnada removed this from the Connectors 2021-11-12 milestone Nov 16, 2021
@sherifnada sherifnada added this to the Connectors Nov 26 2021 milestone Nov 16, 2021
@avida
Copy link
Contributor

avida commented Nov 17, 2021

@tredencegithub The problem is that you have so much data in your request that API closes connection prematurely. I assume it could happen any time if connection take too long.
There could be 2 options:

  1. Save state if connection broken and retry it with updated range.
  2. Add max date range option to spec so you could adjust it to your specific case.

Option 1 is preferable but I was reading docs and couldnt find that Iterable source guaranteed data to be sorted by datetime, so it could lead to lose of some records. Thats why I would prefer to go option 2. @sherifnada what do you think?

@avida
Copy link
Contributor

avida commented Nov 17, 2021

Sherif suggested option 3:
Use dynamic slice range and adjust it based on time needed for previous request. I like this idea, will start working on it.

@PVMer
Copy link

PVMer commented Nov 18, 2021

@avida Thank you for the update Dmytro.

@avida
Copy link
Contributor

avida commented Nov 22, 2021

@PVMer @tredencegithub Pull request merged, please update Iterable to 0.1.13 version. Hope it work now :)

@pprithvi
Copy link
Author

@avida Thank you, I have started the sync and will update you as it complete

@pprithvi
Copy link
Author

@avida It has errored out :( Says deadline Exceeded. I had increased part size to 20 MB from 5MB to make sure it will not hit the 10000 part limit. Output format is JSON.

2021-11-24 19:00:20 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 89342000
2021-11-24 20:10:45 WARN () GrpcRetryer(retryWithResult):152 - Retrying after failure
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: context deadline exceeded
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.getWorkflowExecutionHistory(WorkflowServiceGrpc.java:2638) ~[temporal-serviceclient-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.lambda$getInstanceCloseEvent$1(WorkflowExecutionUtils.java:256) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.GrpcRetryer.retryWithResult(GrpcRetryer.java:127) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getInstanceCloseEvent(WorkflowExecutionUtils.java:244) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:132) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:346) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:328) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178) ~[temporal-sdk-1.0.4.jar:?]
at com.sun.proxy.$Proxy38.run(Unknown Source) ~[?:?]
at io.airbyte.workers.temporal.TemporalClient.lambda$submitSync$3(TemporalClient.java:104) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.execute(TemporalClient.java:124) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.submitSync(TemporalClient.java:103) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory.lambda$createSupplier$0(TemporalWorkerRunFactory.java:47) ~[io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:45) [io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:23) [io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.execute(LifecycledCallable.java:94) [io.airbyte-airbyte-commons-0.30.18-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.call(LifecycledCallable.java:78) [io.airbyte-airbyte-commons-0.30.18-alpha.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-24 20:14:25 WARN () GrpcRetryer(retryWithResult):152 - Retrying after failure
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 120.998969249s. [closed=[], open=[[remote_addr=airbyte-temporal/172.18.0.6:7233]]]
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156) ~[grpc-stub-1.40.0.jar:1.40.0]
at io.temporal.api.workflowservice.v1.WorkflowServiceGrpc$WorkflowServiceBlockingStub.getWorkflowExecutionHistory(WorkflowServiceGrpc.java:2638) ~[temporal-serviceclient-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.lambda$getInstanceCloseEvent$1(WorkflowExecutionUtils.java:256) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.GrpcRetryer.retryWithResult(GrpcRetryer.java:127) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getInstanceCloseEvent(WorkflowExecutionUtils.java:244) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.common.WorkflowExecutionUtils.getWorkflowExecutionResult(WorkflowExecutionUtils.java:132) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:346) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowStubImpl.getResult(WorkflowStubImpl.java:328) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.startWorkflow(WorkflowInvocationHandler.java:315) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler$SyncWorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:270) ~[temporal-sdk-1.0.4.jar:?]
at io.temporal.internal.sync.WorkflowInvocationHandler.invoke(WorkflowInvocationHandler.java:178) ~[temporal-sdk-1.0.4.jar:?]
at com.sun.proxy.$Proxy38.run(Unknown Source) ~[?:?]
at io.airbyte.workers.temporal.TemporalClient.lambda$submitSync$3(TemporalClient.java:104) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.execute(TemporalClient.java:124) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalClient.submitSync(TemporalClient.java:103) ~[io.airbyte-airbyte-workers-0.30.18-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory.lambda$createSupplier$0(TemporalWorkerRunFactory.java:47) ~[io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:45) [io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
at io.airbyte.scheduler.app.worker_run.WorkerRun.call(WorkerRun.java:23) [io.airbyte.airbyte-scheduler-app-0.30.18-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.execute(LifecycledCallable.java:94) [io.airbyte-airbyte-commons-0.30.18-alpha.jar:?]
at io.airbyte.commons.concurrency.LifecycledCallable.call(LifecycledCallable.java:78) [io.airbyte-airbyte-commons-0.30.18-alpha.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]

@avida
Copy link
Contributor

avida commented Nov 25, 2021

@tredencegithub Looks like source works fine and there were some problems on destination side being killed by airbyte platform for not responding. @sherifnada @ChristopheDuong What should we do in this case? Forward issue to core team?

@sherifnada
Copy link
Contributor

@tredencegithub could you share the full sync log?

@PVMer
Copy link

PVMer commented Nov 30, 2021

@sherifnada @avida @ChristopheDuong Here is the full sync log as requested.

logs-2912-0 .txt

@sherifnada
Copy link
Contributor

sherifnada commented Dec 1, 2021

appears to be the same issue as in #4975 @PVMer please follow there for updates

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment