NIFI-6295 fix deserialization issues with NiFiRecordSerDe for hive3streaming#3509
NIFI-6295 fix deserialization issues with NiFiRecordSerDe for hive3streaming#3509gideonkorir wants to merge 1 commit intoapache:masterfrom
Conversation
| case BYTE: | ||
| Integer bIntValue = record.getAsInt(fieldName); | ||
| val = bIntValue == null ? null : bIntValue.byteValue(); | ||
| Integer bIntValue = DataTypeUtils.toInteger(fieldValue, field.getDataType().getFormat()); |
There was a problem hiding this comment.
I see that the direct call to DataTypeUtils is pretty much equivalent to the record.getAs calls (because of the implementation of MapRecord for example), but I personally prefer the use of the Record interface since we're working with Record objects and it's possible (however unlikely) that the underlying implementation will change. Unfortunately that may also mean that getAs returns null (although it apparently doesn't for primitive types) which is why the extra checks are in there. I can appreciate the slight performance increase but I'd prefer we keep the current use of Record methods.
There was a problem hiding this comment.
AvroTypeUtil does seem to use DataTypeUtil rather than use the interface directly so was following that and not assuming the MapRecord implementation.
| } | ||
| Map<String, Integer> fieldPositionMap = null; | ||
| try { | ||
| fieldPositionMap = populateFieldPositionMap(record.getSchema(), structTypeInfo, log); |
There was a problem hiding this comment.
This was previously done in initialize() because it only needed to be done once per flowfile, rather than doing the same work every time a record is deserialized. Is there a reason it needs to be moved into deserialize()?
There was a problem hiding this comment.
The problem was that deserialize could call itself if the field value was a nested record, array/list of records or a map of key->Record. This bothered me quite a bit but I didn't know how exactly to handle it. My initial thoughts was to have a cache (a map basically) that I populate as I encounter new schema/struct type info; something to consider, we can have 2 exact schemas but different positions (imaging an account containing a list of accounts where the column names were re-ordered in the child collection) so the cache will need to handle that.
There was a problem hiding this comment.
Looks like I over-simplified the NiFiRecordSerDe code while porting from JsonSerDe. The latter had broken up the tasks of getting field names, extracting field values, and possibly recursively calling one or both of those methods for nested structures. I think we should look at JsonSerDe and do NiFiRecordSerDe likewise. It can result in getting that fieldPositionMap called multiple times per flow file, but like you said I think we need to do it. However they have a pretty good modular approach to which methods do which things, all we should have to do is map our Record methods to JsonSerDe's JSON parsing methods (nextToken() and such). Thoughts?
There was a problem hiding this comment.
I've checked out their code, will refactor to match what they've done. Looks cleaner than what I've got
| if (array == null) { | ||
| return null; | ||
| } | ||
| Object[] array = DataTypeUtils.toArray(fieldValue, field.getFieldName(), field.getDataType()); |
There was a problem hiding this comment.
Do we need to get the array element type rather than passing in field.getDataType()? The other structured data types (List, e.g.) below do that.
There was a problem hiding this comment.
Thanks will fix that
There was a problem hiding this comment.
Actually based on Hive Binary Design and Hive Types does it make sense to only support ByteArrayRef, byte[] and String via getBytes()?
| assertEquals("test", fields.get(7)); | ||
| assertEquals("test2", fields.get(8)); | ||
| assertEquals("c", fields.get(9)); | ||
| //assertEquals(AvroTypeUtil.convertByteArray(new Object[]{ (byte)1 }).array(), (byte[])fields.get(10)); |
There was a problem hiding this comment.
Should this be included or removed?
There was a problem hiding this comment.
I'm thinking it probably should, would have caught the bug you saw
| break; | ||
| case MAP: | ||
| val = record.getValue(fieldName); | ||
| //in nifi all maps are <String,?> so use that |
There was a problem hiding this comment.
Actually when running with a live NiFi instance, the maps are being represented as MapRecords not Map<String,Object> but I think we should handle both just in case. We had to fix this in #3424 for the other utilities, so we may need to check if it's an instance of Record or Map and handle them separately.
I took the liberty of starting a branch using this PR as a base and updating NiFiRecordSerDe to use the JsonSerDe approach to recursion as we discussed. I'll post the branch/PR when I'm finished, but wasn't sure if you are also actively working it or not. If so, perhaps we could collaborate or bring in each other's commits or something?
|
We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable. If you would like this PR re-opened you can do so and a committer can remove the stale tag. Or you can open a new PR. Try to help review other PRs to increase PR review bandwidth which in turn helps yours. |
Thank you for submitting a contribution to Apache NiFi.
Please provide a short description of the PR here:
Description of PR
Fixes deserialization of records in
NifiRecordSerDeadd the capability to: