Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cpp/csp/adapters/utils/JSONMessageWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ class JSONMessageWriter : public MessageWriter
}

private:
void processTickImpl( const OutputDataMapper & dataMapper, const TimeSeriesProvider * sourcets )
void processTickImpl( const OutputDataMapper & dataMapper, const TimeSeriesProvider * sourcets ) override
{
dataMapper.apply( *this, sourcets );
}

template<typename T>
inline auto convertValue( const T & value )
{
{
return value;
}

Expand All @@ -92,13 +92,13 @@ class JSONMessageWriter : public MessageWriter
template<>
inline auto JSONMessageWriter::convertValue( const std::string & value )
{
return rapidjson::StringRef( value.c_str() );
return rapidjson::StringRef( value.c_str() );
}

template<>
inline auto JSONMessageWriter::convertValue( const csp::Date & value )
{
return rapidjson::Value( value.asYYYYMMDD().c_str(), m_doc.GetAllocator() );
return rapidjson::Value( value.asYYYYMMDD().c_str(), m_doc.GetAllocator() );
}

template<>
Expand All @@ -123,7 +123,7 @@ inline auto JSONMessageWriter::convertValue( const csp::DateTime & value )
template<>
inline auto JSONMessageWriter::convertValue( const csp::TimeDelta & value )
{
return rapidjson::Value( value.asNanoseconds() );
return rapidjson::Value( value.asNanoseconds() );
}

template<>
Expand Down Expand Up @@ -159,7 +159,7 @@ inline auto JSONMessageWriter::convertValue( const StructPtr & struct_, const Cs
if( !nestedEntry.sField -> isSet( struct_.get() ) )
continue;


SupportedCspTypeSwitch::template invoke<SupportedArrayCspTypeSwitch>(
nestedEntry.sField -> type().get(),
[ & ]( auto tag )
Expand Down
1 change: 0 additions & 1 deletion cpp/csp/engine/CspType.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ template<> template<> struct CspType::Type::toCType<CspType::Type::STRING,void>
template<> template<> struct CspType::Type::toCType<CspType::Type::STRUCT,void> { using type = StructPtr; };
template<> template<> struct CspType::Type::toCType<CspType::Type::DIALECT_GENERIC,void> { using type = DialectGenericType; };
template<> template<typename T> struct CspType::Type::toCType<CspType::Type::ARRAY,T> { using type = std::vector<T>; };
template<> template<> struct CspType::Type::toCType<CspType::Type::ARRAY,bool> { using type = std::vector<uint8_t>; };

template<> struct CspType::fromCType<bool> { static CspTypePtr & type() { return CspType::BOOL(); } };
template<> struct CspType::fromCType<int8_t> { static CspTypePtr & type() { return CspType::INT8(); } };
Expand Down
49 changes: 49 additions & 0 deletions cpp/csp/engine/InputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,53 @@ InputAdapter::InputAdapter( Engine *engine, const CspTypePtr &type, PushMode pus
init( type );
}


template<>
bool InputAdapter::consumeTick( const bool & value )
{
switch( pushMode() )
{
case PushMode::LAST_VALUE:
{
if( unlikely( rootEngine() -> cycleCount() == lastCycleCount() ) )
m_timeseries -> lastValueTyped<bool>() = value;
else
this -> outputTickTyped<bool>( rootEngine() -> now(), value );
return true;
}

case PushMode::BURST:
{
CSP_ASSERT( type() -> type() == CspType::Type::ARRAY );
CSP_ASSERT( static_cast<const CspArrayType * >( type() ) -> elemType() -> type() == CspType::Type::fromCType<bool>::type );

using ArrayT = typename CspType::Type::toCType<CspType::Type::ARRAY, bool>::type;
if( likely( rootEngine() -> cycleCount() != lastCycleCount() ) )
{
//ensure we reuse vector memory in our buffer by using reserve api and
//clearing existing value if any
reserveTickTyped<ArrayT>( rootEngine() -> cycleCount(), rootEngine() -> now() ).clear();
}

m_timeseries -> lastValueTyped<ArrayT>().push_back( value );
return true;
}

case PushMode::NON_COLLAPSING:
{
if( unlikely( rootEngine() -> cycleCount() == lastCycleCount() ) )
return false;

this -> outputTickTyped<bool>( rootEngine() -> now(), value );
return true;
}

default:
CSP_THROW( NotImplemented, pushMode() << " mode is not yet supported" );
break;
}

return true;
}

}
3 changes: 1 addition & 2 deletions cpp/csp/engine/InputAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ bool InputAdapter::consumeTick( const T & value )
using ArrayT = typename CspType::Type::toCType<CspType::Type::ARRAY,T>::type;
if( likely( rootEngine() -> cycleCount() != lastCycleCount() ) )
{
//ensure we reuse vector memory in our buffer by using reserve api and
//ensure we reuse vector memory in our buffer by using reserve api and
//clearing existing value if any
reserveTickTyped<ArrayT>( rootEngine() -> cycleCount(), rootEngine() -> now() ).clear();
}
Expand All @@ -97,5 +97,4 @@ bool InputAdapter::consumeTick( const T & value )
}

};

#endif