Skip to content

Commit

Permalink
Add Joda time logical type conversion. (#6704)
Browse files Browse the repository at this point in the history
After upgrade to Apache Avro 1.9.x, the default time conversion changed to JSR-310. For forwarding compatibility, we'd better add the Joda time conversion.

related to #5938

Add joda time conversions

New integration test added

(cherry picked from commit 854716f)

Handle conflict
  • Loading branch information
codelipenghui authored and tuteng committed Apr 13, 2020
1 parent b234eff commit 56b92d1
Show file tree
Hide file tree
Showing 20 changed files with 355 additions and 71 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/ci-integration-process.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ jobs:
- name: checkout
uses: actions/checkout@v1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
- name: run install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: build pulsar image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build pulsar-all image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build artifacts and docker image
run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests

Expand Down
21 changes: 21 additions & 0 deletions .github/workflows/ci-integration-thread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ jobs:
- name: checkout
uses: actions/checkout@v1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
- name: run install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: build pulsar image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build pulsar-all image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build artifacts and docker image
run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests

Expand Down
17 changes: 8 additions & 9 deletions .github/workflows/ci-unit-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,20 @@ jobs:
with:
java-version: 1.8

- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
if: steps.docs.outputs.changed_only == 'no'
with:
maven-version: 3.6.1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
free -h
- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
if: steps.docs.outputs.changed_only == 'no'
with:
maven-version: 3.6.1
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
Expand Down
13 changes: 13 additions & 0 deletions .github/workflows/ci-unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ jobs:
with:
maven-version: 3.6.1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: run unit tests
run: mvn -B -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR install '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!ProxyTest' -DfailIfNoTests=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ static <T> SchemaDefinitionBuilder<T> builder() {
*/
boolean getAlwaysAllowNull();

/**
* Get JSR310 conversion enabled.
*
* @return return true if enable JSR310 conversion. false means use Joda time conversion.
*/
boolean isJsr310ConversionEnabled();

/**
* Get schema class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ public interface SchemaDefinitionBuilder<T> {
*/
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);

/**
* Set schema use JRS310 conversion or not.
*
* <p>Before Avro 1.9 the Joda time library was used for handling the logical date(time) values.
* But since the introduction of Java8 the Java Specification Request (JSR) 310 has been included,
* which greatly improves the handling of date and time natively. To keep forwarding compatibility,
* default is use Joda time conversion.
*
* <p>JSR310 conversion is recommended here. Joda time conversion is has been marked deprecated.
* In future versions, joda time conversion may be removed
*
* @param jsr310ConversionEnabled use JRS310 conversion or not, default is false for keep forwarding compatibility
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);

/**
* Set schema info properties.
*
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-messagecrypto-bc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@
<artifactId>bouncy-castle-bc-shaded</artifactId>
<version>${project.parent.version}</version>
</dependency>

</dependencies>
</project>
6 changes: 6 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.JodaTimeConversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -42,36 +43,14 @@
public class AvroSchema<T> extends StructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);

// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();

reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());

ReflectData reflectDataNotAllowNull = ReflectData.get();

reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

private ClassLoader pojoClassLoader;

private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
super(schemaInfo);
this.pojoClassLoader = pojoClassLoader;
setReader(new AvroReader<>(schema, pojoClassLoader));
setWriter(new AvroWriter<>(schema));
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
setReader(new AvroReader<>(schema, pojoClassLoader, jsr310ConversionEnabled));
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
}

@Override
Expand Down Expand Up @@ -116,7 +95,8 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition(), schemaInfo.toString());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader);
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader, jsr310ConversionEnabled);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
Expand All @@ -125,4 +105,30 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
}
}

private static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
if (schemaInfo != null) {
return Boolean.parseBoolean(schemaInfo.getProperties()
.getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
}
return false;
}

public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) {
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
if (jsr310ConversionEnabled) {
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
} else {
try {
Class.forName("org.joda.time.DateTime");
reflectData.addLogicalTypeConversion(new JodaTimeConversions.TimestampConversion());
} catch (ClassNotFoundException e) {
// Skip if have not provide joda-time dependency.
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T> {

public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull";
public static final String JSR310_CONVERSION_ENABLED = "__jsr310ConversionEnabled";

/**
* the schema definition class
*/
private Class<T> clazz;

/**
* The flag of schema type always allow null
*
Expand All @@ -48,6 +50,13 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
*/
private boolean alwaysAllowNull = true;

/**
* The flag of use JSR310 conversion or Joda time conversion.
*
* If value is true, use JSR310 conversion in the Avro schema. Otherwise, use Joda time conversion.
*/
private boolean jsr310ConversionEnabled = false;

/**
* The schema info properties
*/
Expand All @@ -69,6 +78,12 @@ public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
return this;
}

@Override
public SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled) {
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
return this;
}

@Override
public SchemaDefinitionBuilder<T> addProperty(String key, String value) {
this.properties.put(key, value);
Expand Down Expand Up @@ -107,8 +122,10 @@ public SchemaDefinition<T> build() {
checkArgument(!(StringUtils.isNotBlank(jsonDef) && clazz != null),
"Not allowed to set pojo and jsonDef both for the schema definition.");

properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning);
properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
jsr310ConversionEnabled);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,24 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
* false you can define the field by yourself by the annotation@Nullable
*
*/
private boolean alwaysAllowNull;
private final boolean alwaysAllowNull;

private Map<String, String> properties;
private final Map<String, String> properties;

private String jsonDef;
private final String jsonDef;

private boolean supportSchemaVersioning;
private final boolean supportSchemaVersioning;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties, boolean supportSchemaVersioning) {
private final boolean jsr310ConversionEnabled;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties,
boolean supportSchemaVersioning, boolean jsr310ConversionEnabled) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
this.supportSchemaVersioning = supportSchemaVersioning;
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
}
/**
* get schema whether always allow null or not
Expand All @@ -66,6 +70,11 @@ public boolean getAlwaysAllowNull() {
return alwaysAllowNull;
}

@Override
public boolean isJsr310ConversionEnabled() {
return jsr310ConversionEnabled;
}

/**
* Get json schema definition
*
Expand Down
Loading

0 comments on commit 56b92d1

Please sign in to comment.