Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-311: support for Decimal logical type as incrementing column #129

Merged
merged 1 commit into from
Sep 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package io.confluent.connect.jdbc.source;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -55,6 +58,7 @@ public class TimestampIncrementingTableQuerier extends TableQuerier {
private static final Logger log = LoggerFactory.getLogger(TimestampIncrementingTableQuerier.class);

private static final Calendar UTC_CALENDAR = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
private static final BigDecimal LONG_MAX_VALUE_AS_BIGDEC = new BigDecimal(Long.MAX_VALUE);

private String timestampColumn;
private String incrementingColumn;
Expand Down Expand Up @@ -145,8 +149,6 @@ protected void createPreparedStatement(Connection db) throws SQLException {
stmt = db.prepareStatement(queryString);
}



@Override
protected ResultSet executeQuery() throws SQLException {
if (incrementingColumn != null && timestampColumn != null) {
Expand Down Expand Up @@ -179,37 +181,8 @@ protected ResultSet executeQuery() throws SQLException {

@Override
public SourceRecord extractRecord() throws SQLException {
Struct record = DataConverter.convertRecord(schema, resultSet);
Long id = null;
Timestamp latest = null;
if (incrementingColumn != null) {
switch (schema.field(incrementingColumn).schema().type()) {
case INT8:
id = (long) (Byte) record.get(incrementingColumn);
break;
case INT32:
id = (long) (Integer) record.get(incrementingColumn);
break;
case INT64:
id = (Long) record.get(incrementingColumn);
break;
default:
throw new ConnectException("Invalid type for incrementing column: "
+ schema.field(incrementingColumn).schema().type());
}

// If we are only using an incrementing column, then this must be incrementing. If we are also
// using a timestamp, then we may see updates to older rows.
long incrementingOffset = offset.getIncrementingOffset();
assert (incrementingOffset == -1 || id > incrementingOffset) || timestampColumn != null;
}
if (timestampColumn != null) {
latest = (Timestamp) record.get(timestampColumn);
Timestamp timestampOffset = offset.getTimestampOffset();
assert timestampOffset != null && timestampOffset.compareTo(latest) <= 0;
}
offset = new TimestampIncrementingOffset(latest, id);

final Struct record = DataConverter.convertRecord(schema, resultSet);
offset = extractOffset(schema, record);
// TODO: Key?
final String topic;
final Map<String, String> partition;
Expand All @@ -229,6 +202,56 @@ public SourceRecord extractRecord() throws SQLException {
return new SourceRecord(partition, offset.toMap(), topic, record.schema(), record);
}

// Visible for testing
TimestampIncrementingOffset extractOffset(Schema schema, Struct record) {
final Timestamp extractedTimestamp;
if (timestampColumn != null) {
extractedTimestamp = (Timestamp) record.get(timestampColumn);
Timestamp timestampOffset = offset.getTimestampOffset();
assert timestampOffset != null && timestampOffset.compareTo(extractedTimestamp) <= 0;
} else {
extractedTimestamp = null;
}

final Long extractedId;
if (incrementingColumn != null) {
final Schema incrementingColumnSchema = schema.field(incrementingColumn).schema();
final Object incrementingColumnValue = record.get(incrementingColumn);
if (incrementingColumnValue == null) {
throw new ConnectException("Null value for incrementing column of type: " + incrementingColumnSchema.type());
} else if (isIntegralPrimitiveType(incrementingColumnValue)) {
extractedId = ((Number) incrementingColumnValue).longValue();
} else if (incrementingColumnSchema.name() != null && incrementingColumnSchema.name().equals(Decimal.LOGICAL_NAME)) {
final BigDecimal decimal = ((BigDecimal) incrementingColumnValue);
if (decimal.compareTo(LONG_MAX_VALUE_AS_BIGDEC) > 0) {
throw new ConnectException("Decimal value for incrementing column exceeded Long.MAX_VALUE");
}
if (decimal.scale() != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to these restrictions would be to just make TimestampIncrementingOffset use a BigDecimal and convert the other types (or even make it a composite where you can choose the long or the BigDecimal. Any reason not to be a bit more liberal by doing that? Performance is the only reason I can think of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hesitant to store BigDecimal when it's some other int type because wasn't sure if JDBC drivers would handle statement.setBigDecimal() well for such columns.

BTW, I tried to be more general in a local iteration by storing Comparable, in fact even including support for String types like in #97 but abandoned after complexity around default values. I have an idea now of how to handle that though, so I'll give this another go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backwards compatibility is another concern to take care of... so we should probably always map to Long for any of the currently supported int types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, this isn't going to get stored in the database, right? This is in the source so we are extracting it. I think the major difference would be that the offsets would change from their integer type to Decimal.

Copy link
Contributor Author

@shikhar shikhar Sep 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I'll give this another go.

Er, re-abandoned string support. The issue is, if there is no existing stored offset, you have a null but no type. While a sensible default value can be determined ("" for string, -1 for numbers..), need type-info at bind-time.

I think happy with the solution for now. We could expand support to all Number types rather than just upto Long.MAX_VALUE if there is a request for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, this isn't going to get stored in the database, right? This is in the source so we are extracting it.

It gets stored in the offset storage, and used for range queries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense.

throw new ConnectException("Scale of Decimal value for incrementing column must be 0");
}
extractedId = decimal.longValue();
} else {
throw new ConnectException("Invalid type for incrementing column: " + incrementingColumnSchema.type());
}

// If we are only using an incrementing column, then this must be incrementing.
// If we are also using a timestamp, then we may see updates to older rows.
Long incrementingOffset = offset.getIncrementingOffset();
assert incrementingOffset == -1L || extractedId > incrementingOffset || timestampColumn != null;
} else {
extractedId = null;
}

return new TimestampIncrementingOffset(extractedTimestamp, extractedId);
}

private boolean isIntegralPrimitiveType(Object incrementingColumnValue) {
return incrementingColumnValue instanceof Long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably doesn't matter much in practice, but any reason we switched to checking the value rather than the schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No good reason :-) Can't recall how I ended up switching to instanceof... shouldn't matter as you said

|| incrementingColumnValue instanceof Integer
|| incrementingColumnValue instanceof Short
|| incrementingColumnValue instanceof Byte;
}

@Override
public String toString() {
return "TimestampIncrementingTableQuerier{" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.connect.jdbc.source;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.Test;

import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Collections;

import static org.junit.Assert.assertEquals;

public class TimestampIncrementingTableQuerierTest {

@Test
public void extractIntOffset() throws SQLException {
final Schema schema = SchemaBuilder.struct().field("id", SchemaBuilder.INT32_SCHEMA).build();
final Struct record = new Struct(schema).put("id", 42);
assertEquals(42L, newQuerier().extractOffset(schema, record).getIncrementingOffset());
}

@Test
public void extractLongOffset() throws SQLException {
final Schema schema = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA).build();
final Struct record = new Struct(schema).put("id", 42L);
assertEquals(42L, newQuerier().extractOffset(schema, record).getIncrementingOffset());
}

@Test
public void extractDecimalOffset() throws SQLException {
final Schema decimalSchema = Decimal.schema(0);
final Schema schema = SchemaBuilder.struct().field("id", decimalSchema).build();
final Struct record = new Struct(schema).put("id", new BigDecimal(42));
assertEquals(42L, newQuerier().extractOffset(schema, record).getIncrementingOffset());
}

@Test(expected = ConnectException.class)
public void extractTooLargeDecimalOffset() throws SQLException {
final Schema decimalSchema = Decimal.schema(0);
final Schema schema = SchemaBuilder.struct().field("id", decimalSchema).build();
final Struct record = new Struct(schema).put("id", new BigDecimal(Long.MAX_VALUE).add(new BigDecimal(1)));
newQuerier().extractOffset(schema, record).getIncrementingOffset();
}

@Test(expected = ConnectException.class)
public void extractFractionalDecimalOffset() throws SQLException {
final Schema decimalSchema = Decimal.schema(2);
final Schema schema = SchemaBuilder.struct().field("id", decimalSchema).build();
final Struct record = new Struct(schema).put("id", new BigDecimal("42.42"));
newQuerier().extractOffset(schema, record).getIncrementingOffset();
}

private TimestampIncrementingTableQuerier newQuerier() {
return new TimestampIncrementingTableQuerier(TableQuerier.QueryMode.TABLE, null, "", null, "id", Collections.<String, Object>emptyMap(), 0L, null);
}

}