Skip to content

[destination-mssql] Airbyte can't create an index for _ab_cdc_* columns in mongo-source-tables #60814

@almihor

Description

@almihor

Connector Name

destination-mssql

Connector Version

2.2.9

What step the error happened?

During the sync

Relevant information

The connector creates the _ab_cdc_updated_at and _ab_cdc_deleted_at columns in the destination table with the varchar(max) type. However, columns of this type cannot be used in indexes in MS SQL Server, which leads to issues during replication when it's trying to create indices.

As a temporary workaround, manually changing the column types to varchar(250) and re-run replication allows replication to proceed successfully. Given that the _id column is already created with the varchar(200) type, it would be reasonable and consistent to use a similar, index-compatible type for these CDC-related fields as well.

Relevant log output

2025-05-20 17:22:27 replication-orchestrator INFO failures: [ {
  "failureOrigin" : "destination",
  "failureType" : "system_error",
  "internalMessage" : "com.microsoft.sqlserver.jdbc.SQLServerException: Column '_ab_cdc_deleted_at' in table 'catalogServiceDb.newRestaurants' is of a type that is invalid for use as a key column in an index.",
  "externalMessage" : "Column '_ab_cdc_deleted_at' in table 'catalogServiceDb.newRestaurants' is of a type that is invalid for use as a key column in an index.",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 14,
    "from_trace_message" : true,
    "connector_command" : "write"
  },
  "stacktrace" : "com.microsoft.sqlserver.jdbc.SQLServerException: Column '_ab_cdc_deleted_at' in table 'catalogServiceDb.newRestaurants' is of a type that is invalid for use as a key column in an index.\n\tat com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:270)\n\tat com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1735)\n\tat com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:675)\n\tat com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:594)\n\tat com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7745)\n\tat com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:4391)\n\tat com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:276)\n\tat com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:246)\n\tat com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:537)\n\tat com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61)\n\tat com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java)\n\tat io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilderKt$executeUpdate$2.invoke(MSSQLQueryBuilder.kt:76)\n\tat io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilderKt$executeUpdate$2.invoke(MSSQLQueryBuilder.kt:74)\n\tat io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilderKt.executeUpdate(MSSQLQueryBuilder.kt:70)\n\tat io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilderKt.executeUpdate(MSSQLQueryBuilder.kt:74)\n\tat io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilder.createTableIfNotExists(MSSQLQueryBuilder.kt:316)\n\tat io.airbyte.integrations.destination.mssql.v2.AbstractMSSQLStreamLoader.ensureTableExists(AbstractMSSQLStreamLoader.kt:47)\n\tat io.airbyte.integrations.destination.mssql.v2.AbstractMSSQLStreamLoader.start$suspendImpl(AbstractMSSQLStreamLoader.kt:28)\n\tat io.airbyte.integrations.destination.mssql.v2.AbstractMSSQLStreamLoader.start(AbstractMSSQLStreamLoader.kt)\n\tat io.airbyte.integrations.destination.mssql.v2.MSSQLStreamLoader.start(MSSQLStreamLoader.kt:19)\n\tat io.airbyte.cdk.load.task.implementor.OpenStreamTask$execute$$inlined$map$1$2.emit(Emitters.kt:222)\n\tat kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt(Channels.kt:33)\n\tat kotlinx.coroutines.flow.FlowKt__ChannelsKt.access$emitAllImpl$FlowKt__ChannelsKt(Channels.kt:1)\n\tat kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend(Channels.kt)\n\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)\n\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)\n\tat kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:111)\n\tat kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:99)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:811)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:715)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:702)\n",
  "timestamp" : 1747754486863
}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process message delivery failed",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 14,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process message delivery failed\n\tat io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:454)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:244)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: java.io.IOException: Broken pipe\n\tat java.base/sun.nio.ch.UnixFileDispatcherImpl.write0(Native Method)\n\tat java.base/sun.nio.ch.UnixFileDispatcherImpl.write(UnixFileDispatcherImpl.java:65)\n\tat java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:137)\n\tat java.base/sun.nio.ch.IOUtil.write(IOUtil.java:102)\n\tat java.base/sun.nio.ch.IOUtil.write(IOUtil.java:72)\n\tat java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:300)\n\tat java.base/sun.nio.ch.ChannelOutputStream.writeFully(ChannelOutputStream.java:68)\n\tat java.base/sun.nio.ch.ChannelOutputStream.write(ChannelOutputStream.java:105)\n\tat java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:309)\n\tat java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:381)\n\tat java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:357)\n\tat java.base/sun.nio.cs.StreamEncoder.lockedWrite(StreamEncoder.java:158)\n\tat java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:139)\n\tat java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:219)\n\tat java.base/java.io.BufferedWriter.implFlushBuffer(BufferedWriter.java:178)\n\tat java.base/java.io.BufferedWriter.flushBuffer(BufferedWriter.java:163)\n\tat java.base/java.io.BufferedWriter.implWrite(BufferedWriter.java:334)\n\tat java.base/java.io.BufferedWriter.write(BufferedWriter.java:313)\n\tat java.base/java.io.Writer.write(Writer.java:278)\n\tat io.airbyte.workers.internal.VersionedAirbyteMessageBufferedWriter.write(VersionedAirbyteMessageBufferedWriter.java:39)\n\tat io.airbyte.workers.internal.LocalContainerAirbyteDestination.acceptWithNoTimeoutMonitor(LocalContainerAirbyteDestination.kt:137)\n\tat io.airbyte.workers.internal.LocalContainerAirbyteDestination.accept(LocalContainerAirbyteDestination.kt:96)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:439)\n\t... 5 more\n",
  "timestamp" : 1747754486991
}, {
  "failureOrigin" : "source",
  "internalMessage" : "Source process read attempt failed",
  "externalMessage" : "Something went wrong within the source connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 14,
    "connector_command" : "read"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.SourceException: Source process read attempt failed\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:376)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:223)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: java.lang.IllegalStateException: No exit code found.\n\tat io.airbyte.workers.internal.ContainerIOHandle.getExitCode(ContainerIOHandle.kt:101)\n\tat io.airbyte.workers.internal.LocalContainerAirbyteSource.getExitValue(LocalContainerAirbyteSource.kt:89)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:356)\n\t... 5 more\n",
  "timestamp" : 1747754486991
}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process exited with non-zero exit code 1",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 14,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process exited with non-zero exit code 1\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:503)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:216)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
  "timestamp" : 1747754486991
}, {
  "failureOrigin" : "replication",
  "internalMessage" : "io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.",
  "externalMessage" : "Something went wrong during replication",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 14
  },
  "stacktrace" : "java.lang.RuntimeException: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:548)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:244)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.internal.LocalContainerAirbyteDestination.close(LocalContainerAirbyteDestination.kt:61)\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:546)\n\t... 5 more\n",
  "timestamp" : 1747754487001
} ]

Contribute

  • Yes, I want to contribute

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions