Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Weaviate: update vector: connection to: OpenAI API failed with status: 500 error #31991

Closed
1 task
valebedu opened this issue Oct 31, 2023 · 14 comments · Fixed by #32038
Closed
1 task

Comments

@valebedu
Copy link

Connector Name

destination-weaviate

Connector Version

0.2.5

What step the error happened?

During the sync

Relevant information

Context

I'm syncing a table of 308k articles from Postgres to Weaviate and the sync failed because OpenAI return a 500 error for a single element.

update vector: connection to: OpenAI API failed with status: 500 error

The current batch of 128 elements failed and should be retried or perhaps it's better to stop it and retry it on the next job attempt.

But all the previous lines correctly added to Weaviate and vectorize should not be re added to Weaviate and revectorize, it incurs time and cost that can be avoided.

Obtained Result

Lines correctly added are revectorize

Expected Result

Lines correctly added are marked as added and are not revectorize

Step To Reproduce

Unfortunately to reproduce it you need to sync a large amount of data (really small) to weaviate in order to generate a lot of vector and be unlucky in order to get a 500 error

Or you can force the error with a proxy

Config

  • Airbyte machine: t3.large (2vCPU, 8Gb mem)
  • Airbyte config: default with override on mem limit JOB_MAIN_CONTAINER_MEMORY_LIMIT=4096Mi
  • Weaviate machine: : t4g.large (2vCPU, 8Gb mem)
  • Weaviate config: default with only OpenAI vectorizer with GOMEMLIMIT=2048MiB
  • Weaviate connector config:
    • Chunck size: 7372
    • Text fields to embed: publisher, publication_date, language_code, title, body to build a custom field with secured token length
    • Text splitter: By separator with Keep separator enable
    • Embedding: No external embedding to let Weaviate do the vectorization job
    • Batch size: 128 (default)

Relevant log output

