-
Notifications
You must be signed in to change notification settings - Fork 152
APEXMALHAR-2417 Adding Pojo Outer join accumulation #568
APEXMALHAR-2417 Adding Pojo Outer join accumulation #568
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make all the files evolving.
* Join Accumulation for Pojo Streams. | ||
* | ||
*/ | ||
public abstract class AbstractPojoJoin<InputT1, InputT2> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this @evolving
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the classes except PojoInnerJoin needs to be marked as Evolving. This is not done completely yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take care of comments. Otherwise, looks good.
|
||
public AbstractPojoJoin(Class<?> outClass, String... keys) | ||
{ | ||
if (keys.length % 2 != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why %2? How should user know in which orders the keys should be mentioned... Suggesting to follow either of the 2 approaches:
- 2 parameters to contructor with 2 different keyExpressions.
- 2 parameters to constructor with 2 string[].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Chinmay for this input, I will create a separate Jira under APEXMALHAR-2413 for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Jira does not capture it. Moreover, please close this Jira and create a seperate Jira for performance improvement of PojoJoins..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened Jira https://issues.apache.org/jira/browse/APEXMALHAR-2429 for this
{ | ||
protected final String[] keys; | ||
protected final Class<?> outClass; | ||
private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use Map instead of List? for all the 3 variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream1; | ||
private transient List<KeyValPair<String,PojoUtils.Getter>> gettersStream2; | ||
private transient List<KeyValPair<String,PojoUtils.Setter>> setters; | ||
public transient Set<String> keySetStream2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can keySetStream1 & keySetStream2 be private variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you can make it protected..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
{ | ||
// TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected. | ||
|
||
input.getClass().getDeclaredField(keys[index]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the purpose of this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
public List<?> getOutput(List<List<Map<String, Object>>> accumulatedValue) | ||
{ | ||
// List<Map<String, Object>> result = new ArrayList<>(); | ||
if (setters == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A minor knit... Just to be modular...
if (seters == null) {
// create setters
}
if (keySetStream1 == null) {
// populate set.
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will call additional if condition everytime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
*/ | ||
public class PojoInnerJoin<InputT1, InputT2> | ||
implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>> | ||
extends AbstractPojoJoin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractPojoJoin<InputT1, InputT2>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly for other classes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* | ||
*/ | ||
public abstract class AbstractPojoJoin<InputT1, InputT2> | ||
implements MergeAccumulation<InputT1, InputT2, List<List<Map<String, Object>>>, List<?>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a suggest here in terms of how accumulation data structure is created... Instead of having a list of map... we can have List<MultiMap<Object, Object>>.. The key to multimap is a unique object generated for set of keys configured... and value of multimap can contain more than one object for the same key.
This way, while making the comparison, it becomes straightforward how to get the matching pairs.
And I believe it'll reduce the cost of iterating over the various maps in place.
Just something to consider later on for improvsation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Chinmay for this input, I will create a separate Jira under APEXMALHAR-2413 for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Jira does not capture it. Moreover, please close this Jira and create a seperate Jira for performance improvement of PojoJoins..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
@Override | ||
public Map<String, Object> joinTwoMapsWithKeys(Map map1, Map map2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems to be doing the same thing as the one present in super class. Remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@chinmaykolhatkar have made the changes please review and merge. |
@Hitesh-Scorpio Some changes like creation of right Jira and Making the classes Evolving not done. I'll merge once this is done. |
98e13a8
to
a634e64
Compare
@chinmaykolhatkar have created the suggested Jira's have squashed the comments and rebased. |
6d9ae0a
to
4b36bf3
Compare
@chinmaykolhatkar @tushargosavi please review