forked from voldemort/voldemort
/
AvroVersionedGenericSerializer.java
171 lines (140 loc) · 5.97 KB
/
AvroVersionedGenericSerializer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* Copyright 2011 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.serialization.avro.versioned;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import voldemort.serialization.SerializationException;
import voldemort.serialization.SerializationUtils;
import voldemort.serialization.Serializer;
/**
* Avro serializer that uses the generic representation for Avro data. This
* representation is best for applications which deal with dynamic data, whose
* schemas are not known until runtime.
*
* This serializer supports schema versioning
*/
public class AvroVersionedGenericSerializer implements Serializer<Object> {
private final SortedMap<Integer, String> typeDefVersions;
private final Integer newestVersion;
// reader's schema
private final Schema typeDef;
/**
* Constructor accepting the schema definition as a JSON string.
*
* @param schema a serialized JSON object representing a Avro schema.
*/
public AvroVersionedGenericSerializer(String schema) {
this.typeDefVersions = new TreeMap<Integer, String>();
this.typeDefVersions.put(0, schema);
newestVersion = typeDefVersions.lastKey();
typeDef = Schema.parse(typeDefVersions.get(newestVersion));
}
public AvroVersionedGenericSerializer(Map<Integer, String> typeDefVersions) {
this.typeDefVersions = new TreeMap<Integer, String>(typeDefVersions);
newestVersion = this.typeDefVersions.lastKey();
typeDef = Schema.parse(typeDefVersions.get(newestVersion));
}
public byte[] toBytes(Object object) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
Encoder encoder = new BinaryEncoder(output);
GenericDatumWriter<Object> datumWriter = null;
output.write(newestVersion.byteValue());
try {
datumWriter = new GenericDatumWriter<Object>(typeDef);
datumWriter.write(object, encoder);
encoder.flush();
} catch(ArrayIndexOutOfBoundsException aIOBE) {
// probably the object sent to us was not created using the latest
// schema
// We simply check the old version number and serialize it using the
// old schema version
Schema writer = ((GenericContainer) object).getSchema();
Integer writerVersion = getSchemaVersion(writer);
return toBytes(object, writer, writerVersion);
} catch(IOException e) {
throw new SerializationException(e);
} catch(SerializationException sE) {
throw sE;
} finally {
SerializationUtils.close(output);
}
return output.toByteArray();
}
/*
* Serialize a given object using a non latest schema With auto rebootstrap
* the client gets the latest schema updated on the server However an
* application may still create objects using an old schema this lets us
* serialize those objects without an exception
*/
private byte[] toBytes(Object object, Schema writer, Integer writerVersion) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
Encoder encoder = new BinaryEncoder(output);
GenericDatumWriter<Object> datumWriter = null;
output.write(writerVersion.byteValue());
try {
datumWriter = new GenericDatumWriter<Object>(writer);
datumWriter.write(object, encoder);
encoder.flush();
} catch(IOException e) {
throw new SerializationException(e);
} catch(SerializationException sE) {
throw sE;
} finally {
SerializationUtils.close(output);
}
return output.toByteArray();
}
private Integer getSchemaVersion(Schema s) throws SerializationException {
for(Entry<Integer, String> entry: typeDefVersions.entrySet()) {
Schema version = Schema.parse(entry.getValue());
if(s.equals(version))
return entry.getKey();
}
throw new SerializationException("Writer's schema invalid!");
}
public Object toObject(byte[] bytes) {
Integer version = Integer.valueOf(bytes[0]);
if(version > newestVersion)
throw new SerializationException("Client needs to rebootstrap! \n Writer's schema version greater than Reader");
Schema typeDefWriter = Schema.parse(typeDefVersions.get(version));
byte[] dataBytes = new byte[bytes.length - 1];
System.arraycopy(bytes, 1, dataBytes, 0, bytes.length - 1);
Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(dataBytes, null);
GenericDatumReader<Object> reader = null;
try {
reader = new GenericDatumReader<Object>(typeDefWriter, typeDef);
// writer's schema
reader.setSchema(typeDefWriter);
// Reader's schema
reader.setExpected(typeDef);
return reader.read(null, decoder);
} catch(IOException e) {
throw new SerializationException(e);
}
}
}