2023-10-30 20:57:14 INFO i.a.w.t.TemporalAttemptExecution(get):124 - Docker volume job log path: /tmp/workspace/5/0/logs.log
2023-10-30 20:57:14 INFO i.a.w.t.TemporalAttemptExecution(get):129 - Executing worker wrapper. Airbyte version: 0.50.33
2023-10-30 20:57:14 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):290 - Attempt 0 to save workflow id for cancellation
2023-10-30 20:57:14 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:14 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:14 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:14 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:14 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2023-10-30 20:57:14 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:14 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- START CHECK -----
2023-10-30 20:57:14 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:14 INFO i.a.c.i.LineGobbler(voidCall):149 - Checking if airbyte/source-postgres:3.2.14 exists...
2023-10-30 20:57:14 INFO i.a.c.i.LineGobbler(voidCall):149 - airbyte/source-postgres:3.2.14 was found locally.
2023-10-30 20:57:14 INFO i.a.w.p.DockerProcessFactory(create):140 - Creating docker container = source-postgres-check-5-0-yacmj with resources io.airbyte.config.ResourceRequirements@5d70e9e3[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=4096Mi,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@63e36538[hosts=[10.0.5.200, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2023-10-30 20:57:14 INFO i.a.w.p.DockerProcessFactory(create):187 - Preparing command: docker run --rm --init -i -w /data/5/0 --log-driver none --name source-postgres-check-5-0-yacmj --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-postgres:3.2.14 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.50.33 -e WORKER_JOB_ID=5 --memory=4096Mi airbyte/source-postgres:3.2.14 check --config source_config.json
2023-10-30 20:57:14 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):186 - Reading messages from protocol version 0.2.0
2023-10-30 20:57:16 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:16 INFO i.a.i.s.p.PostgresSource(main):714 - starting source: class io.airbyte.integrations.source.postgres.PostgresSource
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO i.a.c.i.b.IntegrationCliParser(parseOptions):126 - integration args: {check=null, config=source_config.json}
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO i.a.c.i.b.IntegrationRunner(runInternal):132 - Running integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO i.a.c.i.b.IntegrationRunner(runInternal):133 - Command: CHECK
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO i.a.c.i.b.IntegrationRunner(runInternal):134 - Integration config: IntegrationConfig{command=CHECK, configPath='source_config.json', catalogPath='null', statePath='null'}
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword groups - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword group - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword display_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO i.a.c.i.b.s.SshTunnel(getInstance):204 - Starting connection with method: NO_TUNNEL
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO i.a.i.s.p.PostgresUtils(isCdc):59 - using CDC: false
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):770 - REQUIRED toSslJdbcParam require
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO c.z.h.HikariDataSource(<init>):80 - HikariPool-1 - Starting...
2023-10-30 20:57:17 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:17 INFO c.z.h.HikariDataSource(<init>):82 - HikariPool-1 - Start completed.
2023-10-30 20:57:18 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:18 INFO i.a.i.s.p.PostgresUtils(isCdc):59 - using CDC: false
2023-10-30 20:57:18 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:18 INFO i.a.i.s.p.PostgresUtils(isCdc):59 - using CDC: false
2023-10-30 20:57:18 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:18 INFO i.a.c.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):141 - Attempting to get metadata from the database to see if we can connect.
2023-10-30 20:57:19 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:19 INFO c.z.h.HikariDataSource(close):350 - HikariPool-1 - Shutdown initiated...
2023-10-30 20:57:19 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:19 INFO c.z.h.HikariDataSource(close):352 - HikariPool-1 - Shutdown completed.
2023-10-30 20:57:19 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:19 INFO i.a.c.i.b.IntegrationRunner(runInternal):228 - Completed integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource
2023-10-30 20:57:19 INFO i.a.w.i.VersionedAirbyteStreamFactory(toAirbyteMessage):385 - 2023-10-30 20:57:19 INFO i.a.i.s.p.PostgresSource(main):716 - completed source: class io.airbyte.integrations.source.postgres.PostgresSource
2023-10-30 20:57:19 INFO i.a.w.g.DefaultCheckConnectionWorker(run):117 - Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@3290291a[status=succeeded,message=<null>,additionalProperties={}]
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- END CHECK -----
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:19 INFO i.a.w.t.TemporalAttemptExecution(get):124 - Docker volume job log path: /tmp/workspace/5/0/logs.log
2023-10-30 20:57:19 INFO i.a.w.t.TemporalAttemptExecution(get):129 - Executing worker wrapper. Airbyte version: 0.50.33
2023-10-30 20:57:19 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):290 - Attempt 0 to save workflow id for cancellation
2023-10-30 20:57:19 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:19 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:19 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:19 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:19 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- START CHECK -----
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - Checking if airbyte/destination-weaviate:0.2.5 exists...
2023-10-30 20:57:19 INFO i.a.c.i.LineGobbler(voidCall):149 - airbyte/destination-weaviate:0.2.5 was found locally.
2023-10-30 20:57:19 INFO i.a.w.p.DockerProcessFactory(create):140 - Creating docker container = destination-weaviate-check-5-0-kggyx with resources io.airbyte.config.ResourceRequirements@5e7d5b0d[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=4096Mi,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@56754615[hosts=[http://10.0.6.217:8080, api.openai.com, api.cohere.ai, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2023-10-30 20:57:19 INFO i.a.w.p.DockerProcessFactory(create):187 - Preparing command: docker run --rm --init -i -w /data/5/0 --log-driver none --name destination-weaviate-check-5-0-kggyx --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-weaviate:0.2.5 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.50.33 -e WORKER_JOB_ID=5 --memory=4096Mi airbyte/destination-weaviate:0.2.5 check --config source_config.json
2023-10-30 20:57:19 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):186 - Reading messages from protocol version 0.2.0
2023-10-30 20:57:23 INFO i.a.w.g.DefaultCheckConnectionWorker(run):117 - Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@e8f91fe[status=succeeded,message=<null>,additionalProperties={}]
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- END CHECK -----
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:23 INFO i.a.w.t.TemporalAttemptExecution(get):124 - Docker volume job log path: /tmp/workspace/5/0/logs.log
2023-10-30 20:57:23 INFO i.a.w.t.TemporalAttemptExecution(get):129 - Executing worker wrapper. Airbyte version: 0.50.33
2023-10-30 20:57:23 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):290 - Attempt 0 to save workflow id for cancellation
2023-10-30 20:57:23 INFO i.a.w.g.BufferedReplicationWorker(run):152 - start sync worker. job id: 5 attempt id: 0
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- START REPLICATION -----
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-10-30 20:57:23 INFO i.a.w.i.DefaultAirbyteDestination(start):92 - Running destination...
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2023-10-30 20:57:23 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - Checking if airbyte/destination-weaviate:0.2.5 exists...
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - Checking if airbyte/source-postgres:3.2.14 exists...
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - airbyte/source-postgres:3.2.14 was found locally.
2023-10-30 20:57:23 INFO i.a.w.p.DockerProcessFactory(create):140 - Creating docker container = source-postgres-read-5-0-guyng with resources io.airbyte.config.ResourceRequirements@7785950[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=4096Mi,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@355f66fe[hosts=[10.0.5.200, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2023-10-30 20:57:23 INFO i.a.w.p.DockerProcessFactory(create):187 - Preparing command: docker run --rm --init -i -w /data/5/0 --log-driver none --name source-postgres-read-5-0-guyng -e CONCURRENT_SOURCE_STREAM_READ=false --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-postgres:3.2.14 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.50.33 -e WORKER_JOB_ID=5 --memory=4096Mi airbyte/source-postgres:3.2.14 read --config source_config.json --catalog source_catalog.json
2023-10-30 20:57:23 INFO i.a.c.i.LineGobbler(voidCall):149 - airbyte/destination-weaviate:0.2.5 was found locally.
2023-10-30 20:57:23 INFO i.a.w.p.DockerProcessFactory(create):140 - Creating docker container = destination-weaviate-write-5-0-fejty with resources io.airbyte.config.ResourceRequirements@71588002[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=4096Mi,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@35adc925[hosts=[http://10.0.6.217:8080, api.openai.com, api.cohere.ai, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2023-10-30 20:57:23 INFO i.a.w.p.DockerProcessFactory(create):187 - Preparing command: docker run --rm --init -i -w /data/5/0 --log-driver none --name destination-weaviate-write-5-0-fejty --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-weaviate:0.2.5 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.50.33 -e WORKER_JOB_ID=5 --memory=4096Mi airbyte/destination-weaviate:0.2.5 write --config destination_config.json --catalog destination_catalog.json
2023-10-30 20:57:23 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):186 - Reading messages from protocol version 0.2.0
2023-10-30 20:57:23 INFO i.a.w.i.VersionedAirbyteMessageBufferedWriterFactory(createWriter):41 - Writing messages to protocol version 0.2.0
2023-10-30 20:57:23 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):186 - Reading messages from protocol version 0.2.0
2023-10-30 20:57:23 INFO i.a.w.g.BufferedReplicationWorker(readFromSource):371 - readFromSource: start
2023-10-30 20:57:23 INFO i.a.w.i.HeartbeatTimeoutChaperone(runWithHeartbeatThread):98 - Starting source heartbeat check. Will check every 1 minutes.
2023-10-30 20:57:23 INFO i.a.w.g.BufferedReplicationWorker(processMessage):407 - processMessage: start
2023-10-30 20:57:23 INFO i.a.w.g.BufferedReplicationWorker(readFromDestination):481 - readFromDestination: start
2023-10-30 20:57:23 INFO i.a.w.g.BufferedReplicationWorker(writeToDestination):448 - writeToDestination: start
2023-10-30 20:57:30 destination > Begin writing to the destination...
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 INFO i.a.i.s.p.PostgresSource(main):714 - starting source: class io.airbyte.integrations.source.postgres.PostgresSource
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 INFO i.a.c.i.b.IntegrationCliParser(parseOptions):126 - integration args: {read=null, catalog=source_catalog.json, config=source_config.json}
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 INFO i.a.c.i.b.IntegrationRunner(runInternal):132 - Running integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 INFO i.a.c.i.b.IntegrationRunner(runInternal):133 - Command: READ
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 INFO i.a.c.i.b.IntegrationRunner(runInternal):134 - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'}
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword groups - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword group - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:30 source > 2023-10-30 20:57:30 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:31 source > 2023-10-30 20:57:31 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:31 source > 2023-10-30 20:57:31 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword display_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-30 20:57:32 source > 2023-10-30 20:57:32 INFO i.a.c.i.b.s.SshTunnel(getInstance):204 - Starting connection with method: NO_TUNNEL
2023-10-30 20:57:32 source > 2023-10-30 20:57:32 INFO i.a.i.s.p.PostgresUtils(isCdc):59 - using CDC: false
2023-10-30 20:57:32 source > 2023-10-30 20:57:32 INFO i.a.c.i.s.r.s.StateManagerFactory(createStateManager):55 - Stream state manager selected to manage state object with type STREAM.
2023-10-30 20:57:32 source > 2023-10-30 20:57:32 INFO i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream):192 - No cursor field set in catalog but not present in state. Stream: www_query_article, New Cursor Field: updated_at. Resetting cursor value
2023-10-30 20:57:32 source > 2023-10-30 20:57:32 INFO i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):770 - REQUIRED toSslJdbcParam require
2023-10-30 20:57:32 source > 2023-10-30 20:57:32 INFO c.z.h.HikariDataSource(<init>):80 - HikariPool-1 - Starting...
2023-10-30 20:57:32 source > 2023-10-30 20:57:32 INFO c.z.h.HikariDataSource(<init>):82 - HikariPool-1 - Start completed.
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.i.s.p.PostgresUtils(isCdc):59 - using CDC: false
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.c.i.s.j.AbstractJdbcSource(logPreSyncDebugData):462 - Data source product recognized as PostgreSQL:14.8
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.i.s.p.PostgresSource(logPreSyncDebugData):271 - Discovering indexes for schema "www_query", table "article"
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.i.s.p.PostgresSource(logPreSyncDebugData):283 - Xmin Status : {Number of wraparounds: 0, Xmin Transaction Value: 161132847, Xmin Raw Value: 161132847
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.c.i.s.j.AbstractJdbcSource(discoverInternal):170 - Internal schemas to exclude: [catalog_history, information_schema, pg_catalog, pg_internal]
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):31 - Set initial fetch size: 10 rows
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(accept):40 - Set new fetch size: 939584 rows
2023-10-30 20:57:34 source > 2023-10-30 20:57:34 INFO i.a.c.d.j.s.TwoStageSizeEstimator(getTargetBufferByteSize):71 - Max memory limit: 3221225472, JDBC buffer size: 1932735283
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.i.s.p.PostgresUtils(isCdc):59 - using CDC: false
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.i.s.p.PostgresUtils(isXmin):163 - using Xmin: false
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream):192 - No cursor field set in catalog but not present in state. Stream: www_query_article, New Cursor Field: updated_at. Resetting cursor value
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.i.s.p.c.CursorBasedCtidUtils(lambda$reclassifyCategorisedCtidStream$2):113 - Reclassified www_query.article as standard stream
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.i.s.p.PostgresSource(getIncrementalIterators):593 - No Streams will be synced via ctid.
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.i.s.p.PostgresSource(getIncrementalIterators):600 - Streams to be synced via cursor : 1
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.i.s.p.PostgresSource(getIncrementalIterators):601 - Streams: www_query.article
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.c.i.s.j.AbstractJdbcSource(queryTableFullRefresh):114 - Queueing query for table: article
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.c.i.s.r.RelationalDbQueryUtils(lambda$queryTable$0):73 - Queueing query: SELECT "uuid","publisher","article_type","publication_date","language_code","title","body","short_body","industries","formats","categories","created_at","updated_at" FROM "www_query"."article" ORDER BY "updated_at" ASC
2023-10-30 20:57:36 source > 2023-10-30 20:57:36 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):31 - Set initial fetch size: 10 rows
2023-10-30 21:01:04 source > 2023-10-30 21:01:04 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(accept):40 - Set new fetch size: 7262 rows
2023-10-30 21:01:06 source > 2023-10-30 21:01:06 INFO i.a.c.d.j.s.TwoStageSizeEstimator(getTargetBufferByteSize):71 - Max memory limit: 3221225472, JDBC buffer size: 1932735283
2023-10-30 21:01:06 source > 2023-10-30 21:01:06 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:33:07.030398Z (count 1)
2023-10-30 21:01:06 destination > /usr/local/lib/python3.9/site-packages/weaviate/warnings.py:80: DeprecationWarning: Dep002: You are batching manually. This means you are NOT using the client's built-in
2023-10-30 21:01:06 destination >             multi-threading. Setting `batch_size` in `client.batch.configure()`  to an int value will enabled automatic
2023-10-30 21:01:06 destination >             batching. See:
2023-10-30 21:01:06 destination >             https://weaviate.io/developers/weaviate/current/restful-api-references/batch.html#example-request-1
2023-10-30 21:01:06 destination >   warnings.warn(
2023-10-30 21:01:06 source > 2023-10-30 21:01:06 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:33:08.070224Z (count 1)
...
2023-10-30 21:01:08 source > 2023-10-30 21:01:08 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:33:51.451842Z (count 1)
2023-10-30 21:01:08 source > 2023-10-30 21:01:08 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(accept):40 - Set new fetch size: 33381 rows
2023-10-30 21:01:08 source > 2023-10-30 21:01:08 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:33:52.688181Z (count 1)
...
2023-10-30 21:05:36 source > 2023-10-30 21:04:56 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:34:28.780096Z (count 1)
2023-10-30 21:05:36 source > 2023-10-30 21:04:56 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(accept):40 - Set new fetch size: 24114 rows
2023-10-30 21:05:50 source > 2023-10-30 21:05:08 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:34:29.907637Z (count 1)
...
2023-10-30 21:07:16 source > 2023-10-30 21:06:32 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:34:40.942464Z (count 1)
2023-10-30 21:07:32 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 5000 (24 MB)
2023-10-30 21:07:32 source > 2023-10-30 21:06:47 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:34:43.963349Z (count 1)
...
2023-10-30 21:14:21 source > 2023-10-30 21:13:55 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:40:06.884466Z (count 1)
2023-10-30 21:14:21 source > 2023-10-30 21:13:55 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 10000
2023-10-30 21:14:22 source > 2023-10-30 21:13:55 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:40:08.129606Z (count 1)
...
2023-10-30 21:15:56 source > 2023-10-30 21:15:15 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:40:27.508221Z (count 1)
2023-10-30 21:15:56 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 10000 (50 MB)
2023-10-30 21:15:56 source > 2023-10-30 21:15:15 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-29T00:40:29.889293Z (count 1)
...
2023-10-30 21:23:53 source > 2023-10-30 21:22:13 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:06:57.867758Z (count 1)
2023-10-30 21:24:06 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 15000 (71 MB)
2023-10-30 21:24:06 source > 2023-10-30 21:22:13 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:07:07.076499Z (count 1)
...
2023-10-30 21:29:50 source > 2023-10-30 21:27:32 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:27:43.534512Z (count 1)
2023-10-30 21:30:02 source > 2023-10-30 21:27:32 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 20000
2023-10-30 21:30:02 source > 2023-10-30 21:27:32 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:27:52.271841Z (count 1)
...
2023-10-30 21:31:26 source > 2023-10-30 21:28:06 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:29:08.457694Z (count 1)
2023-10-30 21:31:37 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 20000 (80 MB)
2023-10-30 21:31:37 source > 2023-10-30 21:28:06 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:29:19.045108Z (count 1)
...
2023-10-30 21:38:57 source > 2023-10-30 21:36:22 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:52:06.608261Z (count 1)
2023-10-30 21:39:20 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 25000 (90 MB)
2023-10-30 21:39:20 source > 2023-10-30 21:36:22 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:52:14.116483Z (count 1)
2023-10-30 21:39:32 source > 2023-10-30 21:38:33 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:52:21.882260Z (count 1)
2023-10-30 21:39:43 source > 2023-10-30 21:38:33 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:52:29.713353Z (count 1)
2023-10-30 21:39:44 source > 2023-10-30 21:38:33 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T15:52:37.344430Z (count 1)
...
2023-10-30 21:44:30 source > 2023-10-30 21:40:43 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:02:59.302593Z (count 1)
2023-10-30 21:44:42 source > 2023-10-30 21:40:43 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 30000
2023-10-30 21:44:54 source > 2023-10-30 21:40:43 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:03:26.993520Z (count 1)
...
2023-10-30 21:45:05 source > 2023-10-30 21:42:47 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:04:12.494289Z (count 1)
2023-10-30 21:45:37 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 30000 (96 MB)
2023-10-30 21:45:37 source > 2023-10-30 21:42:47 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:04:17.354037Z (count 2)
...
2023-10-30 21:53:27 source > 2023-10-30 21:50:25 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:07:46.354702Z (count 2)
2023-10-30 21:53:58 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 35000 (99 MB)
2023-10-30 21:53:58 source > 2023-10-30 21:50:25 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:07:51.100088Z (count 2)
...
2023-10-30 22:01:03 source > 2023-10-30 21:57:26 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:10:39.270644Z (count 2)
2023-10-30 22:01:03 source > 2023-10-30 21:57:26 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 40000
2023-10-30 22:01:03 source > 2023-10-30 21:57:26 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:10:43.480687Z (count 1)
2023-10-30 22:01:03 source > 2023-10-30 21:57:26 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:10:48.679958Z (count 2)
2023-10-30 22:02:12 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 40000 (102 MB)
2023-10-30 22:02:55 source > 2023-10-30 21:59:20 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:23:01.630742Z (count 1)
...
2023-10-30 22:09:27 source > 2023-10-30 22:04:06 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:46:47.322341Z (count 1)
2023-10-30 22:09:49 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 45000 (105 MB)
2023-10-30 22:09:49 source > 2023-10-30 22:04:06 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:46:56.872338Z (count 1)
...
2023-10-30 22:15:54 source > 2023-10-30 22:11:32 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:57:33.594463Z (count 1)
2023-10-30 22:16:28 source > 2023-10-30 22:14:00 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 50000
2023-10-30 22:16:28 source > 2023-10-30 22:14:00 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:57:50.685326Z (count 1)
...
2023-10-30 22:17:14 source > 2023-10-30 22:14:00 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:59:43.145324Z (count 1)
2023-10-30 22:17:49 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 50000 (108 MB)
2023-10-30 22:17:49 source > 2023-10-30 22:14:00 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T16:59:52.155164Z (count 1)
...
2023-10-30 22:24:26 source > 2023-10-30 22:20:20 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T17:04:48.493598Z (count 3)
2023-10-30 22:24:48 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 55000 (110 MB)
2023-10-30 22:24:48 source > 2023-10-30 22:20:20 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-11-30T17:07:02.281291Z (count 1)
...
2023-10-30 22:30:42 source > 2023-10-30 22:29:56 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-12-01T15:39:34.733005Z (count 4)
2023-10-30 22:31:14 source > 2023-10-30 22:30:42 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 60000
2023-10-30 22:31:14 source > 2023-10-30 22:30:42 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-12-02T21:37:12.023326Z (count 1)
...
2023-10-30 22:33:46 source > 2023-10-30 22:33:08 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-12-17T12:38:11.455679Z (count 1)
2023-10-30 22:33:46 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 60000 (120 MB)
2023-10-30 22:33:46 source > 2023-10-30 22:33:08 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-12-19T19:15:13.258501Z (count 2)
2023-10-30 22:34:00 source > 2023-10-30 22:33:21 INFO i.a.c.i.s.r.StateDecoratingIterator(createStateMessage):207 - State report for stream www_query_article - original: null = null (count 0) -> latest: updated_at = 2022-12-23T11:12:42.222164Z (count 1)
2023-10-30 22:41:34 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 65000 (130 MB)
2023-10-30 22:46:23 source > 2023-10-30 22:44:50 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 70000
2023-10-30 22:49:17 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 70000 (138 MB)
2023-10-30 22:56:31 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 75000 (147 MB)
2023-10-30 23:03:50 source > 2023-10-30 23:03:24 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 80000
2023-10-30 23:05:24 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 80000 (173 MB)
2023-10-30 23:13:10 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 85000 (190 MB)
2023-10-30 23:19:51 source > 2023-10-30 23:19:27 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 90000
2023-10-30 23:21:48 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 90000 (208 MB)
2023-10-30 23:29:49 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 95000 (222 MB)
2023-10-30 23:34:29 source > 2023-10-30 23:30:02 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 100000
2023-10-30 23:35:34 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 100000 (224 MB)
2023-10-30 23:43:06 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 105000 (226 MB)
2023-10-30 23:50:28 source > 2023-10-30 23:47:59 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 110000
2023-10-30 23:52:08 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 110000 (238 MB)
2023-10-30 23:59:32 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 115000 (254 MB)
2023-10-31 00:06:24 source > 2023-10-31 00:03:53 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 120000
2023-10-31 00:07:48 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 120000 (268 MB)
2023-10-31 00:14:39 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 125000 (273 MB)
2023-10-31 00:20:47 source > 2023-10-31 00:16:21 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 130000
2023-10-31 00:22:15 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 130000 (281 MB)
2023-10-31 00:30:34 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 135000 (291 MB)
2023-10-31 00:37:07 source > 2023-10-31 00:35:46 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 140000
2023-10-31 00:39:06 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 140000 (307 MB)
2023-10-31 00:46:43 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 145000 (326 MB)
2023-10-31 00:53:14 source > 2023-10-31 00:52:25 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 150000
2023-10-31 00:54:49 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 150000 (340 MB)
2023-10-31 01:02:54 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 155000 (361 MB)
2023-10-31 01:09:05 source > 2023-10-31 01:08:40 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 160000
2023-10-31 01:10:40 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 160000 (375 MB)
2023-10-31 01:18:43 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 165000 (389 MB)
2023-10-31 01:24:49 source > 2023-10-31 01:24:02 INFO i.a.c.i.s.r.AbstractDbSource(lambda$createReadIterator$5):421 - Reading stream article. Records read: 170000
2023-10-31 01:26:25 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 170000 (407 MB)
2023-10-31 01:35:15 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 175000 (432 MB)
2023-10-31 01:35:27 INFO i.a.w.i.b.StreamStatsTracker(trackStateFromDestination):213 - Unexpected state from destination for stream www_query:article
2023-10-31 01:35:28 INFO i.a.w.i.s.SyncPersistenceImpl(startBackgroundFlushStateTask):160 - starting state flush thread for connectionId 0d54c3fe-c338-4f05-a6d7-38473af6f108
2023-10-31 01:35:28 INFO i.a.w.i.b.StreamStatsTracker(trackStateFromDestination):213 - Unexpected state from destination for stream www_query:article
2023-10-31 01:35:28 INFO i.a.w.i.b.StreamStatsTracker(trackStateFromDestination):213 - Unexpected state from destination for stream www_query:article
2023-10-31 01:35:28 INFO i.a.w.i.b.StreamStatsTracker(trackStateFromDestination):213 - Unexpected state from destination for stream www_query:article
2023-10-31 01:35:28 INFO i.a.w.i.b.StreamStatsTracker(trackStateFromDestination):213 - Unexpected state from destination for stream www_query:article
2023-10-31 01:35:28 INFO i.a.w.i.b.StreamStatsTracker(trackStateFromDestination):213 - Unexpected state from destination for stream www_query:article
2023-10-31 01:35:28 INFO i.a.w.i.b.StreamStatsTracker(trackStateFromDestination):213 - Unexpected state from destination for stream www_query:article
2023-10-31 01:35:28 destination > {'error': [{'message': 'update vector: connection to: OpenAI API failed with status: 500 error: The server had an error processing your request. Sorry about that! You can retry your request, or contact us through our help center at help.openai.com if you keep seeing this error. (Please include the request ID 8ad58bfbef27bc2fdb8818bb4e561834 in your email.)'}]}
2023-10-31 01:35:28 destination > {'error': [{'message': 'update vector: connection to: OpenAI API failed with status: 500 error: The server had an error processing your request. Sorry about that! You can retry your request, or contact us through our help center at help.openai.com if you keep seeing this error. (Please include the request ID f59b0d5e8f4e1db49973042e35c89081 in your email.)'}]}
2023-10-31 01:35:28 destination > Errors while loading: error, error
Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 11, in <module>
    DestinationWeaviate().run(sys.argv[1:])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 119, in run
    for message in output_messages:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 113, in run_cmd
    yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 49, in _run_write
    yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)
  File "/airbyte/integration_code/destination_weaviate/destination.py", line 38, in write
    yield from writer.write(configured_catalog, input_messages)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/vector_db_based/writer.py", line 68, in write
    self._process_batch()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/vector_db_based/writer.py", line 48, in _process_batch
    self.indexer.index(documents, namespace, stream)
  File "/airbyte/integration_code/destination_weaviate/indexer.py", line 124, in index
    self._flush()
  File "/airbyte/integration_code/destination_weaviate/indexer.py", line 163, in _flush
    raise WeaviatePartialBatchError(error_msg)
destination_weaviate.indexer.WeaviatePartialBatchError: Errors while loading: error, error
2023-10-31 01:35:28 INFO i.a.w.g.BufferedReplicationWorker(readFromDestination):504 - readFromDestination: exception caught
java.lang.IllegalStateException: Destination process is still alive, cannot retrieve exit value.
	at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteDestination.getExitValue(DefaultAirbyteDestination.java:184) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:497) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$3(BufferedReplicationWorker.java:253) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-10-31 01:35:28 INFO i.a.w.g.BufferedReplicationWorker(readFromDestination):507 - readFromDestination: done. (writeToDestFailed:true, dest.isFinished:false)
2023-10-31 01:35:28 INFO i.a.w.g.BufferedReplicationWorker(writeToDestination):468 - writeToDestination: exception caught
java.io.IOException: Broken pipe
	at java.io.FileOutputStream.writeBytes(Native Method) ~[?:?]
	at java.io.FileOutputStream.write(FileOutputStream.java:372) ~[?:?]
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:125) ~[?:?]
	at java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:221) ~[?:?]
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:205) ~[?:?]
	at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:313) ~[?:?]
	at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:409) ~[?:?]
	at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:414) ~[?:?]
	at sun.nio.cs.StreamEncoder.lockedFlush(StreamEncoder.java:218) ~[?:?]
	at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:205) ~[?:?]
	at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:263) ~[?:?]
	at java.io.BufferedWriter.implFlush(BufferedWriter.java:372) ~[?:?]
	at java.io.BufferedWriter.flush(BufferedWriter.java:359) ~[?:?]
	at io.airbyte.workers.internal.DefaultAirbyteMessageBufferedWriter.flush(DefaultAirbyteMessageBufferedWriter.java:31) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInputWithNoTimeoutMonitor(DefaultAirbyteDestination.java:133) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:126) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:463) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$3(BufferedReplicationWorker.java:253) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-10-31 01:35:28 INFO i.a.w.g.BufferedReplicationWorker(writeToDestination):471 - writeToDestination: done. (forDest.isDone:false, isDestRunning:false)
2023-10-31 01:35:28 INFO i.a.w.g.BufferedReplicationWorker(processMessage):439 - processMessage: done. (fromSource.isDone:false, forDest.isClosed:true)
2023-10-31 01:35:28 INFO i.a.w.g.BufferedReplicationWorker(readFromSource):396 - readFromSource: exception caught
java.lang.IllegalStateException: Source process is still alive, cannot retrieve exit value.
	at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.1-jre.jar:?]
	at io.airbyte.workers.internal.DefaultAirbyteSource.getExitValue(DefaultAirbyteSource.java:126) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:383) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$4(BufferedReplicationWorker.java:260) ~[io.airbyte-airbyte-commons-worker-0.50.33.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-10-31 01:35:28 INFO i.a.w.g.BufferedReplicationWorker(readFromSource):399 - readFromSource: done. (source.isFinished:false, fromSource.isClosed:true)
2023-10-31 01:36:28 WARN i.a.c.i.LineGobbler(voidCall):154 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job.
2023-10-31 01:36:28 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):290 - Attempt 0 to Flush States from SyncPersistenceImpl
2023-10-31 01:36:29 INFO i.a.w.t.s.a.AppendToAttemptLogActivityImpl(log):56 - Retry State: RetryManager(completeFailureBackoffPolicy=BackoffPolicy(minInterval=PT10S, maxInterval=PT30M, base=3), partialFailureBackoffPolicy=null, successiveCompleteFailureLimit=5, totalCompleteFailureLimit=10, successivePartialFailureLimit=1000, totalPartialFailureLimit=10, successiveCompleteFailures=1, totalCompleteFailures=1, successivePartialFailures=0, totalPartialFailures=0)
 Backoff before next attempt: 10 seconds

