Skip to content

Commit

Permalink
Merge pull request #1533 from Deep1998:count
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 633714633
  • Loading branch information
cloud-teleport committed May 14, 2024
2 parents fd0cec6 + 3030f70 commit 24409db
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.constants;

// Counters for bulk template.
public class MetricCounters {

// Counter for errors in the transformer.
public static final String TRANSFORMER_ERRORS = "transformer_errors";

// Counter for errors encountered by the reader when trying to map JDBC ResultSet to a SourceRow.
public static final String READER_MAPPING_ERRORS = "reader_mapping_errors";

// Counter for errors encountered by the reader while discovering schema. This counts all sorts of
// errors including SQLTransientConnectionException, SQLNonTransientConnectionException,
// SQLExceptions etc.
public static final String READER_SCHEMA_DISCOVERY_ERRORS = "reader_schema_discovery_errors";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.constants;
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql;

import com.google.cloud.teleport.v2.constants.MetricCounters;
import com.google.cloud.teleport.v2.source.reader.io.exception.RetriableSchemaDiscoveryException;
import com.google.cloud.teleport.v2.source.reader.io.exception.SchemaDiscoveryException;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.DialectAdapter;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.JdbcSourceRowMapper;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,6 +31,8 @@
import java.sql.SQLNonTransientConnectionException;
import java.sql.SQLTransientConnectionException;
import javax.sql.DataSource;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,6 +42,9 @@ public final class MysqlDialectAdapter implements DialectAdapter {

private static final Logger logger = LoggerFactory.getLogger(MysqlDialectAdapter.class);

private final Counter schemaDiscoveryErrors =
Metrics.counter(JdbcSourceRowMapper.class, MetricCounters.READER_SCHEMA_DISCOVERY_ERRORS);

public MysqlDialectAdapter(MySqlVersion mySqlVersion) {
this.mySqlVersion = mySqlVersion;
}
Expand Down Expand Up @@ -79,24 +86,25 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
String.format(
"Transient connection error while discovering table schema for datasource=%s db=%s tables=%s, cause=%s",
dataSource, sourceSchemaReference, tables, e));
// TODO: Add metrics for transient connection errors.
schemaDiscoveryErrors.inc();
throw new RetriableSchemaDiscoveryException(e);
} catch (SQLNonTransientConnectionException e) {
logger.error(
String.format(
"Non Transient connection error while discovering table schema for datasource=%s, db=%s tables=%s, cause=%s",
dataSource, sourceSchemaReference, tables, e));
// TODO: Add metrics for non-transient connection errors.
schemaDiscoveryErrors.inc();
throw new SchemaDiscoveryException(e);
} catch (SQLException e) {
logger.error(
String.format(
"Sql exception while discovering table schema for datasource=%s db=%s tables=%s, cause=%s",
dataSource, sourceSchemaReference, tables, e));
// TODO: Add metrics for SQL exceptions.
schemaDiscoveryErrors.inc();
throw new SchemaDiscoveryException(e);
} catch (SchemaDiscoveryException e) {
// Already logged.
schemaDiscoveryErrors.inc();
throw e;
}
return tablesBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper;

import com.google.cloud.teleport.v2.constants.MetricCounters;
import com.google.cloud.teleport.v2.source.reader.io.exception.ValueMappingException;
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema;
Expand All @@ -24,6 +25,8 @@
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -39,6 +42,9 @@ public final class JdbcSourceRowMapper implements JdbcIO.RowMapper<SourceRow> {

private static final Logger logger = LoggerFactory.getLogger(JdbcSourceRowMapper.class);

private final Counter mapperErrors =
Metrics.counter(JdbcSourceRowMapper.class, MetricCounters.READER_MAPPING_ERRORS);

/**
* Construct {@link JdbcSourceRowMapper}.
*
Expand Down Expand Up @@ -87,6 +93,7 @@ long getCurrentTimeMicros() {
.getOrDefault(entry.getValue().getName(), JdbcValueMapper.UNSUPPORTED)
.mapValue(resultSet, entry.getKey(), schema));
} catch (SQLException e) {
mapperErrors.inc();
logger.error(
"Exception while mapping jdbc ResultSet to avro. Check for potential schema changes. Exception: "
+ e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;
import com.google.cloud.teleport.v2.constants.MetricCounters;
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableReference;
import com.google.cloud.teleport.v2.spanner.migrations.avro.GenericRecordTypeConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaMapper;
import java.io.Serializable;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +42,9 @@ public abstract class SourceRowToMutationDoFn extends DoFn<SourceRow, Mutation>

private static final Logger LOG = LoggerFactory.getLogger(SourceRowToMutationDoFn.class);

private final Counter transformerErrors =
Metrics.counter(SourceRowToMutationDoFn.class, MetricCounters.TRANSFORMER_ERRORS);

public abstract ISchemaMapper iSchemaMapper();

public abstract Map<String, SourceTableReference> tableIdMapper();
Expand All @@ -58,6 +64,7 @@ public void processElement(ProcessContext c) {
LOG.error(
"cannot find valid sourceTable for tableId: {} in tableIdMapper",
sourceRow.tableSchemaUUID());
transformerErrors.inc();
return;
}
try {
Expand All @@ -73,7 +80,8 @@ public void processElement(ProcessContext c) {
c.output(mutation);
} catch (Exception e) {
// TODO: Add DLQ integration once supported.
LOG.error("Unable to transform source row to spanner mutation: {}", e.getMessage());
LOG.error("Unable to transform source row to spanner mutation", e);
transformerErrors.inc();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ static Map<Type, AvroToValueFunction> getGsqlMap() {
Type.float64(),
(recordValue, fieldSchema) -> Value.float64(avroFieldToDouble(recordValue, fieldSchema)));
gsqlFunctions.put(
Type.string(), (recordValue, fieldSchema) -> Value.string(recordValue.toString()));
Type.string(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue)));
gsqlFunctions.put(
Type.json(), (recordValue, fieldSchema) -> Value.string(recordValue.toString()));
Type.json(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue)));
gsqlFunctions.put(
Type.numeric(),
(recordValue, fieldSchema) ->
Expand Down Expand Up @@ -121,11 +121,12 @@ static Map<Type, AvroToValueFunction> getPgMap() {
Type.pgFloat8(),
(recordValue, fieldSchema) -> Value.float64(avroFieldToDouble(recordValue, fieldSchema)));
pgFunctions.put(
Type.pgVarchar(), (recordValue, fieldSchema) -> Value.string(recordValue.toString()));
Type.pgVarchar(),
(recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue)));
pgFunctions.put(
Type.pgText(), (recordValue, fieldSchema) -> Value.string(recordValue.toString()));
Type.pgText(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue)));
pgFunctions.put(
Type.pgJsonb(), (recordValue, fieldSchema) -> Value.string(recordValue.toString()));
Type.pgJsonb(), (recordValue, fieldSchema) -> Value.string(avroFieldToString(recordValue)));
pgFunctions.put(
Type.pgNumeric(),
(recordValue, fieldSchema) ->
Expand Down Expand Up @@ -189,6 +190,13 @@ static Double avroFieldToDouble(Object recordValue, Schema fieldSchema) {
}
}

static String avroFieldToString(Object recordValue) {
if (recordValue == null) {
return null;
}
return recordValue.toString();
}

static BigDecimal avroFieldToNumericBigDecimal(Object recordValue, Schema fieldSchema) {
try {
if (recordValue == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,18 @@ public Map<String, Value> transformChangeEvent(GenericRecord record, String srcT
spannerColumnType);
result.put(spannerColName, value);
} catch (NullPointerException e) {
LOG.info("Unable to transform change event: {}", e.getMessage());
LOG.error("Unable to transform change event", e);
throw e;
} catch (IllegalArgumentException e) {
LOG.info("Unable to transform change event: {}", e.getMessage());
LOG.error("Unable to transform change event", e);
throw e;
} catch (Exception e) {
LOG.info(
"Unable to convert spanner value for spanner col: {}. {}",
spannerColName,
e.getMessage());
LOG.error(
String.format("Unable to convert spanner value for spanner col: {}", spannerColName),
e);
throw new RuntimeException(
String.format(
"Unable to convert spanner value for spanner col: {}. {}",
spannerColName,
e.getMessage()));
String.format("Unable to convert spanner value for spanner col: {}", spannerColName),
e);
}
}
return result;
Expand All @@ -123,9 +120,6 @@ public Value getSpannerValue(
LOG.debug("found union type: {}", types);
// Schema types can only union with Type NULL. Any other UNION is unsupported.
if (types.size() == 2 && types.stream().anyMatch(s -> s.getType().equals(Schema.Type.NULL))) {
if (recordValue == null) {
return null;
}
fieldSchema =
types.stream().filter(s -> !s.getType().equals(Schema.Type.NULL)).findFirst().get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,29 @@ public void testAvroFieldToDouble_UnsupportedType() {
AvroToValueMapper.avroFieldToDouble(inputValue, SchemaBuilder.builder().booleanType());
}

@Test
public void testAvroFieldToString_valid() {
String result = AvroToValueMapper.avroFieldToString("Hello");
assertEquals("Hello", result);

result = AvroToValueMapper.avroFieldToString("");
assertEquals("", result);

result = AvroToValueMapper.avroFieldToString(14);
assertEquals("14", result);

result = AvroToValueMapper.avroFieldToString(513148134L);
assertEquals("513148134", result);

result = AvroToValueMapper.avroFieldToString(325.532);
assertEquals("325.532", result);
}

@Test
public void testAvroFieldToString_NullInput() {
assertNull(AvroToValueMapper.avroFieldToString(null));
}

@Test
public void testAvroFieldToNumericBigDecimal_StringInput() {
Map<String, String> testCases = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,42 @@ public void transformChangeEventTest_identityMapper() {
assertEquals(expected, actual);
}

@Test
public void transformChangeEventTest_nullValues() {
GenericRecord genericRecord = new GenericData.Record(getAllSpannerTypesSchema());
genericRecord.put("bool_col", null);
genericRecord.put("int_col", null);
genericRecord.put("float_col", null);
genericRecord.put("string_col", null);
genericRecord.put("numeric_col", null);
genericRecord.put("bytes_col", null);
genericRecord.put("timestamp_col", null);
genericRecord.put("date_col", null);
GenericRecordTypeConvertor genericRecordTypeConvertor =
new GenericRecordTypeConvertor(new IdentityMapper(getIdentityDdl()), "");
Map<String, Value> actual =
genericRecordTypeConvertor.transformChangeEvent(genericRecord, "all_types");
Map<String, Value> expected =
Map.of(
"bool_col",
Value.bool(null),
"int_col",
Value.int64(null),
"float_col",
Value.float64(null),
"string_col",
Value.string(null),
"numeric_col",
Value.numeric(null),
"bytes_col",
Value.bytes(null),
"timestamp_col",
Value.timestamp(null),
"date_col",
Value.date(null));
assertEquals(expected, actual);
}

@Test
public void transformChangeEventTest_illegalUnionType() {
GenericRecordTypeConvertor genericRecordTypeConvertor =
Expand All @@ -396,21 +432,6 @@ public void transformChangeEventTest_illegalUnionType() {
() -> genericRecordTypeConvertor.getSpannerValue(null, schema, "union_col", Type.string()));
}

@Test
public void transformChangeEventTest_nullType() {
GenericRecordTypeConvertor genericRecordTypeConvertor =
new GenericRecordTypeConvertor(new IdentityMapper(getIdentityDdl()), "");
Schema schema =
SchemaBuilder.builder()
.unionOf()
.nullType()
.and()
.type(Schema.create(Schema.Type.BOOLEAN))
.endUnion();
assertNull(
genericRecordTypeConvertor.getSpannerValue(null, schema, "union_col", Type.string()));
}

@Test(expected = IllegalArgumentException.class)
public void transformChangeEventTest_incorrectSpannerType() {

Expand Down

0 comments on commit 24409db

Please sign in to comment.