Skip to content

Commit

Permalink
OTF2: Fix bug in communicators view with asynchronous receive
Browse files Browse the repository at this point in the history
When there are asynchronous receive operations, 2 events are written in
the OTF2 trace. First there is an IRecvRequest event, then, when the
request is tested in the MPI program, there is the associated IRecv
event containing the information about the communicator. The
communicator view was bugged for these events and would not display the
first event.

To fix it, the information about the request and all the other events
following the unresolved request are stored until the IRecv event is
encountered. Then, the state system is updated with the informations
on the IRecvRequest, the following events and finally the IRecv event.
A trace with an IRecv event can be found here:
https://github.com/dorsal-lab/OTF2_testcases

Signed-off-by: yoann-heitz <yoann.heitz@polymtl.ca>
Change-Id: I9b8b1b5c1c1b3826aae3a5a1d49c01279a257971
Reviewed-on: https://git.eclipse.org/r/c/tracecompass.incubator/org.eclipse.tracecompass.incubator/+/189385
Tested-by: Trace Compass Bot <tracecompass-bot@eclipse.org>
Reviewed-by: Marco Miller <marco.miller@ericsson.com>
  • Loading branch information
yoann-heitz authored and marco-miller committed Feb 8, 2022
1 parent 78ba17c commit 0d5fa46
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 11 deletions.
Expand Up @@ -56,4 +56,9 @@ public interface IOtf2Events {
* MPI CollectiveEnd event name
*/
String OTF2_MPI_COLLECTIVE_END = "MpiCollectiveEnd"; //$NON-NLS-1$

/**
* MPI IRecv Request event name
*/
String OTF2_MPI_IRECV_REQUEST = "MpiIrecvRequest"; //$NON-NLS-1$
}
Expand Up @@ -14,11 +14,14 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.tracecompass.incubator.internal.otf2.core.analysis.AbstractOtf2StateProvider;
import org.eclipse.tracecompass.incubator.internal.otf2.core.analysis.IOtf2Constants;
import org.eclipse.tracecompass.incubator.internal.otf2.core.analysis.IOtf2Events;
Expand Down Expand Up @@ -51,6 +54,50 @@ public class Otf2CommunicatorsStateProvider extends AbstractOtf2StateProvider {
private static final String RANK = "Rank "; //$NON-NLS-1$
private static int VERSION_NUMBER = 1;

/*
* A class representing an IRecvRequest. For each asynchronous receive
* operation, an instance is partially constructed when the communication is
* launched and completely filled with the communicator informations when
* the request completion is checked.
*/
private class IRecvRequest {
private final String fOperationName;
private final long fRequestBeginTimestamp;
private final long fRequestID;
private Communicator fCommunicator;
private int fRank;
private long fRequestEndTimestamp;
private boolean fRequestResolved;

public IRecvRequest(String operationName, long requestBeginTimestamp, long requestID) {
fOperationName = operationName;
fRequestBeginTimestamp = requestBeginTimestamp;
fRequestID = requestID;
fCommunicator = new Communicator(-1);
fRank = UNKNOWN_RANK;
fRequestEndTimestamp = 0;
fRequestResolved = false;
}

}

/*
* A class implementing a triplet representing a change in the StateSystem.
* It contains the quark on which this change occurs, the new string value
* for this quark and the timestamp for this change
*/
private class StateSystemUpdateTriplet {
private final int fQuark;
private final @Nullable String fValue;
private final long fTimestamp;

public StateSystemUpdateTriplet(int quark, @Nullable String value, long timestamp) {
fQuark = quark;
fValue = value;
fTimestamp = timestamp;
}
}

/**
* A class representing a location and its state in this analysis.
*
Expand All @@ -63,19 +110,106 @@ private class Location {
private String fLatestEnteredRegion;
private long fLatestEnteredTimestamp;
private boolean fInCommunication;
private boolean fInIRecvRequest;
private int fRank;
private final List<MessageIdentifiers> fSentMessages;
private final Map<Long, Long> fRequestIdBeginTimestamp;
private final Deque<IRecvRequest> fIRecvRequests;
private final Deque<StateSystemUpdateTriplet> fPendingStateSystemUpdates;

public Location(long id) {
fId = id;
fCurrentCommunicator = new Communicator(UNKNOWN_RANK);
fLatestEnteredRegion = IOtf2Constants.UNKNOWN_STRING;
fLatestEnteredTimestamp = 0L;
fInCommunication = false;
fInIRecvRequest = false;
fRank = UNKNOWN_RANK;
fSentMessages = new ArrayList<>();
fRequestIdBeginTimestamp = new HashMap<>();
fIRecvRequests = new LinkedList<>();
fPendingStateSystemUpdates = new LinkedList<>();
}

/*
* This method is called when an IRecvRequest was checked and resolved
* (the corresponding IRecv event has been encountered). It updates the
* IRecvRequest object created for this request
*/
private void searchAndUpdateIRecvRequest(long requestID, Communicator communicator, int rank) {
for (IRecvRequest request : fIRecvRequests) {
if (request.fRequestID == requestID) {
request.fCommunicator = communicator;
request.fRank = rank;
request.fRequestResolved = true;
break;
}
}
}

/*
* This method flushes the StateSystem updates when an IRecvRequest has
* been completed. When an asynchronous Receive is encountered, no
* information about the communicator is given at first. First an
* IRecvRequest event is encountered without information about the
* communicator. It is only when the request is resolved that the
* information about the communicator can be processed. That is why it
* is needed to store the StateSystem updates between the begin of the
* IRecvRequest and its resolution and write all these updates in
* monotonic order when the request has been resolved and all
* information is available
*/
private void flushUpdatesUntilUnresolvedRequest() {
ITmfStateSystemBuilder ssb = getStateSystemBuilder();
if (ssb == null) {
return;
}
while (!fIRecvRequests.isEmpty() && fIRecvRequests.getFirst().fRequestResolved) {
IRecvRequest firstRequest = fIRecvRequests.removeFirst();
int requestCommunicatorQuark = firstRequest.fCommunicator.fQuark;
int requestAssociatedLocationQuark = ssb.getQuarkRelativeAndAdd(requestCommunicatorQuark, RANK + String.valueOf(firstRequest.fRank));
ssb.modifyAttribute(firstRequest.fRequestBeginTimestamp, firstRequest.fOperationName, requestAssociatedLocationQuark);
ssb.modifyAttribute(firstRequest.fRequestEndTimestamp, null, requestAssociatedLocationQuark);

long nextRequestTimestamp = fIRecvRequests.isEmpty() ? Long.MAX_VALUE : fIRecvRequests.getFirst().fRequestBeginTimestamp;
while (!fPendingStateSystemUpdates.isEmpty() && fPendingStateSystemUpdates.peekFirst().fTimestamp <= nextRequestTimestamp) {
StateSystemUpdateTriplet triplet = fPendingStateSystemUpdates.removeFirst();
ssb.modifyAttribute(triplet.fTimestamp, triplet.fValue, triplet.fQuark);
}
}
}

/*
* This method is called when all the trace has been read. It flushes
* the StateSystem updates that were stored during an unresolved request
*/
public void flushAllUpdates(ITmfStateSystemBuilder ssb) {
/*
* First, for each request that has been resolved, all other updates
* that happens before these requests are flushed
*/
for (IRecvRequest request : fIRecvRequests) {
if (request.fRequestResolved) {
while (!fPendingStateSystemUpdates.isEmpty() && fPendingStateSystemUpdates.peekFirst().fTimestamp <= request.fRequestBeginTimestamp) {
StateSystemUpdateTriplet triplet = fPendingStateSystemUpdates.removeFirst();
ssb.modifyAttribute(triplet.fTimestamp, triplet.fValue, triplet.fQuark);
}
int requestCommunicatorQuark = request.fCommunicator.fQuark;
int requestAssociatedLocationQuark = ssb.getQuarkRelativeAndAdd(requestCommunicatorQuark, RANK + String.valueOf(request.fRank));
ssb.modifyAttribute(request.fRequestBeginTimestamp, request.fOperationName, requestAssociatedLocationQuark);
ssb.modifyAttribute(request.fRequestEndTimestamp, null, requestAssociatedLocationQuark);
}
}

/*
* When all the updates associated to resolved requests and events
* occuring before them have been flushed, the remaining events are
* flushed.
*/
while (!fPendingStateSystemUpdates.isEmpty()) {
StateSystemUpdateTriplet triplet = fPendingStateSystemUpdates.removeFirst();
ssb.modifyAttribute(triplet.fTimestamp, triplet.fValue, triplet.fQuark);
}
}

/**
Expand All @@ -96,23 +230,62 @@ public void enter(ITmfEvent event) {
fLatestEnteredRegion = regionName;
}

/**
* This method is called when an OTF2 IRecvRequest is encountered. It
* means an asynchronous receive event has been encountered. However,
* this event does not contain any information on the communicator and
* the attributes can not be updated without this information. An
* IRecvRequest object is partially constructed and will be filled with
* the communicator information when the request is resolved.
*
* @param event
* the ITmfEvent associated to the OTF2 IRecvRequest event
*/

public void mpiIRecvRequest(ITmfEvent event) {
ITmfEventField content = event.getContent();
Long requestID = content.getFieldValue(Long.class, IOtf2Fields.OTF2_REQUEST_ID);
if (requestID == null) {
return;
}
fInCommunication = true;
fInIRecvRequest = true;
fRequestIdBeginTimestamp.put(requestID, fLatestEnteredTimestamp);
fIRecvRequests.add(new IRecvRequest(fLatestEnteredRegion, fLatestEnteredTimestamp, requestID));
}

/**
* This method is called when a location leaves a region associated to a
* MPI Communication and updates state of the attribute associated to
* the location
* the location if there is no unresolved IRecvRequest ongoing for this
* communicator, else, it stores the information for this update
*
* @param event
* the ITmfEvent associated to the OTF2 LeaveRegion event
* @param ssb
* the StateSystemBuilder used to update the state of the
* attribute
*/
public void leaveMpiCommunication(ITmfEvent event, ITmfStateSystemBuilder ssb) {
int communicatorQuark = fCurrentCommunicator.fQuark;
int associatedLocationQuark = ssb.getQuarkRelativeAndAdd(communicatorQuark, RANK + String.valueOf(fRank));
ssb.modifyAttribute(fLatestEnteredTimestamp, fLatestEnteredRegion, associatedLocationQuark);
ssb.modifyAttribute(event.getTimestamp().toNanos(), null, associatedLocationQuark);
fInCommunication = false;
public void leave(ITmfEvent event, ITmfStateSystemBuilder ssb) {
if (fInIRecvRequest) {
IRecvRequest iRecvRequest = fIRecvRequests.getLast();
iRecvRequest.fRequestEndTimestamp = event.getTimestamp().toNanos();
fInIRecvRequest = false;
return;
}
if (fInCommunication) {
int communicatorQuark = fCurrentCommunicator.fQuark;
int associatedLocationQuark = ssb.getQuarkRelativeAndAdd(communicatorQuark, RANK + String.valueOf(fRank));
long eventTimestamp = event.getTimestamp().toNanos();
if (fIRecvRequests.isEmpty()) {
ssb.modifyAttribute(fLatestEnteredTimestamp, fLatestEnteredRegion, associatedLocationQuark);
ssb.modifyAttribute(eventTimestamp, null, associatedLocationQuark);
} else {
fPendingStateSystemUpdates.add(new StateSystemUpdateTriplet(associatedLocationQuark, fLatestEnteredRegion, fLatestEnteredTimestamp));
fPendingStateSystemUpdates.add(new StateSystemUpdateTriplet(associatedLocationQuark, null, eventTimestamp));
}
fInCommunication = false;
}
}

public void mpiSend(ITmfEvent srcEvent, Communicator communicator) {
Expand Down Expand Up @@ -168,7 +341,12 @@ public void mpiRecv(ITmfEvent srcEvent, Communicator communicator, boolean isBlo
if (requestId == null) {
return;
}
Long requestBeginTimestamp = fRequestIdBeginTimestamp.get(requestId);

searchAndUpdateIRecvRequest(requestId, communicator, destRank);
if (fIRecvRequests.getFirst().fRequestID == requestId) {
flushUpdatesUntilUnresolvedRequest();
}
Long requestBeginTimestamp = fRequestIdBeginTimestamp.remove(requestId);
if (requestBeginTimestamp == null) {
return;
}
Expand Down Expand Up @@ -360,13 +538,16 @@ protected void eventHandle(ITmfEvent event) {
* If the event is the last one then we need to update the states
* changes for each communicator. This has to be done at the end since
* collective operations may overlap inside a communicator and state
* changes may happen in the past for communicator attributes.
* changes may happen in the past for communicator attributes. In the
* case were there were unresolved IRecv requests, stored updates for
* each location are also flushed.
*/
if (isLastEvent(event)) {
ITmfStateSystemBuilder ssb = getStateSystemBuilder();
if (ssb == null) {
return;
}
processLocationsAttributes(ssb);
processCommunicatorsAttributes(ssb);
}
}
Expand Down Expand Up @@ -427,11 +608,15 @@ protected void processOtf2Event(ITmfEvent event, String name, ITmfStateSystemBui
break;
}
case IOtf2Events.OTF2_LEAVE: {
if (location.fInCommunication) {
location.leaveMpiCommunication(event, ssb);
if (location.fInCommunication || location.fInIRecvRequest) {
location.leave(event, ssb);
}
break;
}
case IOtf2Events.OTF2_MPI_IRECV_REQUEST: {
location.mpiIRecvRequest(event);
break;
}
case IOtf2Events.OTF2_MPI_SEND:
case IOtf2Events.OTF2_MPI_ISEND:
case IOtf2Events.OTF2_MPI_RECV:
Expand Down Expand Up @@ -469,6 +654,17 @@ private void processCommunicatorsAttributes(ITmfStateSystemBuilder ssb) {
}
}

/*
* Updates the pending state changes of each locations in the case where
* some IRecvRequests were not resolved
*/
private void processLocationsAttributes(ITmfStateSystemBuilder ssb) {
for (Map.Entry<Long, Location> locationEntry : fMapLocation.entrySet()) {
Location location = locationEntry.getValue();
location.flushAllUpdates(ssb);
}
}

/*
* Calls the corresponding method from the associated location given the
* type of event
Expand Down

0 comments on commit 0d5fa46

Please sign in to comment.