Skip to content

Commit

Permalink
Merge branch 'candidate-5.4.4'
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Sep 21, 2015
2 parents 6a127f7 + 2eabb35 commit dcb9a3a
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 14 deletions.
13 changes: 9 additions & 4 deletions cmake_modules/commonSetup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,15 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
endif()
if (CMAKE_COMPILER_IS_CLANGXX)
execute_process( COMMAND ${CMAKE_CXX_COMPILER} --version OUTPUT_VARIABLE clang_full_version_string )
if (${clang_full_version_string} MATCHES "Apple LLVM version ([0-9]+\\.[0-9]+\\.[0-9]+).*")
string (REGEX REPLACE "Apple LLVM version ([0-9]+\\.[0-9]+\\.[0-9]+).*" "\\1" APPLE_CLANG_VERSION ${clang_full_version_string})
endif()
if (${clang_full_version_string} MATCHES ".*based on LLVM ([0-9]+\\.[0-9]+).*")
string (REGEX REPLACE ".*based on LLVM ([0-9]+\\.[0-9]+).*" "\\1" CLANG_VERSION ${clang_full_version_string})
else ()
string (REGEX REPLACE ".*clang version ([0-9]+\\.[0-9]+).*" "\\1" CLANG_VERSION ${clang_full_version_string})
else()
if (${clang_full_version_string} MATCHES ".*clang version ([0-9]+\\.[0-9]+).*")
string (REGEX REPLACE ".*clang version ([0-9]+\\.[0-9]+).*" "\\1" CLANG_VERSION ${clang_full_version_string})
endif()
endif()
endif ()

Expand Down Expand Up @@ -239,7 +244,7 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
set ( LIBRARY_OUTPUT_PATH "${CMAKE_BINARY_DIR}/${CMAKE_BUILD_TYPE}/libs" )

if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANGXX)
message ("Using compiler: ${CMAKE_CXX_COMPILER_ID} ${CMAKE_CXX_COMPILER_VERSION} ${CLANG_VERSION}")
message ("Using compiler: ${CMAKE_CXX_COMPILER_ID} :: ${CMAKE_CXX_COMPILER_VERSION} :: ${CLANG_VERSION} :: ${APPLE_CLANG_VERSION}")
SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -frtti -fPIC -fmessage-length=0 -Wformat -Wformat-security -Wformat-nonliteral -pthread -Wuninitialized")
SET (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -rdynamic")
SET (CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -g -fno-inline-functions")
Expand Down Expand Up @@ -277,7 +282,7 @@ IF ("${COMMONSETUP_DONE}" STREQUAL "")
SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror=logical-op-parentheses -Werror=bool-conversions -Werror=return-type -Werror=comment")
SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror=bitwise-op-parentheses -Werror=tautological-compare")
SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-switch-enum -Wno-format-zero-length -Wno-switch")
if (CLANG_VERSION VERSION_GREATER 3.6 OR CLANG_VERSION VERSION_EQUAL 3.6)
if (CLANG_VERSION VERSION_GREATER 3.6 OR CLANG_VERSION VERSION_EQUAL 3.6 OR APPLE_CLANG_VERSION VERSION_GREATER 6.0)
SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-pointer-bool-conversion")
endif()
endif()
Expand Down
20 changes: 13 additions & 7 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2707,7 +2707,7 @@ class CNodeSubscriptionManager : public CSimpleInterface, implements INodeSubscr
CCovenSDSManager &owner;
OwningSimpleHashTableOf<CNodeSubscriberContainer, SubscriptionId> subscribersById;
OwningSimpleHashTableOf<CNodeSubscriberContainerList, CServerRemoteTree *> subscriberListByNode;
CriticalSection lock;
mutable CriticalSection subscriberListCrit;

void _notify(CServerRemoteTree *node, PDState state, IArrayOf<CNodeSubscriberContainer> &subscribers)
{
Expand Down Expand Up @@ -2778,13 +2778,13 @@ class CNodeSubscriptionManager : public CSimpleInterface, implements INodeSubscr
void notify(CServerRemoteTree &node, PDState state)
{
// shouldn't be here, unless node is in subscribers table
CriticalBlock b(lock);
CriticalBlock b(subscriberListCrit);
_notify(&node, state);
}
void notifyDelete(CServerRemoteTree *node)
{
// shouldn't be here, unless node is in subscribers table
CriticalBlock b(lock);
CriticalBlock b(subscriberListCrit);
/* Need to be careful not to release subscribers here (on this thread)
* 1) gather subscribers(linked)
* 2) remove nodes and lists, so no longer in use by SDS
Expand All @@ -2803,7 +2803,9 @@ class CNodeSubscriptionManager : public CSimpleInterface, implements INodeSubscr
// ISubscriptionManager impl.
virtual void add(ISubscription *sub, SubscriptionId id)
{
CriticalBlock b(lock);
CHECKEDDALIREADLOCKBLOCK(owner.dataRWLock, readWriteTimeout);
CHECKEDCRITICALBLOCK(owner.treeRegCrit, fakeCritTimeout);
CriticalBlock b(subscriberListCrit);
/* calls back out to owner to scan for match, so that SDSManager can protect root/treereg.
* It calls back (associateSubscriber) in this class to add subscribers based on matches.
*/
Expand All @@ -2816,7 +2818,7 @@ class CNodeSubscriptionManager : public CSimpleInterface, implements INodeSubscr
*/
CHECKEDDALIREADLOCKBLOCK(owner.dataRWLock, readWriteTimeout);
CHECKEDCRITICALBLOCK(owner.treeRegCrit, fakeCritTimeout);
CriticalBlock b(lock);
CriticalBlock b(subscriberListCrit);
/* calls back out to owner to protect root/treereg.
* It calls back into removeSubscriberAssociation.
*/
Expand Down Expand Up @@ -2856,6 +2858,12 @@ class CNodeSubscriptionManager : public CSimpleInterface, implements INodeSubscr
}
MemoryBuffer &collectSubscribers(MemoryBuffer &out) const
{
/* important to ensure have exclusive data lock on removal, ahead of subscriber lock
* as can get notifications whilst holding data lock, e.g. notifyDelete on node destruction.
*/
CHECKEDDALIREADLOCKBLOCK(owner.dataRWLock, readWriteTimeout);
CHECKEDCRITICALBLOCK(owner.treeRegCrit, fakeCritTimeout);
CriticalBlock b(subscriberListCrit);
out.append(subscribersById.count());
SuperHashIteratorOf<CNodeSubscriberContainer> sdsNodeIter(subscribersById);
ForEach(sdsNodeIter)
Expand Down Expand Up @@ -8596,8 +8604,6 @@ void CCovenSDSManager::addNodeSubscriber(ISubscription *sub, SubscriptionId id)
mb.read(xpath);
mb.read(sendValue);

CHECKEDDALIREADLOCKBLOCK(dataRWLock, readWriteTimeout);
CHECKEDCRITICALBLOCK(treeRegCrit, fakeCritTimeout);
Owned<IPropertyTreeIterator> iter = root->getElements(xpath+1);
if (!iter->first())
throw MakeSDSException(SDSExcpt_SubscriptionNoMatch, "Failed to match any nodes: %s", xpath.get());
Expand Down
130 changes: 128 additions & 2 deletions dali/datest/datest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2365,7 +2365,7 @@ class TestSDS3TestThread : public CInterface, implements IPooledThread
};

void TestSDS3(IGroup *group)
{
{
class TSDS1 : public CInterface, implements IThreadFactory
{
public:
Expand All @@ -2389,6 +2389,129 @@ void TestSDS3(IGroup *group)
pool.clear();
}

void TestNodeSubs()
{
class CNodeSubPool : public CSimpleInterfaceOf<IThreadFactory>
{
class CNodeSubscriber : public CSimpleInterfaceOf<ISDSNodeSubscription>
{
public:
virtual void notify(SubscriptionId id, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
{
PROGLOG("CNodeSubscriber notified");
}
};
SubscriptionId sid;
CriticalSection sidCrit;
Owned<ISDSNodeSubscription> subscriber;

void test()
{
try
{
unsigned t = getRandom()%5;
switch (t)
{
case 0:
{
// connect thread
PROGLOG("Creating SDS node");
Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_CREATE|RTM_LOCK_WRITE, INFINITE);
MilliSleep(5+getRandom()%50);
conn.clear();
break;
}
case 1:
{
// node sub test
CriticalBlock b(sidCrit);
if (!sid)
{
PROGLOG("Subscribing to node");
sid = querySDS().subscribeExact("/nodesubtest", *subscriber, false);
}
break;
}
case 2:
{
// node sub test
CriticalBlock b(sidCrit);
if (sid)
{
PROGLOG("Unsubscribing to node");
querySDS().unsubscribeExact(sid);
sid = 0;
}
break;
}
case 3:
{
PROGLOG("Deleting node");
Owned<IRemoteConnection> conn = querySDS().connect("/nodesubtest", myProcessSession(), RTM_LOCK_WRITE, INFINITE);
if (conn)
conn->close(true);
break;
}
case 4:
{
PROGLOG("Gathering subscriber info");
StringBuffer info;
querySDS().getSubscribers(info);
if (info.length())
PROGLOG("Subscribers: \n%s", info.str());
break;
}
}
}
catch (IException *e)
{
PrintExceptionLog(e, NULL);
e->Release();
}
}
class CNodeSubThread : public CInterface, implements IPooledThread
{
CNodeSubPool &owner;
public:
IMPLEMENT_IINTERFACE;

CNodeSubThread(CNodeSubPool &_owner) : owner(_owner) { }
virtual void init(void *param)
{
}
virtual void main()
{
owner.test();
}
virtual bool stop() { return true; }
virtual bool canReuse() { return true; }
};
public:
CNodeSubPool()
{
sid = 0;
subscriber.setown(new CNodeSubscriber());
}
virtual IPooledThread *createNew()
{
return new CNodeSubThread(*this);
}
} poolFactory;

Owned<IThreadPool> pool = createThreadPool("TSDSTest", &poolFactory, NULL, 100, 100000);

unsigned tests = testParams.ordinality() ? atoi(testParams.item(0)) : 10;
for (unsigned t=0; t<tests; t++)
{
pool->start(NULL);
}

PrintLog("Joining all TSDSThread running threads");
pool->joinAll();
pool.clear();

}

void TestSDSXPaths()
{
const char *testXML =
Expand Down Expand Up @@ -2901,7 +3024,7 @@ void usage(const char *error=NULL)
{
if (error) printf("%s\n", error);
printf("usage: DATEST <server_ip:port>* [/test <name> [<test params...>] [/NITER <iterations>]\n");
printf("where name = RANDTEST | DFS | QTEST | QTEST2 | SESSION | LOCKS | SDS1 | SDS2 | XPATHS| STRESS | STRESS2 | SHUTDOWN | EXTERNAL | SUBLOCKS | SUBSCRIPTION | CONNECTIONSUBS | MULTIFILE\n");
printf("where name = RANDTEST | DFS | QTEST | QTEST2 | SESSION | LOCKS | SDS1 | SDS2 | XPATHS| STRESS | STRESS2 | SHUTDOWN | EXTERNAL | SUBLOCKS | SUBSCRIPTION | CONNECTIONSUBS | MULTIFILE | NODESUBS\n");
printf("eg: datest . /test QTEST put -- one coven server running locally, running qtest with param \"put\"\n");
printf(" datest eq0001016 eq0001017 -- two coven servers, use default test %s\n", DEFAULT_TEST);
}
Expand Down Expand Up @@ -3048,6 +3171,7 @@ int main(int argc, char* argv[])
case 9: TestExternal(); break;
case 10: TestSubLocks(); break;
case 11: TestSDS3(group); break;
case 12: TestNodeSubs(); break;
}
}
else if (TEST("DFS"))
Expand All @@ -3074,6 +3198,8 @@ int main(int argc, char* argv[])
TestSDS2();
else if (TEST("SDS3"))
TestSDS3(group);
else if (TEST("NODESUBS"))
TestNodeSubs();
else if (TEST("XPATHS"))
TestSDSXPaths();
else if (TEST("STRESS"))
Expand Down
2 changes: 1 addition & 1 deletion plugins/cassandra/cpp-driver
Submodule cpp-driver updated 1 files
+17 −0 CMakeLists.txt

0 comments on commit dcb9a3a

Please sign in to comment.