Skip to content

Commit

Permalink
Close Eyescale/Equalizer#146: Implement byt swapping, buffer knows re…
Browse files Browse the repository at this point in the history
…cv/sender endianness and DataIStram::getNextBuffer will update swap state. Needs testing
  • Loading branch information
eile committed Sep 10, 2012
1 parent f19d69f commit ef13b83
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 52 deletions.
36 changes: 21 additions & 15 deletions co/buffer.cpp
@@ -1,5 +1,6 @@

/* Copyright (c) 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
* 2012, Stefan.Eilemann@epfl.ch
*
* This library is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License version 2.1 as published
Expand Down Expand Up @@ -29,14 +30,14 @@ class Buffer
{
public:
Buffer( lunchbox::a_int32_t& freeCounter )
: _freeCount( freeCounter )
, _node()
, _localNode()
: freeCount( freeCounter )
, node()
, localNode()
{}

lunchbox::a_int32_t& _freeCount;
NodePtr _node; //!< The node sending the command
LocalNodePtr _localNode; //!< The node receiving the command
lunchbox::a_int32_t& freeCount;
NodePtr node; //!< The node sending the command
LocalNodePtr localNode; //!< The node receiving the command
};
}

Expand All @@ -55,12 +56,17 @@ Buffer::~Buffer()

NodePtr Buffer::getNode() const
{
return _impl->_node;
return _impl->node;
}

LocalNodePtr Buffer::getLocalNode() const
{
return _impl->_localNode;
return _impl->localNode;
}

bool Buffer::needsSwapping() const
{
return _impl->node->isBigEndian() != _impl->localNode->isBigEndian();
}

bool Buffer::isValid() const
Expand All @@ -72,11 +78,11 @@ size_t Buffer::alloc( NodePtr node, LocalNodePtr localNode, const uint64_t size)
{
LB_TS_THREAD( _writeThread );
LBASSERT( getRefCount() == 1 ); // caller BufferCache
LBASSERT( _impl->_freeCount > 0 );
LBASSERT( _impl->freeCount > 0 );

--_impl->_freeCount;
_impl->_node = node;
_impl->_localNode = localNode;
--_impl->freeCount;
_impl->node = node;
_impl->localNode = localNode;

reserve( LB_MAX( getMinSize(), size ));
resize( size );
Expand All @@ -90,14 +96,14 @@ void Buffer::free()

clear();

_impl->_node = 0;
_impl->_localNode = 0;
_impl->node = 0;
_impl->localNode = 0;
}

void Buffer::deleteReferenced( const Referenced* object ) const
{
Buffer* buffer = const_cast< Buffer* >( this );
++buffer->_impl->_freeCount;
++buffer->_impl->freeCount;
}

size_t Buffer::getMinSize()
Expand Down
4 changes: 4 additions & 0 deletions co/buffer.h
@@ -1,5 +1,6 @@

/* Copyright (c) 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
* 2012, Stefan.Eilemann@epfl.ch
*
* This library is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License version 2.1 as published
Expand Down Expand Up @@ -52,6 +53,9 @@ class Buffer : public lunchbox::Bufferb, public lunchbox::Referenced
/** @return the receiving node. */
LocalNodePtr getLocalNode() const;

/** @return true if the content needs to be endian-converted. */
bool needsSwapping() const;

/** @return true if the buffer has valid data. */
CO_API bool isValid() const;

Expand Down
13 changes: 7 additions & 6 deletions co/command.cpp
Expand Up @@ -51,7 +51,7 @@ class Command
, cmd( rhs.cmd )
{}

void operator=( const Command& rhs )
void operator = ( const Command& rhs )
{
func = rhs.func;
buffer = rhs.buffer;
Expand Down Expand Up @@ -169,17 +169,18 @@ NodePtr Command::getMaster()
return getNode();
}

bool Command::getNextBuffer( uint32_t* compressor, uint32_t* nChunks,
const void** chunkData, uint64_t* size )
bool Command::getNextBuffer( uint32_t& compressor, uint32_t& nChunks,
const void** chunkData, uint64_t& size )
{
if( !_impl->buffer )
return false;

*chunkData = _impl->buffer->getData();
*size = _impl->buffer->getSize();
*compressor = EQ_COMPRESSOR_NONE;
*nChunks = 1;
size = _impl->buffer->getSize();
compressor = EQ_COMPRESSOR_NONE;
nChunks = 1;

setSwapping( _impl->buffer->needsSwapping( ));
return true;
}

Expand Down
6 changes: 3 additions & 3 deletions co/command.h
Expand Up @@ -106,10 +106,10 @@ namespace detail { class Command; }
CO_API virtual size_t nRemainingBuffers() const;
CO_API virtual uint128_t getVersion() const;
CO_API virtual NodePtr getMaster();
CO_API virtual bool getNextBuffer( uint32_t* compressor,
uint32_t* nChunks,
CO_API virtual bool getNextBuffer( uint32_t& compressor,
uint32_t& nChunks,
const void** chunkData,
uint64_t* size );
uint64_t& size );
//@}

void _skipHeader(); //!< @internal
Expand Down
4 changes: 2 additions & 2 deletions co/dataIStream.cpp
Expand Up @@ -76,7 +76,7 @@ void DataIStream::setSwapping( const bool onOff )
_impl->swap = onOff;
}

bool DataIStream::_isSwapping() const
bool DataIStream::isSwapping() const
{
return _impl->swap;
}
Expand Down Expand Up @@ -144,7 +144,7 @@ bool DataIStream::_checkBuffer()
uint32_t nChunks = 0;
const void* data = 0;

if( !getNextBuffer( &compressor, &nChunks, &data, &_impl->inputSize ))
if( !getNextBuffer( compressor, nChunks, &data, _impl->inputSize ))
return false;

_impl->input = _decompress( data, compressor, nChunks,
Expand Down
11 changes: 5 additions & 6 deletions co/dataIStream.h
Expand Up @@ -41,6 +41,7 @@ namespace detail { class DataIStream; }
virtual uint128_t getVersion() const = 0; //!< @internal
virtual void reset() { _reset(); } //!< @internal
void setSwapping( const bool onOff ); //!< @internal enable endian swap
CO_API bool isSwapping() const; //!< @internal
//@}

/** @name Data input */
Expand Down Expand Up @@ -133,8 +134,8 @@ namespace detail { class DataIStream; }
CO_API virtual ~DataIStream();
//@}

virtual bool getNextBuffer( uint32_t* compressor, uint32_t* nChunks,
const void** chunkData, uint64_t* size )=0;
virtual bool getNextBuffer( uint32_t& compressor, uint32_t& nChunks,
const void** chunkData, uint64_t& size )=0;
private:
detail::DataIStream* const _impl;

Expand Down Expand Up @@ -166,16 +167,14 @@ namespace detail { class DataIStream; }
return *this;
}

CO_API bool _isSwapping() const;

/** Byte-swap a plain data item. @version 1.0 */
template< typename T > void _swap( T& value ) const
{ if( _isSwapping( )) swap( value ); }
{ if( isSwapping( )) swap( value ); }

/** Byte-swap a C array. @version 1.0 */
template< typename T > void _swap( Array< T > array ) const
{
if( !_isSwapping( ))
if( !isSwapping( ))
return;
#pragma omp parallel for
for( ssize_t i = 0; i < ssize_t( array.num ); ++i )
Expand Down
5 changes: 5 additions & 0 deletions co/node.cpp
Expand Up @@ -263,6 +263,11 @@ NodePtr Node::createNode( const uint32_t type )
return new Node;
}

bool Node::isBigEndian() const
{
return _impl->bigEndian;
}

bool Node::isReachable() const
{
return isListening() || isConnected();
Expand Down
2 changes: 2 additions & 0 deletions co/node.h
Expand Up @@ -47,6 +47,8 @@ namespace detail { class Node; }
//@{
bool operator == ( const Node* n ) const;

bool isBigEndian() const; //!< @internal

CO_API bool isReachable() const;
CO_API bool isConnected() const;
CO_API bool isClosed() const;
Expand Down
25 changes: 13 additions & 12 deletions co/objectDataIStream.cpp
Expand Up @@ -128,8 +128,8 @@ uint128_t ObjectDataIStream::getPendingVersion() const
return cmd.getVersion();
}

bool ObjectDataIStream::getNextBuffer( uint32_t* compressor, uint32_t* nChunks,
const void** chunkData, uint64_t* size )
bool ObjectDataIStream::getNextBuffer( uint32_t& compressor, uint32_t& nChunks,
const void** chunkData, uint64_t& size )
{
if( _commands.empty( ))
{
Expand All @@ -138,35 +138,36 @@ bool ObjectDataIStream::getNextBuffer( uint32_t* compressor, uint32_t* nChunks,
}

_usedCommand = _commands.front();
ObjectDataCommand command( _usedCommand );
_commands.pop_front();
if( !command.isValid( ))
if( !_usedCommand.isValid( ))
return false;

LBASSERT( command.getCommand() == CMD_OBJECT_INSTANCE ||
command.getCommand() == CMD_OBJECT_DELTA ||
command.getCommand() == CMD_OBJECT_SLAVE_DELTA );
LBASSERT( _usedCommand.getCommand() == CMD_OBJECT_INSTANCE ||
_usedCommand.getCommand() == CMD_OBJECT_DELTA ||
_usedCommand.getCommand() == CMD_OBJECT_SLAVE_DELTA );

ObjectDataCommand command( _usedCommand );
const uint64_t dataSize = command.getDataSize();

if( dataSize == 0 ) // empty command
return getNextBuffer( compressor, nChunks, chunkData, size );

*size = dataSize;
*compressor = command.getCompressor();
*nChunks = command.getChunks();
size = dataSize;
compressor = command.getCompressor();
nChunks = command.getChunks();
switch( command.getCommand( ))
{
case CMD_OBJECT_INSTANCE:
case CMD_OBJECT_INSTANCE:
command.get< NodeID >(); // nodeID
command.get< uint32_t >(); // instanceID
break;
case CMD_OBJECT_SLAVE_DELTA:
case CMD_OBJECT_SLAVE_DELTA:
command.get< UUID >(); // commit UUID
break;
}
*chunkData = command.getRemainingBuffer( command.getRemainingBufferSize( ));

setSwapping( command.isSwapping( ));
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions co/objectDataIStream.h
Expand Up @@ -54,8 +54,8 @@ namespace co
CO_API virtual NodePtr getMaster();

protected:
virtual bool getNextBuffer( uint32_t* compressor, uint32_t* nChunks,
const void** chunkData, uint64_t* size );
virtual bool getNextBuffer( uint32_t& compressor, uint32_t& nChunks,
const void** chunkData, uint64_t& size );

private:
/** All data commands for this istream. */
Expand Down
12 changes: 6 additions & 6 deletions tests/dataStream.cpp
Expand Up @@ -72,8 +72,8 @@ class DataIStream : public co::DataIStream
virtual co::NodePtr getMaster() { return 0; }

protected:
virtual bool getNextBuffer( uint32_t* compressor, uint32_t* nChunks,
const void** chunkData, uint64_t* size )
virtual bool getNextBuffer( uint32_t& compressor, uint32_t& nChunks,
const void** chunkData, uint64_t& size )
{
co::Command cmd = _commands.tryPop();
if( !cmd.isValid( ))
Expand All @@ -83,10 +83,10 @@ class DataIStream : public co::DataIStream

TEST( command.getCommand() == co::CMD_OBJECT_DELTA );

*size = command.getDataSize();
*compressor = command.getCompressor();
*nChunks = command.getChunks();
*chunkData = command.getRemainingBuffer( *size );
size = command.getDataSize();
compressor = command.getCompressor();
nChunks = command.getChunks();
*chunkData = command.getRemainingBuffer( size );
return true;
}

Expand Down

0 comments on commit ef13b83

Please sign in to comment.