diff --git a/src/main/java/com/odiago/flumebase/io/AvroEventParser.java b/src/main/java/com/odiago/flumebase/io/AvroEventParser.java index 6d0cc51..fa74d76 100644 --- a/src/main/java/com/odiago/flumebase/io/AvroEventParser.java +++ b/src/main/java/com/odiago/flumebase/io/AvroEventParser.java @@ -141,30 +141,33 @@ public boolean validate(StreamSymbol streamSym) { schemaFields = mSchema.getFields(); } catch (AvroRuntimeException are) { // This wasn't a record schema, it was a single field or something. - LOG.error("Schemas for events must be of record type."); + LOG.error("Schemas for events must be of record type. Each column must " + + "represent a field of the same name."); return false; } List columnFields = streamSym.getFields(); - if (schemaFields.size() != columnFields.size()) { - LOG.error("The schema specified for this stream has a different number " - + "of fields than are specified in the stream definition."); - return false; - } - - for (int i = 0; i < schemaFields.size(); i++) { + for (int i = 0; i < columnFields.size(); i++) { TypedField col = columnFields.get(i); Type colType = col.getType(); - Schema.Field schemaField = schemaFields.get(i); - - if (!schemaField.name().equals(col.getUserAlias())) { - // TODO -- does this matter? We address these fields by index anyway, not by name.. - // Just warn them. - LOG.warn("Column " + col.getUserAlias() + " is aligned with an Avro field " - + "with the name: " + schemaField.name()); + + // Get the schema field that matches this column name. + Schema.Field schemaField = null; + for (Schema.Field testSchemaField : schemaFields) { + if (testSchemaField.name().equals(col.getUserAlias())) { + schemaField = testSchemaField; + break; + } + } + + if (null == schemaField) { + // Can't find a field in the schema to match the current column. + LOG.error("The Avro schema does not contain a field with the same name " + + "as column '" + col.getUserAlias() + "'."); + return false; } - // More important: are the schemas compatible? + // Are the schemas compatible? if (!schemaField.schema().equals(colType.getAvroSchema())) { boolean warned = false;