Skip to content

Commit

Permalink
- Refactored schema check method
Browse files Browse the repository at this point in the history
- added fix to the verioned serializer to support writing of objects
  created using the old schema
  • Loading branch information
abh1nay committed Oct 15, 2012
1 parent f2bfbc0 commit 20261b9
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 198 deletions.
Expand Up @@ -154,9 +154,8 @@ public void configure(JobConf conf) {
this.saveKeys = conf.getBoolean("save.keys", true);
this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false);

// TODO this breaks but why?
// keySerializerDefinition = getStoreDef().getKeySerializer();
// valueSerializerDefinition = getStoreDef().getValueSerializer();
keySerializerDefinition = getStoreDef().getKeySerializer();
valueSerializerDefinition = getStoreDef().getValueSerializer();

try {
SerializerFactory factory = new DefaultSerializerFactory();
Expand Down
Expand Up @@ -90,6 +90,14 @@ public class VoldemortBuildAndPushJob extends AbstractJob {

private final boolean isAvroVersioned;

private static final String AVRO_GENERIC_TYPE_NAME = "avro-generic";

// New serialization types for avro versioning support
// We cannot change existing serializer classes since
// this will break existing clients while looking for the version byte

private static final String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";

/* Informed stuff */
private final String informedURL = "http://informed.corp.linkedin.com/_post";
private final List<Future> informedResults;
Expand Down Expand Up @@ -173,10 +181,13 @@ public void run() throws Exception {
log.info("Working on " + url);

try {
if(isAvroJob && !isAvroVersioned)
verifyAvroSchema(url);
else if(isAvroJob && isAvroVersioned)
verifyAvroSchemaandVersions(url);
/*
* if(isAvroJob && !isAvroVersioned) verifyAvroSchema(url); else
* if(isAvroJob && isAvroVersioned)
* verifyAvroSchemaandVersions(url, isAvroVersioned);
*/
if(isAvroJob)
verifyAvroSchemaandVersions(url, isAvroVersioned);
else
verifySchema(url);

Expand Down Expand Up @@ -559,193 +570,11 @@ public String getValueSchema() throws IOException {

}

// Verify if the new avro schema being pushed is the same one as the old one
// Does not have logic to check for Avro schema evolution yet
public void verifyAvroSchema(String url) throws Exception {
// create new n store def with schema from the metadata in the input
// path
Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());
int replicationFactor = props.getInt("build.replication.factor", 2);
int requiredReads = props.getInt("build.required.reads", 1);
int requiredWrites = props.getInt("build.required.writes", 1);
String description = props.getString("push.store.description", "");
String owners = props.getString("push.store.owners", "");

String keySchema = "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
+ schema.getField(keyField).schema() + "</schema-info>\n\t";
String valSchema = "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
+ schema.getField(valueField).schema() + "</schema-info>\n\t";

boolean hasCompression = false;
if(props.containsKey("build.compress.value"))
hasCompression = true;

if(hasCompression) {
valSchema += "\t<compression><type>gzip</type></compression>\n\t";
}

if(props.containsKey("build.force.schema.key")) {
keySchema = props.get("build.force.schema.key");
}

if(props.containsKey("build.force.schema.value")) {
valSchema = props.get("build.force.schema.value");
}

String newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
replicationFactor,
requiredReads,
requiredWrites,
props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads")
: null,
props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes")
: null,
(props.containsKey("push.force.schema.key")) ? props.getString("push.force.schema.key")
: keySchema,
(props.containsKey("push.force.schema.value")) ? props.getString("push.force.schema.value")
: valSchema,
description,
owners);

log.info("Verifying store: \n" + newStoreDefXml.toString());

StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);

// get store def from cluster
log.info("Getting store definition from: " + url + " (node id " + this.nodeId + ")");

AdminClient adminClient = new AdminClient(url, new AdminClientConfig());
try {
List<StoreDefinition> remoteStoreDefs = adminClient.getRemoteStoreDefList(this.nodeId)
.getValue();
boolean foundStore = false;

// go over all store defs and see if one has the same name as the
// store we're trying
// to build
for(StoreDefinition remoteStoreDef: remoteStoreDefs) {
if(remoteStoreDef.getName().equals(storeName)) {
// if the store already exists, but doesn't match what we
// want to push, we need
// to worry
if(!remoteStoreDef.equals(newStoreDef)) {

// let's check to see if the key/value serializers are
// REALLY equal.
SerializerDefinition localKeySerializerDef = newStoreDef.getKeySerializer();
SerializerDefinition localValueSerializerDef = newStoreDef.getValueSerializer();
SerializerDefinition remoteKeySerializerDef = remoteStoreDef.getKeySerializer();
SerializerDefinition remoteValueSerializerDef = remoteStoreDef.getValueSerializer();

if(remoteKeySerializerDef.getName().equals("avro-generic")
&& remoteValueSerializerDef.getName().equals("avro-generic")
&& remoteKeySerializerDef.getAllSchemaInfoVersions().size() == 1
&& remoteValueSerializerDef.getAllSchemaInfoVersions().size() == 1) {
Schema remoteKeyDef = Schema.parse(remoteKeySerializerDef.getCurrentSchemaInfo());
Schema remoteValDef = Schema.parse(remoteValueSerializerDef.getCurrentSchemaInfo());
Schema localKeyDef = Schema.parse(localKeySerializerDef.getCurrentSchemaInfo());
Schema localValDef = Schema.parse(localValueSerializerDef.getCurrentSchemaInfo());

if(remoteKeyDef.equals(localKeyDef) && remoteValDef.equals(localValDef)) {
String compressionPolicy = "";
if(hasCompression) {
compressionPolicy = "\n\t\t<compression><type>gzip</type></compression>";
}

// if the key/value serializers are REALLY equal
// (even though the strings may not match), then
// just use the remote stores to GUARANTEE that
// they
// match, and try again.
newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
replicationFactor,
requiredReads,
requiredWrites,
props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads")
: null,
props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes")
: null,
"\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
+ remoteKeySerializerDef.getCurrentSchemaInfo()
+ "</schema-info>\n\t",
"\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
+ remoteValueSerializerDef.getCurrentSchemaInfo()
+ "</schema-info>"
+ compressionPolicy
+ "\n\t");

newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);

if(!remoteStoreDef.equals(newStoreDef)) {
// if we still get a fail, then we know that
// the
// store defs don't match for reasons OTHER
// than
// the key/value serializer
throw new RuntimeException("Your store schema is identical, but the store definition does not match. Have: "
+ newStoreDef
+ "\nBut expected: "
+ remoteStoreDef);
}
} else {
// if the key/value serializers are not equal
// (even
// in java, not just json strings), then fail
throw new RuntimeException("Your store definition does not match the store definition that is already in the cluster. Tried to resolve identical schemas between local and remote, but failed. Have: "
+ newStoreDef
+ "\nBut expected: "
+ remoteStoreDef);
}
}
}

foundStore = true;
break;
}
}

// if the store doesn't exist yet, create it
if(!foundStore) {
// New requirement - Make sure the user had description and
// owner specified
if(description.length() == 0) {
throw new RuntimeException("Description field missing in store definition. "
+ "Please add \"push.store.description\" with a line describing your store");
}

if(owners.length() == 0) {
throw new RuntimeException("Owner field missing in store definition. "
+ "Please add \"push.store.owners\" with value being comma-separated list of LinkedIn email ids");

}

log.info("Could not find store " + storeName
+ " on Voldemort. Adding it to all nodes ");
adminClient.addStore(newStoreDef);
}

storeDefs = ImmutableList.of(VoldemortUtils.getStoreDef(VoldemortUtils.getStoreDefXml(storeName,
replicationFactor,
requiredReads,
requiredWrites,
props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads")
: null,
props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes")
: null,
keySchema,
valSchema)));
cluster = adminClient.getAdminClientCluster();
} finally {
adminClient.stop();
}
}

