Skip to content
Permalink
Browse files
GEODE-4421: Convert Execution factory to value type (#236)
* Uses pImpl pattern

Signed-off-by: Ivan Godwin <igodwin@pivotal.io>
Signed-off-by: Mike Martell <mmartell@pivotal.io>
  • Loading branch information
igodwin authored and pivotal-jbarrett committed Mar 14, 2018
1 parent 7b32fed commit ec8c908addb79cfcb92e20ccc40f3e50f61f4a7c
Show file tree
Hide file tree
Showing 18 changed files with 341 additions and 364 deletions.
@@ -23,7 +23,7 @@
#include <geode/Execution.hpp>
#include "end_native.hpp"

#include "native_shared_ptr.hpp"
#include "native_unique_ptr.hpp"

using namespace System;

@@ -91,25 +91,25 @@ namespace Apache
/// <returns>
/// The managed wrapper object; null if the native pointer is null.
/// </returns>
inline static Execution<TResult>^ Create( std::shared_ptr<native::Execution> nativeptr, IResultCollector<TResult>^ rc )
inline static Execution<TResult>^ Create(native::Execution&& nativeExecution, IResultCollector<TResult>^ rc )
{
return __nullptr == nativeptr ? nullptr :
gcnew Execution<TResult>( nativeptr, rc );
}
return gcnew Execution<TResult>( std::move(nativeExecution), rc );
}

/// <summary>
/// Private constructor to wrap a native object pointer.
/// </summary>
/// <param name="nativeptr">The native object pointer</param>
inline Execution( std::shared_ptr<native::Execution> nativeptr, IResultCollector<TResult>^ rc )
inline Execution( native::Execution&& nativeptr, IResultCollector<TResult>^ rc )
{
m_rc = rc;
m_nativeptr = gcnew native_shared_ptr<native::Execution>(nativeptr);
m_nativeptr = gcnew native_unique_ptr<native::Execution>(
std::unique_ptr<native::Execution>(new native::Execution(std::move(nativeptr))));
}
private:
IResultCollector<TResult>^ m_rc;

native_shared_ptr<native::Execution>^ m_nativeptr;
native_unique_ptr<native::Execution>^ m_nativeptr;
};
} // namespace Client
} // namespace Geode
@@ -45,7 +45,7 @@ namespace Apache

auto nativeRegion = ((Region<TKey, TValue>^)rg)->GetNative();
auto execution = native::FunctionService::onRegion(nativeRegion);
return Execution<TResult>::Create( execution, nullptr );
return Execution<TResult>::Create( std::move(execution), nullptr );

_GF_MG_EXCEPTION_CATCH_ALL2/* due to auto replace */
}
@@ -56,7 +56,7 @@ namespace Apache
_GF_MG_EXCEPTION_TRY2/* due to auto replace */

auto nativeptr = native::FunctionService::onServer(pl->GetNative());
return Execution<TResult>::Create( nativeptr , nullptr);
return Execution<TResult>::Create(std::move(nativeptr) , nullptr);

_GF_MG_EXCEPTION_CATCH_ALL2/* due to auto replace */
}
@@ -67,7 +67,7 @@ namespace Apache
_GF_MG_EXCEPTION_TRY2/* due to auto replace */

auto nativeptr = native::FunctionService::onServers(pl->GetNative());
return Execution<TResult>::Create( nativeptr , nullptr);
return Execution<TResult>::Create(std::move(nativeptr) , nullptr);

_GF_MG_EXCEPTION_CATCH_ALL2/* due to auto replace */
}
@@ -80,13 +80,13 @@ namespace Apache
if(auto realCache = dynamic_cast<Cache^>(cache))
{
auto nativeptr = native::FunctionService::onServer(realCache->GetNative());
return Execution<TResult>::Create( nativeptr, nullptr );
return Execution<TResult>::Create(std::move(nativeptr), nullptr );
}
else
{
auto authCache = dynamic_cast<AuthenticatedCache^>(cache);
auto nativeptr = native::FunctionService::onServer(authCache->GetNative());
return Execution<TResult>::Create( nativeptr, nullptr );
return Execution<TResult>::Create(std::move(nativeptr), nullptr );
}

