APEXCORE-756: Fix ConcurrentModificationException in GroupingManager and code refactoring #554
Conversation
@vrozov @tweise @bhupeshchawda addressed review comments. Can you please review and merge? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to ConcurrentModificationException in the JIRA and the commit. Note that the exception affects not only tests, the fix is required in production code.
@@ -16,16 +16,17 @@ | |||
* specific language governing permissions and limitations | |||
* under the License. | |||
*/ | |||
package org.apache.apex.stram; | |||
package org.apache.apex.engine; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be better to introduce a package (like org.apache.apex.engine.events.grouping) for classes related to the functionality.
@@ -39,7 +40,7 @@ | |||
public class GroupingManager | |||
{ | |||
private static final GroupingManager groupingManager = new GroupingManager(); | |||
private Map<String, GroupingRequest> groupingRequests = Maps.newHashMap(); | |||
private Map<String, GroupingRequest> groupingRequests = Maps.newConcurrentMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is concurrent necessary? If yes, the pattern used in addOrModifyGroupingRequest is subject to a data loss.
for (Entry<String, GroupingRequest> request : groupingRequests.entrySet()) { | ||
Iterator<Entry<String, GroupingRequest>> itr = groupingRequests.entrySet().iterator(); | ||
while (itr.hasNext()) { | ||
Entry<String, GroupingRequest> request = itr.next(); | ||
if (request.getValue().getOperatorsToDeploy().size() == 0 | ||
&& request.getValue().getOperatorsToUndeploy().size() == 0) { | ||
LOG.info("Removing for :" + request.getKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use sl4j logging. Does it need to be info level?
for (Entry<String, GroupingRequest> request : groupingRequests.entrySet()) { | ||
Iterator<Entry<String, GroupingRequest>> itr = groupingRequests.entrySet().iterator(); | ||
while (itr.hasNext()) { | ||
Entry<String, GroupingRequest> request = itr.next(); | ||
if (request.getValue().getOperatorsToDeploy().size() == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduce isDone() or isComplete() into the GroupingRequest.
Has any testing be done after these changes to verify that concurrent container failures are handled? |
53174db
to
11f0793
Compare
Addressed review comments as well as did testing for recursive/concurrent failures. |
*/ | ||
public boolean isProcessed() | ||
{ | ||
if (getOperatorsToDeploy().size() == 0 && getOperatorsToUndeploy().size() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return operatorsToDeploy.isEmpty() && operatorsToUndeploy.isEmpty();
while (itr.hasNext()) { | ||
Entry<String, GroupingRequest> request = itr.next(); | ||
if (request.getValue().isProcessed()) { | ||
LOG.debug("Removing Grouping request for :" + request.getKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use sl4j
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vrozov can you please help me understand what you mean by sl4j. I have used slf4j logger here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use {} parameters instead of string concat. It is a basic thing that all contributors should be aware of.
groupingRequests.remove(request.getKey()); | ||
Iterator<Entry<String, GroupingRequest>> itr = groupingRequests.entrySet().iterator(); | ||
while (itr.hasNext()) { | ||
Entry<String, GroupingRequest> request = itr.next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please rename to entry. The actual request is itr.next().getValue();
@DT-Priyanka please document on the original JIRA how testing for concurrent container failures was done. IMO #479 and APEXCORE-602 should have not been closed without it. We cannot merge PRs before elementary things are covered. In this case, since changes affect the core aspects of the system, there is a need for more extensive testing (and documentation of the same). |
@tweise thanks for feedback. I have updated Jira. We did extensive testing, but didn't update it in jira. Will make a point to update jira with testing from now on. |
11f0793
to
f1ba9d1
Compare
@DT-Priyanka Is this PR ready for review or more changes are coming? |
f1ba9d1
to
0cbd4c9
Compare
@vrozov PR is already ready for review. I am addressing review comments as I am getting. Just pushed the last commit and have resolved all comments till now. |
*/ | ||
public boolean isProcessed() | ||
{ | ||
if (getOperatorsToDeploy().isEmpty() && getOperatorsToUndeploy().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my original comment. The result of isProcessed() is the value of operatorsToDeploy.isEmpty() && operatorsToUndeploy.isEmpty(). There is no need to extra if
…and code refactoring
0cbd4c9
to
4d1cad2
Compare
No description provided.