Navigation Menu

Skip to content

Commit

Permalink
Added support for Avro schema evolution in RO Stores
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 15, 2012
1 parent 0c4bef5 commit 1ba1779
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 8 deletions.
Expand Up @@ -20,7 +20,9 @@
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroCollector;
Expand All @@ -35,9 +37,11 @@
import voldemort.cluster.Cluster;
import voldemort.routing.ConsistentRoutingStrategy;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.serialization.avro.AvroGenericSerializer;
import voldemort.serialization.avro.versioned.AvroVersionedGenericSerializer;
import voldemort.store.StoreDefinition;
import voldemort.store.compress.CompressionStrategy;
import voldemort.store.compress.CompressionStrategyFactory;
Expand All @@ -57,8 +61,8 @@ public class AvroStoreBuilderMapper extends

protected MessageDigest md5er;
protected ConsistentRoutingStrategy routingStrategy;
protected AvroGenericSerializer keySerializer;
protected AvroGenericSerializer valueSerializer;
protected Serializer keySerializer;
protected Serializer valueSerializer;

private String keySchema;
private String valSchema;
Expand Down Expand Up @@ -138,6 +142,7 @@ public void configure(JobConf conf) {

this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml")));
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml")));

if(storeDefs.size() != 1)
throw new IllegalStateException("Expected to find only a single store, but found multiple!");
this.storeDef = storeDefs.get(0);
Expand All @@ -149,8 +154,9 @@ public void configure(JobConf conf) {
this.saveKeys = conf.getBoolean("save.keys", true);
this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false);

keySerializerDefinition = getStoreDef().getKeySerializer();
valueSerializerDefinition = getStoreDef().getValueSerializer();
// TODO this breaks but why?
// keySerializerDefinition = getStoreDef().getKeySerializer();
// valueSerializerDefinition = getStoreDef().getValueSerializer();

try {
SerializerFactory factory = new DefaultSerializerFactory();
Expand All @@ -160,15 +166,41 @@ public void configure(JobConf conf) {
.newInstance();
}

keySerializer = factory.getSerializer(keySerializerDefinition);
valueSerializer = factory.getSerializer(valueSerializerDefinition);

keyField = conf.get("avro.key.field");

valField = conf.get("avro.value.field");

keySchema = conf.get("avro.key.schema");
valSchema = conf.get("avro.val.schema");

// hadoop.job.valueSchema
keySerializer = new AvroGenericSerializer(keySchema);
valueSerializer = new AvroGenericSerializer(valSchema);
if(keySerializerDefinition.getName().equals("avro-generic")) {
keySerializer = new AvroGenericSerializer(keySchema);
valueSerializer = new AvroGenericSerializer(valSchema);
} else {

if(keySerializerDefinition.hasVersion()) {
Map<Integer, String> versions = new HashMap<Integer, String>();
for(Map.Entry<Integer, String> entry: keySerializerDefinition.getAllSchemaInfoVersions()
.entrySet())
versions.put(entry.getKey(), entry.getValue());
keySerializer = new AvroVersionedGenericSerializer(versions);
} else
keySerializer = new AvroVersionedGenericSerializer(keySerializerDefinition.getCurrentSchemaInfo());

if(valueSerializerDefinition.hasVersion()) {
Map<Integer, String> versions = new HashMap<Integer, String>();
for(Map.Entry<Integer, String> entry: valueSerializerDefinition.getAllSchemaInfoVersions()
.entrySet())
versions.put(entry.getKey(), entry.getValue());
valueSerializer = new AvroVersionedGenericSerializer(versions);
} else
valueSerializer = new AvroVersionedGenericSerializer(valueSerializerDefinition.getCurrentSchemaInfo());

}

} catch(Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 1ba1779

Please sign in to comment.