diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/constants/MetricCounters.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/constants/MetricCounters.java new file mode 100644 index 0000000000..3b79f6bb54 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/constants/MetricCounters.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.constants; + +// Counters for bulk template. +public class MetricCounters { + + // Counter for errors in the transformer. + public static final String TRANSFORMER_ERRORS = "transformer_errors"; + + // Counter for errors encountered by the reader when trying to map JDBC ResultSet to a SourceRow. + public static final String READER_MAPPING_ERRORS = "reader_mapping_errors"; + + // Counter for errors encountered by the reader while discovering schema. This counts all sorts of + // errors including SQLTransientConnectionException, SQLNonTransientConnectionException, + // SQLExceptions etc. + public static final String READER_SCHEMA_DISCOVERY_ERRORS = "reader_schema_discovery_errors"; +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/constants/package-info.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/constants/package-info.java new file mode 100644 index 0000000000..3e1124dc05 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/constants/package-info.java @@ -0,0 +1,16 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.constants; diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java index 8c714bf7ae..17494e442e 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java @@ -15,9 +15,11 @@ */ package com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql; +import com.google.cloud.teleport.v2.constants.MetricCounters; import com.google.cloud.teleport.v2.source.reader.io.exception.RetriableSchemaDiscoveryException; import com.google.cloud.teleport.v2.source.reader.io.exception.SchemaDiscoveryException; import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.DialectAdapter; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.JdbcSourceRowMapper; import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference; import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType; import com.google.common.collect.ImmutableList; @@ -29,6 +31,8 @@ import java.sql.SQLNonTransientConnectionException; import java.sql.SQLTransientConnectionException; import javax.sql.DataSource; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +42,9 @@ public final class MysqlDialectAdapter implements DialectAdapter { private static final Logger logger = LoggerFactory.getLogger(MysqlDialectAdapter.class); + private final Counter schemaDiscoveryErrors = + Metrics.counter(JdbcSourceRowMapper.class, MetricCounters.READER_SCHEMA_DISCOVERY_ERRORS); + public MysqlDialectAdapter(MySqlVersion mySqlVersion) { this.mySqlVersion = mySqlVersion; } @@ -79,24 +86,25 @@ public ImmutableMap> discoverTabl String.format( "Transient connection error while discovering table schema for datasource=%s db=%s tables=%s, cause=%s", dataSource, sourceSchemaReference, tables, e)); - // TODO: Add metrics for transient connection errors. + schemaDiscoveryErrors.inc(); throw new RetriableSchemaDiscoveryException(e); } catch (SQLNonTransientConnectionException e) { logger.error( String.format( "Non Transient connection error while discovering table schema for datasource=%s, db=%s tables=%s, cause=%s", dataSource, sourceSchemaReference, tables, e)); - // TODO: Add metrics for non-transient connection errors. + schemaDiscoveryErrors.inc(); throw new SchemaDiscoveryException(e); } catch (SQLException e) { logger.error( String.format( "Sql exception while discovering table schema for datasource=%s db=%s tables=%s, cause=%s", dataSource, sourceSchemaReference, tables, e)); - // TODO: Add metrics for SQL exceptions. + schemaDiscoveryErrors.inc(); throw new SchemaDiscoveryException(e); } catch (SchemaDiscoveryException e) { // Already logged. + schemaDiscoveryErrors.inc(); throw e; } return tablesBuilder.build(); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapper.java index 3e920e252b..76354273c7 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/rowmapper/JdbcSourceRowMapper.java @@ -15,6 +15,7 @@ */ package com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper; +import com.google.cloud.teleport.v2.constants.MetricCounters; import com.google.cloud.teleport.v2.source.reader.io.exception.ValueMappingException; import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow; import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema; @@ -24,6 +25,8 @@ import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -39,6 +42,9 @@ public final class JdbcSourceRowMapper implements JdbcIO.RowMapper { private static final Logger logger = LoggerFactory.getLogger(JdbcSourceRowMapper.class); + private final Counter mapperErrors = + Metrics.counter(JdbcSourceRowMapper.class, MetricCounters.READER_MAPPING_ERRORS); + /** * Construct {@link JdbcSourceRowMapper}. * @@ -87,6 +93,7 @@ long getCurrentTimeMicros() { .getOrDefault(entry.getValue().getName(), JdbcValueMapper.UNSUPPORTED) .mapValue(resultSet, entry.getKey(), schema)); } catch (SQLException e) { + mapperErrors.inc(); logger.error( "Exception while mapping jdbc ResultSet to avro. Check for potential schema changes. Exception: " + e); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/transformer/SourceRowToMutationDoFn.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/transformer/SourceRowToMutationDoFn.java index 1f9375c15c..b68d38ae27 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/transformer/SourceRowToMutationDoFn.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/transformer/SourceRowToMutationDoFn.java @@ -18,6 +18,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; +import com.google.cloud.teleport.v2.constants.MetricCounters; import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow; import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableReference; import com.google.cloud.teleport.v2.spanner.migrations.avro.GenericRecordTypeConvertor; @@ -25,6 +26,8 @@ import java.io.Serializable; import java.util.Map; import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +42,9 @@ public abstract class SourceRowToMutationDoFn extends DoFn private static final Logger LOG = LoggerFactory.getLogger(SourceRowToMutationDoFn.class); + private final Counter transformerErrors = + Metrics.counter(SourceRowToMutationDoFn.class, MetricCounters.TRANSFORMER_ERRORS); + public abstract ISchemaMapper iSchemaMapper(); public abstract Map tableIdMapper(); @@ -58,6 +64,7 @@ public void processElement(ProcessContext c) { LOG.error( "cannot find valid sourceTable for tableId: {} in tableIdMapper", sourceRow.tableSchemaUUID()); + transformerErrors.inc(); return; } try { @@ -73,7 +80,8 @@ public void processElement(ProcessContext c) { c.output(mutation); } catch (Exception e) { // TODO: Add DLQ integration once supported. - LOG.error("Unable to transform source row to spanner mutation: {}", e.getMessage()); + LOG.error("Unable to transform source row to spanner mutation", e); + transformerErrors.inc(); } } diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapper.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapper.java index b76eb7c46a..2d4aa97d96 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapper.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapper.java @@ -89,9 +89,9 @@ static Map getGsqlMap() { Type.float64(), (recordValue, fieldSchema) -> Value.float64(avroFieldToDouble(recordValue, fieldSchema))); gsqlFunctions.put( - Type.string(), (recordValue, fieldSchema) -> Value.string(recordValue.toString())); + Type.string(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue))); gsqlFunctions.put( - Type.json(), (recordValue, fieldSchema) -> Value.string(recordValue.toString())); + Type.json(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue))); gsqlFunctions.put( Type.numeric(), (recordValue, fieldSchema) -> @@ -121,11 +121,12 @@ static Map getPgMap() { Type.pgFloat8(), (recordValue, fieldSchema) -> Value.float64(avroFieldToDouble(recordValue, fieldSchema))); pgFunctions.put( - Type.pgVarchar(), (recordValue, fieldSchema) -> Value.string(recordValue.toString())); + Type.pgVarchar(), + (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue))); pgFunctions.put( - Type.pgText(), (recordValue, fieldSchema) -> Value.string(recordValue.toString())); + Type.pgText(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue))); pgFunctions.put( - Type.pgJsonb(), (recordValue, fieldSchema) -> Value.string(recordValue.toString())); + Type.pgJsonb(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue))); pgFunctions.put( Type.pgNumeric(), (recordValue, fieldSchema) -> @@ -189,6 +190,13 @@ static Double avroFieldToDouble(Object recordValue, Schema fieldSchema) { } } + static String avroFieldToString(Object recordValue) { + if (recordValue == null) { + return null; + } + return recordValue.toString(); + } + static BigDecimal avroFieldToNumericBigDecimal(Object recordValue, Schema fieldSchema) { try { if (recordValue == null) { diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java index 9822f23276..1d986ef35e 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java @@ -88,21 +88,18 @@ public Map transformChangeEvent(GenericRecord record, String srcT spannerColumnType); result.put(spannerColName, value); } catch (NullPointerException e) { - LOG.info("Unable to transform change event: {}", e.getMessage()); + LOG.error("Unable to transform change event", e); throw e; } catch (IllegalArgumentException e) { - LOG.info("Unable to transform change event: {}", e.getMessage()); + LOG.error("Unable to transform change event", e); throw e; } catch (Exception e) { - LOG.info( - "Unable to convert spanner value for spanner col: {}. {}", - spannerColName, - e.getMessage()); + LOG.error( + String.format("Unable to convert spanner value for spanner col: {}", spannerColName), + e); throw new RuntimeException( - String.format( - "Unable to convert spanner value for spanner col: {}. {}", - spannerColName, - e.getMessage())); + String.format("Unable to convert spanner value for spanner col: {}", spannerColName), + e); } } return result; @@ -123,9 +120,6 @@ public Value getSpannerValue( LOG.debug("found union type: {}", types); // Schema types can only union with Type NULL. Any other UNION is unsupported. if (types.size() == 2 && types.stream().anyMatch(s -> s.getType().equals(Schema.Type.NULL))) { - if (recordValue == null) { - return null; - } fieldSchema = types.stream().filter(s -> !s.getType().equals(Schema.Type.NULL)).findFirst().get(); } else { diff --git a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapperTest.java b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapperTest.java index d002c5b53d..74b69aca14 100644 --- a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapperTest.java +++ b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroToValueMapperTest.java @@ -158,6 +158,29 @@ public void testAvroFieldToDouble_UnsupportedType() { AvroToValueMapper.avroFieldToDouble(inputValue, SchemaBuilder.builder().booleanType()); } + @Test + public void testAvroFieldToString_valid() { + String result = AvroToValueMapper.avroFieldToString("Hello"); + assertEquals("Hello", result); + + result = AvroToValueMapper.avroFieldToString(""); + assertEquals("", result); + + result = AvroToValueMapper.avroFieldToString(14); + assertEquals("14", result); + + result = AvroToValueMapper.avroFieldToString(513148134L); + assertEquals("513148134", result); + + result = AvroToValueMapper.avroFieldToString(325.532); + assertEquals("325.532", result); + } + + @Test + public void testAvroFieldToString_NullInput() { + assertNull(AvroToValueMapper.avroFieldToString(null)); + } + @Test public void testAvroFieldToNumericBigDecimal_StringInput() { Map testCases = new HashMap<>(); diff --git a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java index 2b713445dd..67216674bd 100644 --- a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java +++ b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java @@ -378,6 +378,42 @@ public void transformChangeEventTest_identityMapper() { assertEquals(expected, actual); } + @Test + public void transformChangeEventTest_nullValues() { + GenericRecord genericRecord = new GenericData.Record(getAllSpannerTypesSchema()); + genericRecord.put("bool_col", null); + genericRecord.put("int_col", null); + genericRecord.put("float_col", null); + genericRecord.put("string_col", null); + genericRecord.put("numeric_col", null); + genericRecord.put("bytes_col", null); + genericRecord.put("timestamp_col", null); + genericRecord.put("date_col", null); + GenericRecordTypeConvertor genericRecordTypeConvertor = + new GenericRecordTypeConvertor(new IdentityMapper(getIdentityDdl()), ""); + Map actual = + genericRecordTypeConvertor.transformChangeEvent(genericRecord, "all_types"); + Map expected = + Map.of( + "bool_col", + Value.bool(null), + "int_col", + Value.int64(null), + "float_col", + Value.float64(null), + "string_col", + Value.string(null), + "numeric_col", + Value.numeric(null), + "bytes_col", + Value.bytes(null), + "timestamp_col", + Value.timestamp(null), + "date_col", + Value.date(null)); + assertEquals(expected, actual); + } + @Test public void transformChangeEventTest_illegalUnionType() { GenericRecordTypeConvertor genericRecordTypeConvertor = @@ -396,21 +432,6 @@ public void transformChangeEventTest_illegalUnionType() { () -> genericRecordTypeConvertor.getSpannerValue(null, schema, "union_col", Type.string())); } - @Test - public void transformChangeEventTest_nullType() { - GenericRecordTypeConvertor genericRecordTypeConvertor = - new GenericRecordTypeConvertor(new IdentityMapper(getIdentityDdl()), ""); - Schema schema = - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(Schema.create(Schema.Type.BOOLEAN)) - .endUnion(); - assertNull( - genericRecordTypeConvertor.getSpannerValue(null, schema, "union_col", Type.string())); - } - @Test(expected = IllegalArgumentException.class) public void transformChangeEventTest_incorrectSpannerType() {