Skip to content
Permalink
Browse files
[FLINK-27255] [flink-avro] flink-avro does not support ser/de of larg…
…e avro schema (#19645)

Co-authored-by: Haizhou Zhao <haizhou_zhao@apple.com>
  • Loading branch information
haizhou-zhao and Haizhou Zhao committed May 10, 2022
1 parent 1dcad22 commit 610164d57dd13eab029a1ac0ecc0776bf6db342d
Showing 6 changed files with 153 additions and 10 deletions.
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,10 +105,15 @@ public boolean canEqual(Object obj) {
}

private void writeObject(ObjectOutputStream oos) throws IOException {
oos.writeUTF(schema.toString());
byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8);
oos.writeInt(schemaStrInBytes.length);
oos.write(schemaStrInBytes);
}

private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
this.schema = new Schema.Parser().parse(ois.readUTF());
int len = ois.readInt();
byte[] content = new byte[len];
ois.readFully(content);
this.schema = new Schema.Parser().parse(new String(content, StandardCharsets.UTF_8));
}
}
@@ -28,6 +28,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

/** A wrapper for Avro {@link Schema}, that is Java serializable. */
@Internal
@@ -52,14 +53,18 @@ private void writeObject(ObjectOutputStream oos) throws IOException {
oos.writeBoolean(false);
} else {
oos.writeBoolean(true);
oos.writeUTF(schema.toString(false));
byte[] schemaStrInBytes = schema.toString(false).getBytes(StandardCharsets.UTF_8);
oos.writeInt(schemaStrInBytes.length);
oos.write(schemaStrInBytes);
}
}

private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
if (ois.readBoolean()) {
String schema = ois.readUTF();
this.schema = new Parser().parse(schema);
int len = ois.readInt();
byte[] content = new byte[len];
ois.readFully(content);
this.schema = new Parser().parse(new String(content, StandardCharsets.UTF_8));
} else {
this.schema = null;
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.typeutils;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.formats.avro.utils.AvroTestUtils;

import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link GenericRecordAvroTypeInfo}. */
public class AvroGenericRecordTypeInfoTest
extends TypeInformationTestBase<GenericRecordAvroTypeInfo> {

@Override
protected GenericRecordAvroTypeInfo[] getTestData() {
return new GenericRecordAvroTypeInfo[] {
new GenericRecordAvroTypeInfo(AvroTestUtils.getSmallSchema()),
new GenericRecordAvroTypeInfo(AvroTestUtils.getLargeSchema())
};
}

@Test
void testAvroByDefault() {
final TypeSerializer<GenericRecord> serializer =
new GenericRecordAvroTypeInfo(AvroTestUtils.getLargeSchema())
.createSerializer(new ExecutionConfig());
assertThat(serializer).isInstanceOf(AvroSerializer.class);
}
}
@@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.formats.avro.utils.AvroTestUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -28,11 +29,7 @@
/** Test for {@link AvroSerializer} that tests GenericRecord. */
public class AvroSerializerGenericRecordTest extends SerializerTestBase<GenericRecord> {

private static final Schema SCHEMA =
new org.apache.avro.Schema.Parser()
.parse(
"{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\": "
+ "[{\"name\":\"afield\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
private static final Schema SCHEMA = AvroTestUtils.getSmallSchema();

@Override
protected TypeSerializer<GenericRecord> createSerializer() {
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro.typeutils;

import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.formats.avro.utils.AvroTestUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;

/** Test for {@link AvroSerializer} that tests GenericRecord. */
public class AvroSerializerLargeGenericRecordTest extends SerializerTestBase<GenericRecord> {

private static final Schema SCHEMA = AvroTestUtils.getLargeSchema();

@Override
protected TypeSerializer<GenericRecord> createSerializer() {
return new AvroSerializer<>(GenericRecord.class, SCHEMA);
}

@Override
protected int getLength() {
return -1;
}

@Override
protected Class<GenericRecord> getTypeClass() {
return GenericRecord.class;
}

@Override
protected GenericRecord[] getTestData() {
return new GenericRecord[] {
new GenericRecordBuilder(SCHEMA).set("field1", "foo bar").build()
};
}
}
@@ -24,9 +24,11 @@
import org.apache.flink.formats.avro.generated.Fixed16;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.typeutils.AvroSerializerLargeGenericRecordTest;
import org.apache.flink.types.Row;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
@@ -259,6 +261,34 @@ public static Tuple3<GenericRecord, Row, Schema> getGenericTestData() {
return t;
}

/**
* Craft a large Avro Schema which contains more than 0xFFFF characters.
*
* <p>0xFFFF is the magical number that once a java string length is above it, then the
* serialization scheme changes
*/
public static Schema getLargeSchema() {
SchemaBuilder.FieldAssembler<Schema> fields =
SchemaBuilder.record("LargeAvroSchema")
.namespace(AvroSerializerLargeGenericRecordTest.class.getName())
.fields();
for (int i = 0; i < 10000; ++i) {
fields = fields.optionalString("field" + i);
}
Schema schema = fields.endRecord();

assert schema.toString().length() > 0xFFFF;
return schema;
}

/** Craft a small Avro Schema which contains less than 0xFFFF characters. */
public static Schema getSmallSchema() {
return new org.apache.avro.Schema.Parser()
.parse(
"{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\": "
+ "[{\"name\":\"afield\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
}

/**
* Writes given record using specified schema.
*

0 comments on commit 610164d

Please sign in to comment.