Skip to content

NIFI-12614: Create record reader service for Protobuf messages#8250

Closed
mark-bathori wants to merge 6 commits intoapache:mainfrom
mark-bathori:NIFI-12614
Closed

NIFI-12614: Create record reader service for Protobuf messages#8250
mark-bathori wants to merge 6 commits intoapache:mainfrom
mark-bathori:NIFI-12614

Conversation

@mark-bathori
Copy link
Contributor

Summary

The implementation does not generate message specific code with builders, instead it reads the schema from the provided proto file and maps it with the Unknown field list parsed from encoded proto data.

NIFI-12614

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@dan-s1
Copy link
Contributor

dan-s1 commented Jan 17, 2024

@mark-bathori With a big addition such as this, I would recommend some more documentation for end users so they are clear on how this works and for those who need to review the code. :)
This would go in your case under
nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html

To get ideas on how to format and what to include, take a look at the other readers and the documentation they have.

@mark-bathori
Copy link
Contributor Author

Thanks @dan-s1 for the comment. I've added additionDeatils page to the Reader in my latest commit.

@dan-s1
Copy link
Contributor

dan-s1 commented Jan 31, 2024

@exceptionfactory Can you please restart the dependency-check? The exception does not seem to relate to the changes. Thanks!

Comment on lines +29 to +30
Further information about Protocol Buffers can be found here:
<a href="https://protobuf.dev/">protobuf.dev</a>
Copy link
Contributor

@dan-s1 dan-s1 Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mark-bathori Thanks for adding these additional details. I did have one question as I see on this referenced site there is Protobuf versions 2 and 3. Does the reader you created support both versions or only one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Reader supports both proto2 and proto3 versions, only exception is the GROUPS feature since the schema reader doesn't support it. This feature was already deprecated in proto2 and not included in proto3.

Comment on lines +65 to +66
private String message;
private Schema protoSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private String message;
private Schema protoSchema;
private volatile String message;
private volatile Schema protoSchema;

@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final String protoDirectory = context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
message = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The designation message is consistently used throughout this module when it's really about message type. It can be confusing.

Suggested change
message = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
messageType = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();

Comment on lines +179 to +190
private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException {
final ProtoType protoType = protoField.getProtoType();
if (protoType.isScalar()) {
switch (FieldType.findValue(protoType.getSimpleName())) {
case STRING:
return resolveFieldValue(protoField, values, ByteString::toStringUtf8);
case BYTES:
return resolveFieldValue(protoField, values, ByteString::toByteArray);
default:
throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," +
" [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be easer to maintain and understand if we got rid of the FieldType enum end used predefined maps for the type-specific converter functions.

Suggested change
private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException {
final ProtoType protoType = protoField.getProtoType();
if (protoType.isScalar()) {
switch (FieldType.findValue(protoType.getSimpleName())) {
case STRING:
return resolveFieldValue(protoField, values, ByteString::toStringUtf8);
case BYTES:
return resolveFieldValue(protoField, values, ByteString::toByteArray);
default:
throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," +
" [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName()));
}
private Map<String, Function<ByteString, Object>> scalarLengthDelimitedValueProviders = Map.of(
"string", ByteString::toStringUtf8,
"bytes", ByteString::toByteArray
);
private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException {
final Object convertedValue;
final ProtoType protoType = protoField.getProtoType();
if (protoType.isScalar()) {
final Function<ByteString, Object> valueConverter = scalarLengthDelimitedValueProviders.get(protoType.getSimpleName());
if (valueConverter == null) {
throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," +
" [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName()));
} else {
convertedValue = resolveFieldValue(protoField, values, valueConverter);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FieldType enum cannot be removed since it is being used in the ProtoSchemaParser for DataType matching. If I remove it I need to define the scalar proto field types in two different place which is not efficient. Instead I changed the Enum matching to assign only the 'valueConverter' function so the multiple 'return' were removed.

}
}

private <T> Object resolveFieldValue(ProtoField protoField, List<T> values, Function<T, Object> getValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private <T> Object resolveFieldValue(ProtoField protoField, List<T> values, Function<T, Object> getValue) {
private <T> Object resolveFieldValue(ProtoField protoField, List<T> values, Function<T, Object> valueConverter) {

* @param unknownField field's value
* @param values Map of values
*/
private void getField(ProtoField protoField, UnknownFieldSet.Field unknownField, Map<String, Object> values) throws InvalidProtocolBufferException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of methods with in-out parameters but if we think it's worth it to make the code more readable a more appropriate signature would be welcome.

Suggested change
private void getField(ProtoField protoField, UnknownFieldSet.Field unknownField, Map<String, Object> values) throws InvalidProtocolBufferException {
private void collectFieldValue(Map<String, Object> fieldNameToConvertedValue, ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException {

}

@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This called from the @OnEnabled SchemaRegistryService.storeSchemaAccessStrategy. At the same time it depends on ProtobufReader's own onEnabled method. My Java runtime gathers the @OnEnabled methods in an alphanumeric order of the name class so it works but we probably shouldn't rely on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this by creating a Controller Service with name starting with letter 'Z' and the @onEnabled methods were called in proper order. It seems like they are being called in class inheritance order.

private RecordSchema recordSchema;
private boolean inputProcessed;

public ProtobufRecordReader(InputStream inputStream, String message, Schema protoSchema, RecordSchema recordSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public ProtobufRecordReader(InputStream inputStream, String message, Schema protoSchema, RecordSchema recordSchema) {
I feel the ordering of the fields and parameters could be more consistent.

Comment on lines +28 to +38
public ProtoField(String fieldName, ProtoType protoType, boolean repeatable) {
this.fieldName = fieldName;
this.protoType = protoType;
this.repeatable = repeatable;
}

public ProtoField(Field field) {
this.fieldName = field.getName();
this.protoType = field.getType();
this.repeatable = field.isRepeated();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public ProtoField(String fieldName, ProtoType protoType, boolean repeatable) {
this.fieldName = fieldName;
this.protoType = protoType;
this.repeatable = repeatable;
}
public ProtoField(Field field) {
this.fieldName = field.getName();
this.protoType = field.getType();
this.repeatable = field.isRepeated();
}
public ProtoField(Field field) {
this(field.getName(), field.getType(),field.isRepeated());
}
public ProtoField(String fieldName, ProtoType protoType, boolean repeatable) {
this.fieldName = fieldName;
this.protoType = protoType;
this.repeatable = repeatable;
}

Comment on lines +19 to +47
message Proto3Message {
bool field1 = 1;
string field2 = 2;
int32 field3 = 3;
uint32 field4 = 4;
sint32 field5 = 5;
fixed32 field6 = 6;
sfixed32 field7 = 7;
double field8 = 8;
float field9 = 9;
bytes field10 = 10;
int64 field11 = 11;
uint64 field12 = 12;
sint64 field13 = 13;
fixed64 field14 = 14;
sfixed64 field15 = 15;
NestedMessage nestedMessage = 16;
}

message NestedMessage {
TestEnum testEnum = 20;
repeated string repeatedField = 21;
oneof oneOfField {
string option1 = 22;
bool option2 = 23;
int32 option3 = 24;
}
map<string, int32> testMap = 25;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More descriptive field names would help reading the test code much easier.
(Similar changes would be good in test_proto2.proto as well.)

Suggested change
message Proto3Message {
bool field1 = 1;
string field2 = 2;
int32 field3 = 3;
uint32 field4 = 4;
sint32 field5 = 5;
fixed32 field6 = 6;
sfixed32 field7 = 7;
double field8 = 8;
float field9 = 9;
bytes field10 = 10;
int64 field11 = 11;
uint64 field12 = 12;
sint64 field13 = 13;
fixed64 field14 = 14;
sfixed64 field15 = 15;
NestedMessage nestedMessage = 16;
}
message NestedMessage {
TestEnum testEnum = 20;
repeated string repeatedField = 21;
oneof oneOfField {
string option1 = 22;
bool option2 = 23;
int32 option3 = 24;
}
map<string, int32> testMap = 25;
}
message Proto3Message {
bool booleanField = 1;
string stringField = 2;
int32 int32Field = 3;
uint32 uint32Field = 4;
sint32 sint32Field = 5;
fixed32 fixed32Field = 6;
sfixed32 sfixed32Field = 7;
double doubleField = 8;
float floatField = 9;
bytes bytesField = 10;
int64 int64Field = 11;
uint64 uint64Field = 12;
sint64 sint64Field = 13;
fixed64 fixed64Field = 14;
sfixed64 sfixed64Field = 15;
NestedMessage nestedMessage = 16;
}
message NestedMessage {
TestEnum testEnum = 20;
repeated string repeatedField = 21;
oneof oneOfField {
string stringOption = 22;
bool booleanOption = 23;
int32 int32Option = 24;
}
map<string, int32> testMap = 25;
}


assertArrayEquals(new Object[] {"Repeated 1", "Repeated 2", "Repeated 3"}, (Object[]) nestedRecord.getValue("repeatedField"));

assertEquals(3, nestedRecord.getValue("option1"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test kind of hides the fact that the though originally "optionX" is set, when I write the record to JSON for example the result will contain the name of the oneof field instead. The original field name should be there in my opinion.

public class TestProtoSchemaParser {

@Test
public void testSchemaParserForProto3() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking a prebuilt expected schema can help reduce boiler-plate:

Suggested change
public void testSchemaParserForProto3() {
public void testSchemaParserForProto3() {
final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault());
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + "test_proto3.proto")), Collections.emptyList());
final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schemaLoader.loadSchema());
final SimpleRecordSchema expected = new SimpleRecordSchema(Arrays.asList(
new RecordField("booleanField", RecordFieldType.BOOLEAN.getDataType()),
new RecordField("stringField", RecordFieldType.STRING.getDataType()),
new RecordField("int32Field", RecordFieldType.INT.getDataType()),
new RecordField("uint32Field", RecordFieldType.LONG.getDataType()),
new RecordField("sint32Field", RecordFieldType.LONG.getDataType()),
new RecordField("fixed32Field", RecordFieldType.LONG.getDataType()),
new RecordField("sfixed32Field", RecordFieldType.INT.getDataType()),
new RecordField("doubleField", RecordFieldType.DOUBLE.getDataType()),
new RecordField("floatField", RecordFieldType.FLOAT.getDataType()),
new RecordField("bytesField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())),
new RecordField("int64Field", RecordFieldType.LONG.getDataType()),
new RecordField("uint64Field", RecordFieldType.BIGINT.getDataType()),
new RecordField("sint64Field", RecordFieldType.LONG.getDataType()),
new RecordField("fixed64Field", RecordFieldType.BIGINT.getDataType()),
new RecordField("sfixed64Field", RecordFieldType.LONG.getDataType()),
new RecordField("nestedMessage", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
new RecordField("testEnum", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("ENUM_VALUE_1", "ENUM_VALUE_2", "ENUM_VALUE_3"))),
new RecordField("repeatedField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())),
new RecordField("testMap", RecordFieldType.MAP.getMapDataType(RecordFieldType.INT.getDataType())),
new RecordField(
"oneOfField",
RecordFieldType.CHOICE.getChoiceDataType(
RecordFieldType.STRING.getDataType(),
RecordFieldType.BOOLEAN.getDataType(),
RecordFieldType.INT.getDataType()
),
new HashSet<>(Arrays.asList("stringOption", "booleanOption", "int32Option"))
)
))))
));
final RecordSchema actual = schemaParser.createSchema("Proto3Message");
assertEquals(expected, actual);
}

Comment on lines +51 to +53
final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault());
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + "test_proto3.proto")), Collections.emptyList());
protoSchema = schemaLoader.loadSchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are creating the schema around 9 times across 3 test classes in very similar if not outright the same way. We could probably move this logic into ProtoTestUtil.

Comment on lines +371 to +370
/**
* Gets the fully qualified name of the message type.
*
* @param typeName name of the message
* @return fully qualified name of the message type
*/
private String normalizeTypeName(String typeName) {
return typeName.substring(typeName.lastIndexOf('/') + 1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. Fully qualified = normalized = part after last '/' ?

* @return created record
* @throws IOException failed to read input stream
*/
public MapRecord createRecord(InputStream data) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. The method states that it needs the room message. It relies on the constructor to get the root message type.
If we consider this to be the case I suggest adding a javadoc to the constructor and rename message to not merely messageType but to rootMessageType.

However in all fairness this class seems to be able to handle any message type. The only constraint is that the message type and the nifi record schema are consistent with each other.

return handleAnyField(unknownFieldSet);
}

return new MapRecord(recordSchema, processMessageFields(messageType, unknownFieldSet), false, dropUnknownFields);
Copy link
Contributor

@tpalfy tpalfy Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to not have a single expression do 2 things that could be considered business logic.

Suggested change
return new MapRecord(recordSchema, processMessageFields(messageType, unknownFieldSet), false, dropUnknownFields);
final Map<String, Object> fieldValues = processMessageFields(messageType, unknownFieldSet);
return new MapRecord(recordSchema, fieldValues, false, dropUnknownFields);

if (coerceTypes) {
Optional<RecordField> recordField = rootRecordSchema.getField(protoField.getFieldName());
if (recordField.isPresent()) {
resultValues = resultValues.stream().map(o -> DataTypeUtils.convertType(o, recordField.get().getDataType(), recordField.get().getFieldName())).toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resultValues = resultValues.stream().map(o -> DataTypeUtils.convertType(o, recordField.get().getDataType(), recordField.get().getFieldName())).toList();
resultValues = resultValues.stream().map(value -> DataTypeUtils.convertType(value, recordField.get().getDataType(), recordField.get().getFieldName())).toList();

@tpalfy
Copy link
Contributor

tpalfy commented Apr 10, 2024

LGTM
Thank your for work @mark-bathori and for the review @dan-s1 !
Merged into main.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants