Skip to content

Commit

Permalink
Added initial code for offer operation status update in master.
Browse files Browse the repository at this point in the history
  • Loading branch information
jieyu committed Nov 6, 2017
1 parent 8f727c9 commit 2f1efb9
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 21 deletions.
138 changes: 120 additions & 18 deletions src/master/master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5145,16 +5145,19 @@ void Master::_accept(
<< operation.create_volume().source() << " from framework "
<< *framework << " to agent " << *slave;

Owned<OfferOperation> offerOperation(new OfferOperation(
protobuf::createOfferOperation(operation, frameworkId)));
OfferOperation* offerOperation = new OfferOperation(
protobuf::createOfferOperation(
operation,
protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
frameworkId));

addOfferOperation(framework, slave, offerOperation);

ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());

framework->addOfferOperation(std::move(offerOperation));

send(slave->pid, message);
break;
}
Expand Down Expand Up @@ -5185,16 +5188,19 @@ void Master::_accept(
<< operation.destroy_volume().volume() << " from framework "
<< *framework << " to agent " << *slave;

Owned<OfferOperation> offerOperation(new OfferOperation(
protobuf::createOfferOperation(operation, frameworkId)));
OfferOperation* offerOperation = new OfferOperation(
protobuf::createOfferOperation(
operation,
protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
frameworkId));

addOfferOperation(framework, slave, offerOperation);

ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());

framework->addOfferOperation(std::move(offerOperation));

send(slave->pid, message);
break;
}
Expand Down Expand Up @@ -5225,16 +5231,19 @@ void Master::_accept(
<< operation.create_block().source() << " from framework "
<< *framework << " to agent " << *slave;

Owned<OfferOperation> offerOperation(new OfferOperation(
protobuf::createOfferOperation(operation, frameworkId)));
OfferOperation* offerOperation = new OfferOperation(
protobuf::createOfferOperation(
operation,
protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
frameworkId));

addOfferOperation(framework, slave, offerOperation);

ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());

framework->addOfferOperation(std::move(offerOperation));

send(slave->pid, message);
break;
}
Expand Down Expand Up @@ -5265,16 +5274,19 @@ void Master::_accept(
<< operation.destroy_block().block() << " from framework "
<< *framework << " to agent " << *slave;

Owned<OfferOperation> offerOperation(new OfferOperation(
protobuf::createOfferOperation(operation, frameworkId)));
OfferOperation* offerOperation = new OfferOperation(
protobuf::createOfferOperation(
operation,
protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
frameworkId));

addOfferOperation(framework, slave, offerOperation);

ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());

framework->addOfferOperation(std::move(offerOperation));

send(slave->pid, message);
break;
}
Expand Down Expand Up @@ -7263,9 +7275,56 @@ void Master::forward(


void Master::offerOperationStatusUpdate(
const OfferOperationStatusUpdate& message)
const OfferOperationStatusUpdate& update)
{
// TODO(jieyu): Provide implementation here.
CHECK(update.has_slave_id())
<< "External resource provider is not supported yet";

const SlaveID& slaveId = update.slave_id();
const FrameworkID& frameworkId = update.framework_id();

Try<UUID> uuid = UUID::fromString(update.operation_uuid());
if (uuid.isError()) {
LOG(ERROR) << "Failed to parse offer operation UUID for operation "
<< "'" << update.status().operation_id() << "' "
<< "from framework " << frameworkId << ": " << uuid.error();
return;
}

Slave* slave = slaves.registered.get(slaveId);

// This is possible if the agent is marked as unreachable or gone,
// or has initiated a graceful shutdown. In either of those cases,
// ignore the offer operation status update.
//
// TODO(jieyu): If the agent is unreachable or has initiated a
// graceful shutdown, we can still forward the update to the
// framework so that the framework can get notified about the offer
// operation early. However, the acknowledgement of the update won't
// be able to reach the agent in those cases. If the agent is gone,
// we cannot forward the update because the master might already
// tell the framework that the operation is gone.
if (slave == nullptr) {
LOG(WARNING) << "Ignoring status update for offer operation '"
<< update.status().operation_id() << "' (uuid: "
<< uuid->toString() << ") for framework "
<< frameworkId << " because agent "
<< slaveId << " is not registered";
return;
}

OfferOperation* operation = slave->getOfferOperation(uuid.get());
if (operation == nullptr) {
LOG(ERROR) << "Failed to find the offer operation '"
<< update.status().operation_id() << "' (uuid: "
<< uuid->toString() << ") from framework "
<< frameworkId << " on agent " << slaveId;
return;
}

updateOfferOperation(operation, update);

// TODO(jieyu): Forward the status update to the framework.
}