Contribute

  • Yes, I want to contribute
@flash1293
Copy link
Contributor

Thanks for opening this issue @valebedu - Am I understanding correctly that the OpenAI vectorizer within Weaviate is erroring which is in turn breaking the sync?

If yes, it looks like something Weaviate should ideally handle internally.

Is it possible the specific text from your records is causing the error on OpenAI side?

That being said we can add a retry mechanism on the Airbyte side as well to retry to load objects that failed in the current batch, I'm going to work on this.

@valebedu
Copy link
Author

Hi @flash1293 ,

Yes that's right in my configuration Airbyte doesn't do the vectorizer, it simply sync data from source to destination. That's Weaviate job to vectorize content with OpenAI.

It's not possible that a specific text cause the error because in that run the job fail after 175000 records. In a previous run it went up to 300000 records without any issue. It's just a random unavailable server error.

A retry logic on the current batch of 128 elements could be really nice!

@flash1293
Copy link
Contributor

Weaviate is already doing retries in case of timeout and similar, but not in case of a 500 error. I added this on the Airbyte side for all objects in a batch that didn't succeed (will retry twice): #32038

If the error on OpenAI side is transient, it should catch this problem

@valebedu
Copy link
Author

valebedu commented Nov 1, 2023

Awesome! I'll ask Weaviate team why this case is not handled on Weaviate side

