Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync job hanging after source job container has completed #8218

Closed
benmoriceau opened this issue Nov 23, 2021 · 15 comments
Closed

Sync job hanging after source job container has completed #8218

benmoriceau opened this issue Nov 23, 2021 · 15 comments

Comments

@benmoriceau
Copy link
Contributor

Enviroment

  • Airbyte version: 0.32.5, plus dev worker
  • OS Version / Instance: OpenShift Kubernetes Cluster
  • Deployment: Kubernetes via helm chart
  • Source Connector and version: source-db2 0.1.3
  • Destination Connector and version: destination-postgres 0.3.12
  • Severity: High
  • Step where error happened: Sync job

Current Behavior

With both source and destination set up, I start a manual sync job.

Both the source and destination containers start up in the cluster, and I see records being read and created on the destination side.
The source container completes successfully, but the job appears to hang. Some records are extracted successfully and inserted on the postgres side, but not all of them.
After this, the destination container and job keeps running, and the logs stop.

When I've created and exec'd into the source-db2 container and run the extraction manually, all records/json lines are outputted successfully. So the issue doesn't seem to be on the source. I don't get any error logs from the destination either. What I see is that the while (!cancelled.get() && !source.isFinished()) { call seems to block forever.

To try and debug this further I've also added some more logging into the DefaultReplicationWorker.java and DefaultAirbyteSource.java files too, where it seems the call to final var isEmpty = !messageIterator.hasNext(); is blocking.

DefaultReplicationWorker.java I added an overloaded method to the `DefaultAirbyteSource.java` just to log a bit more there too.
  private static Runnable getReplicationRunnable(final AirbyteSource source,
                                                 final AirbyteDestination destination,
                                                 final AtomicBoolean cancelled,
                                                 final AirbyteMapper mapper,
                                                 final MessageTracker sourceMessageTracker,
                                                 final Map<String, String> mdc) {
    return () -> {
      MDC.setContextMap(mdc);
      LOGGER.info("Replication thread started.");
      var recordsRead = 0;
      try {
        while (!cancelled.get() && !source.isFinished()) {
          if (recordsRead > 51000) {
            LOGGER.info("Records read: {}, cancelled: {}, isFinished: {}", recordsRead, cancelled.get(), source.isFinished());
          }

          final Optional<AirbyteMessage> messageOptional = source.attemptRead();
          if (recordsRead > 51000) {
            LOGGER.info("AttemptRead");
          }
          if (messageOptional.isPresent()) {
            final AirbyteMessage message = mapper.mapMessage(messageOptional.get());
            if (recordsRead > 51000) {
              LOGGER.info("Message: {}", message.toString());
            }

            sourceMessageTracker.accept(message);
            destination.accept(message);
            recordsRead += 1;

            if (recordsRead > 51000) {
              LOGGER.info("Message Accepted: recordsRead {}", recordsRead);
              LOGGER.info("Cancelled? {}", cancelled.get());
              LOGGER.info("isFinished? {}", source.isFinished(true));

            }

            if (recordsRead % 1000 == 0) {
              LOGGER.info("Records read: {}", recordsRead);
            }
          }
        }
        destination.notifyEndOfStream();
      } catch (final Exception e) {
        if (!cancelled.get()) {
          // Although this thread is closed first, it races with the source's closure and can attempt one
          // final read after the source is closed before it's terminated.
          // This read will fail and throw an exception. Because of this, throw exceptions only if the worker
          // was not cancelled.
          throw new RuntimeException(e);
        }
      }
    };
  }
DefaultAirbyteSource.java
  @Override
  public boolean isFinished() {
    Preconditions.checkState(sourceProcess != null);
    // As this check is done on every message read, it is important for this operation to be efficient.
    // Short circuit early to avoid checking the underlying process.
    final var isEmpty = !messageIterator.hasNext();
    if (!isEmpty) {
      return false;
    }

    return !sourceProcess.isAlive() && !messageIterator.hasNext();
  }

  @Override
  public boolean isFinished(final boolean logMore) {
    Preconditions.checkState(sourceProcess != null);
    // As this check is done on every message read, it is important for this operation to be efficient.
    // Short circuit early to avoid checking the underlying process
    final var isEmpty = !messageIterator.hasNext();
    if (logMore) {
      LOGGER.debug("isEmpty -> hasNext", isEmpty);
    }
    if (!isEmpty) {
      return false;
    }

    if (logMore) {
      LOGGER.debug("isAlive?", sourceProcess.isAlive());
    }
    return !sourceProcess.isAlive() && !messageIterator.hasNext();
  }

I'd hope this was resolved by #8036 or the other related PRs, but doesn't seem to be the case.

Note: I'm running all of the containers as non-root, via wrapping the dockerfiles, see #7872. I have also deployed k8s networkpolicies to make communication possible between the containers in our cluster.

Expected Behavior

The job should complete successfully and not hang.

Logs

Here's one instance where I cancelled the job after 5min of idle activity. I have also waited longer (1h+) to see if it resolves, and have the logs for those as well.
In rare cases I get an log from the destination saying something similar too

destination - 2021-11-23 16:46:56 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:46:56 ERROR i.a.i.b.IntegrationRunner(consumeWriteStream):149 - {} - Received invalid message: {"type":"RECORD","record":{"stream":"RENTOBJTRANSACT","data":{"RENTOBJTRANSACTID":790218,"COMPANYID":20,"RENTOBJID":8498,"RENTCONTRACTID":791353,"TRANSACTTYPE":"RC","CHECKOUTSTATIONID":50816,"CHECKOUTDATE":"2020-05-25T00:00:00Z","CHECKOUTTIME":"1970-01-01T17:11:00Z","CHECKOUTEMPLOYEEID":2206,"CHECKOUTPOSITIONID":776,"PLANCHECKINSTATIONID":50816,"PLANCHECKINDATE":"2020-05-28T00:00:00Z","PLANCHECKINTIME":"1970-01-01T16:00:00Z","PLANCHECKINPOSITIONID":776,"BILLINGCHECKOUTVALUEEXIST":0,"BILLINGCHECKINVALUEEXIST":0,"CHECKOUTTIMESTAMP":"2020-05-26T16:15:41Z","CUSTKEY":591588,"AMOUNTSUMTOTAL":463.65,"AMOUNTSUMDISC":0,"AMOUNTSUMNET":463.65,"AMOUNTSUMTAX":97.3665,"AMOUNTSUMGROSS":561.0165,"COUNTDRIVER":1,"RCPRINTPOS":"01","BILLPRINTPOS":"01","PRICELISTID":50622,"CLASSIFICATIONID":60141,"KM":0,"TRANSACTYEAR":2020,"TRANSACTMONTH":5,"EXTPRICECALC":0,"DISCPERCENT":0,"CONTRACTTYPE":"10","CALCRENTMONTH":0,"CALCRENTWEEK":0,"CALCRENTDAY":2,"CALC

But for most of them not.

LOG
2021-11-23 16:40:29 INFO WorkerRun(call):49 - Executing worker wrapper. Airbyte version: 0.32.5-alpha
2021-11-23 16:40:33 INFO TemporalAttemptExecution(get):116 - Executing worker wrapper. Airbyte version: 0.32.5-alpha
2021-11-23 16:40:33 WARN Databases(createPostgresDatabaseWithRetry):41 - Waiting for database to become available...
2021-11-23 16:40:33 INFO JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready...
2021-11-23 16:40:33 INFO Databases(createPostgresDatabaseWithRetry):58 - Database available!
2021-11-23 16:40:35 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:40:36 WARN JsonMetaSchema(newValidator):338 - Unknown keyword example - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-11-23 16:40:36 WARN JsonMetaSchema(newValidator):338 - Unknown keyword existingJavaType - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-11-23 16:40:36 INFO DefaultReplicationWorker(run):99 - start sync worker. job id: 9 attempt id: 0
2021-11-23 16:40:36 INFO DefaultReplicationWorker(run):108 - configured sync modes: {CRM.RENTOBJ=full_refresh - append, CRM.RENTOBJTRANSACT=full_refresh - append}
2021-11-23 16:40:36 INFO DefaultAirbyteDestination(start):64 - Running destination...
2021-11-23 16:40:36 INFO KubeProcessFactory(create):106 - airbyte-destination-postgres-worker-9-0-hjvtz stdoutLocalPort = 9024
2021-11-23 16:40:36 INFO KubeProcessFactory(create):109 - airbyte-destination-postgres-worker-9-0-hjvtz stderrLocalPort = 9025
2021-11-23 16:40:36 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):428 - Creating stdout socket server...
2021-11-23 16:40:36 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):439 - Creating stderr socket server...
2021-11-23 16:40:37 INFO KubePodProcess(<init>):387 - Creating pod...
2021-11-23 16:40:41 INFO KubePodProcess(waitForInitPodToRun):229 - Waiting for init container to be ready before copying files...
2021-11-23 16:40:41 INFO KubePodProcess(waitForInitPodToRun):232 - Init container present..
2021-11-23 16:40:41 INFO KubePodProcess(waitForInitPodToRun):235 - Init container ready..
2021-11-23 16:40:41 INFO KubePodProcess(<init>):392 - Copying files...
2021-11-23 16:40:41 INFO KubePodProcess(copyFilesToKubeConfigVolume):207 - Uploading file: destination_config.json
2021-11-23 16:40:41 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-23 16:40:41 INFO KubePodProcess(copyFilesToKubeConfigVolume):207 - Uploading file: destination_catalog.json
2021-11-23 16:40:42 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-23 16:40:42 INFO KubePodProcess(copyFilesToKubeConfigVolume):207 - Uploading file: FINISHED_UPLOADING
2021-11-23 16:40:42 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-23 16:40:42 INFO KubePodProcess(<init>):395 - Waiting until pod is ready...
2021-11-23 16:40:43 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):430 - Setting stdout...
2021-11-23 16:40:43 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):441 - Setting stderr...
2021-11-23 16:40:43 DEBUG AbstractWatchManager(runWatch):158 - Watching https://10.254.0.1/api/v1/namespaces/airbyte/pods?fieldSelector=metadata.name%3Dairbyte-destination-postgres-worker-9-0-hjvtz&resourceVersion=227660379&watch=true...
2021-11-23 16:40:44 DEBUG AbstractWatchManager(close):182 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2102e4b3
2021-11-23 16:40:44 DEBUG AbstractWatchManager(closeWebSocket):169 - Closing websocket okhttp3.internal.ws.RealWebSocket@1298d6aa
2021-11-23 16:40:44 DEBUG AbstractWatchManager(closeExecutorService):98 - Closing ExecutorService
2021-11-23 16:40:44 INFO KubePodProcess(<init>):409 - Reading pod IP...
2021-11-23 16:40:44 INFO KubePodProcess(<init>):411 - Pod IP: 192.168.130.30
2021-11-23 16:40:44 INFO KubePodProcess(<init>):414 - Creating stdin socket...
2021-11-23 16:40:44 INFO KubeProcessFactory(create):106 - airbyte-source-db2-worker-9-0-hvnav stdoutLocalPort = 9026
2021-11-23 16:40:44 INFO KubeProcessFactory(create):109 - airbyte-source-db2-worker-9-0-hvnav stderrLocalPort = 9027
2021-11-23 16:40:44 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):428 - Creating stdout socket server...
2021-11-23 16:40:44 INFO KubePodProcess(<init>):387 - Creating pod...
2021-11-23 16:40:44 INFO KubePodProcess(waitForInitPodToRun):229 - Waiting for init container to be ready before copying files...
2021-11-23 16:40:44 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):439 - Creating stderr socket server...
2021-11-23 16:40:45 INFO KubePodProcess(waitForInitPodToRun):232 - Init container present..
2021-11-23 16:40:45 DEBUG AbstractWatchManager(runWatch):158 - Watching https://10.254.0.1/api/v1/namespaces/airbyte/pods?fieldSelector=metadata.name%3Dairbyte-source-db2-worker-9-0-hvnav&resourceVersion=227660403&watch=true...
2021-11-23 16:40:46 DEBUG AbstractWatchManager(close):182 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2b82c21c
2021-11-23 16:40:46 DEBUG AbstractWatchManager(closeWebSocket):169 - Closing websocket okhttp3.internal.ws.RealWebSocket@5bd5d9d1
2021-11-23 16:40:46 DEBUG AbstractWatchManager(closeExecutorService):98 - Closing ExecutorService
2021-11-23 16:40:46 INFO KubePodProcess(waitForInitPodToRun):235 - Init container ready..
2021-11-23 16:40:46 INFO KubePodProcess(<init>):392 - Copying files...
2021-11-23 16:40:46 INFO KubePodProcess(copyFilesToKubeConfigVolume):207 - Uploading file: source_config.json
2021-11-23 16:40:46 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:40:46 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-23 16:40:46 INFO KubePodProcess(copyFilesToKubeConfigVolume):207 - Uploading file: source_catalog.json
2021-11-23 16:40:46 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-23 16:40:46 INFO KubePodProcess(copyFilesToKubeConfigVolume):207 - Uploading file: FINISHED_UPLOADING
2021-11-23 16:40:46 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-23 16:40:46 INFO KubePodProcess(<init>):395 - Waiting until pod is ready...
2021-11-23 16:40:46 DEBUG AbstractWatchManager(runWatch):158 - Watching https://10.254.0.1/api/v1/namespaces/airbyte/pods?fieldSelector=metadata.name%3Dairbyte-source-db2-worker-9-0-hvnav&resourceVersion=227660415&watch=true...
2021-11-23 16:40:47 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):430 - Setting stdout...
2021-11-23 16:40:47 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):441 - Setting stderr...
2021-11-23 16:40:48 DEBUG AbstractWatchManager(close):182 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@5d3e8b39
2021-11-23 16:40:48 DEBUG AbstractWatchManager(closeWebSocket):169 - Closing websocket okhttp3.internal.ws.RealWebSocket@5e00a178
2021-11-23 16:40:48 DEBUG AbstractWatchManager(closeExecutorService):98 - Closing ExecutorService
2021-11-23 16:40:48 INFO KubePodProcess(<init>):409 - Reading pod IP...
2021-11-23 16:40:48 INFO KubePodProcess(<init>):411 - Pod IP: 192.168.129.64
2021-11-23 16:40:48 INFO KubePodProcess(<init>):418 - Using null stdin output stream...
2021-11-23 16:40:48 INFO DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):260 - Destination output thread started.
2021-11-23 16:40:48 INFO DefaultReplicationWorker(run):136 - Waiting for source thread to join.
2021-11-23 16:40:48 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):207 - Replication thread started.
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[32mINFO�[m i.a.i.d.p.PostgresDestination(main):69 - {} - starting destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):76 - {} - Running integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):118 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):80 - {} - Command: WRITE
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):81 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:44 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.b.s.SshTunnel(getInstance):170 - {} - Starting connection with method: NO_TUNNEL
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):96 - {} - Write config: WriteConfig{streamName=RENTOBJ, namespace=raw_ams, outputSchemaName=raw_ams, tmpTableName=_airbyte_tmp_yli_RENTOBJ, outputTableName=_airbyte_raw_RENTOBJ, syncMode=append}
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):96 - {} - Write config: WriteConfig{streamName=RENTOBJTRANSACT, namespace=raw_ams, outputSchemaName=raw_ams, tmpTableName=_airbyte_tmp_jjo_RENTOBJTRANSACT, outputTableName=_airbyte_raw_RENTOBJTRANSACT, syncMode=append}
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(startTracked):124 - {} - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):121 - {} - Preparing tmp tables in destination started for 2 streams
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):125 - {} - Preparing tmp table in destination started for stream RENTOBJ. schema: raw_ams, tmp table name: _airbyte_tmp_yli_RENTOBJ
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):125 - {} - Preparing tmp table in destination started for stream RENTOBJTRANSACT. schema: raw_ams, tmp table name: _airbyte_tmp_jjo_RENTOBJTRANSACT
�[35mdestination�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:45 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):131 - {} - Preparing tables in destination completed.
�[34msource�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.s.d.Db2Source(main):48 - {} - starting source: class io.airbyte.integrations.source.db2.Db2Source
�[34msource�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.source.db2.Db2Source
�[34msource�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {read=null, catalog=source_catalog.json, config=source_config.json}
�[34msource�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: READ
�[34msource�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'}
�[34msource�[0m - 2021-11-23 16:40:48 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[34msource�[0m - 2021-11-23 16:40:49 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[34msource�[0m - 2021-11-23 16:40:49 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.s.r.CdcStateManager(<init>):46 - {} - Initialized CDC state with: null
�[34msource�[0m - 2021-11-23 16:40:49 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='RENTOBJTRANSACT', namespace='CRM'}, New Cursor Field: null. Resetting cursor value
�[34msource�[0m - 2021-11-23 16:40:49 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:48 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='RENTOBJ', namespace='CRM'}, New Cursor Field: null. Resetting cursor value
�[34msource�[0m - 2021-11-23 16:40:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:51 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):460 - {} - Queueing query for table: RENTOBJ
�[34msource�[0m - 2021-11-23 16:40:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:40:51 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):460 - {} - Queueing query for table: RENTOBJTRANSACT
2021-11-23 16:40:56 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:40:59 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 1000
2021-11-23 16:41:04 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 2000
2021-11-23 16:41:06 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:41:06 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 3000
2021-11-23 16:41:08 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 4000
2021-11-23 16:41:10 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 5000
2021-11-23 16:41:12 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 6000
2021-11-23 16:41:14 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 7000
2021-11-23 16:41:16 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 8000
2021-11-23 16:41:16 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:41:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 9000
2021-11-23 16:41:19 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 10000
2021-11-23 16:41:21 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 11000
2021-11-23 16:41:22 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 12000
2021-11-23 16:41:24 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 13000
2021-11-23 16:41:25 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 14000
2021-11-23 16:41:26 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:41:26 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 15000
2021-11-23 16:41:27 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 16000
2021-11-23 16:41:29 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 17000
�[34msource�[0m - 2021-11-23 16:41:30 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:41:20 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 10000
2021-11-23 16:41:30 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 18000
2021-11-23 16:41:31 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 19000
2021-11-23 16:41:32 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 20000
2021-11-23 16:41:34 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 21000
2021-11-23 16:41:35 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 22000
2021-11-23 16:41:36 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:41:36 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 23000
2021-11-23 16:41:38 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 24000
2021-11-23 16:41:38 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 25000
2021-11-23 16:41:40 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 26000
2021-11-23 16:41:41 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 27000
�[34msource�[0m - 2021-11-23 16:41:42 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:41:33 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 20000
2021-11-23 16:41:42 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 28000
2021-11-23 16:41:43 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 29000
2021-11-23 16:41:44 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 30000
2021-11-23 16:41:45 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 31000
2021-11-23 16:41:46 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:41:46 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 32000
2021-11-23 16:41:47 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 33000
2021-11-23 16:41:48 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 34000
2021-11-23 16:41:50 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 35000
2021-11-23 16:41:51 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 36000
2021-11-23 16:41:52 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 37000
�[34msource�[0m - 2021-11-23 16:41:53 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:41:45 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 30000
2021-11-23 16:41:53 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 38000
2021-11-23 16:41:55 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 39000
2021-11-23 16:41:56 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:41:56 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 40000
2021-11-23 16:41:57 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 41000
2021-11-23 16:41:58 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 42000
2021-11-23 16:41:59 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 43000
2021-11-23 16:42:00 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 44000
2021-11-23 16:42:01 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 45000
2021-11-23 16:42:03 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 46000
2021-11-23 16:42:04 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 47000
�[34msource�[0m - 2021-11-23 16:42:05 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:41:57 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 40000
2021-11-23 16:42:05 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 48000
2021-11-23 16:42:06 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:42:06 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 49000
2021-11-23 16:42:07 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 50000
2021-11-23 16:42:07 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):237 - Records read: 51000
2021-11-23 16:42:07 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):230 - Message Accepted: recordsRead 51001

REDACTED a lot of similar lines to keep the issue within 65k chars

2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):212 - Records read: 51725, cancelled: false, isFinished: false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):217 - AttemptRead
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):222 - Message: io.airbyte.protocol.models.AirbyteMessage@436b7b93[type=RECORD,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=io.airbyte.protocol.models.AirbyteRecordMessage@46c09463[stream=RENTOBJTRANSACT,data=...REDACTED but valid json...,emittedAt=1637685648466,namespace=raw_ams,additionalProperties={}],state=<null>,additionalProperties={}]
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):230 - Message Accepted: recordsRead 51726
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):231 - Cancelled? false
2021-11-23 16:42:18 DEBUG DefaultAirbyteSource(isFinished):102 - isEmpty -> hasNext
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):232 - isFinished? false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):212 - Records read: 51726, cancelled: false, isFinished: false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):217 - AttemptRead
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):222 - Message: io.airbyte.protocol.models.AirbyteMessage@2aa2b44a[type=RECORD,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=io.airbyte.protocol.models.AirbyteRecordMessage@421bf639[stream=RENTOBJTRANSACT,data=...REDACTED but valid json...,emittedAt=1637685648466,namespace=raw_ams,additionalProperties={}],state=<null>,additionalProperties={}]
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):230 - Message Accepted: recordsRead 51727
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):231 - Cancelled? false
2021-11-23 16:42:18 DEBUG DefaultAirbyteSource(isFinished):102 - isEmpty -> hasNext
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):232 - isFinished? false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):212 - Records read: 51727, cancelled: false, isFinished: false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):217 - AttemptRead
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):222 - Message: io.airbyte.protocol.models.AirbyteMessage@62ce442[type=RECORD,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=io.airbyte.protocol.models.AirbyteRecordMessage@33b9b4d8[stream=RENTOBJTRANSACT,data=...REDACTED but valid json...,emittedAt=1637685648466,namespace=raw_ams,additionalProperties={}],state=<null>,additionalProperties={}]
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):230 - Message Accepted: recordsRead 51728
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):231 - Cancelled? false
2021-11-23 16:42:18 DEBUG DefaultAirbyteSource(isFinished):102 - isEmpty -> hasNext
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):232 - isFinished? false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):212 - Records read: 51728, cancelled: false, isFinished: false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):217 - AttemptRead
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):222 - Message: io.airbyte.protocol.models.AirbyteMessage@48a3a110[type=RECORD,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=io.airbyte.protocol.models.AirbyteRecordMessage@4d73ad9c[stream=RENTOBJTRANSACT,data=...REDACTED but valid json...,emittedAt=1637685648466,namespace=raw_ams,additionalProperties={}],state=<null>,additionalProperties={}]
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):230 - Message Accepted: recordsRead 51729
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):231 - Cancelled? false
2021-11-23 16:42:18 DEBUG DefaultAirbyteSource(isFinished):102 - isEmpty -> hasNext
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):232 - isFinished? false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):212 - Records read: 51729, cancelled: false, isFinished: false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):217 - AttemptRead
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):222 - Message: io.airbyte.protocol.models.AirbyteMessage@77af2bf1[type=RECORD,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=io.airbyte.protocol.models.AirbyteRecordMessage@56b59751[stream=RENTOBJTRANSACT,data=...REDACTED but valid json...,emittedAt=1637685648466,namespace=raw_ams,additionalProperties={}],state=<null>,additionalProperties={}]
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):230 - Message Accepted: recordsRead 51730
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):231 - Cancelled? false
2021-11-23 16:42:18 DEBUG DefaultAirbyteSource(isFinished):102 - isEmpty -> hasNext
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):232 - isFinished? false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):212 - Records read: 51730, cancelled: false, isFinished: false
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):217 - AttemptRead
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):222 - Message: io.airbyte.protocol.models.AirbyteMessage@9da79f9[type=RECORD,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=io.airbyte.protocol.models.AirbyteRecordMessage@65b7e5b2[stream=RENTOBJTRANSACT,data=...REDACTED but valid json...,emittedAt=1637685648466,namespace=raw_ams,additionalProperties={}],state=<null>,additionalProperties={}]
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):230 - Message Accepted: recordsRead 51731
2021-11-23 16:42:18 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):231 - Cancelled? false
2021-11-23 16:42:26 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:42:36 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:42:46 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:42:56 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:43:06 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:43:16 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:43:26 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:43:36 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:43:46 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:43:56 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:44:06 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:44:16 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:44:26 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:44:36 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:44:46 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:44:56 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:45:06 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:45:16 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:45:26 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:45:36 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:45:46 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:45:56 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:46:06 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:46:16 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:46:26 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc

I manually press the cancel button here.

2021-11-23 16:47:07 DEBUG JobSubmitter(lambda$submitJob$2):135 - Job id 9 succeeded
2021-11-23 16:47:07 DEBUG JobSubmitter(lambda$submitJob$4):166 - Job id 9 cleared
2021-11-23 16:46:36 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:46:46 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:46:46 INFO TemporalAttemptExecution(lambda$getCancellationChecker$3):203 - Running sync worker cancellation...
2021-11-23 16:46:46 INFO DefaultReplicationWorker(cancel):284 - Cancelling replication worker...
2021-11-23 16:46:56 INFO DefaultReplicationWorker(cancel):292 - Cancelling destination...
2021-11-23 16:46:56 INFO DefaultAirbyteDestination(cancel):120 - Attempting to cancel destination process...
2021-11-23 16:46:56 INFO DefaultAirbyteDestination(cancel):125 - Destination process exists, cancelling...
2021-11-23 16:46:56 INFO KubePodProcess(destroy):490 - Destroying Kube process: airbyte-destination-postgres-worker-9-0-hjvtz
�[35mdestination�[0m - 2021-11-23 16:46:56 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:46:56 �[1;31mERROR�[m i.a.i.b.IntegrationRunner(consumeWriteStream):149 - {} - Received invalid message: {"type":"RECORD","record":{"stream":"RENTOBJTRANSACT","data":{"RENTOBJTRANSACTID":790218,"COMPANYID":20,"RENTOBJID":8498,"RENTCONTRACTID":791353,"TRANSACTTYPE":"RC","CHECKOUTSTATIONID":50816,"CHECKOUTDATE":"2020-05-25T00:00:00Z","CHECKOUTTIME":"1970-01-01T17:11:00Z","CHECKOUTEMPLOYEEID":2206,"CHECKOUTPOSITIONID":776,"PLANCHECKINSTATIONID":50816,"PLANCHECKINDATE":"2020-05-28T00:00:00Z","PLANCHECKINTIME":"1970-01-01T16:00:00Z","PLANCHECKINPOSITIONID":776,"BILLINGCHECKOUTVALUEEXIST":0,"BILLINGCHECKINVALUEEXIST":0,"CHECKOUTTIMESTAMP":"2020-05-26T16:15:41Z","CUSTKEY":591588,"AMOUNTSUMTOTAL":463.65,"AMOUNTSUMDISC":0,"AMOUNTSUMNET":463.65,"AMOUNTSUMTAX":97.3665,"AMOUNTSUMGROSS":561.0165,"COUNTDRIVER":1,"RCPRINTPOS":"01","BILLPRINTPOS":"01","PRICELISTID":50622,"CLASSIFICATIONID":60141,"KM":0,"TRANSACTYEAR":2020,"TRANSACTMONTH":5,"EXTPRICECALC":0,"DISCPERCENT":0,"CONTRACTTYPE":"10","CALCRENTMONTH":0,"CALCRENTWEEK":0,"CALCRENTDAY":2,"CALC
�[35mdestination�[0m - 2021-11-23 16:46:56 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:46:56 �[32mINFO�[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):60 - {} - Airbyte message consumer: succeeded.
�[35mdestination�[0m - 2021-11-23 16:46:56 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-23 16:46:56 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(close):199 - {} - executing on success close procedure.
2021-11-23 16:46:56 WARN LineGobbler(voidCall):86 - airbyte-destination gobbler IOException: Socket closed. Typically happens when cancelling a job.
2021-11-23 16:46:56 DEBUG KubePodProcess(close):528 - Closed airbyte-destination-postgres-worker-9-0-hjvtz
2021-11-23 16:46:56 INFO KubePodProcess(destroy):496 - Destroyed Kube process: airbyte-destination-postgres-worker-9-0-hjvtz
2021-11-23 16:47:06 WARN WorkerUtils(closeProcess):145 - Process is still alive after calling destroy. Attempting to destroy forcibly...
2021-11-23 16:47:06 INFO KubePodProcess(destroy):490 - Destroying Kube process: airbyte-destination-postgres-worker-9-0-hjvtz
2021-11-23 16:47:06 DEBUG KubePodProcess(close):528 - Closed airbyte-destination-postgres-worker-9-0-hjvtz
2021-11-23 16:47:06 INFO KubePodProcess(destroy):496 - Destroyed Kube process: airbyte-destination-postgres-worker-9-0-hjvtz
2021-11-23 16:47:06 INFO DefaultAirbyteDestination(cancel):127 - Cancelled destination process!
2021-11-23 16:47:06 INFO DefaultReplicationWorker(cancel):299 - Cancelling source...
2021-11-23 16:47:06 INFO DefaultAirbyteSource(cancel):142 - Attempting to cancel source process...
2021-11-23 16:47:06 INFO DefaultAirbyteSource(cancel):147 - Source process exists, cancelling...
2021-11-23 16:47:06 INFO KubePodProcess(destroy):490 - Destroying Kube process: airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO DefaultReplicationWorker(run):138 - Source thread complete.
2021-11-23 16:47:06 DEBUG KubePodProcess(close):528 - Closed airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO DefaultReplicationWorker(run):139 - Waiting for destination thread to join.
2021-11-23 16:47:06 INFO DefaultReplicationWorker(run):141 - Destination thread complete.
2021-11-23 16:47:06 DEBUG DefaultAirbyteSource(close):128 - Closing source process
2021-11-23 16:47:06 INFO KubePodProcess(destroy):496 - Destroyed Kube process: airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 DEBUG WorkerUtils(gentleClose):50 - Gently closing process airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO KubePodProcess(getReturnCode):560 - Unable to find pod airbyte-source-db2-worker-9-0-hvnav to retrieve exit value. Defaulting to  value 143. This is expected if the job was cancelled.
2021-11-23 16:47:06 INFO KubePodProcess(getReturnCode):560 - Unable to find pod airbyte-source-db2-worker-9-0-hvnav to retrieve exit value. Defaulting to  value 143. This is expected if the job was cancelled.
2021-11-23 16:47:06 DEBUG KubePodProcess(close):528 - Closed airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 DEBUG KubePodProcess(close):528 - Closed airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO KubePodProcess(exitValue):598 - Closed all resources for pod airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO KubePodProcess(exitValue):598 - Closed all resources for pod airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO KubePodProcess(getReturnCode):560 - Unable to find pod airbyte-source-db2-worker-9-0-hvnav to retrieve exit value. Defaulting to  value 143. This is expected if the job was cancelled.
2021-11-23 16:47:06 DEBUG KubePodProcess(close):528 - Closed airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO KubePodProcess(exitValue):598 - Closed all resources for pod airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO DefaultAirbyteSource(cancel):149 - Cancelled source process!
2021-11-23 16:47:06 INFO KubePodProcess(getReturnCode):560 - Unable to find pod airbyte-source-db2-worker-9-0-hvnav to retrieve exit value. Defaulting to  value 143. This is expected if the job was cancelled.
2021-11-23 16:47:06 DEBUG KubePodProcess(close):528 - Closed airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO KubePodProcess(exitValue):598 - Closed all resources for pod airbyte-source-db2-worker-9-0-hvnav
2021-11-23 16:47:06 INFO TemporalAttemptExecution(lambda$getCancellationChecker$3):207 - Interrupting worker thread...
2021-11-23 16:47:06 INFO TemporalAttemptExecution(lambda$getCancellationChecker$3):210 - Cancelling completable future...
2021-11-23 16:47:06 WARN CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):49 - Job either timeout-ed or was cancelled.
2021-11-23 16:47:06 INFO TemporalAttemptExecution(get):137 - Stopping cancellation check scheduling...
2021-11-23 16:47:06 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-23 16:47:06 WARN POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=5c8e3ad2-5d74-3776-9b24-9f6fe3132ee3, activityType=Replicate, attempt=1
java.util.concurrent.CancellationException: null
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2468) ~[?:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$3(TemporalAttemptExecution.java:213) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.temporal.CancellationHandler$TemporalCancellationHandler.checkAndHandleCancellation(CancellationHandler.java:48) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getCancellationChecker$4(TemporalAttemptExecution.java:216) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
2021-11-23 16:47:06 WARN CancellationHandler$TemporalCancellationHandler(checkAndHandleCancellation):49 - Job either timeout-ed or was cancelled.
2021-11-23 16:47:06 ERROR DefaultReplicationWorker(run):145 - Sync worker failed.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Pod]  with name: [airbyte-source-db2-worker-9-0-hvnav]  in namespace: [airbyte]  failed.
	at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:224) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:185) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:86) ~[kubernetes-client-5.3.1.jar:?]
	at io.airbyte.workers.process.KubePodProcess.getReturnCode(KubePodProcess.java:557) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.process.KubePodProcess.exitValue(KubePodProcess.java:591) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at java.lang.Process.hasExited(Process.java:333) ~[?:?]
	at java.lang.Process.isAlive(Process.java:323) ~[?:?]
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:134) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:143) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:49) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
	Suppressed: java.net.SocketException: Socket closed
		at sun.nio.ch.NioSocketImpl.ensureOpenAndConnected(NioSocketImpl.java:165) ~[?:?]
		at sun.nio.ch.NioSocketImpl.beginWrite(NioSocketImpl.java:366) ~[?:?]
		at sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:411) ~[?:?]
		at sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440) ~[?:?]
		at sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826) ~[?:?]
		at java.net.Socket$SocketOutputStream.write(Socket.java:1052) ~[?:?]
		at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:242) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:312) ~[?:?]
		at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:290) ~[?:?]
		at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:131) ~[?:?]
		at java.io.OutputStreamWriter.write(OutputStreamWriter.java:208) ~[?:?]
		at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?]
		at java.io.BufferedWriter.flush(BufferedWriter.java:256) ~[?:?]
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.notifyEndOfStream(DefaultAirbyteDestination.java:93) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
		at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:106) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:118) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
		at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:49) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.io.InterruptedIOException: interrupted
	at okio.Timeout.throwIfReached(Timeout.java:146) ~[okio-1.17.2.jar:?]
	at okio.Okio$1.write(Okio.java:76) ~[okio-1.17.2.jar:?]
	at okio.AsyncTimeout$1.write(AsyncTimeout.java:180) ~[okio-1.17.2.jar:?]
	at okio.RealBufferedSink.flush(RealBufferedSink.java:224) ~[okio-1.17.2.jar:?]
	at okhttp3.internal.http2.Http2Writer.flush(Http2Writer.java:121) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:275) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:238) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http2.Http2ExchangeCodec.writeRequestHeaders(Http2ExchangeCodec.java:116) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.connection.Exchange.writeRequestHeaders(Exchange.java:72) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:43) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:43) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[okhttp-3.14.9.jar:?]
	at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:133) ~[kubernetes-client-5.3.1.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[okhttp-3.14.9.jar:?]
	at io.fabric8.kubernetes.client.utils.TokenRefreshInterceptor.intercept(TokenRefreshInterceptor.java:42) ~[kubernetes-client-5.3.1.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[okhttp-3.14.9.jar:?]
	at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68) ~[kubernetes-client-5.3.1.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[okhttp-3.14.9.jar:?]
	at io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createApplicableInterceptors$6(HttpClientUtils.java:284) ~[kubernetes-client-5.3.1.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[okhttp-3.14.9.jar:?]
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[okhttp-3.14.9.jar:?]
	at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229) ~[okhttp-3.14.9.jar:?]
	at okhttp3.RealCall.execute(RealCall.java:81) ~[okhttp-3.14.9.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:485) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:448) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:415) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:397) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleGet(BaseOperation.java:924) ~[kubernetes-client-5.3.1.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:219) ~[kubernetes-client-5.3.1.jar:?]
	... 11 more
2021-11-23 16:47:06 INFO DefaultReplicationWorker(run):169 - sync summary: io.airbyte.config.ReplicationAttemptSummary@125477b7[status=cancelled,recordsSynced=51731,bytesSynced=132213741,startTime=1637685636770,endTime=1637686026968]
2021-11-23 16:47:06 INFO DefaultReplicationWorker(run):178 - Source did not output any state messages
2021-11-23 16:47:06 WARN DefaultReplicationWorker(run):189 - State capture: No state retained.

Steps to Reproduce

  1. Click "Sync Now"
  2. Wait

Are you willing to submit a PR?

I would, but I have reached the limit of my debugging.

@benmoriceau
Copy link
Contributor Author

benmoriceau commented Nov 23, 2021

Original comment from @lmossman:

Hello, I took a look at your issue but I couldn't find any obvious answer as to why your job is hanging. It seems like you have narrowed down the problem to show that the process is hanging on

final var isEmpty = !messageIterator.hasNext();

in the DefaultAirbyteSource, which in turn is calling BufferedReader#readLine. I tried looking this up and it appears that this is an issue others have faced with the BufferedReader#readLine method hanging. It could indicate that there is an issue in the source where it is not returning an EOL for some line, causing it to hang on the call to readLine.

@Dracyr Does it always hang after record 51731? Or does the last record logged vary from one run to the next?

Also I am going to reassign this ticket to @benmoriceau as he is the current Platform OC engineer.

@benmoriceau
Copy link
Contributor Author

Hello @Dracyr,

We accidentally transfer your original ticket to another project which shouldn't have happen. Let's restore it here. I am trying to figure out which person would be the most relevant to address the issue you reported. @lmossman had some question that will help us to figure it out.

Thanks,

@lmossman
Copy link
Contributor

lmossman commented Nov 23, 2021

Here was the response that @Dracyr provided to my question over slack, since they could not post on the ticket after it was transferred:

Thanks for taking the time!

Yeah, I find it a bit weird. When I've been running the source container on it's own and executing the entrypoint/reading script, I got all the output I was expecting (i.e all valid json-lines ending with some logging output of the source closing). But I'll try to run it again, save it and do a deeper look for any missing EOL/EOF markers too.

It's not on the same record, but semi consistent, I did three runs one after another and got 51180 51469 51990

@benmoriceau
Copy link
Contributor Author

Here was the response that @Dracyr provided to my question over slack, since they could not post on the ticket after it was transferred:

Thanks for taking the time!
Yeah, I find it a bit weird. When I've been running the source container on it's own and executing the entrypoint/reading script, I got all the output I was expecting (i.e all valid json-lines ending with some logging output of the source closing). But I'll try to run it again, save it and do a deeper look for any missing EOL/EOF markers too.
It's not on the same record, but semi consistent, I did three runs one after another and got 51180 51469 51990

Do you think that is could be a size limit on DB2?

@Dracyr
Copy link
Contributor

Dracyr commented Nov 24, 2021

Size limit on what exactly?

If I generate all the records in the container

# Source container running in cluster, running 'sleep infinity' with same source_config.json/source_catalog.json copied in
pv@localhost$: kubectl -n airbyte exec -it airbyte-source-db2-worker-pv /bin/bash
airbyte@airbyte-source-db2-worker-pv:/config$ eval "$AIRBYTE_ENTRYPOINT read --config source_config.json --catalog source_catalog.json" > records.txt

And copy the resulting file back to my local machine for inspection, that file looks alright. It contains 53248 lines, and I can pipe all the json-lines to jq which does not show any errors.

# remove log output, pipe to jq to verify proper json
cat records.txt | sed '/^2021-11-24/d' | jq -e . >/dev/null 2>&1; echo ${PIPESTATUS[1]}

@Dracyr
Copy link
Contributor

Dracyr commented Nov 24, 2021

I added a socket read timeout, which throws a SocketTimeoutException if the read blocks for more than 5s.

diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java
index 8cb4f4a3f..315eccb11 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java
@@ -37,6 +37,7 @@ import java.io.OutputStream;
 import java.lang.ProcessHandle.Info;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.AbstractMap;
@@ -448,6 +449,8 @@ public class KubePodProcess extends Process {
         LOGGER.info("Creating stdout socket server...");
         final var socket = stdoutServerSocket.accept(); // blocks until connected
         LOGGER.info("Setting stdout...");
+        socket.setSoTimeout(5000);
+
         this.stdout = socket.getInputStream();
       } catch (final IOException e) {
         e.printStackTrace(); // todo: propagate exception / join at the end of constructor
Full Logs from run I removed one of the streams, so there's 10k less records than before.
2021-11-24 12:47:31 INFO WorkerRun(call):49 - Executing worker wrapper. Airbyte version: 0.32.5-alpha
2021-11-24 12:47:35 INFO TemporalAttemptExecution(get):116 - Executing worker wrapper. Airbyte version: 0.32.5-alpha
2021-11-24 12:47:35 WARN Databases(createPostgresDatabaseWithRetry):41 - Waiting for database to become available...
2021-11-24 12:47:35 INFO JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready...
2021-11-24 12:47:36 INFO Databases(createPostgresDatabaseWithRetry):58 - Database available!
2021-11-24 12:47:38 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:47:39 WARN JsonMetaSchema(newValidator):338 - Unknown keyword example - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-11-24 12:47:39 WARN JsonMetaSchema(newValidator):338 - Unknown keyword existingJavaType - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2021-11-24 12:47:39 INFO DefaultReplicationWorker(run):100 - start sync worker. job id: 18 attempt id: 0
2021-11-24 12:47:39 INFO DefaultReplicationWorker(run):109 - configured sync modes: {CRM.RENTOBJTRANSACT=full_refresh - append}
2021-11-24 12:47:39 INFO DefaultAirbyteDestination(start):64 - Running destination...
2021-11-24 12:47:39 INFO KubeProcessFactory(create):106 - airbyte-destination-postgres-worker-18-0-beige stdoutLocalPort = 9024
2021-11-24 12:47:39 INFO KubeProcessFactory(create):109 - airbyte-destination-postgres-worker-18-0-beige stderrLocalPort = 9025
2021-11-24 12:47:39 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):449 - Creating stdout socket server...
2021-11-24 12:47:39 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):470 - Creating stderr socket server...
2021-11-24 12:47:40 INFO KubePodProcess(<init>):408 - Creating pod...
2021-11-24 12:47:44 INFO KubePodProcess(waitForInitPodToRun):234 - Waiting for init container to be ready before copying files...
2021-11-24 12:47:44 INFO KubePodProcess(waitForInitPodToRun):237 - Init container present..
2021-11-24 12:47:44 INFO KubePodProcess(waitForInitPodToRun):240 - Init container ready..
2021-11-24 12:47:44 INFO KubePodProcess(<init>):413 - Copying files...
2021-11-24 12:47:44 INFO KubePodProcess(copyFilesToKubeConfigVolume):212 - Uploading file: destination_config.json
2021-11-24 12:47:45 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-24 12:47:45 INFO KubePodProcess(copyFilesToKubeConfigVolume):212 - Uploading file: destination_catalog.json
2021-11-24 12:47:46 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-24 12:47:46 INFO KubePodProcess(copyFilesToKubeConfigVolume):212 - Uploading file: FINISHED_UPLOADING
2021-11-24 12:47:46 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-24 12:47:46 INFO KubePodProcess(<init>):416 - Waiting until pod is ready...
2021-11-24 12:47:47 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):451 - Setting stdout...
2021-11-24 12:47:47 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):472 - Setting stderr...
2021-11-24 12:47:47 DEBUG AbstractWatchManager(runWatch):158 - Watching https://10.254.0.1/api/v1/namespaces/airbyte/pods?fieldSelector=metadata.name%3Dairbyte-destination-postgres-worker-18-0-beige&resourceVersion=228009353&watch=true...
2021-11-24 12:47:47 DEBUG AbstractWatchManager(close):182 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@65e7bfb
2021-11-24 12:47:47 DEBUG AbstractWatchManager(closeWebSocket):169 - Closing websocket okhttp3.internal.ws.RealWebSocket@4455902a
2021-11-24 12:47:47 DEBUG AbstractWatchManager(closeExecutorService):98 - Closing ExecutorService
2021-11-24 12:47:47 INFO KubePodProcess(<init>):430 - Reading pod IP...
2021-11-24 12:47:47 INFO KubePodProcess(<init>):432 - Pod IP: 192.168.130.57
2021-11-24 12:47:47 INFO KubePodProcess(<init>):435 - Creating stdin socket...
2021-11-24 12:47:47 INFO KubeProcessFactory(create):106 - airbyte-source-db2-worker-18-0-tvuuf stdoutLocalPort = 9026
2021-11-24 12:47:48 INFO KubeProcessFactory(create):109 - airbyte-source-db2-worker-18-0-tvuuf stderrLocalPort = 9027
2021-11-24 12:47:48 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):449 - Creating stdout socket server...
2021-11-24 12:47:48 INFO KubePodProcess(<init>):408 - Creating pod...
2021-11-24 12:47:48 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):470 - Creating stderr socket server...
2021-11-24 12:47:48 INFO KubePodProcess(waitForInitPodToRun):234 - Waiting for init container to be ready before copying files...
2021-11-24 12:47:48 DEBUG AbstractWatchManager(runWatch):158 - Watching https://10.254.0.1/api/v1/namespaces/airbyte/pods?fieldSelector=metadata.name%3Dairbyte-source-db2-worker-18-0-tvuuf&resourceVersion=228009373&watch=true...
2021-11-24 12:47:48 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:47:48 DEBUG AbstractWatchManager(close):182 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@211449d8
2021-11-24 12:47:48 DEBUG AbstractWatchManager(closeWebSocket):169 - Closing websocket okhttp3.internal.ws.RealWebSocket@5c2216ba
2021-11-24 12:47:48 DEBUG AbstractWatchManager(closeExecutorService):98 - Closing ExecutorService
2021-11-24 12:47:48 INFO KubePodProcess(waitForInitPodToRun):237 - Init container present..
2021-11-24 12:47:48 DEBUG AbstractWatchManager(runWatch):158 - Watching https://10.254.0.1/api/v1/namespaces/airbyte/pods?fieldSelector=metadata.name%3Dairbyte-source-db2-worker-18-0-tvuuf&resourceVersion=228009381&watch=true...
2021-11-24 12:47:49 DEBUG AbstractWatchManager(close):182 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2777576f
2021-11-24 12:47:49 DEBUG AbstractWatchManager(closeWebSocket):169 - Closing websocket okhttp3.internal.ws.RealWebSocket@99d1b69
2021-11-24 12:47:49 DEBUG AbstractWatchManager(closeExecutorService):98 - Closing ExecutorService
2021-11-24 12:47:49 INFO KubePodProcess(waitForInitPodToRun):240 - Init container ready..
2021-11-24 12:47:49 INFO KubePodProcess(<init>):413 - Copying files...
2021-11-24 12:47:49 INFO KubePodProcess(copyFilesToKubeConfigVolume):212 - Uploading file: input_state.json
2021-11-24 12:47:49 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-24 12:47:49 INFO KubePodProcess(copyFilesToKubeConfigVolume):212 - Uploading file: source_config.json
2021-11-24 12:47:49 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-24 12:47:49 INFO KubePodProcess(copyFilesToKubeConfigVolume):212 - Uploading file: source_catalog.json
2021-11-24 12:47:50 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-24 12:47:50 INFO KubePodProcess(copyFilesToKubeConfigVolume):212 - Uploading file: FINISHED_UPLOADING
2021-11-24 12:47:50 DEBUG WebSocketStreamHandler(close):119 - Successfully closed socket.
2021-11-24 12:47:50 INFO KubePodProcess(<init>):416 - Waiting until pod is ready...
2021-11-24 12:47:50 DEBUG AbstractWatchManager(runWatch):158 - Watching https://10.254.0.1/api/v1/namespaces/airbyte/pods?fieldSelector=metadata.name%3Dairbyte-source-db2-worker-18-0-tvuuf&resourceVersion=228009390&watch=true...
2021-11-24 12:47:50 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$5):451 - Setting stdout...
2021-11-24 12:47:50 INFO KubePodProcess(lambda$setupStdOutAndStdErrListeners$6):472 - Setting stderr...
2021-11-24 12:47:51 DEBUG AbstractWatchManager(close):182 - Force closing the watch io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@7ae9144e
2021-11-24 12:47:51 DEBUG AbstractWatchManager(closeWebSocket):169 - Closing websocket okhttp3.internal.ws.RealWebSocket@79677315
2021-11-24 12:47:51 DEBUG AbstractWatchManager(closeExecutorService):98 - Closing ExecutorService
2021-11-24 12:47:51 INFO KubePodProcess(<init>):430 - Reading pod IP...
2021-11-24 12:47:51 INFO KubePodProcess(<init>):432 - Pod IP: 192.168.129.86
2021-11-24 12:47:51 INFO KubePodProcess(<init>):439 - Using null stdin output stream...
2021-11-24 12:47:51 INFO DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):268 - Destination output thread started.
2021-11-24 12:47:51 INFO DefaultReplicationWorker(run):137 - Waiting for source thread to join.
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:47 �[32mINFO�[m i.a.i.d.p.PostgresDestination(main):69 - {} - starting destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
2021-11-24 12:47:51 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):208 - Replication thread started.
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):76 - {} - Running integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
�[34msource�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:51 �[32mINFO�[m i.a.i.s.d.Db2Source(main):48 - {} - starting source: class io.airbyte.integrations.source.db2.Db2Source
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):118 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):80 - {} - Command: WRITE
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):81 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.b.s.SshTunnel(getInstance):170 - {} - Starting connection with method: NO_TUNNEL
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):96 - {} - Write config: WriteConfig{streamName=RENTOBJTRANSACT, namespace=raw_ams, outputSchemaName=raw_ams, tmpTableName=_airbyte_tmp_dqp_RENTOBJTRANSACT, outputTableName=_airbyte_raw_RENTOBJTRANSACT, syncMode=append}
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.d.b.BufferedStreamConsumer(startTracked):124 - {} - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):121 - {} - Preparing tmp tables in destination started for 1 streams
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):125 - {} - Preparing tmp table in destination started for stream RENTOBJTRANSACT. schema: raw_ams, tmp table name: _airbyte_tmp_dqp_RENTOBJTRANSACT
�[35mdestination�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:48 �[32mINFO�[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):131 - {} - Preparing tables in destination completed.
�[34msource�[0m - 2021-11-24 12:47:51 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:51 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):96 - {} - Running integration: io.airbyte.integrations.source.db2.Db2Source
�[34msource�[0m - 2021-11-24 12:47:52 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:52 �[32mINFO�[m i.a.i.b.IntegrationCliParser(parseOptions):135 - {} - integration args: {read=null, catalog=source_catalog.json, state=input_state.json, config=source_config.json}
�[34msource�[0m - 2021-11-24 12:47:52 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:52 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):100 - {} - Command: READ
�[34msource�[0m - 2021-11-24 12:47:52 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:52 �[32mINFO�[m i.a.i.b.IntegrationRunner(run):101 - {} - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='input_state.json'}
�[34msource�[0m - 2021-11-24 12:47:52 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:52 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[34msource�[0m - 2021-11-24 12:47:52 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:52 �[33mWARN�[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
�[34msource�[0m - 2021-11-24 12:47:52 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:52 �[32mINFO�[m i.a.i.s.r.CdcStateManager(<init>):46 - {} - Initialized CDC state with: null
�[34msource�[0m - 2021-11-24 12:47:52 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:52 �[32mINFO�[m i.a.i.s.r.StateManager(createCursorInfoForStream):137 - {} - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='RENTOBJTRANSACT', namespace='CRM'}, New Cursor Field: null. Resetting cursor value
�[34msource�[0m - 2021-11-24 12:47:55 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:47:55 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(queryTableFullRefresh):460 - {} - Queueing query for table: RENTOBJTRANSACT
2021-11-24 12:47:58 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:48:03 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 1000
2021-11-24 12:48:06 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 2000
2021-11-24 12:48:08 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:48:09 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 3000
2021-11-24 12:48:11 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 4000
2021-11-24 12:48:13 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 5000
2021-11-24 12:48:15 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 6000
2021-11-24 12:48:17 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 7000
2021-11-24 12:48:18 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:48:19 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 8000
2021-11-24 12:48:21 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 9000
�[34msource�[0m - 2021-11-24 12:48:23 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:48:08 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 10000
2021-11-24 12:48:23 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 10000
2021-11-24 12:48:25 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 11000
2021-11-24 12:48:26 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 12000
2021-11-24 12:48:27 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 13000
2021-11-24 12:48:28 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:48:29 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 14000
2021-11-24 12:48:30 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 15000
2021-11-24 12:48:32 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 16000
2021-11-24 12:48:33 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 17000
2021-11-24 12:48:34 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 18000
2021-11-24 12:48:35 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 19000
�[34msource�[0m - 2021-11-24 12:48:36 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:48:27 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 20000
2021-11-24 12:48:36 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 20000
2021-11-24 12:48:38 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 21000
2021-11-24 12:48:38 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:48:40 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 22000
2021-11-24 12:48:42 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 23000
2021-11-24 12:48:43 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 24000
2021-11-24 12:48:45 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 25000
2021-11-24 12:48:46 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 26000
2021-11-24 12:48:47 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 27000
2021-11-24 12:48:48 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:48:48 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 28000
2021-11-24 12:48:49 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 29000
�[34msource�[0m - 2021-11-24 12:48:50 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:48:42 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 30000
2021-11-24 12:48:50 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 30000
2021-11-24 12:48:52 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 31000
2021-11-24 12:48:53 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 32000
2021-11-24 12:48:54 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 33000
2021-11-24 12:48:55 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 34000
2021-11-24 12:48:56 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 35000
2021-11-24 12:48:57 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 36000
2021-11-24 12:48:58 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:48:58 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 37000
2021-11-24 12:48:59 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 38000
2021-11-24 12:49:00 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 39000
�[34msource�[0m - 2021-11-24 12:49:01 INFO DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-24 12:48:54 �[32mINFO�[m i.a.i.s.r.AbstractRelationalDbSource(lambda$createReadIterator$6):315 - {} - Reading stream RENTOBJTRANSACT. Records read: 40000
2021-11-24 12:49:01 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 40000
2021-11-24 12:49:02 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 41000
2021-11-24 12:49:03 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 42000
2021-11-24 12:49:04 INFO DefaultReplicationWorker(lambda$getReplicationRunnable$2):238 - Records read: 43000
2021-11-24 12:49:08 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:49:10 DEBUG DefaultAirbyteSource(close):131 - Closing source process
2021-11-24 12:49:10 DEBUG WorkerUtils(gentleClose):50 - Gently closing process airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 INFO KubePodProcess(getReturnCode):614 - Exit code for pod airbyte-source-db2-worker-18-0-tvuuf is 0
2021-11-24 12:49:10 DEBUG KubePodProcess(close):559 - Closed airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 DEBUG KubePodProcess(close):559 - Closed airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 DEBUG KubePodProcess(close):559 - Closed airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 DEBUG KubePodProcess(close):559 - Closed airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-source-db2-worker-18-0-tvuuf
2021-11-24 12:49:10 DEBUG DefaultAirbyteDestination(close):109 - Closing destination process
2021-11-24 12:49:10 DEBUG WorkerUtils(gentleClose):50 - Gently closing process airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:18 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:49:28 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:49:41 DEBUG JobSubmitter(lambda$submitJob$2):135 - Job id 18 succeeded
2021-11-24 12:49:41 DEBUG JobSubmitter(lambda$submitJob$4):166 - Job id 18 cleared
2021-11-24 12:49:38 DEBUG LogClientSingleton(setJobMdc):136 - Setting kube job mdc
2021-11-24 12:49:40 INFO KubePodProcess(getReturnCode):614 - Exit code for pod airbyte-destination-postgres-worker-18-0-beige is 0
2021-11-24 12:49:40 DEBUG KubePodProcess(close):559 - Closed airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 DEBUG KubePodProcess(close):559 - Closed airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 DEBUG KubePodProcess(close):559 - Closed airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 DEBUG KubePodProcess(close):559 - Closed airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 INFO KubePodProcess(exitValue):629 - Closed all resources for pod airbyte-destination-postgres-worker-18-0-beige
2021-11-24 12:49:40 ERROR DefaultReplicationWorker(run):146 - Sync worker failed.
java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.UncheckedIOException: java.net.SocketTimeoutException: Read timed out
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
	at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:138) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:50) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.RuntimeException: java.io.UncheckedIOException: java.net.SocketTimeoutException: Read timed out
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:256) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
	... 1 more
Caused by: java.io.UncheckedIOException: java.net.SocketTimeoutException: Read timed out
	at java.io.BufferedReader$1.hasNext(BufferedReader.java:577) ~[?:?]
	at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:?]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294) ~[?:?]
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206) ~[?:?]
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169) ~[?:?]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300) ~[?:?]
	at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[?:?]
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.isFinished(DefaultAirbyteSource.java:89) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:211) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
	... 1 more
Caused by: java.net.SocketTimeoutException: Read timed out
	at sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:283) ~[?:?]
	at sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309) ~[?:?]
	at sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350) ~[?:?]
	at sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803) ~[?:?]
	at java.net.Socket$SocketInputStream.read(Socket.java:982) ~[?:?]
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:297) ~[?:?]
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) ~[?:?]
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188) ~[?:?]
	at java.io.InputStreamReader.read(InputStreamReader.java:181) ~[?:?]
	at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:?]
	at java.io.BufferedReader.readLine(BufferedReader.java:326) ~[?:?]
	at java.io.BufferedReader.readLine(BufferedReader.java:392) ~[?:?]
	at java.io.BufferedReader$1.hasNext(BufferedReader.java:574) ~[?:?]
	at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:?]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294) ~[?:?]
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206) ~[?:?]
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169) ~[?:?]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300) ~[?:?]
	at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[?:?]
	at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.isFinished(DefaultAirbyteSource.java:89) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at io.airbyte.workers.DefaultReplicationWorker.lambda$getReplicationRunnable$2(DefaultReplicationWorker.java:211) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[?:?]
	... 1 more
2021-11-24 12:49:40 INFO DefaultReplicationWorker(run):170 - sync summary: io.airbyte.config.ReplicationAttemptSummary@4f807927[status=failed,recordsSynced=43745,bytesSynced=116945875,startTime=1637758059172,endTime=1637758180299]
2021-11-24 12:49:40 INFO DefaultReplicationWorker(run):179 - Source did not output any state messages
2021-11-24 12:49:40 WARN DefaultReplicationWorker(run):187 - State capture: No new state, falling back on input state: io.airbyte.config.State@4917da36[state={}]
2021-11-24 12:49:40 INFO TemporalAttemptExecution(get):137 - Stopping cancellation check scheduling...
2021-11-24 12:49:40 INFO ReplicationActivityImpl(replicate):121 - sync summary: io.airbyte.config.StandardSyncOutput@3bec5959[standardSyncSummary=io.airbyte.config.StandardSyncSummary@7f74a45f[status=failed,recordsSynced=43745,bytesSynced=116945875,startTime=1637758059172,endTime=1637758180299],state=io.airbyte.config.State@4917da36[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@1709445f[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@8075d05[stream=io.airbyte.protocol.models.AirbyteStream@51f8fbcd[name=RENTOBJTRANSACT,jsonSchema={"type":"object","properties":{"KM":{"type":"number"},"NOTE":{"type":"string"},"SUBNO":{"type":"number"},"PLANKM":{"type":"number"},"CUSTKEY":{"type":"number"},"EVENTID":{"type":"string"},"RCPOSNO":{"type":"number"},"SYS_END":{"type":"string"},"TRANSNO":{"type":"number"},"EXTCODE1":{"type":"string"},"EXTCODE2":{"type":"string"},"EXTCODE3":{"type":"string"},"EXTCODE4":{"type":"string"},"EXTCODE5":{"type":"string"},"ISACTIVE":{"type":"number"},"PRCALCKM":{"type":"number"},"ROBSTATE":{"type":"string"},"TRANSNOC":{"type":"string"},"TRANS_ID":{"type":"string"},"CID_CREDA":{"type":"string"},"CID_CRENO":{"type":"number"},"CID_MODDA":{"type":"string"},"CID_MODNO":{"type":"number"},"COLLECTKM":{"type":"number"},"COMPANYID":{"type":"number"},"ERRORTYPE":{"type":"string"},"EXTPLNAME":{"type":"string"},"HASDAMAGE":{"type":"number"},"ISCOLLECT":{"type":"number"},"RENTOBJID":{"type":"number"},"SENDINDEX":{"type":"number"},"SYS_START":{"type":"string"},"CUSTKEYREP":{"type":"number"},"DELIVERYKM":{"type":"number"},"ISDELIVERY":{"type":"number"},"LICENCEEID":{"type":"number"},"QUEUESTATE":{"type":"string"},"RCPRINTPOS":{"type":"string"},"BILLINGFLAG":{"type":"string"},"CALCRENTDAY":{"type":"number"},"CALCRENTMIN":{"type":"number"},"CHECKINDATE":{"type":"string"},"CHECKINNOTE":{"type":"string"},"CHECKINTIME":{"type":"string"},"COLLECTADDR":{"type":"number"},"COLLECTDATE":{"type":"string"},"COLLECTTIME":{"type":"string"},"COUNTDRIVER":{"type":"number"},"CUSTLOCADDR":{"type":"number"},"DISCPERCENT":{"type":"number"},"PRCALCKMSUM":{"type":"number"},"PRICELISTID":{"type":"number"},"SEWRITESUMS":{"type":"number"},"AMOUNTSUMNET":{"type":"number"},"AMOUNTSUMTAX":{"type":"number"},"BILLPRINTPOS":{"type":"string"},"CALCKMPLINCL":{"type":"number"},"CALCPRICEDAY":{"type":"number"},"CALCPRICEMIN":{"type":"number"},"CALCRENTHOUR":{"type":"number"},"CALCRENTWEEK":{"type":"number"},"CHECKOUTDATE":{"type":"string"},"CHECKOUTNOTE":{"type":"string"},"CHECKOUTTIME":{"type":"string"},"CONTRACTTYPE":{"type":"string"},"DELIVERYADDR":{"type":"number"},"DELIVERYDATE":{"type":"string"},"DELIVERYTIME":{"type":"string"},"EXTPRICECALC":{"type":"number"},"ISSTATIONCHG":{"type":"number"},"PRICECALCMSG":{"type":"string"},"TRANSACTTYPE":{"type":"string"},"TRANSACTWEEK":{"type":"number"},"TRANSACTYEAR":{"type":"number"},"AMOUNTPARTSUM":{"type":"number"},"AMOUNTSUMDISC":{"type":"number"},"AUTOPRICELIST":{"type":"number"},"CALCKMCHARGED":{"type":"number"},"CALCPRICEHOUR":{"type":"number"},"CALCPRICEWEEK":{"type":"number"},"CALCRENTMONTH":{"type":"number"},"CONTRACTSTATE":{"type":"string"},"COUNTPRINTMOV":{"type":"number"},"COUNTTIREDATA":{"type":"number"},"ISLICENCEECHG":{"type":"number"},"ORG_CALCCODE1":{"type":"string"},"ORG_CALCCODE2":{"type":"string"},"ORG_CALCCODE3":{"type":"string"},"ORG_CALCCODE4":{"type":"string"},"ORG_CALCCODE5":{"type":"string"},"ORG_CALCCODE6":{"type":"string"},"ORG_CALCCODE7":{"type":"string"},"ORG_CALCCODE8":{"type":"string"},"ORG_CALCCODE9":{"type":"string"},"ORG_REGNUMBER":{"type":"string"},"TRANSACTMONTH":{"type":"number"},"AMOUNTSUMGROSS":{"type":"number"},"AMOUNTSUMTOTAL":{"type":"number"},"BUSINESSAREAID":{"type":"number"},"CALCPRICEMONTH":{"type":"number"},"CHECKINTANKCAP":{"type":"string"},"DOWRITEHISTORY":{"type":"number"},"IGNOREBUFFERCI":{"type":"number"},"IGNOREBUFFERCO":{"type":"number"},"ISALIENRENTOBJ":{"type":"number"},"ORG_CALCCODE10":{"type":"string"},"ORG_CALCCODE11":{"type":"string"},"ORG_CALCCODE12":{"type":"string"},"ORG_CLASSIFIC1":{"type":"number"},"ORG_CLASSIFIC2":{"type":"number"},"ORG_CLASSIFIC3":{"type":"number"},"ORG_CLASSIFIC4":{"type":"number"},"ORG_CLASSIFIC5":{"type":"number"},"ORG_CLASSIFIC6":{"type":"number"},"ORG_CLASSIFIC7":{"type":"number"},"ORG_CLASSIFIC8":{"type":"number"},"ORG_CLASSIFIC9":{"type":"number"},"ORG_UNITNUMBER":{"type":"string"},"PLANKMCALCBASE":{"type":"string"},"RENTCONTRACTID":{"type":"number"},"BROKERSTATIONID":{"type":"number"},"CHECKOUTTANKCAP":{"type":"string"},"COLLECTCUSTPERS":{"type":"number"},"CURRENTPOSITION":{"type":"number"},"DELIVERCUSTPERS":{"type":"number"},"ORG_CLASSIFIC10":{"type":"number"},"ORG_CLASSIFIC11":{"type":"number"},"ORG_CLASSIFIC12":{"type":"number"},"PLANCHECKINDATE":{"type":"string"},"PLANCHECKINTIME":{"type":"string"},"PRESERVICEROTID":{"type":"number"},"RELOCATIONROTID":{"type":"number"},"SUCSERVICEROTID":{"type":"number"},"CALCRENTDAYTOTAL":{"type":"number"},"CALCRENTTOTALMIN":{"type":"number"},"CANCELLATIONDATE":{"type":"string"},"CANCELLATIONEMPL":{"type":"number"},"CANCELLATIONTYPE":{"type":"string"},"CHECKINKILOMETER":{"type":"number"},"CHECKINSTATIONID":{"type":"number"},"CHECKINTANKLITRE":{"type":"number"},"CHECKINTIMESTAMP":{"type":"string"},"CLASSIFICATIONID":{"type":"number"},"COLLECTSERVICEID":{"type":"number"},"CUSTKEYSECRETARY":{"type":"number"},"EXTPRICECALCTYPE":{"type":"string"},"MOVEMENTCHAINEND":{"type":"string"},"MULTIPLEOBJPOSNO":{"type":"number"},"PLANCHECKOUTDATE":{"type":"string"},"PLANCHECKOUTTIME":{"type":"string"},"CHECKINEMPLOYEEID":{"type":"number"},"CHECKINPOSITIONID":{"type":"number"},"CHECKOUTKILOMETER":{"type":"number"},"CHECKOUTSTATIONID":{"type":"number"},"CHECKOUTTANKLITRE":{"type":"number"},"CHECKOUTTIMESTAMP":{"type":"string"},"DAMAGEBALANCETYPE":{"type":"string"},"DELIVERYSERVICEID":{"type":"number"},"PLANKMPERCALCBASE":{"type":"number"},"PRCALCCHECKINDATE":{"type":"string"},"PRCALCCHECKINTIME":{"type":"string"},"RENTOBJTRANSACTID":{"type":"number"},"BILLINGCHECKINDATE":{"type":"string"},"BILLINGCHECKINTIME":{"type":"string"},"CANCELLATIONNOTICE":{"type":"string"},"CHECKOUTEMPLOYEEID":{"type":"number"},"CHECKOUTPOSITIONID":{"type":"number"},"CLASSIFICATIONCODE":{"type":"string"},"ISSELECTEDMULTIPLE":{"type":"number"},"MOVEMENTCHAINBEGIN":{"type":"string"},"PRCALCCHECKOUTDATE":{"type":"string"},"PRCALCCHECKOUTTIME":{"type":"string"},"RENTDURATIONINDAYS":{"type":"number"},"AMOUNTSUMTAXROUNDED":{"type":"number"},"BILLINGCHECKOUTDATE":{"type":"string"},"BILLINGCHECKOUTTIME":{"type":"string"},"CONTRACTCUST_RIDREP":{"type":"number"},"CALCCHECKINTIMESTAMP":{"type":"string"},"CLASSIFICATIONIDCUST":{"type":"number"},"PLANCHECKINSTATIONID":{"type":"number"},"CALCCHECKOUTTIMESTAMP":{"type":"string"},"COUNTCOCIPROTABDYNVAL":{"type":"number"},"PLANCHECKINPOSITIONID":{"type":"number"},"RENEWPLANCIDISALLOWED":{"type":"number"},"CLASSIFICATIONIDORIGIN":{"type":"number"},"CNTCOCIPROTWILLGENSERV":{"type":"number"},"ISCLASSIFICATIONLOCKED":{"type":"number"},"PRCALCCHECKINSTATIONID":{"type":"number"},"PRCALCCHECKINTIMESTAMP":{"type":"string"},"PRCALCCHECKINPOSITIONID":{"type":"number"},"PRCALCCHECKOUTTIMESTAMP":{"type":"string"},"RENTCONTRACTIDDEPENDSTO":{"type":"number"},"RENTOBJTRANSACTPARENTID":{"type":"number"},"BILLINGCHECKINVALUEEXIST":{"type":"number"},"BILLINGCHECKOUTSTATIONID":{"type":"number"},"COUNTRENTOBJTRANSACTPROP":{"type":"number"},"BILLINGCHECKOUTVALUEEXIST":{"type":"number"},"PLANCHECKINDATEATCHECKOUT":{"type":"string"},"PLANCHECKINTIMEATCHECKOUT":{"type":"string"},"CHECKBREAKDOWNDATACOMPLETED":{"type":"number"},"CONTRACTSTATEFORCHARGINGMOVEMENT":{"type":"string"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[[RENTOBJTRANSACTID]],namespace=raw_ams,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=append,primaryKey=[[RENTOBJTRANSACTID]],additionalProperties={}]],additionalProperties={}]]
2021-11-24 12:49:40 INFO ConfigRepository(updateConnectionState):518 - Updating connection 09fbbf30-9629-41cf-98f7-eb8aee3f9fa6 state: io.airbyte.config.State@2aded8b4[state={}]
2021-11-24 12:49:40 INFO DatabaseConfigPersistence(updateConfigRecord):273 - Updating STANDARD_SYNC_STATE record 09fbbf30-9629-41cf-98f7-eb8aee3f9fa6

I don't have to manually cancel it then at least, but it still fails on a different number of records read each time.

@sherifnada
Copy link
Contributor

sherifnada commented Nov 24, 2021

Helpful context from @Dracyr :

I've run a pipeline against a db2 instance (with the same schema, but not as much data) locally, using docker-compose. This succeeded.
The db2 instance I'm testing against is only accessible from our cloud infrastructure, so that's harder.

I've also created and exec'd into a source-db2 container, and run it with same catalog/config, and could output all the data to the shell sucessfully as well.

# Source container running in cluster, running 'sleep infinity' with same source_config.json/source_catalog.json copied in
pv@localhost$: kubectl -n airbyte exec -it airbyte-source-db2-worker-pv /bin/bash
airbyte@airbyte-source-db2-worker-pv:/config$ eval "$AIRBYTE_ENTRYPOINT read --config source_config.json --catalog source_catalog.json" > records.txt

This confirms that the source connector can successfully run using exactly the same settings as the source connector in the failing sync:

  • credentials
  • schema selection
  • infrastructure (it even ran on exactly the same pod)

So the issue is probably in how the worker consumes records from the connector

@Dracyr
Copy link
Contributor

Dracyr commented Nov 24, 2021

Also tried to verify or rule out the network doing stuff now, started up a destination pod and piped the data through to the other sides stdout and printed it in the shell.

# Destination pod, open socat, print to stdout
$ socat -d TCP-L:9001 STDOUT
# Source pod, socat container, ip of 'airbyte-destination-postgres-worker-pv'
$ cat /pipes/stdout | socat -d - TCP:192.168.130.40:9001
# Source pod, pipe to stdout pipe
airbyte@airbyte-source-db2-worker-pv:/config$ eval "$AIRBYTE_ENTRYPOINT read --config source_config.json --catalog source_catalog.json" > /pipes/stdout

And all records seem to have made it through to the destination pod successfully as well. Which further points to the issue being in the worker consuming the records.

@Dracyr
Copy link
Contributor

Dracyr commented Nov 26, 2021

Another update, I was experimenting a bit with other tables in the source database, to see if the behavior changed. And for some combinations, I can get successful syncs.

  • table_ren
    • completed, recordsSynced=7925, bytesSynced=15095256, duration=20s
    • Tried three times, each worked
  • table_car, table_cla, table_zco
    • completed, recordsSynced=8092, bytesSynced=3938856, duration=24s
  • table_car, table_cla, table_zco, table_ren
    • Fails, tried four times.

So it doesn't look like anything in the source data, since the tables can be synced on their own, but not together.

I also tried connecting it to another postgres (so source postgres, destination postgres) and the final log before hanging was Records read: 126000 after 45s. So might also be something happening when the job is close to or has finished?

So I'm assuming something in the k8s cluster or networking is responsible somehow. If you have some code you'd like me to run in order to test something, I'd be able to compile and run it as well.

@Dracyr
Copy link
Contributor

Dracyr commented Dec 3, 2021

I'm a bit at the end of my abilities too look into this further, if you have any idea of something to try out or run, let me know.

@sherifnada sherifnada added the area/platform issues related to the platform label Dec 3, 2021
@danieldiamond
Copy link
Contributor

FYI I still experience this issue with EC2 Docker and 0.33.11-alpha
See original issue: #5754

@lmossman
Copy link
Contributor

@Dracyr sorry for the delay on a response here.

If you are still able to, could you kubectl describe the source and destination pods while they are in this hanging state, and paste the output here?

@Dracyr
Copy link
Contributor

Dracyr commented Dec 16, 2021

Hey!

Sorry that's going to be difficult right now, I was trying to set up an ELT pipeline for my client as a Christmas gift as one of the final things before my project ended, and now I'm going to move on to another one.

From memory however:

  • The source container completed, so there would be no pod to kubectl describe
  • The destination pod was running, but IIRC there were no differences between a successful or failing run (except things like the ports which are different for every run)

@edgao
Copy link
Contributor

edgao commented Oct 27, 2022

@grishick is this intended to be in the destinations in-progress column?

@davinchia
Copy link
Contributor

Closing since this is old and doesn't seem relevant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests