Skip to content

Commit

Permalink
Merge branch 'candidate-6.0.4'
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>

Conflicts:
	thorlcr/activities/selectnth/thselectnthslave.cpp
	version.cmake
  • Loading branch information
ghalliday committed Jul 21, 2016
2 parents 88336e1 + 8d5ec35 commit 9b0eb2d
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 56 deletions.
26 changes: 20 additions & 6 deletions common/thorhelper/thorcommon.cpp
Expand Up @@ -1788,7 +1788,16 @@ void setAutoAffinity(unsigned curProcess, unsigned processPerMachine, const char
if (optNodes)
throw makeStringException(1, "Numa node list not yet supported");

unsigned numNumaNodes = numa_max_node()+1;
unsigned numaMap[NUMA_NUM_NODES];
unsigned numNumaNodes = 0;
for (unsigned i=0; i<=numa_max_node(); i++)
{
if (numa_bitmask_isbitset(numa_all_nodes_ptr, i))
{
numaMap[numNumaNodes] = i;
numNumaNodes++;
}
}
if (numNumaNodes <= 1)
return;

Expand All @@ -1797,20 +1806,20 @@ void setAutoAffinity(unsigned curProcess, unsigned processPerMachine, const char

#if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
struct bitmask * cpus = numa_allocate_cpumask();
numa_node_to_cpus(curNode, cpus);
numa_node_to_cpus(numaMap[curNode], cpus);
bool ok = (numa_sched_setaffinity(0, cpus) == 0);
numa_bitmask_free(cpus);
#else
cpu_set_t cpus;
CPU_ZERO(&cpus);
numa_node_to_cpus(curNode, (unsigned long *) &cpus, sizeof (cpus));
numa_node_to_cpus(numaMap[curNode], (unsigned long *) &cpus, sizeof (cpus));
bool ok = sched_setaffinity (0, sizeof(cpus), &cpus) != 0;
#endif

if (!ok)
throw makeStringExceptionV(1, "Failed to set affinity for node %u", curNode);
throw makeStringExceptionV(1, "Failed to set affinity to numa node %u (id:%u)", curNode, numaMap[curNode]);

DBGLOG("Process bound to numa node %u of %u", curNode, numNumaNodes);
DBGLOG("Process bound to numa node %u (id:%u) of %u", curNode, numaMap[curNode], numNumaNodes);
#endif
clearAffinityCache();
}
Expand All @@ -1820,7 +1829,12 @@ void bindMemoryToLocalNodes()
#if defined(LIBNUMA_API_VERSION) && (LIBNUMA_API_VERSION>=2)
numa_set_bind_policy(1);

unsigned numNumaNodes = numa_max_node() + 1;
unsigned numNumaNodes = 0;
for (unsigned i=0; i<=numa_max_node(); i++)
{
if (numa_bitmask_isbitset(numa_all_nodes_ptr, i))
numNumaNodes++;
}
if (numNumaNodes <= 1)
return;
struct bitmask *nodes = numa_get_run_node_mask();
Expand Down
2 changes: 1 addition & 1 deletion esp/services/ws_machine/metrics.xslt
Expand Up @@ -66,13 +66,13 @@
<script type="text/javascript">
var autoRefreshVal=<xsl:value-of select="$autoRefresh"/>;
var autoUpdateChecked=<xsl:value-of select="$autoupdatechecked"/>;
var viewColumnsStr = '<xsl:value-of select="$hpccStrings/st[@id='ViewColumns']"/>';
<xsl:text disable-output-escaping="yes"><![CDATA[
//############ begin auto reload script ######################
var reloadTimer = null;
var reloadTimeout = 0;
var idCount = 0;
var checkboxIDs = new Array();
var viewColumnsStr = '<xsl:value-of select="$hpccStrings/st[@id='ViewColumns']"/>';
// This function gets called when the window has completely loaded.
// It starts the reload timer with a default time value.
Expand Down
3 changes: 3 additions & 0 deletions plugins/mysql/mysqlembed.cpp
Expand Up @@ -1521,6 +1521,9 @@ class MySQLEmbedFunctionContext : public CInterfaceOf<IEmbedFunctionContext>
if (nextParam != stmtInfo->queryInputBindings().numColumns())
failx("Not enough parameters supplied (%d parameters supplied, but statement has %d bound columns)", nextParam, stmtInfo->queryInputBindings().numColumns());
// We actually do the execute later, when the result is fetched
// Unless, there is no expected result, in that case execute query now
if (stmtInfo->queryResultBindings().numColumns() == 0)
lazyExecute();
}
protected:
void lazyExecute()
Expand Down
10 changes: 5 additions & 5 deletions system/security/LdapSecurity/ldapconnection.cpp
Expand Up @@ -1348,11 +1348,11 @@ class CLdapClient : public CInterface, implements ILdapClient
virtual void init(IPermissionProcessor* pp)
{
m_pp = pp;
if(m_ldapconfig->getServerType() == OPEN_LDAP)
static bool createdOU = false;
CriticalBlock block(lcCrit);
if (!createdOU)
{
static bool createdOU = false;
CriticalBlock block(lcCrit);
if (!createdOU)
if(m_ldapconfig->getServerType() == OPEN_LDAP)
{
try
{
Expand All @@ -1363,7 +1363,7 @@ class CLdapClient : public CInterface, implements ILdapClient
}
try
{
addGroup("Directory Administrators", NULL, NULL, m_ldapconfig->getBasedn());
addGroup("Directory Administrators", NULL, NULL, m_ldapconfig->getBasedn());
}
catch(...)
{
Expand Down
2 changes: 1 addition & 1 deletion system/security/LdapSecurity/ldapsecurity.cpp
Expand Up @@ -528,8 +528,8 @@ void CLdapSecManager::init(const char *serviceName, IPropertyTree* cfg)
else
throwUnexpected();

ldap_client->init(pp);
pp->setLdapClient(ldap_client);
ldap_client->init(pp);

m_ldap_client.setown(ldap_client);
m_pp.setown(pp);
Expand Down
28 changes: 5 additions & 23 deletions thorlcr/activities/firstn/thfirstnslave.cpp
Expand Up @@ -34,11 +34,6 @@ class CFirstNSlaveBase : public CSlaveActivity
bool stopped;
IHThorFirstNArg *helper;

virtual void doStop()
{
PARENT::stop();
}

public:
CFirstNSlaveBase(CGraphElementBase *_container) : CSlaveActivity(_container)
{
Expand All @@ -58,7 +53,7 @@ class CFirstNSlaveBase : public CSlaveActivity
{
abortSoon = true;
stopped = true;
doStop();
PARENT::stop();
}
}
virtual void getMetaInfo(ThorDataLinkMetaInfo &info)
Expand Down Expand Up @@ -215,14 +210,6 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
ThorDataLinkMetaInfo inputMeta;
Owned<IEngineRowStream> originalInputStream;

protected:
virtual void doStop()
{
limitgot.signal(); // JIC not previously signalled by lookahead
onInputFinished(getDataLinkCount()+skipped);
PARENT::doStop();
}

public:
CFirstNSlaveGlobal(CGraphElementBase *container) : CFirstNSlaveBase(container)
{
Expand Down Expand Up @@ -268,7 +255,7 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
{
CMessageBuffer msgMb;
if (!receiveMsg(msgMb, 0, mpTag))
return false; // NB: can be triggered by onInputFinished with count==0, or abort()
return false; // NB: can be triggered by abort()
msgMb.read(limit);
msgMb.read(skipCount);
skipped = 0;
Expand All @@ -295,7 +282,7 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
void sendCount()
{
limitgot.wait();

rowcount_t read = 0;
rowcount_t skip = skipCount;
if (limit > 0)
Expand Down Expand Up @@ -329,7 +316,8 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
CATCH_NEXTROW()
{
ActivityTimer t(totalCycles, timeActivities);
if (!abortSoon) {
if (!abortSoon)
{
if (firstget)
{
firstget = false;
Expand Down Expand Up @@ -377,12 +365,6 @@ class CFirstNSlaveGlobal : public CFirstNSlaveBase, implements ILookAheadStopNot
sendCount();
}
ActPrintLog("FIRSTN: maximum row count %" RCPF "d", count);
if (0 == count)
{
limit = 0;
CriticalBlock b(crit);
cancelReceiveMsg(RANK_ALL, mpTag);
}
}
};

Expand Down
17 changes: 10 additions & 7 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Expand Up @@ -3219,16 +3219,19 @@ class CAllJoinSlaveActivity : public CInMemJoinBase<CAllTable, IHThorAllJoinArg>
}
virtual void stop()
{
if (isGlobal())
if (hasStarted())
{
if (gotRHS)
if (isGlobal())
{
// Other channels sharing HT. So do not reset until all here
if (queryJob().queryJobChannels()>1)
InterChannelBarrier();
if (gotRHS)
{
// Other channels sharing HT. So do not reset until all here
if (queryJob().queryJobChannels()>1)
InterChannelBarrier();
}
else
getRHS(true); // If global, need to handle RHS until all are slaves stop
}
else
getRHS(true); // If global, need to handle RHS until all are slaves stop
}
PARENT::stop();
}
Expand Down
37 changes: 24 additions & 13 deletions thorlcr/activities/selectnth/thselectnthslave.cpp
Expand Up @@ -28,6 +28,7 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
bool createDefaultIfFail;
IHThorSelectNArg *helper;
SpinLock spin; // MORE: Remove this and use an atomic variable for lookaheadN
Owned<IEngineRowStream> originalInputStream;

void initN()
{
Expand Down Expand Up @@ -72,19 +73,12 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
}

// IThorSlaveActivity overloaded methods
virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData)
virtual void init(MemoryBuffer & data, MemoryBuffer &slaveData) override
{
if (!container.queryLocalOrGrouped())
mpTag = container.queryJobChannel().deserializeMPTag(data);
}
virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
{
PARENT::setInputStream(index, _input, consumerOrdered);
rowcount_t rowN = (rowcount_t)helper->getRowToSelect();
if (!isLocal && rowN)
setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, rowN, this, &container.queryJob().queryIDiskUsage()));
}
virtual void start()
virtual void start() override
{
ActivityTimer s(totalCycles, timeActivities);

Expand All @@ -102,8 +96,17 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
throw;
}

rowcount_t rowN = (rowcount_t)helper->getRowToSelect();
IStartableEngineRowStream *lookAhead = nullptr;
if (!isLocal && rowN)
{
lookAhead = createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), SELECTN_SMART_BUFFER_SIZE, isSmartBufferSpillNeeded(this), false, rowN, this, &container.queryJob().queryIDiskUsage());
originalInputStream.setown(replaceInputStream(0, lookAhead));
lookAhead->start();
}

seenNth = false;
if (0==helper->getRowToSelect())
if (0 == rowN)
{
ThorDataLinkMetaInfo info;
queryInput(0)->getMetaInfo(info);
Expand All @@ -119,7 +122,15 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
}
first = true;
}
virtual void abort()
virtual void stop() override
{
PARENT::stop();
if (originalInputStream)
{
Owned<IEngineRowStream> lookAhead = replaceInputStream(0, originalInputStream.getClear());
}
}
virtual void abort() override
{
CSlaveActivity::abort();
if (!firstNode())
Expand Down Expand Up @@ -188,15 +199,15 @@ class CSelectNthSlaveActivity : public CSlaveActivity, implements ILookAheadStop
return ret.getClear();
}
virtual bool isGrouped() const override { return false; }
void getMetaInfo(ThorDataLinkMetaInfo &info)
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) override
{
initMetaInfo(info);
info.isSequential = true;
info.canReduceNumRows = true; // not sure what selectNth is doing
calcMetaInfoSize(info, queryInput(0));
}
// IStartableEngineRowStream methods used for global selectn only
virtual void onInputFinished(rowcount_t count)
virtual void onInputFinished(rowcount_t count) override
{
SpinBlock b(spin);
lookaheadN = count;
Expand Down
1 change: 1 addition & 0 deletions thorlcr/graph/thgraphmaster.cpp
Expand Up @@ -233,6 +233,7 @@ void CSlaveMessageHandler::main()
assertex(element);
try
{
element->reset();
element->doCreateActivity(parentExtractSz, parentExtract);
}
catch (IException *e)
Expand Down

0 comments on commit 9b0eb2d

Please sign in to comment.