@flash1293
Copy link
Contributor

@valebedu I just realized, I might have misunderstood the Weaviate code and I just configured the client incorrectly for automatic retries. Let me check again please.

@valebedu
Copy link
Author

valebedu commented Nov 1, 2023

No problem, also note that this is not a critical issue as it works most of the time, I just need luck for the sync to finish successfully without 5XX error.

Additionally, Weaviate released 1.22 yesterday with async vector indexing. That will allow non blocking sync and faster sync I hope, I'll try to sync with this version.

@flash1293
Copy link
Contributor

That sounds great, thanks. It's not enabled by default, but the weaviate client already has retries on all errors built in. I switched the PR linked above to enable that - this along with the async vector indexing should help a lot.

@aaronsteers
Copy link
Collaborator

@valebedu - Thanks very much for logging this. Even if not a critical bug, feedback is helpful to our improving these connectors and is very much appreciated. 👍

@flash1293
Copy link
Contributor

@valebedu Were you able to run your sync successfully?

@valebedu
Copy link
Author

valebedu commented Nov 3, 2023

Hi @flash1293 ,

Unfortunately no 😞, I'm facing a new error but I think this one is on the Weaviate client side I asked on the Weavaite #support slack channel to get more info on the error

FYI the error is:

...
# same setup as other logs
...

