Skip to content

Commit

Permalink
Support for Consumption Policies and Negotiator Resource Consumption …
Browse files Browse the repository at this point in the history
…===GT=== #3435
  • Loading branch information
erikerlandson committed Mar 7, 2013
1 parent c913797 commit 7fb2996
Show file tree
Hide file tree
Showing 18 changed files with 1,428 additions and 138 deletions.
5 changes: 5 additions & 0 deletions src/condor_includes/condor_attributes.h
Expand Up @@ -880,6 +880,11 @@ extern const char ATTR_SEC_AUTHENTICATED_USER [];
#define ATTR_TOTAL_SLOT_PREFIX "TotalSlot"
#define ATTR_MACHINE_RESOURCES "MachineResources"

// multiclaim / negside resource consumption
#define ATTR_NUM_CLAIMS "NumClaims"
#define ATTR_CLAIM_ID_LIST "ClaimIdList"
#define ATTR_CONSUMPTION_PREFIX "Consumption"

// This is a record of the job exit status from a standard universe job exit
// via waitpid. It is in the job ad to implement the terminate_pending
// feature. It has to be here because of rampant global variable usage in the
Expand Down
172 changes: 114 additions & 58 deletions src/condor_negotiator.V6/matchmaker.cpp
Expand Up @@ -41,6 +41,8 @@
#include "MyString.h"
#include "condor_daemon_core.h"

#include "consumption_policy.h"

#include <vector>
#include <string>
#include <deque>
Expand Down Expand Up @@ -1144,7 +1146,8 @@ negotiationTime ()
{
ClassAdList allAds; //contains ads from collector
ClassAdListDoesNotDeleteAds startdAds; // ptrs to startd ads in allAds
ClaimIdHash claimIds(MyStringHash);
//ClaimIdHash claimIds(MyStringHash);
ClaimIdHash claimIds;
ClassAdListDoesNotDeleteAds scheddAds; // ptrs to schedd ads in allAds

/**
Expand Down Expand Up @@ -3123,8 +3126,8 @@ obtainAdsFromCollector (

MakeClaimIdHash(startdPvtAdList,claimIds);

dprintf(D_ALWAYS, "Got ads: %d public and %d private\n",
allAds.MyLength(),claimIds.getNumElements());
dprintf(D_ALWAYS, "Got ads: %d public and %lu private\n",
allAds.MyLength(),claimIds.size());

dprintf(D_ALWAYS, "Public ads include %d submitter, %d startd\n",
scheddAds.MyLength(), startdAds.MyLength() );
Expand Down Expand Up @@ -3176,7 +3179,8 @@ Matchmaker::MakeClaimIdHash(ClassAdList &startdPvtAdList, ClaimIdHash &claimIds)
while( (ad = startdPvtAdList.Next()) ) {
MyString name;
MyString ip_addr;
MyString claim_id;
string claim_id;
string claimlist;

if( !ad->LookupString(ATTR_NAME, name) ) {
continue;
Expand All @@ -3188,18 +3192,33 @@ Matchmaker::MakeClaimIdHash(ClassAdList &startdPvtAdList, ClaimIdHash &claimIds)
// As of 7.1.3, we look up CLAIM_ID first and CAPABILITY
// second. Someday CAPABILITY can be phased out.
if( !ad->LookupString(ATTR_CLAIM_ID, claim_id) &&
!ad->LookupString(ATTR_CAPABILITY, claim_id) )
!ad->LookupString(ATTR_CAPABILITY, claim_id) &&
!ad->LookupString(ATTR_CLAIM_ID_LIST, claimlist))
{
continue;
}

// hash key is name + ip_addr
name += ip_addr;
if( claimIds.insert(name,claim_id)!=0 ) {
dprintf(D_ALWAYS,
"WARNING: failed to insert claim id hash table entry "
"for '%s'\n",name.Value());
}
string key = name;
key += ip_addr;
ClaimIdHash::iterator f(claimIds.find(key));
if (f == claimIds.end()) {
claimIds[key];
f = claimIds.find(key);
} else {
dprintf(D_ALWAYS, "Warning: duplicate key %s detected while loading private claim table, overwriting previous entry\n", key.c_str());
f->second.clear();
}

if (ad->LookupString(ATTR_CLAIM_ID_LIST, claimlist)) {
StringList idlist(claimlist.c_str());
idlist.rewind();
while (char* id = idlist.next()) {
f->second.insert(id);
}
} else {
f->second.insert(claim_id);
}
}
startdPvtAdList.Close();
}
Expand Down Expand Up @@ -3645,23 +3664,33 @@ negotiate(char const* groupName, char const *scheddName, const ClassAd *scheddAd

// 2g. Delete ad from list so that it will not be considered again in
// this negotiation cycle
int reevaluate_ad = false;
offer->LookupBool(ATTR_WANT_AD_REVAULATE, reevaluate_ad);
if( reevaluate_ad ) {
reeval(offer);
// Shuffle this resource to the end of the list. This way, if
// two resources with the same RANK match, we'll hand them out
// in a round-robin way
startdAds.Remove (offer);
startdAds.Insert (offer);
} else {
startdAds.Remove (offer);
}

double SlotWeight = accountant.GetSlotWeight(offer);
limitUsed += SlotWeight;
if (remoteUser == "") limitUsedUnclaimed += SlotWeight;
pieLeft -= SlotWeight;
double match_cost = 0;
if (cp_supports_policy(*offer)) {
// we've vetted this match, so actually consume assets off the resource ad:
match_cost = cp_deduct_assets(request, *offer);
// in this mode we don't remove offers here, because the goal is to allow
// other jobs/requests to match against them and consume resources, if possible
} else {
int reevaluate_ad = false;
offer->LookupBool(ATTR_WANT_AD_REVAULATE, reevaluate_ad);
if (reevaluate_ad) {
reeval(offer);
// Shuffle this resource to the end of the list. This way, if
// two resources with the same RANK match, we'll hand them out
// in a round-robin way
startdAds.Remove(offer);
startdAds.Insert(offer);
} else {
startdAds.Remove(offer);
}
// traditional match cost is just slot weight expression
match_cost = accountant.GetSlotWeight(offer);
}

limitUsed += match_cost;
if (remoteUser == "") limitUsedUnclaimed += match_cost;
pieLeft -= match_cost;
negotiation_cycle_stats[0]->matches++;
}

Expand Down Expand Up @@ -3714,13 +3743,20 @@ EvalNegotiatorMatchRank(char const *expr_name,ExprTree *expr,
}

bool Matchmaker::
SubmitterLimitPermits(ClassAd *candidate, double used, double allowed, double pieLeft)
{
double SlotWeight = accountant.GetSlotWeight(candidate);
if ((used + SlotWeight) <= allowed) {
SubmitterLimitPermits(ClassAd* request, ClassAd* candidate, double used, double allowed, double pieLeft) {
double match_cost = 0;

if (cp_supports_policy(*candidate)) {
// deduct assets in test-mode only, for purpose of getting match cost
match_cost = cp_deduct_assets(*request, *candidate, true);
} else {
match_cost = accountant.GetSlotWeight(candidate);
}

if ((used + match_cost) <= allowed) {
return true;
}
if ((used <= 0) && (allowed > 0) && (pieLeft >= 0.99*SlotWeight)) {
if ((used <= 0) && (allowed > 0) && (pieLeft >= 0.99*match_cost)) {

// Allow user to round up once per pie spin in order to avoid
// "crumbs" being left behind that couldn't be taken by anyone
Expand Down Expand Up @@ -3844,9 +3880,9 @@ matchmakingAlgorithm(const char *scheddName, const char *scheddAddr, ClassAd &re
int t = 0;
cached_bestSoFar->LookupInteger(ATTR_PREEMPT_STATE_, t);
PreemptState pstate = PreemptState(t);
if ((pstate != NO_PREEMPTION) && SubmitterLimitPermits(cached_bestSoFar, limitUsed, submitterLimit, pieLeft)) {
if ((pstate != NO_PREEMPTION) && SubmitterLimitPermits(&request, cached_bestSoFar, limitUsed, submitterLimit, pieLeft)) {
break;
} else if (SubmitterLimitPermits(cached_bestSoFar, limitUsedUnclaimed, submitterLimitUnclaimed, pieLeft)) {
} else if (SubmitterLimitPermits(&request, cached_bestSoFar, limitUsedUnclaimed, submitterLimitUnclaimed, pieLeft)) {
break;
}
MatchList->increment_rejForSubmitterLimit();
Expand Down Expand Up @@ -3913,8 +3949,27 @@ matchmakingAlgorithm(const char *scheddName, const char *scheddAddr, ClassAd &re
candidate->dPrint(D_MACHINE);
}

// the candidate offer and request must match
bool is_a_match = IsAMatch(&request, candidate);
map<string, double> consumption;
bool has_cp = cp_supports_policy(*candidate);
bool cp_sufficient = true;
if (has_cp) {
// replace RequestXxx attributes (temporarily) with values derived from
// the consumption policy, so that Requirements expressions evaluate in a
// manner consistent with the check on CP resources
cp_override_requested(request, *candidate, consumption);
cp_sufficient = cp_sufficient_assets(*candidate, consumption);
}

// The candidate offer and request must match.
// When candidate supports a consumption policy, then resources
// requested via consumption policy must also be available from
// the resource
bool is_a_match = cp_sufficient && IsAMatch(&request, candidate);

if (has_cp) {
// put original values back for RequestXxx attributes
cp_restore_requested(request, consumption);
}

int cluster_id=-1,proc_id=-1;
MyString machine_name;
Expand Down Expand Up @@ -4039,10 +4094,10 @@ matchmakingAlgorithm(const char *scheddName, const char *scheddAddr, ClassAd &re
check if we are negotiating only for startd rank, since startd rank
preemptions should be allowed regardless of user priorities.
*/
if ((candidatePreemptState == PRIO_PREEMPTION) && !SubmitterLimitPermits(candidate, limitUsed, submitterLimit, pieLeft)) {
if ((candidatePreemptState == PRIO_PREEMPTION) && !SubmitterLimitPermits(&request, candidate, limitUsed, submitterLimit, pieLeft)) {
rejForSubmitterLimit++;
continue;
} else if ((candidatePreemptState == NO_PREEMPTION) && !SubmitterLimitPermits(candidate, limitUsedUnclaimed, submitterLimitUnclaimed, pieLeft)) {
} else if ((candidatePreemptState == NO_PREEMPTION) && !SubmitterLimitPermits(&request, candidate, limitUsedUnclaimed, submitterLimitUnclaimed, pieLeft)) {
rejForSubmitterLimit++;
continue;
}
Expand Down Expand Up @@ -4304,7 +4359,6 @@ matchmakingProtocol (ClassAd &request, ClassAd *offer,
char accountingGroup[256];
char remoteOwner[256];
MyString startdName;
char const *claim_id;
SafeSock startdSock;
bool send_failed;
int want_claiming = -1;
Expand Down Expand Up @@ -4344,14 +4398,20 @@ matchmakingProtocol (ClassAd &request, ClassAd *offer,
}
}

// find the startd's claim id from the private ad
MyString claim_id_buf;
if ( want_claiming ) {
if (!(claim_id = getClaimId (startdName.Value(), startdAddr.Value(), claimIds, claim_id_buf)))
{
dprintf(D_ALWAYS," %s has no claim id\n", startdName.Value());
return MM_BAD_MATCH;
}
// find the startd's claim id from the private a
char const *claim_id = NULL;
string claim_id_buf;
ClaimIdHash::iterator claimset = claimIds.end();
if (want_claiming) {
string key = startdName.Value();
key += startdAddr.Value();
claimset = claimIds.find(key);
if ((claimIds.end() == claimset) || (claimset->second.size() < 1)) {
dprintf(D_ALWAYS," %s has no claim id\n", startdName.Value());
return MM_BAD_MATCH;
}
claim_id_buf = *(claimset->second.begin());
claim_id = claim_id_buf.c_str();
} else {
// Claiming is *not* desired
claim_id = "null";
Expand Down Expand Up @@ -4458,6 +4518,13 @@ matchmakingProtocol (ClassAd &request, ClassAd *offer,
startdAddr.Value(), startdName.Value(),
offline ? " (offline)" : "");

// At this point we're offering this match as good.
// We don't offer a claim more than once per cycle, so remove it
// from the set of available claims.
if (claimset != claimIds.end()) {
claimset->second.erase(claim_id_buf);
}

/* CONDORDB Insert into matches table */
insert_into_matches(scheddName, request, *offer);

Expand Down Expand Up @@ -4627,17 +4694,6 @@ calculateNormalizationFactor (ClassAdListDoesNotDeleteAds &scheddAds,
}


char const *
Matchmaker::getClaimId (const char *startdName, const char *startdAddr, ClaimIdHash &claimIds, MyString &claim_id_buf)
{
MyString key = startdName;
key += startdAddr;
if( claimIds.lookup(key,claim_id_buf)!=0 ) {
return NULL;
}
return claim_id_buf.Value();
}

void Matchmaker::
addRemoteUserPrios( ClassAdListDoesNotDeleteAds &cal )
{
Expand Down
5 changes: 3 additions & 2 deletions src/condor_negotiator.V6/matchmaker.h
Expand Up @@ -109,7 +109,8 @@ class Matchmaker : public Service
// reinitialization method (reconfig)
int reinitialize ();

typedef HashTable<MyString, MyString> ClaimIdHash;
//typedef HashTable<MyString, MyString> ClaimIdHash;
typedef std::map<std::string, std::set<std::string> > ClaimIdHash;

// command handlers
int RESCHEDULE_commandHandler (int, Stream*);
Expand Down Expand Up @@ -283,7 +284,7 @@ class Matchmaker : public Service
// trim out startd ads that are not in the Unclaimed state.
int trimStartdAds(ClassAdListDoesNotDeleteAds &startdAds);

bool SubmitterLimitPermits(ClassAd *candidate, double used, double allowed, double pieLeft);
bool SubmitterLimitPermits(ClassAd* request, ClassAd* candidate, double used, double allowed, double pieLeft);
double sumSlotWeights(ClassAdListDoesNotDeleteAds &startdAds,double *minSlotWeight, ExprTree* constraint);

/* ODBC insert functions */
Expand Down
49 changes: 46 additions & 3 deletions src/condor_startd.V6/Reqexp.cpp
Expand Up @@ -27,6 +27,9 @@

#include "condor_common.h"
#include "startd.h"
#include "consumption_policy.h"
#include <set>
using std::set;

Reqexp::Reqexp( Resource* res_ip )
{
Expand Down Expand Up @@ -135,7 +138,45 @@ Reqexp::compute( amask_t how_much )
// In the below, _condor_RequestX attributes may be explicitly set by
// the schedd; if they are not set, go with the RequestX that derived from
// the user's original submission.
tmp = const_cast<char*>(
if (rip->r_has_cp || (rip->get_parent() && rip->get_parent()->r_has_cp)) {
dprintf(D_FULLDEBUG, const_cast<char*>("Using CP variant of WithinResourceLimits\n"));
// a CP-supporting p-slot, or a d-slot derived from one, gets variation
// that supports zeroed resource assets, and refers to consumption
// policy attributes.

// reconstructing this isn't a big deal, but I'm doing it because I'm
// afraid to randomly perterb the order of the resource initialization
// spaghetti, which makes kittens cry.
set<string> assets;
assets.insert("cpus");
assets.insert("memory");
assets.insert("disk");
for (CpuAttributes::slotres_map_t::const_iterator j(rip->r_attr->get_slotres_map().begin()); j != rip->r_attr->get_slotres_map().end(); ++j) {
assets.insert(j->first);
}

// first subexpression does not need && operator:
bool need_and = false;
string estr = "(";
for (set<string>::iterator j(assets.begin()); j != assets.end(); ++j) {
string rname(*j);
if (rname == "swap") continue;
*(rname.begin()) = toupper(*(rname.begin()));
string te;
// The logic here is that if the target job ad is in a mode where its RequestXxx have
// already been temporarily overridden with the consumption policy values, then we want
// to use RequestXxx (note, this will include any overrides by _condor_RequestXxx).
// Otherwise, we want to refer to ConsumptionXxx.
formatstr(te, "ifThenElse(target._cp_orig_%s%s isnt undefined, target.%s%s <= my.%s, my.%s%s <= my.%s)", ATTR_REQUEST_PREFIX, rname.c_str(), ATTR_REQUEST_PREFIX, rname.c_str(), rname.c_str(), ATTR_CONSUMPTION_PREFIX, rname.c_str(), rname.c_str());
if (need_and) estr += " && ";
estr += te;
need_and = true;
}
estr += ")";

m_within_resource_limits_expr = strdup(const_cast<char*>(estr.c_str()));
} else {
tmp = const_cast<char*>(
"("
"ifThenElse(TARGET._condor_RequestCpus =!= UNDEFINED,"
"MY.Cpus > 0 && TARGET._condor_RequestCpus <= MY.Cpus,"
Expand All @@ -155,8 +196,10 @@ Reqexp::compute( amask_t how_much )
"MY.Disk > 0 && TARGET.RequestDisk <= MY.Disk,"
"FALSE))"
")");
m_within_resource_limits_expr = strdup( tmp );
}
m_within_resource_limits_expr = strdup(tmp);
}
}
dprintf(D_FULLDEBUG, const_cast<char*>("%s = %s\n"), ATTR_WITHIN_RESOURCE_LIMITS, m_within_resource_limits_expr);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/condor_startd.V6/ResAttributes.h
Expand Up @@ -82,6 +82,9 @@ const float AUTO_SHARE = 123;
// for floats.
#define IS_AUTO_SHARE(share) ((int)share == (int)AUTO_SHARE)

const float UNSET_SHARE = -9999;
#define IS_UNSET_SHARE(share) (int(share) == int(UNSET_SHARE))

// This is used as a place-holder value when configuring memory share
// for a slot. It is later updated by dividing the remaining resources
// evenly between slots using AUTO_MEM.
Expand Down

0 comments on commit 7fb2996

Please sign in to comment.