diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java index 3c2e677cfdf..707988ad14e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java @@ -300,7 +300,7 @@ private boolean schemaChanged(List columns, Table tab incomingLength); return true; } - final int localScale = column.scale().get(); + final int localScale = column.scale().orElseGet(() -> 0); final int incomingScale = message.getTypeMetadata().getScale(); if (localScale != incomingScale) { logger.info("detected new scale for column '{}', old scale was {}, new scale is {}; refreshing table schema", columnName, localScale, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 4174b1a2bdd..9a8e20c28ec 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -297,7 +297,7 @@ private SchemaBuilder numericSchema(Column column) { if (decimalMode == DecimalMode.PRECISE && isVariableScaleDecimal(column)) { return VariableScaleDecimal.builder(); } - return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().get()); + return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().orElseGet(() -> 0)); } private SchemaBuilder hstoreSchema() { @@ -840,8 +840,8 @@ else if (data instanceof PgArray) { } private boolean isVariableScaleDecimal(final Column column) { - return (column.scale().isPresent() && column.scale().get() == 0 && column.length() == VARIABLE_SCALE_DECIMAL_LENGTH) - || (!column.scale().isPresent() && column.length() == -1); + return column.length() == VARIABLE_SCALE_DECIMAL_LENGTH && + column.scale().orElseGet(() -> 0) == 0; } public static Optional toSpecialValue(String value) { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index 98f29710ac8..6dfc4b638a8 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -112,6 +112,7 @@ public void converterFor(RelationalColumn column, ConverterRegistration schemasAndValuesForCustomConverterTypes() { return Arrays.asList(new SchemaAndValueField("i", SchemaBuilder.string().name("io.debezium.postgresql.type.Isbn").build(), "0-393-04002-X")); @@ -408,7 +409,7 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception { // then start the producer and validate all records are there buildNoStreamProducer(TestHelper.defaultConfig()); - TestConsumer consumer = testConsumer(1 + 2 * 30); // Every record comes once from partitioned table and from partition + TestConsumer consumer = testConsumer(1 + 30); consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); Set ids = new HashSet<>(); @@ -434,7 +435,7 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception { // verify each topic contains exactly the number of input records assertEquals(1, topicCounts.get("test_server.public.first_table").intValue()); - assertEquals(30, topicCounts.get("test_server.public.partitioned").intValue()); + assertEquals(0, topicCounts.get("test_server.public.partitioned").intValue()); assertEquals(10, topicCounts.get("test_server.public.partitioned_1_100").intValue()); assertEquals(20, topicCounts.get("test_server.public.partitioned_101_200").intValue());