2023-11-03 02:38:01 destination > /usr/local/lib/python3.9/site-packages/weaviate/warnings.py:80: DeprecationWarning: Dep002: You are batching manually. This means you are NOT using the client's built-in
2023-11-03 02:38:01 destination >             multi-threading. Setting `batch_size` in `client.batch.configure()`  to an int value will enabled automatic
2023-11-03 02:38:01 destination >             batching. See:
2023-11-03 02:38:01 destination >             https://weaviate.io/developers/weaviate/current/restful-api-references/batch.html#example-request-1
2023-11-03 02:38:01 destination >   warnings.warn(
2023-11-03 02:38:01 source > 2023-11-03 02:38:01 INFO i.a.c.d.j.s.AdaptiveStreamingQueryConfig(accept):40 - Set new fetch size: 39624 rows
2023-11-03 02:44:28 INFO i.a.w.g.ReplicationWorkerHelper(internalProcessMessageFromSource):266 - Records read: 5000 (20 MB)
2023-11-03 02:46:24 destination > string indices must be integers
Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 11, in <module>
    DestinationWeaviate().run(sys.argv[1:])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 119, in run
    for message in output_messages:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 113, in run_cmd
    yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/destination.py", line 49, in _run_write
    yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)
  File "/airbyte/integration_code/destination_weaviate/destination.py", line 38, in write
    yield from writer.write(configured_catalog, input_messages)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/vector_db_based/writer.py", line 68, in write
    self._process_batch()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/destinations/vector_db_based/writer.py", line 48, in _process_batch
    self.indexer.index(documents, namespace, stream)
  File "/airbyte/integration_code/destination_weaviate/indexer.py", line 126, in index
    self._flush()
  File "/airbyte/integration_code/destination_weaviate/indexer.py", line 155, in _flush
    results = self.client.batch.create_objects()
  File "/usr/local/lib/python3.9/site-packages/weaviate/batch/crud_batch.py", line 921, in create_objects
    response = self._create_data( 
  File "/usr/local/lib/python3.9/site-packages/weaviate/batch/crud_batch.py", line 696, in _create_data
    batch_to_retry, response_json_successful = self._retry_on_error(
  File "/usr/local/lib/python3.9/site-packages/weaviate/batch/crud_batch.py", line 1716, in _retry_on_error
    successful_responses = new_batch.add_failed_objects_from_response(
  File "/usr/local/lib/python3.9/site-packages/weaviate/batch/requests.py", line 324, in add_failed_objects_from_response
    if self._skip_objects_retry(obj, errors_to_exclude, errors_to_include):
  File "/usr/local/lib/python3.9/site-packages/weaviate/batch/requests.py", line 104, in _skip_objects_retry
    len(entry["result"]) == 0
TypeError: string indices must be integers

...
# same failure as other logs: readFromDestination: exception caught
...

I also tried to see the diff between weaviate client v3.23.2 and v3.25.2 weaviate/weaviate-python-client@v3.23.2...v3.25.2 but I didn't find anything relevant it's the same logic

@flash1293
Copy link
Contributor

@valebedu Are you using async vector indexing and version 1.22? If yes, it might return some new result shape the older client library can't deal with. Judging from the error, it tries to read entry as a dictionary but it's actually a string. In any way, I will create a PR to upgrade the client version, this can't hurt for sure.

@valebedu
Copy link
Author

valebedu commented Nov 3, 2023

Yes I do, I'll try again without async indexing

@valebedu
Copy link
Author

valebedu commented Nov 4, 2023

Hi @flash1293 FYI I downgraded Weaviate to 1.21.9 and the job finally run without any issue thanks a lot for your help :)

@flash1293
Copy link
Contributor

That's great to hear @valebedu , keep me updated in case any other problems / feature gaps come up

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants