diff --git a/co/buffer.cpp b/co/buffer.cpp index 0008831a2..c4ec9431d 100644 --- a/co/buffer.cpp +++ b/co/buffer.cpp @@ -1,5 +1,6 @@ /* Copyright (c) 2012, Daniel Nachbaur + * 2012, Stefan.Eilemann@epfl.ch * * This library is free software; you can redistribute it and/or modify it under * the terms of the GNU Lesser General Public License version 2.1 as published @@ -29,14 +30,14 @@ class Buffer { public: Buffer( lunchbox::a_int32_t& freeCounter ) - : _freeCount( freeCounter ) - , _node() - , _localNode() + : freeCount( freeCounter ) + , node() + , localNode() {} - lunchbox::a_int32_t& _freeCount; - NodePtr _node; //!< The node sending the command - LocalNodePtr _localNode; //!< The node receiving the command + lunchbox::a_int32_t& freeCount; + NodePtr node; //!< The node sending the command + LocalNodePtr localNode; //!< The node receiving the command }; } @@ -55,12 +56,17 @@ Buffer::~Buffer() NodePtr Buffer::getNode() const { - return _impl->_node; + return _impl->node; } LocalNodePtr Buffer::getLocalNode() const { - return _impl->_localNode; + return _impl->localNode; +} + +bool Buffer::needsSwapping() const +{ + return _impl->node->isBigEndian() != _impl->localNode->isBigEndian(); } bool Buffer::isValid() const @@ -72,11 +78,11 @@ size_t Buffer::alloc( NodePtr node, LocalNodePtr localNode, const uint64_t size) { LB_TS_THREAD( _writeThread ); LBASSERT( getRefCount() == 1 ); // caller BufferCache - LBASSERT( _impl->_freeCount > 0 ); + LBASSERT( _impl->freeCount > 0 ); - --_impl->_freeCount; - _impl->_node = node; - _impl->_localNode = localNode; + --_impl->freeCount; + _impl->node = node; + _impl->localNode = localNode; reserve( LB_MAX( getMinSize(), size )); resize( size ); @@ -90,14 +96,14 @@ void Buffer::free() clear(); - _impl->_node = 0; - _impl->_localNode = 0; + _impl->node = 0; + _impl->localNode = 0; } void Buffer::deleteReferenced( const Referenced* object ) const { Buffer* buffer = const_cast< Buffer* >( this ); - ++buffer->_impl->_freeCount; + ++buffer->_impl->freeCount; } size_t Buffer::getMinSize() diff --git a/co/buffer.h b/co/buffer.h index 9e82e3e09..cdbd905af 100644 --- a/co/buffer.h +++ b/co/buffer.h @@ -1,5 +1,6 @@ /* Copyright (c) 2012, Daniel Nachbaur + * 2012, Stefan.Eilemann@epfl.ch * * This library is free software; you can redistribute it and/or modify it under * the terms of the GNU Lesser General Public License version 2.1 as published @@ -52,6 +53,9 @@ class Buffer : public lunchbox::Bufferb, public lunchbox::Referenced /** @return the receiving node. */ LocalNodePtr getLocalNode() const; + /** @return true if the content needs to be endian-converted. */ + bool needsSwapping() const; + /** @return true if the buffer has valid data. */ CO_API bool isValid() const; diff --git a/co/command.cpp b/co/command.cpp index 37adbb69b..2fcde598e 100644 --- a/co/command.cpp +++ b/co/command.cpp @@ -51,7 +51,7 @@ class Command , cmd( rhs.cmd ) {} - void operator=( const Command& rhs ) + void operator = ( const Command& rhs ) { func = rhs.func; buffer = rhs.buffer; @@ -169,17 +169,18 @@ NodePtr Command::getMaster() return getNode(); } -bool Command::getNextBuffer( uint32_t* compressor, uint32_t* nChunks, - const void** chunkData, uint64_t* size ) +bool Command::getNextBuffer( uint32_t& compressor, uint32_t& nChunks, + const void** chunkData, uint64_t& size ) { if( !_impl->buffer ) return false; *chunkData = _impl->buffer->getData(); - *size = _impl->buffer->getSize(); - *compressor = EQ_COMPRESSOR_NONE; - *nChunks = 1; + size = _impl->buffer->getSize(); + compressor = EQ_COMPRESSOR_NONE; + nChunks = 1; + setSwapping( _impl->buffer->needsSwapping( )); return true; } diff --git a/co/command.h b/co/command.h index 89c89e475..6677b3dbd 100644 --- a/co/command.h +++ b/co/command.h @@ -106,10 +106,10 @@ namespace detail { class Command; } CO_API virtual size_t nRemainingBuffers() const; CO_API virtual uint128_t getVersion() const; CO_API virtual NodePtr getMaster(); - CO_API virtual bool getNextBuffer( uint32_t* compressor, - uint32_t* nChunks, + CO_API virtual bool getNextBuffer( uint32_t& compressor, + uint32_t& nChunks, const void** chunkData, - uint64_t* size ); + uint64_t& size ); //@} void _skipHeader(); //!< @internal diff --git a/co/dataIStream.cpp b/co/dataIStream.cpp index fa8bf692a..f680e3e87 100644 --- a/co/dataIStream.cpp +++ b/co/dataIStream.cpp @@ -76,7 +76,7 @@ void DataIStream::setSwapping( const bool onOff ) _impl->swap = onOff; } -bool DataIStream::_isSwapping() const +bool DataIStream::isSwapping() const { return _impl->swap; } @@ -144,7 +144,7 @@ bool DataIStream::_checkBuffer() uint32_t nChunks = 0; const void* data = 0; - if( !getNextBuffer( &compressor, &nChunks, &data, &_impl->inputSize )) + if( !getNextBuffer( compressor, nChunks, &data, _impl->inputSize )) return false; _impl->input = _decompress( data, compressor, nChunks, diff --git a/co/dataIStream.h b/co/dataIStream.h index a34596359..d8f8c2b97 100644 --- a/co/dataIStream.h +++ b/co/dataIStream.h @@ -41,6 +41,7 @@ namespace detail { class DataIStream; } virtual uint128_t getVersion() const = 0; //!< @internal virtual void reset() { _reset(); } //!< @internal void setSwapping( const bool onOff ); //!< @internal enable endian swap + CO_API bool isSwapping() const; //!< @internal //@} /** @name Data input */ @@ -133,8 +134,8 @@ namespace detail { class DataIStream; } CO_API virtual ~DataIStream(); //@} - virtual bool getNextBuffer( uint32_t* compressor, uint32_t* nChunks, - const void** chunkData, uint64_t* size )=0; + virtual bool getNextBuffer( uint32_t& compressor, uint32_t& nChunks, + const void** chunkData, uint64_t& size )=0; private: detail::DataIStream* const _impl; @@ -166,16 +167,14 @@ namespace detail { class DataIStream; } return *this; } - CO_API bool _isSwapping() const; - /** Byte-swap a plain data item. @version 1.0 */ template< typename T > void _swap( T& value ) const - { if( _isSwapping( )) swap( value ); } + { if( isSwapping( )) swap( value ); } /** Byte-swap a C array. @version 1.0 */ template< typename T > void _swap( Array< T > array ) const { - if( !_isSwapping( )) + if( !isSwapping( )) return; #pragma omp parallel for for( ssize_t i = 0; i < ssize_t( array.num ); ++i ) diff --git a/co/node.cpp b/co/node.cpp index 6b318a185..f2c70716b 100644 --- a/co/node.cpp +++ b/co/node.cpp @@ -263,6 +263,11 @@ NodePtr Node::createNode( const uint32_t type ) return new Node; } +bool Node::isBigEndian() const +{ + return _impl->bigEndian; +} + bool Node::isReachable() const { return isListening() || isConnected(); diff --git a/co/node.h b/co/node.h index 9ae40b594..918f8f6b6 100644 --- a/co/node.h +++ b/co/node.h @@ -47,6 +47,8 @@ namespace detail { class Node; } //@{ bool operator == ( const Node* n ) const; + bool isBigEndian() const; //!< @internal + CO_API bool isReachable() const; CO_API bool isConnected() const; CO_API bool isClosed() const; diff --git a/co/objectDataIStream.cpp b/co/objectDataIStream.cpp index cc4c714fb..100f2b574 100644 --- a/co/objectDataIStream.cpp +++ b/co/objectDataIStream.cpp @@ -128,8 +128,8 @@ uint128_t ObjectDataIStream::getPendingVersion() const return cmd.getVersion(); } -bool ObjectDataIStream::getNextBuffer( uint32_t* compressor, uint32_t* nChunks, - const void** chunkData, uint64_t* size ) +bool ObjectDataIStream::getNextBuffer( uint32_t& compressor, uint32_t& nChunks, + const void** chunkData, uint64_t& size ) { if( _commands.empty( )) { @@ -138,35 +138,36 @@ bool ObjectDataIStream::getNextBuffer( uint32_t* compressor, uint32_t* nChunks, } _usedCommand = _commands.front(); - ObjectDataCommand command( _usedCommand ); _commands.pop_front(); - if( !command.isValid( )) + if( !_usedCommand.isValid( )) return false; - LBASSERT( command.getCommand() == CMD_OBJECT_INSTANCE || - command.getCommand() == CMD_OBJECT_DELTA || - command.getCommand() == CMD_OBJECT_SLAVE_DELTA ); + LBASSERT( _usedCommand.getCommand() == CMD_OBJECT_INSTANCE || + _usedCommand.getCommand() == CMD_OBJECT_DELTA || + _usedCommand.getCommand() == CMD_OBJECT_SLAVE_DELTA ); + ObjectDataCommand command( _usedCommand ); const uint64_t dataSize = command.getDataSize(); if( dataSize == 0 ) // empty command return getNextBuffer( compressor, nChunks, chunkData, size ); - *size = dataSize; - *compressor = command.getCompressor(); - *nChunks = command.getChunks(); + size = dataSize; + compressor = command.getCompressor(); + nChunks = command.getChunks(); switch( command.getCommand( )) { - case CMD_OBJECT_INSTANCE: + case CMD_OBJECT_INSTANCE: command.get< NodeID >(); // nodeID command.get< uint32_t >(); // instanceID break; - case CMD_OBJECT_SLAVE_DELTA: + case CMD_OBJECT_SLAVE_DELTA: command.get< UUID >(); // commit UUID break; } *chunkData = command.getRemainingBuffer( command.getRemainingBufferSize( )); + setSwapping( command.isSwapping( )); return true; } diff --git a/co/objectDataIStream.h b/co/objectDataIStream.h index d75bce8f1..2ef38bfb1 100644 --- a/co/objectDataIStream.h +++ b/co/objectDataIStream.h @@ -54,8 +54,8 @@ namespace co CO_API virtual NodePtr getMaster(); protected: - virtual bool getNextBuffer( uint32_t* compressor, uint32_t* nChunks, - const void** chunkData, uint64_t* size ); + virtual bool getNextBuffer( uint32_t& compressor, uint32_t& nChunks, + const void** chunkData, uint64_t& size ); private: /** All data commands for this istream. */ diff --git a/tests/dataStream.cpp b/tests/dataStream.cpp index 1546d7296..0dade03f7 100644 --- a/tests/dataStream.cpp +++ b/tests/dataStream.cpp @@ -72,8 +72,8 @@ class DataIStream : public co::DataIStream virtual co::NodePtr getMaster() { return 0; } protected: - virtual bool getNextBuffer( uint32_t* compressor, uint32_t* nChunks, - const void** chunkData, uint64_t* size ) + virtual bool getNextBuffer( uint32_t& compressor, uint32_t& nChunks, + const void** chunkData, uint64_t& size ) { co::Command cmd = _commands.tryPop(); if( !cmd.isValid( )) @@ -83,10 +83,10 @@ class DataIStream : public co::DataIStream TEST( command.getCommand() == co::CMD_OBJECT_DELTA ); - *size = command.getDataSize(); - *compressor = command.getCompressor(); - *nChunks = command.getChunks(); - *chunkData = command.getRemainingBuffer( *size ); + size = command.getDataSize(); + compressor = command.getCompressor(); + nChunks = command.getChunks(); + *chunkData = command.getRemainingBuffer( size ); return true; }