Skip to content
This repository has been archived by the owner on Apr 2, 2023. It is now read-only.

Fix duplicate host issue for aurora scheduler, DTCPIOPS-5221 #445

Merged
merged 4 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ public Iterable<HostOffer> getOrdered(TaskGroupKey groupKey, ResourceRequest res
ScheduleRequest scheduleRequest = createRequest(goodOffers, resourceRequest, startTime);
LOG.info("Sending request {}", scheduleRequest.jobKey);
String responseStr = sendRequest(scheduleRequest);
orderedOffers = processResponse(goodOffers, responseStr);
} catch (IOException e) {
orderedOffers = processResponse(goodOffers, responseStr, badOffers.size());
} catch (Exception e) {
LOG.error("Failed to schedule the task of {} using {} ",
resourceRequest.getTask().getJob().toString(), endpoint, e);
HttpOfferSetImpl.incrementFailureCount();
Expand Down Expand Up @@ -325,36 +325,33 @@ private String sendRequest(ScheduleRequest scheduleRequest) throws IOException {
}
}

List<HostOffer> processResponse(List<HostOffer> mOffers, String responseStr)
List<HostOffer> processResponse(List<HostOffer> mOffers, String responseStr, int badOfferSize)
throws IOException {
ScheduleResponse response = jsonMapper.readValue(responseStr, ScheduleResponse.class);
LOG.info("Received {} offers", response.hosts.size());

Map<String, HostOffer> offerMap = mOffers.stream()
.collect(Collectors.toMap(offer -> offer.getAttributes().getHost(), offer -> offer));
if (!response.error.trim().isEmpty()) {
LOG.error("Unable to receive offers from {} due to {}", endpoint, response.error);
throw new IOException(response.error);
}

List<HostOffer> orderedOffers = response.hosts.stream()
.map(host -> offerMap.get(host))
.filter(offer -> offer != null)
.collect(Collectors.toList());
List<String> extraOffers = response.hosts.stream()
.filter(host -> offerMap.get(host) == null)
.collect(Collectors.toList());

//offSetDiff is the total number of missing offers and the extra offers
long offSetDiff = mOffers.size() - (response.hosts.size() - extraOffers.size())
+ extraOffers.size();
// Use Map<String, List<HostOffer>> to fix offers with duplicate host name issue
Map<String, List<HostOffer>> offerMap = mOffers.stream().
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved
collect(Collectors.groupingBy(offer -> offer.getOffer().getHostname(),
Collectors.mapping(offer -> offer, Collectors.toList())));
List<HostOffer> orderedOffers = response.hosts.stream().map(host -> offerMap.get(host))
.filter(offerList -> offerList != null && !offerList.isEmpty()).
flatMap(e -> e.stream()).collect(Collectors.toList());
List<String> extraOffers = response.hosts.stream().filter(host -> offerMap.get(host) == null).
collect(Collectors.toList());

//offSetDiff is the value of the difference between Aurora offers and response offers
long offSetDiff = mOffers.size() + badOfferSize - orderedOffers.size() + extraOffers.size();
offerSetDiffList.add(offSetDiff);
if (offSetDiff > 0) {
LOG.warn("The number of different offers between the original and received offer sets is {}",
offSetDiff);
if (LOG.isDebugEnabled()) {
List<String> missedOffers = mOffers.stream()
.map(offer -> offer.getAttributes().getHost())
List<String> missedOffers = offerMap.keySet().stream()
.filter(host -> !response.hosts.contains(host))
lawwong1 marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
LOG.debug("missed offers: {}", missedOffers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@ public class HttpOfferSetImplTest extends EasyMockTest {
private static final HostOffer OFFER_C = new HostOffer(
Offers.makeOffer("OFFER_C", HOST_C),
IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C)));
private static final HostOffer OFFER_C1 = new HostOffer(
Offers.makeOffer("OFFER_C1", HOST_C),
IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C)));
private static final String HOST_D = "HOST_D";

private final Storage storage = MemStorageModule.newEmptyStorage();

private HttpOfferSetImpl httpOfferSet;
private Set<HostOffer> offers;
private HttpOfferSetImpl duplicateHostsHttpOfferSet;

@Before
public void setUp() throws IOException {
Expand Down Expand Up @@ -112,6 +116,20 @@ public void setUp() throws IOException {
0,
0,
false);

// duplicate host offers
Set<HostOffer> duplicateHostOffers = new HashSet<>();
duplicateHostOffers.add(OFFER_A);
duplicateHostOffers.add(OFFER_B);
duplicateHostOffers.add(OFFER_C);
duplicateHostOffers.add(OFFER_C1);

duplicateHostsHttpOfferSet = new HttpOfferSetImpl(duplicateHostOffers,
0,
new URL("http://localhost:9090/v1/offerset"),
0,
0,
false);
}

@Test
Expand All @@ -124,7 +142,7 @@ public void testProcessResponse() throws IOException {

List<HostOffer> mOffers = ImmutableList.copyOf(httpOfferSet.values());

List<HostOffer> sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
List<HostOffer> sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 3);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
Expand All @@ -135,7 +153,7 @@ public void testProcessResponse() throws IOException {
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_C + "\"]}";
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
Expand All @@ -147,7 +165,7 @@ public void testProcessResponse() throws IOException {
+ HOST_B + "\",\""
+ HOST_D + "\",\""
+ HOST_C + "\"]}";
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 3);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
Expand All @@ -159,19 +177,26 @@ public void testProcessResponse() throws IOException {
+ HOST_A + "\",\""
+ HOST_D + "\",\""
+ HOST_C + "\"]}";
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr);
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(3), 2);

// Test with 1 bad offer
sortedOffers = httpOfferSet.processResponse(mOffers, responseStr, 1);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(4), 3);

responseStr = "{\"error\": \"Error\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_C + "\"]}";
boolean isException = false;
try {
httpOfferSet.processResponse(mOffers, responseStr);
httpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
Expand All @@ -180,7 +205,7 @@ public void testProcessResponse() throws IOException {
responseStr = "{\"error\": \"error\"}";
isException = false;
try {
httpOfferSet.processResponse(mOffers, responseStr);
httpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
Expand All @@ -189,7 +214,79 @@ public void testProcessResponse() throws IOException {
responseStr = "{\"weird\": \"cannot decode this json string\"}";
isException = false;
try {
httpOfferSet.processResponse(mOffers, responseStr);
httpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
assertTrue(isException);

// Duplicate host test
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_C + "\"]}";

List<HostOffer> mDuplicateHostOffers = ImmutableList.
copyOf(duplicateHostsHttpOfferSet.values());
assertEquals(mDuplicateHostOffers.size(), 4);

sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers,
responseStr, 0);
assertEquals(sortedOffers.size(), 4);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C);
assertEquals(sortedOffers.get(3).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(5), 0);

// plugin returns less offers than Aurora has.
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_C + "\"]}";
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 3);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(6), 1);

// plugin returns more offers than Aurora has.
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_D + "\",\""
+ HOST_C + "\"]}";
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 4);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals(sortedOffers.get(2).getAttributes().getHost(), HOST_C);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(7), 1);

// plugin omits 1 offer & returns 1 extra offer
responseStr = "{\"error\": \"\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_D + "\",\""
+ HOST_B + "\"]}";
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 0);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(8), 3);

// Test with 1 bad offer
sortedOffers = duplicateHostsHttpOfferSet.processResponse(mDuplicateHostOffers, responseStr, 1);
assertEquals(sortedOffers.size(), 2);
assertEquals(sortedOffers.get(0).getAttributes().getHost(), HOST_A);
assertEquals(sortedOffers.get(1).getAttributes().getHost(), HOST_B);
assertEquals((long) HttpOfferSetImpl.offerSetDiffList.get(9), 4);

responseStr = "{\"error\": \"Error\", \"hosts\": [\""
+ HOST_A + "\",\""
+ HOST_B + "\",\""
+ HOST_C + "\"]}";
isException = false;
try {
duplicateHostsHttpOfferSet.processResponse(mOffers, responseStr, 0);
} catch (IOException ioe) {
isException = true;
}
Expand Down