From 1ba1779eedd4aa8bd9f4b75e2d8162cbdfc77bd3 Mon Sep 17 00:00:00 2001 From: Abhinay Nagpal Date: Sat, 13 Oct 2012 12:14:41 -0700 Subject: [PATCH] Added support for Avro schema evolution in RO Stores --- .../readonly/mr/AvroStoreBuilderMapper.java | 46 +++- .../mr/azkaban/VoldemortBuildAndPushJob.java | 245 +++++++++++++++++- 2 files changed, 283 insertions(+), 8 deletions(-) diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java index 726dc8a29f..83909f0145 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java @@ -20,7 +20,9 @@ import java.io.StringReader; import java.nio.ByteBuffer; import java.security.MessageDigest; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.avro.generic.GenericData; import org.apache.avro.mapred.AvroCollector; @@ -35,9 +37,11 @@ import voldemort.cluster.Cluster; import voldemort.routing.ConsistentRoutingStrategy; import voldemort.serialization.DefaultSerializerFactory; +import voldemort.serialization.Serializer; import voldemort.serialization.SerializerDefinition; import voldemort.serialization.SerializerFactory; import voldemort.serialization.avro.AvroGenericSerializer; +import voldemort.serialization.avro.versioned.AvroVersionedGenericSerializer; import voldemort.store.StoreDefinition; import voldemort.store.compress.CompressionStrategy; import voldemort.store.compress.CompressionStrategyFactory; @@ -57,8 +61,8 @@ public class AvroStoreBuilderMapper extends protected MessageDigest md5er; protected ConsistentRoutingStrategy routingStrategy; - protected AvroGenericSerializer keySerializer; - protected AvroGenericSerializer valueSerializer; + protected Serializer keySerializer; + protected Serializer valueSerializer; private String keySchema; private String valSchema; @@ -138,6 +142,7 @@ public void configure(JobConf conf) { this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml"))); List storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml"))); + if(storeDefs.size() != 1) throw new IllegalStateException("Expected to find only a single store, but found multiple!"); this.storeDef = storeDefs.get(0); @@ -149,8 +154,9 @@ public void configure(JobConf conf) { this.saveKeys = conf.getBoolean("save.keys", true); this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false); - keySerializerDefinition = getStoreDef().getKeySerializer(); - valueSerializerDefinition = getStoreDef().getValueSerializer(); + // TODO this breaks but why? + // keySerializerDefinition = getStoreDef().getKeySerializer(); + // valueSerializerDefinition = getStoreDef().getValueSerializer(); try { SerializerFactory factory = new DefaultSerializerFactory(); @@ -160,15 +166,41 @@ public void configure(JobConf conf) { .newInstance(); } + keySerializer = factory.getSerializer(keySerializerDefinition); + valueSerializer = factory.getSerializer(valueSerializerDefinition); + keyField = conf.get("avro.key.field"); + valField = conf.get("avro.value.field"); keySchema = conf.get("avro.key.schema"); valSchema = conf.get("avro.val.schema"); - // hadoop.job.valueSchema - keySerializer = new AvroGenericSerializer(keySchema); - valueSerializer = new AvroGenericSerializer(valSchema); + if(keySerializerDefinition.getName().equals("avro-generic")) { + keySerializer = new AvroGenericSerializer(keySchema); + valueSerializer = new AvroGenericSerializer(valSchema); + } else { + + if(keySerializerDefinition.hasVersion()) { + Map versions = new HashMap(); + for(Map.Entry entry: keySerializerDefinition.getAllSchemaInfoVersions() + .entrySet()) + versions.put(entry.getKey(), entry.getValue()); + keySerializer = new AvroVersionedGenericSerializer(versions); + } else + keySerializer = new AvroVersionedGenericSerializer(keySerializerDefinition.getCurrentSchemaInfo()); + + if(valueSerializerDefinition.hasVersion()) { + Map versions = new HashMap(); + for(Map.Entry entry: valueSerializerDefinition.getAllSchemaInfoVersions() + .entrySet()) + versions.put(entry.getKey(), entry.getValue()); + valueSerializer = new AvroVersionedGenericSerializer(versions); + } else + valueSerializer = new AvroVersionedGenericSerializer(valueSerializerDefinition.getCurrentSchemaInfo()); + + } + } catch(Exception e) { throw new RuntimeException(e); } diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java index a147a518b5..e50562ffa8 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java @@ -29,6 +29,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -87,6 +88,8 @@ public class VoldemortBuildAndPushJob extends AbstractJob { private final String valueField; + private final boolean isAvroVersioned; + /* Informed stuff */ private final String informedURL = "http://informed.corp.linkedin.com/_post"; private final List informedResults; @@ -125,6 +128,11 @@ public VoldemortBuildAndPushJob(String name, Props props) { isAvroJob = props.getBoolean("build.type.avro", false); + // Set default to false + // this ensures existing clients who are not aware of the new serializer + // type dont bail out + isAvroVersioned = props.getBoolean("avro.serializer.versioned", false); + keyField = props.getString("avro.key.field", null); valueField = props.getString("avro.value.field", null); @@ -165,8 +173,10 @@ public void run() throws Exception { log.info("Working on " + url); try { - if(isAvroJob) + if(isAvroJob && !isAvroVersioned) verifyAvroSchema(url); + else if(isAvroJob && isAvroVersioned) + verifyAvroSchemaandVersions(url); else verifySchema(url); @@ -730,6 +740,239 @@ public void verifyAvroSchema(String url) throws Exception { } } + // Verify if the new avro schema being pushed is the same one as the last + // version present on the server + // supports schema evolution + + // can refactor 2 methods into one? + public void verifyAvroSchemaandVersions(String url) throws Exception { + // create new n store def with schema from the metadata in the input + // path + Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath()); + int replicationFactor = props.getInt("build.replication.factor", 2); + int requiredReads = props.getInt("build.required.reads", 1); + int requiredWrites = props.getInt("build.required.writes", 1); + String description = props.getString("push.store.description", ""); + String owners = props.getString("push.store.owners", ""); + + String keySchema = "\n\t\tavro-generic-versioned\n\t\t" + + schema.getField(keyField).schema() + "\n\t"; + String valSchema = "\n\t\tavro-generic-versioned\n\t\t" + + schema.getField(valueField).schema() + "\n\t"; + + boolean hasCompression = false; + if(props.containsKey("build.compress.value")) + hasCompression = true; + + if(hasCompression) { + valSchema += "\tgzip\n\t"; + } + + if(props.containsKey("build.force.schema.key")) { + keySchema = props.get("build.force.schema.key"); + } + + if(props.containsKey("build.force.schema.value")) { + valSchema = props.get("build.force.schema.value"); + } + + String newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName, + replicationFactor, + requiredReads, + requiredWrites, + props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads") + : null, + props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes") + : null, + (props.containsKey("push.force.schema.key")) ? props.getString("push.force.schema.key") + : keySchema, + (props.containsKey("push.force.schema.value")) ? props.getString("push.force.schema.value") + : valSchema, + description, + owners); + + log.info("Verifying store: \n" + newStoreDefXml.toString()); + + StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml); + + // get store def from cluster + log.info("Getting store definition from: " + url + " (node id " + this.nodeId + ")"); + + AdminClient adminClient = new AdminClient(url, new AdminClientConfig()); + try { + List remoteStoreDefs = adminClient.getRemoteStoreDefList(this.nodeId) + .getValue(); + boolean foundStore = false; + + // go over all store defs and see if one has the same name as the + // store we're trying + // to build + for(StoreDefinition remoteStoreDef: remoteStoreDefs) { + + if(remoteStoreDef.getName().equals(storeName)) { + // if the store already exists, but doesn't match what we + // want to push, we need + // to worry + + if(!remoteStoreDef.equals(newStoreDef)) { + + // let's check to see if the key/value serializers are + // REALLY equal. + SerializerDefinition localKeySerializerDef = newStoreDef.getKeySerializer(); + SerializerDefinition localValueSerializerDef = newStoreDef.getValueSerializer(); + SerializerDefinition remoteKeySerializerDef = remoteStoreDef.getKeySerializer(); + SerializerDefinition remoteValueSerializerDef = remoteStoreDef.getValueSerializer(); + + if(remoteKeySerializerDef.getName().equals("avro-generic-versioned") + && remoteValueSerializerDef.getName().equals("avro-generic-versioned")) { + + Schema remoteKeyDef = Schema.parse(remoteKeySerializerDef.getCurrentSchemaInfo()); + Schema remoteValDef = Schema.parse(remoteValueSerializerDef.getCurrentSchemaInfo()); + Schema localKeyDef = Schema.parse(localKeySerializerDef.getCurrentSchemaInfo()); + Schema localValDef = Schema.parse(localValueSerializerDef.getCurrentSchemaInfo()); + + if(remoteKeyDef.equals(localKeyDef) && remoteValDef.equals(localValDef)) { + + String compressionPolicy = ""; + if(hasCompression) { + compressionPolicy = "\n\t\tgzip"; + } + + // if the key/value serializers are REALLY equal + // (even though the strings may not match), then + // just use the remote stores to GUARANTEE that + // they + // match, and try again. + + String keySerializerStr = "\n\t\t" + + remoteKeySerializerDef.getName() + + ""; + + if(remoteKeySerializerDef.hasVersion()) { + + Map versions = new HashMap(); + for(Map.Entry entry: remoteKeySerializerDef.getAllSchemaInfoVersions() + .entrySet()) { + keySerializerStr += "\n\t\t " + + entry.getValue() + + "\n\t"; + } + + } else { + keySerializerStr = "\n\t\tavro-generic-versioned\n\t\t" + + remoteKeySerializerDef.getCurrentSchemaInfo() + + "\n\t"; + } + + keySchema = keySerializerStr; + String valueSerializerStr = "\n\t\t" + + remoteValueSerializerDef.getName() + + ""; + + if(remoteValueSerializerDef.hasVersion()) { + + Map versions = new HashMap(); + for(Map.Entry entry: remoteValueSerializerDef.getAllSchemaInfoVersions() + .entrySet()) { + valueSerializerStr += "\n\t\t " + + entry.getValue() + + "\n\t"; + } + valueSerializerStr += compressionPolicy + "\n\t"; + + } else { + + valueSerializerStr = "\n\t\tavro-generic-versioned\n\t\t" + + remoteValueSerializerDef.getCurrentSchemaInfo() + + "" + + compressionPolicy + + "\n\t"; + + } + valSchema = valueSerializerStr; + + newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName, + replicationFactor, + requiredReads, + requiredWrites, + props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads") + : null, + props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes") + : null, + keySerializerStr, + valueSerializerStr); + + newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml); + + if(!remoteStoreDef.equals(newStoreDef)) { + // if we still get a fail, then we know that + // the + // store defs don't match for reasons OTHER + // than + // the key/value serializer + throw new RuntimeException("Your store schema is identical, but the store definition does not match. Have: " + + newStoreDef + + "\nBut expected: " + + remoteStoreDef); + } + + } else { + // if the key/value serializers are not equal + // (even + // in java, not just json strings), then fail + throw new RuntimeException("Your store definition does not match the store definition that is already in the cluster. Tried to resolve identical schemas between local and remote, but failed. Have: " + + newStoreDef + + "\nBut expected: " + + remoteStoreDef); + } + } + } + + foundStore = true; + + break; + } + } + + // if the store doesn't exist yet, create it + if(!foundStore) { + + // New requirement - Make sure the user had description and + // owner specified + if(description.length() == 0) { + throw new RuntimeException("Description field missing in store definition. " + + "Please add \"push.store.description\" with a line describing your store"); + } + + if(owners.length() == 0) { + throw new RuntimeException("Owner field missing in store definition. " + + "Please add \"push.store.owners\" with value being comma-separated list of LinkedIn email ids"); + + } + + log.info("Could not find store " + storeName + + " on Voldemort. Adding it to all nodes "); + adminClient.addStore(newStoreDef); + } + + storeDefs = ImmutableList.of(VoldemortUtils.getStoreDef(VoldemortUtils.getStoreDefXml(storeName, + replicationFactor, + requiredReads, + requiredWrites, + props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads") + : null, + props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes") + : null, + keySchema, + valSchema))); + cluster = adminClient.getAdminClientCluster(); + } finally { + adminClient.stop(); + } + } + private class InformedClient implements Runnable { private Props props;