Expand Down Expand Up @@ -8426,6 +8485,10 @@ void Master::recoverFramework(
framework->addExecutor(slave->id, executor);
}
}

foreachvalue (OfferOperation* operation, slave->offerOperations) {
framework->addOfferOperation(operation);
}
}

addFramework(framework, suppressedRoles);
Expand Down Expand Up @@ -9486,6 +9549,27 @@ void Master::removeExecutor(
}


void Master::addOfferOperation(
Framework* framework,
Slave* slave,
OfferOperation* operation)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(slave);
CHECK_NOTNULL(operation);

slave->addOfferOperation(operation);
framework->addOfferOperation(operation);
}


void Master::updateOfferOperation(
OfferOperation* operation,
OfferOperationStatusUpdate update)
{
}


Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation)
{
CHECK_NOTNULL(slave);
Expand Down Expand Up @@ -10371,6 +10455,24 @@ void Slave::removeTask(Task* task)
}


void Slave::addOfferOperation(OfferOperation* operation)
{
Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
CHECK_SOME(uuid);

offerOperations.put(uuid.get(), operation);
}


OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
{
if (offerOperations.contains(uuid)) {
return offerOperations.at(uuid);
}
return nullptr;
}


void Slave::addOffer(Offer* offer)
{
CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
Expand Down
25 changes: 22 additions & 3 deletions src/master/master.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ struct Slave

void removeTask(Task* task);

void addOfferOperation(OfferOperation* operation);
OfferOperation* getOfferOperation(const UUID& uuid) const;

void addOffer(Offer* offer);

void removeOffer(Offer* offer);
Expand Down Expand Up @@ -228,6 +231,10 @@ struct Slave
// This is used for reconciliation when the slave re-registers.
multihashmap<FrameworkID, TaskID> killedTasks;

// Pending operations or terminal operations that have
// unacknowledged status updates on this agent.
hashmap<UUID, OfferOperation*> offerOperations;

// Active offers on this slave.
hashset<Offer*> offers;

Expand Down Expand Up @@ -475,7 +482,7 @@ class Master : public ProtobufProcess<Master>
const std::vector<TaskStatus>& statuses);

void offerOperationStatusUpdate(
const OfferOperationStatusUpdate& message);
const OfferOperationStatusUpdate& update);

void exitedExecutor(
const process::UPID& from,
Expand Down Expand Up @@ -864,6 +871,18 @@ class Master : public ProtobufProcess<Master>
const FrameworkID& frameworkId,
const ExecutorID& executorId);

// Adds the given offer operation to the framework and the agent.
void addOfferOperation(
Framework* framework,
Slave* slave,
OfferOperation* operation);

// Transitions the offer operation, and recovers resources if the
// offer operation becomes terminal.
void updateOfferOperation(
OfferOperation* operation,
OfferOperationStatusUpdate update);

// Attempts to update the allocator by applying the given operation.
// If successful, updates the slave's resources, sends a
// 'CheckpointResourcesMessage' to the slave with the updated
Expand Down Expand Up @@ -2731,7 +2750,7 @@ struct Framework
}
}

void addOfferOperation(process::Owned<OfferOperation> operation)
void addOfferOperation(OfferOperation* operation)
{
Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
CHECK_SOME(uuid);
Expand Down Expand Up @@ -3000,7 +3019,7 @@ struct Framework

// Pending operations or terminal operations that have
// unacknowledged status updates.
hashmap<UUID, process::Owned<OfferOperation>> offerOperations;
hashmap<UUID, OfferOperation*> offerOperations;

// The map from the framework-specified operation ID to the
// corresponding internal operation UUID.
Expand Down

0 comments on commit 2f1efb9

Please sign in to comment.