_GF_MG_EXCEPTION_CATCH_ALL2/* due to auto replace */
@@ -100,13 +100,13 @@ namespace Apache
if(auto realCache = dynamic_cast<Cache^>(cache))
{
auto nativeptr = native::FunctionService::onServers(realCache->GetNative());
return Execution<TResult>::Create( nativeptr, nullptr );
return Execution<TResult>::Create(std::move(nativeptr), nullptr );
}
else
{
auto authCache = dynamic_cast<AuthenticatedCache^>(cache);
auto nativeptr = native::FunctionService::onServers(authCache->GetNative());
return Execution<TResult>::Create( nativeptr, nullptr );
return Execution<TResult>::Create(std::move(nativeptr), nullptr );
}

_GF_MG_EXCEPTION_CATCH_ALL2/* due to auto replace */
@@ -35,14 +35,22 @@
namespace apache {
namespace geode {
namespace client {

class ExecutionImpl;
class FunctionService;

/**
* @class Execution Execution.hpp
* gathers results from function execution
* @see FunctionService
*/

class _GEODE_EXPORT Execution {
public:
Execution();
~Execution() noexcept;
Execution(Execution&& move) noexcept;
Execution& operator=(Execution&& move) noexcept;

/**
* Specifies a data filter of routing objects for selecting the Geode
* members
@@ -57,23 +65,23 @@ class _GEODE_EXPORT Execution {
* @throws UnsupportedOperationException if not called after
* FunctionService::onRegion(Region).
*/
virtual std::shared_ptr<Execution> withFilter(std::shared_ptr<CacheableVector> routingObj) = 0;
Execution withFilter(std::shared_ptr<CacheableVector> routingObj);
/**
* Specifies the user data passed to the function when it is executed.
* @param args user data passed to the function execution
* @return an Execution with args
* @throws IllegalArgumentException if the input parameter is nullptr
*
*/
virtual std::shared_ptr<Execution> withArgs(std::shared_ptr<Cacheable> args) = 0;
Execution withArgs(std::shared_ptr<Cacheable> args);
/**
* Specifies the {@link ResultCollector} that will receive the results after
* the function has been executed.
* @return an Execution with a collector
* @throws IllegalArgumentException if {@link ResultCollector} is nullptr
* @see ResultCollector
*/
virtual std::shared_ptr<Execution> withCollector(std::shared_ptr<ResultCollector> rs) = 0;
Execution withCollector(std::shared_ptr<ResultCollector> rs);
/**
* Executes the function using its name
* <p>
@@ -83,9 +91,9 @@ class _GEODE_EXPORT Execution {
* @return either a default result collector or one specified by {@link
* #withCollector(ResultCollector)}
*/
virtual std::shared_ptr<ResultCollector> execute(
std::shared_ptr<ResultCollector> execute(
const std::string& func,
std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT) = 0;
std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);

/**
* Executes the function using its name
@@ -102,11 +110,19 @@ class _GEODE_EXPORT Execution {
* @return either a default result collector or one specified by {@link
* #withCollector(ResultCollector)}
*/
virtual std::shared_ptr<ResultCollector> execute(
std::shared_ptr<ResultCollector> execute(
const std::shared_ptr<CacheableVector>& routingObj,
const std::shared_ptr<Cacheable>& args,
const std::shared_ptr<ResultCollector>& rs, const std::string& func,
std::chrono::milliseconds timeout) = 0;
std::chrono::milliseconds timeout);

private:
std::unique_ptr<ExecutionImpl> impl_;

Execution(std::unique_ptr<ExecutionImpl> impl);

friend ExecutionImpl;
friend FunctionService;
};

} // namespace client
@@ -62,7 +62,7 @@ class _GEODE_EXPORT FunctionService {
* @throws NullPointerException
* if the region passed in is nullptr
*/
static std::shared_ptr<Execution> onRegion(const std::shared_ptr<Region>& region);
static Execution onRegion(const std::shared_ptr<Region>& region);

/**
* Returns a {@link Execution} object that can be used to execute a data
@@ -77,7 +77,7 @@ class _GEODE_EXPORT FunctionService {
* @throws UnsupportedOperationException
* if Pool is in multiusersecure Mode
*/
inline static std::shared_ptr<Execution> onServer(const std::shared_ptr<Pool>& pool) {
inline static Execution onServer(const std::shared_ptr<Pool>& pool) {
return onServerWithPool(pool);
}

@@ -95,7 +95,7 @@ class _GEODE_EXPORT FunctionService {
* @throws UnsupportedOperationException
* if Pool is in multiusersecure Mode
*/
inline static std::shared_ptr<Execution> onServer(const std::shared_ptr<RegionService>& cache) {
inline static Execution onServer(const std::shared_ptr<RegionService>& cache) {
return onServerWithCache(cache);
}

@@ -112,7 +112,7 @@ class _GEODE_EXPORT FunctionService {
* @throws UnsupportedOperationException
* if Pool is in multiusersecure Mode
*/
inline static std::shared_ptr<Execution> onServers(const std::shared_ptr<Pool>& pool) {
inline static Execution onServers(const std::shared_ptr<Pool>& pool) {
return onServersWithPool(pool);
}

@@ -130,23 +130,23 @@ class _GEODE_EXPORT FunctionService {
* @throws UnsupportedOperationException
* if Pool is in multiusersecure Mode
*/
inline static std::shared_ptr<Execution> onServers(const std::shared_ptr<RegionService>& cache) {
inline static Execution onServers(const std::shared_ptr<RegionService>& cache) {
return onServersWithCache(cache);
}

virtual ~FunctionService() {}

private:
static std::shared_ptr<Execution> onServerWithPool(
static Execution onServerWithPool(
const std::shared_ptr<Pool>& pool);

static std::shared_ptr<Execution> onServerWithCache(
static Execution onServerWithCache(
const std::shared_ptr<RegionService>& cache);

static std::shared_ptr<Execution> onServersWithPool(
static Execution onServersWithPool(
const std::shared_ptr<Pool>& pool);

static std::shared_ptr<Execution> onServersWithCache(const std::shared_ptr<RegionService>& cache);
static Execution onServersWithCache(const std::shared_ptr<RegionService>& cache);
};
} // namespace client
} // namespace geode
@@ -50,9 +50,9 @@ namespace client {
* <br>
* <pre>
* auto rc = FunctionService::onRegion(region)
* ->withArgs(args)
* ->withFilter(keySet)
* ->withCollector(new
* .withArgs(args)
* .withFilter(keySet)
* .withCollector(new
* MyCustomResultCollector())
* .execute(Function);
* //Application can do something else here before retrieving the result
@@ -2422,9 +2422,9 @@ DUNIT_TASK_DEFINITION(CLIENT1, generateJavaPdxType)

auto funcExec = FunctionService::onRegion(regPtr0);

auto collector = funcExec->withArgs(args)
->withFilter(routingObj)
->execute("ComparePdxTypes");
auto collector = funcExec.withArgs(args)
.withFilter(routingObj)
.execute("ComparePdxTypes");
ASSERT(collector != nullptr, "onRegion collector nullptr");

auto result = collector->getResult();
@@ -2591,9 +2591,9 @@ DUNIT_TASK_DEFINITION(CLIENT1, verifyDotNetPdxTypes)

auto funcExec = FunctionService::onRegion(regPtr0);

auto collector = funcExec->withArgs(args)
->withFilter(routingObj)
->execute("ComparePdxTypes");
auto collector = funcExec.withArgs(args)
.withFilter(routingObj)
.execute("ComparePdxTypes");
ASSERT(collector != nullptr, "onRegion collector nullptr");

auto result = collector->getResult();
@@ -2705,7 +2705,7 @@ DUNIT_TASK_DEFINITION(CLIENT3, client3GetsV2Object)

auto funcExec = FunctionService::onRegion(regPtr0);

auto collector = funcExec->execute("IterateRegion");
auto collector = funcExec.execute("IterateRegion");
ASSERT(collector != nullptr, "onRegion collector nullptr");

auto result = collector->getResult();
@@ -223,8 +223,8 @@ class putThread : public ACE_Task_Base {
auto routingObj = CacheableVector::create();
routingObj->push_back(key);
auto exc = FunctionService::onRegion(regPtr0);
exc->execute(routingObj, args, rPtr, getFuncName2,
std::chrono::seconds(300))
exc.execute(routingObj, args, rPtr, getFuncName2,
std::chrono::seconds(300))
->getResult();
} catch (const TimeoutException& te) {
LOGINFO("Timeout exception occurred %s", te.what());
@@ -266,8 +266,7 @@ void executeFunction() {
auto routingObj = CacheableVector::create();
routingObj->push_back(key);
auto exc = FunctionService::onRegion(regPtr0);
exc->execute(routingObj, args, rPtr, getFuncName2,
std::chrono::seconds(300))
exc.execute(routingObj, args, rPtr, getFuncName2, std::chrono::seconds(300))
->getResult();
}
LOGINFO("executeFunction failureCount %d", failureCount);
@@ -229,31 +229,29 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
LOG("Put for execKey's on region complete.");

LOG("Adding filter");
auto arrList = CacheableArrayList::create();
for (int i = 100; i < 120; i++) {
sprintf(buf, "execKey-%d", i);
auto key = CacheableKey::create(buf);
arrList->push_back(key);
}
auto arrList = CacheableArrayList::create();
for (int i = 100; i < 120; i++) {
sprintf(buf, "execKey-%d", i);
auto key = CacheableKey::create(buf);
arrList->push_back(key);
}

auto filter = CacheableVector::create();
for (int i = 100; i < 120; i++) {
sprintf(buf, "execKey-%d", i);
auto key = CacheableKey::create(buf);
filter->push_back(key);
}
LOG("Adding filter done.");
auto filter = CacheableVector::create();
for (int i = 100; i < 120; i++) {
sprintf(buf, "execKey-%d", i);
auto key = CacheableKey::create(buf);
filter->push_back(key);
}
LOG("Adding filter done.");

auto args = CacheableBoolean::create(1);
auto args = CacheableBoolean::create(1);

auto funcExec = FunctionService::onRegion(regPtr0);
ASSERT(funcExec != nullptr, "onRegion Returned nullptr");
auto funcExec = FunctionService::onRegion(regPtr0);

auto collector = funcExec->withArgs(args)->withFilter(filter)->execute(
exFuncNameSendException, std::chrono::seconds(15));
ASSERT(collector != nullptr, "onRegion collector nullptr");
auto collector = funcExec.withArgs(args).withFilter(filter).execute(
exFuncNameSendException, std::chrono::seconds(15));

auto result = collector->getResult();
auto result = collector->getResult();

if (result == nullptr) {
ASSERT(false, "echo String : result is nullptr");
@@ -275,7 +273,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)

LOG("exFuncNameSendException done for bool argument.");

collector = funcExec->withArgs(arrList)->withFilter(filter)->execute(
collector = funcExec.withArgs(arrList).withFilter(filter).execute(
exFuncNameSendException, std::chrono::seconds(15));
ASSERT(collector != nullptr, "onRegion collector for arrList nullptr");
std::this_thread::sleep_for(std::chrono::seconds(2));
@@ -287,10 +285,10 @@ DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)

LOGINFO("Executing the exception test it is expected to throw.");
auto executeFunctionResult3 =
funcExec->withArgs(arrList)
->withFilter(filter)
->execute("ThinClientRegionExceptionTest",
std::chrono::seconds(15))
funcExec.withArgs(arrList)
.withFilter(filter)
.execute("ThinClientRegionExceptionTest",
std::chrono::seconds(15))
->getResult();
FAIL("Failed to throw expected exception.");
} catch (...) {

0 comments on commit ec8c908

Please sign in to comment.