Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple Avro schema version in Pulsar SQL #4847

Merged
merged 23 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0e0e9a0
Pulsar sql support schema version
congbobo Jul 30, 2019
5ba4030
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Sep 5, 2019
277ecfb
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Oct 10, 2019
b87e142
Add the decode byteBuf and modify test
congbobo Oct 10, 2019
244d22f
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Oct 11, 2019
1771681
modify the code style
congbobo Oct 11, 2019
137e1be
Modify the code check style
congbobo Oct 11, 2019
235048a
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Dec 4, 2019
123f5d8
Fix schema version provider key is byte[]
congbobo Dec 4, 2019
e06f6be
Fix some comments
congbobo Dec 9, 2019
1779434
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Dec 9, 2019
f8c11f0
Modify the codeStyle
congbobo Dec 9, 2019
78e86b5
Modify the check style
congbobo Dec 10, 2019
78df48c
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Feb 15, 2020
910b127
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Mar 10, 2020
57132c4
Key value add multiVersionSchema
congbobo Mar 10, 2020
51239bd
Fix some comment
congbobo Mar 17, 2020
bfce092
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Mar 19, 2020
59e2c40
no message
congbobo Mar 28, 2020
1cac019
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Mar 29, 2020
13ecb9a
Fix some comments
congbobo Mar 29, 2020
c9bd00f
Fix some comments
congbobo Mar 30, 2020
47aaccd
no message
congbobo Mar 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schema
}
}

protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
public static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
final Parser parser = new Parser();
return parser.parse(schemaJson);
}

protected static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
return SchemaInfo.builder()
.schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
.properties(schemaDefinition.getProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,12 @@ public interface RawMessage {
* @return the key of the message
*/
Optional<String> getKey();

/**
* Get the schema verison of the message
*
* @return the schema version of the message
*/
byte[] getSchemaVersion();

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class RawMessageImpl implements RawMessage {
private ReferenceCountedObject<MessageMetadata> msgMetadata;
private PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata;
private ByteBuf payload;
private MessageMetadata.Builder msgMetadataBuilder;

private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
@Override
Expand Down Expand Up @@ -71,6 +72,7 @@ public static RawMessage get(ReferenceCountedObject<MessageMetadata> msgMetadata
ByteBuf payload,
long ledgerId, long entryId, long batchIndex) {
RawMessageImpl msg = RECYCLER.get();
msg.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata.get());
msg.msgMetadata = msgMetadata;
msg.msgMetadata.retain();
msg.singleMessageMetadata = singleMessageMetadata;
Expand Down Expand Up @@ -140,4 +142,13 @@ public Optional<String> getKey() {
return Optional.empty();
}
}

@Override
public byte[] getSchemaVersion() {
if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get schema version by msgMetadata directly.

return msgMetadataBuilder.getSchemaVersion().toByteArray();
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,86 +18,101 @@
*/
package org.apache.pulsar.sql.presto;

import com.google.common.annotations.VisibleForTesting;
import io.airlift.log.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import java.io.IOException;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;

import java.util.List;

public class AvroSchemaHandler implements SchemaHandler {

private final DatumReader<GenericRecord> datumReader;

private final List<PulsarColumnHandle> columnHandles;

private static final FastThreadLocal<BinaryDecoder> decoders =
new FastThreadLocal<>();
private final GenericAvroSchema genericAvroSchema;

private final SchemaInfo schemaInfo;

private static final FastThreadLocal<byte[]> tmpBuffer = new FastThreadLocal<byte[]>() {
@Override
protected byte[] initialValue() {
return new byte[1024];
}
};

private static final Logger log = Logger.get(AvroSchemaHandler.class);

public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) {
this.datumReader = new GenericDatumReader<>(schema);
public AvroSchemaHandler(TopicName topicName, PulsarConnectorConfig pulsarConnectorConfig,
SchemaInfo schemaInfo, List<PulsarColumnHandle> columnHandles) throws PulsarClientException {
this.schemaInfo = schemaInfo;
this.genericAvroSchema = new GenericAvroSchema(schemaInfo);
this.genericAvroSchema
.setSchemaInfoProvider(new PulsarSqlSchemaInfoProvider(topicName, pulsarConnectorConfig.getPulsarAdmin()));
this.columnHandles = columnHandles;
}

@Override
public Object deserialize(ByteBuf payload) {

ByteBuf heapBuffer = null;
try {
BinaryDecoder decoderFromCache = decoders.get();

// Make a copy into a heap buffer, since Avro cannot deserialize directly from direct memory
int size = payload.readableBytes();
heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(size, size);
heapBuffer.writeBytes(payload);
AvroSchemaHandler(PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider, SchemaInfo schemaInfo, List<PulsarColumnHandle> columnHandles) {
this.schemaInfo = schemaInfo;
this.genericAvroSchema = new GenericAvroSchema(schemaInfo);
this.genericAvroSchema.setSchemaInfoProvider(pulsarSqlSchemaInfoProvider);
this.columnHandles = columnHandles;
}

BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
heapBuffer.readableBytes(), decoderFromCache);
if (decoderFromCache==null) {
decoders.set(decoder);
}
return this.datumReader.read(null, decoder);
} catch (IOException e) {
log.error(e);
} finally {
ReferenceCountUtil.safeRelease(heapBuffer);
@Override
public Object deserialize(RawMessage rawMessage) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just add a method deserialize(ByteBuf byteBuf, byte[] schemaVersion) to instead use RawMessage as an input param.

ByteBuf payload = rawMessage.getData();
int size = payload.readableBytes();
byte[] buffer = tmpBuffer.get();
if (buffer.length < size) {
// If the thread-local buffer is not big enough, replace it with
// a bigger one
buffer = new byte[size * 2];
tmpBuffer.set(buffer);
}
payload.readBytes(buffer, 0, size);
if (rawMessage.getSchemaVersion() != null) {
return genericAvroSchema.decode(buffer, rawMessage.getSchemaVersion());
} else {
return genericAvroSchema.decode(buffer);
}
return null;
}

@Override
public Object extractField(int index, Object currentRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the reason for the change in this method. I think to support multiple schema version decode does not affect extract field from the GenericRecord right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only add a deserialize method, and add a default interface for the keyPayload deserialize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about extractField(int index, Object currentRecord) method. I noticed the new change is read by field names and we use read by position index before, is the previous method not enough to support multiple schema versions?

try {
GenericRecord record = (GenericRecord) currentRecord;
GenericAvroRecord record = (GenericAvroRecord) currentRecord;
PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(index);
Integer[] positionIndices = pulsarColumnHandle.getPositionIndices();
Object curr = record.get(positionIndices[0]);
if (curr == null) {
return null;
}
if (positionIndices.length > 0) {
for (int i = 1 ; i < positionIndices.length; i++) {
curr = ((GenericRecord) curr).get(positionIndices[i]);
if (curr == null) {
return null;
}
String[] names = pulsarColumnHandle.getFieldNames();

if (names.length == 1) {
return record.getField(pulsarColumnHandle.getName());
} else {
for (int i = 0 ; i < names.length - 1; i++) {
record = (GenericAvroRecord) record.getField(names[i]);
}
return record.getField(names[names.length - 1]);
}
return curr;
} catch (Exception ex) {
log.debug(ex,"%s", ex);
}
return null;
}

@VisibleForTesting
GenericAvroSchema getSchema() {
return this.genericAvroSchema;
}

@VisibleForTesting
SchemaInfo getSchemaInfo() {
return schemaInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.common.api.raw.RawMessage;

public class JSONSchemaHandler implements SchemaHandler {

Expand All @@ -51,9 +52,10 @@ public JSONSchemaHandler(List<PulsarColumnHandle> columnHandles) {
}

@Override
public Object deserialize(ByteBuf payload) {
public Object deserialize(RawMessage rawMessage) {
// Since JSON deserializer only works on a byte[] we need to convert a direct mem buffer into
// a byte[].
ByteBuf payload = rawMessage.getData();
int size = payload.readableBytes();
byte[] buffer = tmpBuffer.get();
if (buffer.length < size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.facebook.presto.spi.type.Type;

import java.util.Arrays;
import java.util.List;

public class PulsarColumnMetadata extends ColumnMetadata {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/
package org.apache.pulsar.sql.presto;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;

import io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
Expand All @@ -43,8 +42,8 @@ public PulsarPrimitiveSchemaHandler(SchemaInfo schemaInfo) {
}

@Override
public Object deserialize(ByteBuf byteBuf) {
byte[] data = ByteBufUtil.getBytes(byteBuf);
public Object deserialize(RawMessage rawMessage) {
byte[] data = ByteBufUtil.getBytes(rawMessage.getData());
Object currentRecord = schema.decode(data);
switch (schemaInfo.getType()) {
case DATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,9 @@ private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit puls
this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
this.pulsarConnectorConfig = pulsarConnectorConfig;

this.schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
pulsarSplit.getSchemaInfo(),
columnHandles
);
this.schemaHandler = PulsarSchemaHandlers
.newPulsarSchemaHandler(this.topicName,
this.pulsarConnectorConfig, pulsarSplit.getSchemaInfo(), columnHandles);

log.info("Initializing split with parameters: %s", pulsarSplit);

Expand Down Expand Up @@ -401,7 +400,7 @@ public boolean advanceNextPosition() {
//start time for deseralizing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();

currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData());
currentRecord = this.schemaHandler.deserialize(this.currentMessage);
metricsTracker.incr_NUM_RECORD_DESERIALIZED();

// stats for time spend deserializing
Expand Down Expand Up @@ -457,7 +456,11 @@ public long getLong(int field) {
} else if (type.equals(TIME)) {
return ((Number) record).longValue();
} else if (type.equals(TIMESTAMP)) {
return ((Number) record).longValue();
if (record instanceof String) {
return Long.parseLong((String) record);
} else {
return ((Number) record).longValue();
}
} else if (type.equals(TIMESTAMP_WITH_TIME_ZONE)) {
return packDateTimeWithZone(((Number) record).longValue(), 0);
} else if (type.equals(TINYINT)) {
Expand Down Expand Up @@ -540,4 +543,9 @@ private void checkFieldType(int field, Class<?> expected) {
Class<?> actual = getType(field).getJavaType();
checkArgument(actual == expected, "Expected field %s to be type %s but is %s", field, expected, actual);
}

@VisibleForTesting
SchemaHandler getSchemaHandler() {
return this.schemaHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,40 @@
package org.apache.pulsar.sql.presto;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.facebook.presto.spi.PrestoException;
import java.util.List;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;

class PulsarSchemaHandlers {

static SchemaHandler newPulsarSchemaHandler(SchemaInfo schemaInfo,
List<PulsarColumnHandle> columnHandles) {
static SchemaHandler newPulsarSchemaHandler(TopicName topicName, PulsarConnectorConfig pulsarConnectorConfig, SchemaInfo schemaInfo,
List<PulsarColumnHandle> columnHandles) throws RuntimeException{
if (schemaInfo.getType().isPrimitive()) {
return new PulsarPrimitiveSchemaHandler(schemaInfo);
} else if (schemaInfo.getType().isStruct()) {
switch (schemaInfo.getType()) {
case JSON:
return new JSONSchemaHandler(columnHandles);
case AVRO:
return new AvroSchemaHandler(PulsarConnectorUtils
.parseSchema(new String(schemaInfo.getSchema(), UTF_8)
), columnHandles);
default:
throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
}

try {
switch (schemaInfo.getType()) {
case JSON:
return new JSONSchemaHandler(columnHandles);
case AVRO:
return new AvroSchemaHandler(topicName, pulsarConnectorConfig, schemaInfo, columnHandles);
default:
throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
}
} catch (PulsarClientException e) {
throw new RuntimeException(new Throwable("PulsarAdmin gets version schema fail, topicName : " + topicName.toString(), e));
}
} else {
throw new PrestoException(
NOT_SUPPORTED,
"Schema `" + schemaInfo.getType() + "` is not supported by presto yet : " + schemaInfo);
}

}

}
Loading