Skip to content

Commit

Permalink
Added comments to the new functions
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 16, 2012
1 parent 20261b9 commit 5aacdc7
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
Expand Up @@ -210,7 +210,6 @@ public void configure(JobConf conf) {
routingStrategy = new ConsistentRoutingStrategy(getCluster().getNodes(),
getStoreDef().getReplicationFactor());

// /
Props props = HadoopUtils.getPropsFromJob(conf);

}
Expand Down
Expand Up @@ -181,13 +181,9 @@ public void run() throws Exception {
log.info("Working on " + url);

try {
/*
* if(isAvroJob && !isAvroVersioned) verifyAvroSchema(url); else
* if(isAvroJob && isAvroVersioned)
* verifyAvroSchemaandVersions(url, isAvroVersioned);
*/

if(isAvroJob)
verifyAvroSchemaandVersions(url, isAvroVersioned);
verifyAvroSchemaAndVersions(url, isAvroVersioned);
else
verifySchema(url);

Expand Down Expand Up @@ -574,7 +570,7 @@ public String getValueSchema() throws IOException {
// version present on the server
// supports schema evolution

public void verifyAvroSchemaandVersions(String url, boolean isVersioned) throws Exception {
public void verifyAvroSchemaAndVersions(String url, boolean isVersioned) throws Exception {
// create new n store def with schema from the metadata in the input
// path
Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());
Expand Down
Expand Up @@ -40,6 +40,7 @@
* representation is best for applications which deal with dynamic data, whose
* schemas are not known until runtime.
*
* This serializer supports schema versioning
*/
public class AvroVersionedGenericSerializer implements Serializer<Object> {

Expand Down Expand Up @@ -81,6 +82,12 @@ public byte[] toBytes(Object object) {
datumWriter.write(object, encoder);
encoder.flush();
} catch(ArrayIndexOutOfBoundsException aIOBE) {

// probably the object sent to us was not created using the latest
// schema
// We simply check the old version number and serialize it using the
// old schema version

Schema writer = ((GenericContainer) object).getSchema();
Integer writerVersion = getSchemaVersion(writer);
return toBytes(object, writer, writerVersion);
Expand All @@ -96,6 +103,12 @@ public byte[] toBytes(Object object) {
return output.toByteArray();
}

/*
* Serialize a given object using a non latest schema With auto rebootstrap
* the client gets the latest schema updated on the server However an
* application may still create objects using an old schema this lets us
* serialize those objects without an exception
*/
private byte[] toBytes(Object object, Schema writer, Integer writerVersion) {

ByteArrayOutputStream output = new ByteArrayOutputStream();
Expand Down

0 comments on commit 5aacdc7

Please sign in to comment.