Skip to content

Commit

Permalink
WIP: work removal & interactions between npnp and normal apps in one SU
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewStoneOpenClovis committed Feb 12, 2016
1 parent f4121b5 commit 2981fe7
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 64 deletions.
13 changes: 12 additions & 1 deletion src/amf/amfAppRpcImpl.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,20 @@ namespace amfAppRpc {
if (SAFplusI::amfSession)
{
uint tgt = request->target();
SaAmfHAStateT haState = (SaAmfHAStateT) request->operation();
if ((int)haState == SAFplusI::AMF_HA_OPERATION_REMOVE)
{
if (tgt == SA_AMF_CSI_TARGET_ALL)
{
}
else
{
// TODO
assert(0);
}
}
if (tgt != SAFplusI::AMF_CSI_REMOVE_ONE)
{
SaAmfHAStateT haState = (SaAmfHAStateT) request->operation();
SaAmfCSIDescriptorT csiDescriptor; // TODO
csiDescriptor.csiFlags = tgt;
strcpy((char*)csiDescriptor.csiName.value,"TODO");
Expand Down
109 changes: 101 additions & 8 deletions src/amf/server/amfOperations.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <clNameApi.hxx>
#include <clOsalApi.hxx>
#include <clAmfPolicyPlugin.hxx>
#include <clAmfIpi.hxx>

#include <SAFplusAmf/Component.hxx>
#include <SAFplusAmf/ComponentServiceInstance.hxx>
Expand Down Expand Up @@ -68,7 +69,7 @@ namespace SAFplus
assert(comp);
if (!comp->serviceUnit.value) comp->serviceUnit.updateReference(); // find the pointer if it is not resolved

if ((!comp->serviceUnit.value)||(!comp->serviceUnit.value->node.value)||(!comp->serviceUnit.value->serviceGroup.value)) // This component is not properly hooked up to other entities; is must be off
if ((!comp->serviceUnit.value)||(!comp->serviceUnit.value->node.value)||(!comp->serviceUnit.value->serviceGroup.value)) // This component is not properly hooked up to other entities; it must be off
{
logInfo("N+M","AUDIT","Component [%s] entity group is not properly configured",comp->name.value.c_str());
return SAFplusAmf::AdministrativeState::off;
Expand Down Expand Up @@ -165,6 +166,17 @@ namespace SAFplus
}
else // work removal
{
if (result == SA_AIS_OK) // TODO: actually, I think I need to call into the redundancy model plugin to correctly process the result.
{
assert(wat.comp->pendingOperation == PendingOperation::workRemoval); // No one should be issuing another operation until this one is aborted

wat.comp->haState = SAFplusAmf::HighAvailabilityState::idle;
// if (wat.si->assignmentState = AssignmentState::fullyAssigned; // TODO: for now just make the SI happy to see something work

// pending operation completed. Clear it out
wat.comp->pendingOperationExpiration.value.value = 0;
wat.comp->pendingOperation = PendingOperation::none;
}
}
reportChange();
}
Expand Down Expand Up @@ -212,7 +224,7 @@ namespace SAFplus
}
else // RPC call
{
logInfo("OP","CMP","Request component [%s] state from node [%s]", comp->name.value.c_str(), comp->serviceUnit.value->node.value->name.value.c_str());
//logInfo("OP","CMP","Request component [%s] state from node [%s]", comp->name.value.c_str(), comp->serviceUnit.value->node.value->name.value.c_str());

ProcessInfoRequest req;
req.set_pid(pid);
Expand All @@ -223,6 +235,8 @@ namespace SAFplus
// RPC call is broken, should throw exception
assert(0);
}
logInfo("OP","CMP","Request component [%s] state from node [%s] returned [%s]", comp->name.value.c_str(), comp->serviceUnit.value->node.value->name.value.c_str(),resp.running() ? "running" : "stopped");

if (resp.running())
{
// TODO: I need to compare the process command line with my command line to make sure that my proc didn't die and another reuse the pid
Expand Down Expand Up @@ -318,6 +332,7 @@ namespace SAFplus
{
assert(su);

// Update the Service Instances: decrement assignment statistics, and remove this SU from the SI's assigned list.
SAFplus::MgtProvList<SAFplusAmf::ServiceInstance*>::ContainerType::iterator it;
SAFplus::MgtProvList<SAFplusAmf::ServiceInstance*>::ContainerType::iterator end = su->assignedServiceInstances.value.end();
for (it = su->assignedServiceInstances.value.begin(); it != end; it++)
Expand All @@ -341,29 +356,76 @@ namespace SAFplus
if ((si->numActiveAssignments.current.value == 0)&&(si->numStandbyAssignments.current.value == 0)) si->assignmentState = AssignmentState::unassigned;
}

// Update the Service Unit's statistics and SI list.
su->assignedServiceInstances.clear();
su->numActiveServiceInstances = 0;
su->numStandbyServiceInstances = 0;

// TODO: clear the CSI structures

// Now actually remove the work from the relevant components
SAFplus::MgtProvList<SAFplusAmf::Component*>::ContainerType::iterator itcomp;
SAFplus::MgtProvList<SAFplusAmf::Component*>::ContainerType::iterator endcomp = su->components.value.end();
for (itcomp = su->components.value.begin(); itcomp != endcomp; itcomp++)
{
Component* comp = dynamic_cast<Component*>(*itcomp);
assert(comp);

// TODO: in multi component service units, we need to remove the work from all the other components.
// for now ASSERT if all the components are not already dead
assert((comp->presenceState == PresenceState::uninstantiated) || (comp->presenceState == PresenceState::instantiationFailed)); // remove when multi-comp implemented

// In multi component service units, we need to remove the work from all the other components.
if (!((comp->presenceState == PresenceState::uninstantiated) || (comp->presenceState == PresenceState::instantiationFailed)))
{
if ( comp->capabilityModel == CapabilityModel::not_preinstantiable) // except if it has no work we just kill it
{
abort(comp, w); // TODO, need to consolidate the wakeables
}
else
{
// Nothing to do if already dead or never started
if (!((comp->presenceState == PresenceState::uninstantiated) || (comp->presenceState == PresenceState::instantiationFailed)))
{
SAFplus::Rpc::amfAppRpc::WorkOperationRequest request;
Handle hdl;
try
{
hdl = name.getHandle(comp->name);
}
catch (SAFplus::NameException& n)
{
logCritical("OPS","WRK","Component [%s] is not registered in the name service. Cannot control it.", comp->name.value.c_str());
comp->lastError.value = "Component's name is not registered in the name service so cannot remove work cleanly";
// this one should be killed since it can't be controlled
abort(comp,w); // TODO, need to consolidate the wakeables
// TODO ? anything to set in the comp's status?
continue; // Go to the next component
}

// Mark this component with a pending operation. This will be cleared in the WorkOperationTracker, or by timeout in auditDiscovery
comp->pendingOperationExpiration.value.value = nowMs() + comp->timeouts.workAssignment;
comp->pendingOperation = PendingOperation::workRemoval;

request.set_componentname(comp->name.value.c_str());
request.set_componenthandle((const char*) &hdl, sizeof(Handle)); // [libprotobuf ERROR google/protobuf/wire_format.cc:1053] String field contains invalid UTF-8 data when serializing a protocol buffer. Use the 'bytes' type if you intend to send raw bytes.
request.set_operation((uint32_t)SAFplusI::AMF_HA_OPERATION_REMOVE);
request.set_target(SA_AMF_CSI_TARGET_ALL);
if ((invocation & 0xFFFFFFFF) == 0xFFFFFFFF) invocation &= 0xFFFFFFFF00000000ULL; // Don't let increasing invocation numbers overwrite the node or port... ofc this'll never happen 4 billion invocations? :-)
request.set_invocation(invocation++);

// Now I need to fill up the key/value pairs from the CSI
request.clear_keyvaluepairs();

pendingWorkOperations[request.invocation()] = WorkOperationTracker(comp,nullptr,nullptr,(uint32_t)SAFplusI::AMF_HA_OPERATION_REMOVE,SA_AMF_CSI_TARGET_ALL);

amfAppRpc->workOperation(hdl, &request);
}
}
}
}
reportChange();
}

void AmfOperations::assignWork(ServiceUnit* su, ServiceInstance* si, HighAvailabilityState state,Wakeable& w)
{
SAFplus::Rpc::amfAppRpc::WorkOperationRequest request;
// TODO fill request
Component* comp = nullptr; // TODO: su down to comp
ComponentServiceInstance* csi = nullptr;

assert(su->assignedServiceInstances.contains(si) == false); // We can only assign a particular SI to a particular SU once.
Expand Down Expand Up @@ -490,6 +552,37 @@ namespace SAFplus

}

void AmfOperations::stop(SAFplusAmf::Component* comp,Wakeable& w)
{
assert(0);
}

void AmfOperations::abort(SAFplusAmf::Component* comp,Wakeable& w)
{
Handle nodeHdl;
try
{
nodeHdl = name.getHandle(comp->serviceUnit.value->node.value->name);
}
catch (SAFplus::NameException& n)
{
logCritical("OPS","SRT","AMF Entity [%s] is not registered in the name service. Cannot start processes on it.", comp->serviceUnit.value->node.value->name.value.c_str());
comp->lastError.value = "Component's node is not registered in the name service so address cannot be determined.";
if (&w) w.wake(1,(void*)comp);
return;
}

if (nodeHdl == nodeHandle) // Handle this request locally. This is an optimization. The RPC call will also work locally.
{
Process p(comp->processId.value);
p.signal(SIGTERM);
}
else
{
assert(0); // TODO implement process stop RPC
}
}

void AmfOperations::start(SAFplusAmf::Component* comp,Wakeable& w)
{
assert(comp);
Expand Down
Loading

0 comments on commit 2981fe7

Please sign in to comment.