Skip to content

Commit

Permalink
[FLINK-3304] Making the Avro Schema serializable.
Browse files Browse the repository at this point in the history
This closes #1635
  • Loading branch information
kl0u authored and rmetzger committed Feb 16, 2016
1 parent 65d0805 commit c658763
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
Expand Up @@ -27,15 +27,16 @@
import org.apache.flink.core.fs.Path;

import java.io.IOException;
import java.io.Serializable;

public class AvroOutputFormat<E> extends FileOutputFormat<E> {
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {

private static final long serialVersionUID = 1L;

private final Class<E> avroValueType;

private Schema userDefinedSchema = null;

private transient Schema userDefinedSchema = null;
private transient DataFileWriter<E> dataFileWriter;

public AvroOutputFormat(Path filePath, Class<E> type) {
Expand Down Expand Up @@ -66,7 +67,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);

DatumWriter<E> datumWriter;
Schema schema = null;
Schema schema;
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
datumWriter = new SpecificDatumWriter<E>(avroValueType);
try {
Expand All @@ -88,6 +89,31 @@ public void open(int taskNumber, int numTasks) throws IOException {
}
}

private void writeObject(java.io.ObjectOutputStream out) throws IOException {
out.defaultWriteObject();

if(userDefinedSchema != null) {
byte[] json = userDefinedSchema.toString().getBytes();
out.writeInt(json.length);
out.write(json);
} else {
out.writeInt(0);
}
}

private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();

int length = in.readInt();
if(length != 0) {
byte[] json = new byte[length];
in.read(json);

Schema schema = new Schema.Parser().parse(new String(json));
setSchema(schema);
}
}

@Override
public void close() throws IOException {
dataFileWriter.flush();
Expand Down
Expand Up @@ -68,7 +68,9 @@ protected void testProgram() throws Exception {

//output the data with AvroOutputFormat for specific user type
DataSet<User> specificUser = input.map(new ConvertToUser());
specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
specificUser.write(avroOutputFormat, outputPath1);

//output the data with AvroOutputFormat for reflect user type
DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
Expand Down

0 comments on commit c658763

Please sign in to comment.