Skip to content

Commit

Permalink
Relax AvroEventParser schema validation rules.
Browse files Browse the repository at this point in the history
The Schema may have more fields than the stream defines columns.
Column names and schema fields do not need to be in the same order.
  • Loading branch information
kimballa committed May 14, 2011
1 parent beaca97 commit a5726c8
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions src/main/java/com/odiago/flumebase/io/AvroEventParser.java
Expand Up @@ -141,30 +141,33 @@ public boolean validate(StreamSymbol streamSym) {
schemaFields = mSchema.getFields();
} catch (AvroRuntimeException are) {
// This wasn't a record schema, it was a single field or something.
LOG.error("Schemas for events must be of record type.");
LOG.error("Schemas for events must be of record type. Each column must "
+ "represent a field of the same name.");
return false;
}
List<TypedField> columnFields = streamSym.getFields();

if (schemaFields.size() != columnFields.size()) {
LOG.error("The schema specified for this stream has a different number "
+ "of fields than are specified in the stream definition.");
return false;
}

for (int i = 0; i < schemaFields.size(); i++) {
for (int i = 0; i < columnFields.size(); i++) {
TypedField col = columnFields.get(i);
Type colType = col.getType();
Schema.Field schemaField = schemaFields.get(i);

if (!schemaField.name().equals(col.getUserAlias())) {
// TODO -- does this matter? We address these fields by index anyway, not by name..
// Just warn them.
LOG.warn("Column " + col.getUserAlias() + " is aligned with an Avro field "
+ "with the name: " + schemaField.name());

// Get the schema field that matches this column name.
Schema.Field schemaField = null;
for (Schema.Field testSchemaField : schemaFields) {
if (testSchemaField.name().equals(col.getUserAlias())) {
schemaField = testSchemaField;
break;
}
}

if (null == schemaField) {
// Can't find a field in the schema to match the current column.
LOG.error("The Avro schema does not contain a field with the same name "
+ "as column '" + col.getUserAlias() + "'.");
return false;
}

// More important: are the schemas compatible?
// Are the schemas compatible?
if (!schemaField.schema().equals(colType.getAvroSchema())) {
boolean warned = false;

Expand Down

0 comments on commit a5726c8

Please sign in to comment.