// Verify if the new avro schema being pushed is the same one as the last
// version present on the server
// supports schema evolution

// can refactor 2 methods into one?
public void verifyAvroSchemaandVersions(String url) throws Exception {
public void verifyAvroSchemaandVersions(String url, boolean isVersioned) throws Exception {
// create new n store def with schema from the metadata in the input
// path
Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());
Expand All @@ -755,9 +584,18 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {
String description = props.getString("push.store.description", "");
String owners = props.getString("push.store.owners", "");

String keySchema = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
String serializerName;

if(isVersioned)
serializerName = AVRO_GENERIC_VERSIONED_TYPE_NAME;
else
serializerName = AVRO_GENERIC_TYPE_NAME;

String keySchema = "\n\t\t<type>" + serializerName
+ "</type>\n\t\t<schema-info version=\"0\">"
+ schema.getField(keyField).schema() + "</schema-info>\n\t";
String valSchema = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
String valSchema = "\n\t\t<type>" + serializerName
+ "</type>\n\t\t<schema-info version=\"0\">"
+ schema.getField(valueField).schema() + "</schema-info>\n\t";

boolean hasCompression = false;
Expand Down Expand Up @@ -823,8 +661,8 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {
SerializerDefinition remoteKeySerializerDef = remoteStoreDef.getKeySerializer();
SerializerDefinition remoteValueSerializerDef = remoteStoreDef.getValueSerializer();

if(remoteKeySerializerDef.getName().equals("avro-generic-versioned")
&& remoteValueSerializerDef.getName().equals("avro-generic-versioned")) {
if(remoteKeySerializerDef.getName().equals(serializerName)
&& remoteValueSerializerDef.getName().equals(serializerName)) {

Schema remoteKeyDef = Schema.parse(remoteKeySerializerDef.getCurrentSchemaInfo());
Schema remoteValDef = Schema.parse(remoteValueSerializerDef.getCurrentSchemaInfo());
Expand Down Expand Up @@ -860,7 +698,9 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {
}

} else {
keySerializerStr = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
keySerializerStr = "\n\t\t<type>"
+ serializerName
+ "</type>\n\t\t<schema-info version=\"0\">"
+ remoteKeySerializerDef.getCurrentSchemaInfo()
+ "</schema-info>\n\t";
}
Expand All @@ -884,10 +724,11 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {

} else {

valueSerializerStr = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
valueSerializerStr = "\n\t\t<type>"
+ serializerName
+ "</type>\n\t\t<schema-info version=\"0\">"
+ remoteValueSerializerDef.getCurrentSchemaInfo()
+ "</schema-info>"
+ compressionPolicy
+ "</schema-info>" + compressionPolicy
+ "\n\t";

}
Expand Down

0 comments on commit 20261b9

Please sign in to comment.