Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Memory allocation for very large element sizes? #60

Closed
vectronic opened this issue Mar 3, 2018 · 4 comments
Closed

Question: Memory allocation for very large element sizes? #60

vectronic opened this issue Mar 3, 2018 · 4 comments
Assignees
Labels

Comments

@vectronic
Copy link

In the case of processing video, buffers for high-res video frames (e.g. uncompressed 4K with 12 bits per channel) can be very large. The way I understand Raft is that all FIFOs would need to allocate space for these frames (elements).

A. Would there be a limit to the element size before efficiencies in caching and locality break down e.g. the size of a cache line?

B. If so, if the element was a smaller wrapper object pointing to a global shared pool of pre-allocated frame buffers, I assume this would just shift the problem of caching and locality to the shared pool? And this could be even harder to coordinate if multiple kernels are making use of it...

@jonathan-beard
Copy link
Member

jonathan-beard commented Mar 10, 2018

Would there be a limit to the element size before efficiencies in caching and locality break down >e.g. the size of a cache line?

By definition there is a limit to efficiencies for caching when the element size goes outside of the size of a single cache line. RaftLib by design has three different behavior for object allocation size.
For a super simple example, take a look at the external allocate test suite file.

Here's an excerpt of the test case:

if( ext_alloc< int[32] >::value != true )
{
   return( EXIT_FAILURE );
}

I'd like to understand the object structure a bit better to give a definitive answer...For small allocation (fundamental types or data structures less than a cache line size) the allocations are essentially in-place within the FIFO. As long as that FIFO structure between kernels exists at the current size, the data is read in place from producer to consumer. When objects are allocated (in C++, things with constructors vs. P.O.D types) then the constructor is called in-place to set up the structure before the user gets to use it. For items greater than a single cache line, RaftLib uses either the system slab allocator to allocate the large objects then passes around pointers to that object. That way the large allocations can exist across an entire topology vs. just at one FIFO. RaftLib handles passing the pointer from one kernel to the next safely (see next post), then discards it when that pointer is no longer in use. Another feature of the large allocation is that RaftLib inserts prefetch hints into the instruction stream for those large objects when they near the head of the queue for the consumer. That way, despite being a pointer, we parallelize as best as possible the memory access to those large objects. We don't do this for things that are less than a cache line size given that most next line pre-fetchers handle the FIFO pattern that's designed in RaftLib very very well (i.e., the follow-on data will always be in the cache for the consumer). This does create some pedantic worst case behavior with false sharing, some architectures do better than others with this.

If so, if the element was a smaller wrapper object pointing to a global shared pool of pre-allocated frame buffers, I assume this would just shift the problem of caching and locality to the shared pool? And this could be even harder to coordinate if multiple kernels are making use of it...

I think I answered this one in the last response. If there's a large frame buffer, RaftLib automatically creates a pool with automated pre-fetch hints that are transparent to the user. The pointer to those pools is passed around transparently and garbage collected when no longer in use. It's entirely transparent to the user when these pools are created, the only way you'll know is by doing sizeof(x) on the type you're allocating. When calling allocate on the port is when this behavior is kicked off. Push should also do it as well, when copying in a large object. I'll double check to make sure that's the case, but I'm pretty sure it is. The idea with keeping the pools managed by RaftLib is that there isn't a way for multiple kernels to step on each other by accessing the same data. However, given that glibc uses a nice slab allocator, the pool of memory is essentially managed by the OS. One thing I need to re-implement is the page management for these pools. Ideally I'd be able to call numa_page_move to create locality for the pool as it moves through the graph, right now first touch in Linux puts it on the NUMA node of first allocation. To fix the current behavior, I'd need to wrap the glibc allocator and use aligned memory...basically, each allocation would need to get a new page no matter how big the allocation or how small.

To get a better idea of how this works, it'd be good to start with the allocation traits header file. This gives the definitions that enable the rest of the templates to decide which type of FIFO to instantiate.

Going from the "outside" to the inside, looking at the fifo.hpp file we get some functions like this one:

   template< class T,
             typename std::enable_if< ext_alloc< T >::value >::type* = nullptr >
   T& peek( raft::signal *signal = nullptr )
   {
      T **ptr( nullptr );
      local_peek( (void**)&ptr, signal );
      return( **ptr );
   }

That have to use the templates here to ensure that the correct pointer type is returned. In this case, if it's an external object that's created with the allocate function (it's created by default in a pool above a certain size. For really big objects, we get this type of construct in the fifo.hpp file since
it's the last layer of object that actually "understands" what the object type is before we just get to bytes and addresses:

   template < class T,
              class ... Args,
              typename std::enable_if< ext_alloc< T >::value >::type* = nullptr > 
   T& 
   allocate( Args&&... params )
   {
      T **ptr( nullptr );
      /** call blocks till an element is available **/
      local_allocate( (void**) &ptr );
      *ptr = new T( std::forward< Args >( params )... );
      return( **ptr );
   }

Then going to the ringbuffer itself, for the heap implementation, there are actually three distinct variants. You'll see comments like this throughout the file to help you find your way:

/*********************************
 * INLINE CLASS ALLOC STARTS HERE
 *********************************/

template < class T >
class RingBufferBase<
    T,
    Type::Heap,
    typename std::enable_if< inl

If you're curious, for the external pool, the code looks like this:

   virtual void  local_push( void *ptr, const raft::signal &signal )
   {
      for(;;)
      {
         /** data manager is for the ring buffer resizing **/
         (this)->datamanager.enterBuffer( dm::push );
         if( (this)->datamanager.notResizing() )
         {
            if( (this)->space_avail() > 0 )
            {
               break;
            }
         }
         (this)->datamanager.exitBuffer( dm::push );
         if( (this)->write_stats.bec.blocked == 0 )
         {
            (this)->write_stats.bec.blocked = 1;
         }
#if (defined NICE) && (! defined USEQTHREADS)
         std::this_thread::yield();
#if __x86_64
         __asm__ volatile("\
           pause"
           :
           :
           : );
#endif
#endif
#ifdef USEQTHREADS
         qthread_yield();
#endif
      }
      auto * const buff_ptr( (this)->datamanager.get() );
       const size_t write_index( Pointer::val( buff_ptr->write_pt ) );
      if( ptr != nullptr )
      {
          T *item( reinterpret_cast< T* >( ptr ) );
          T * temp( new (
            &buff_ptr->store[ write_index ]
          ) T( *item  ) );
          UNUSED( temp );
          (this)->write_stats.bec.count++;
       }
      buff_ptr->signal[ write_index ]         = signal;
       Pointer::inc( buff_ptr->write_pt );
      if( signal == raft::quit )
      {
         (this)->write_finished = true;
      }
      (this)->datamanager.exitBuffer( dm::push );
   }

Lastly, if we peek an externally allocated object. There's some code that enables prefetch

   /**
    * local_peek() - look at a reference to the head of the
    * ring buffer.  This doesn't remove the item, but it
    * does give the user a chance to take a look at it without
    * removing.
    * @return T&
    */
   virtual void local_peek(  void **ptr, raft::signal *signal )
   {
      for(;;)
      {

         (this)->datamanager.enterBuffer( dm::peek );
         if( (this)->datamanager.notResizing() )
         {
            if( (this)->size() > 0  )
            {
               break;
            }
            else if( (this)->is_invalid() && (this)->size() == 0 )
            {
               throw ClosedPortAccessException(
                  "Accessing closed port with local_peek call, exiting!!" );
            }
         }
         (this)->datamanager.exitBuffer( dm::peek );
#if (defined NICE) && (! defined USEQTHREADS)
         std::this_thread::yield();
#if  __x86_64
         __asm__ volatile("\
           pause"
           :
           :
           : );
#endif
#endif
#ifdef USEQTHREADS
         qthread_yield();
#endif
      }
      auto * const buff_ptr( (this)->datamanager.get() );
      const size_t read_index( Pointer::val( buff_ptr->read_pt ) );
      if( signal != nullptr )
      {
         *signal = buff_ptr->signal[ read_index ];
      }
      //actual pointer
      auto ***real_ptr( reinterpret_cast< T*** >( ptr ) );
      *real_ptr = reinterpret_cast< T** >( &( buff_ptr->store[ read_index ] ) );
      /** prefetch first 1024  bytes **/
      //probably need to optimize this for each arch / # of
      //threads
      raft::prefetch< raft::READ,
                      raft::LOTS,
                      sizeof( T ) < sizeof( std::uintptr_t ) << 7 ?
                      sizeof( T ) : sizeof( std::uintptr_t ) << 7
                      >( **real_ptr );
      (this)->in_peek->insert( reinterpret_cast< ptr_t >( **real_ptr ) );
      return;
      /**
       * exitBuffer() called when recycle is called, can't be sure the
       * reference isn't being used until all outside accesses to it are
       * invalidated from the buffer.
       */
   }

There's two really important things going on here. First, there's a set (in_peek) that operates across each kernel in the run function that implements garbage collection, but that's called from the scheduler so the garbage collection calls are at the right times and can be coordinated with execution.

@jonathan-beard
Copy link
Member

jonathan-beard commented Mar 10, 2018

Ok, in picture form, here's what happens when you allocate a large object in RaftLib on a port (key word, allocate on a port. The example I'm looking at is actually a test case. Based on the suggestion fo @mr-j0nes , I'm going to make these all simpler and put them in an examples folder to provide simple template examples to follow link to code.

Here's the example mechanism in picture form

raftlib_gc

Example in code form

The first bit to look at is the allocate itself, along with the defined type of what we're allocating

using obj_t = foo< 80 >;

class start : public raft::kernel
{
public:
    start() : raft::kernel()
    {
        output.addPort< obj_t >( "y" );
    }

    virtual ~start() = default;

    virtual raft::kstatus run()
    {
        //simply allocating mem and writing, note it's larger than a cache line
        //so based on the template it gets a pool allocate
        auto &mem( output[ "y" ].allocate< obj_t >() );
        for( auto i( 0 ); i < mem.length; i++ )
        {
            mem.pad[ i ] = static_cast< int >( counter );
        }
        output[ "y" ].send();
        counter++;
        if( counter == 200 )
        {
            return( raft::stop );
        }
        return( raft::proceed );
    }
private:
    std::size_t counter = 0;
};

So, there's no outward appearance that any of this is actually occurring, however, that's the nice thing. There's no way a programmer can leak memory or give a dangling pointer to a downstream kernel, the handoff is managed for you with proven methods and leaks are prevented by refcounting the pointers across the entire application automatically.

Okay, so how does this work, lets look at the _middle kernel

class middle : public raft::kernel
{
public:
    middle() : raft::kernel()
    {
        input.addPort< obj_t >( "x" );
        output.addPort< obj_t >( "y" );
    }

    virtual raft::kstatus run()
    {
        auto &val( input[ "x" ].peek< obj_t >() );
        output[ "y" ].push( val );
        input[ "x" ].unpeek();
        input[ "x" ].recycle( 1 );
        return( raft::proceed );
    }
};

Just as in the picture, the reference is peeked from the input stream (taking control from the stream), then pushed to the output stream (giving control to the output stream). The only data being moved at this point is enough data to indicate to the next stream that it is to take control of pointer to memory pool object pointed to by ref in this FIFO order. The unpeek call tells the input stream that the place reserved for it in FIFO order is no longer needed. Technically the runtime takes control of the pointer and from the point you've pushed it, you can't really do anything to it. For symmetry I thought it best to keep the unpeek and recycle. The recycle here actually serves a purpose, it calls the machinery which invalidates that point in the queue and signals to the FIFO sizing monitor that this kernel is no longer touching the stream and so the maintenance monitor can resize or numa move if it desires.

The point is, that the data is never copied or deleted outside of the pointer move. If the programmer hadn't called push, the object would have been deconstructed lazily, the pool memory wouldn't be deallocated for quite awhile.

Taking a look at the last kernel

class last : public raft::kernel
{
public:
    last() : raft::kernel()
    {
        input.addPort< obj_t >( "x" );
    }

    virtual ~last() = default;

    virtual raft::kstatus run()
    {
        obj_t mem;
        //copy object
        input[ "x" ].pop( mem );
        //proceed until we have all of them then exit automatically
        return( raft::proceed );
    }
};

The last kernel simply exits once all objects are received.

If interested, each scheduler has to implement its own "run" function to manage the pool of kernels or single kernel that it has control over. This is called from a base schedule function to the more specialized ones. Here's an excerpt from the simpleschedule file where it's easy to see how the gc across a single kernel firing works

void
simple_schedule::simple_run( void * data ) 
{
   assert( data != nullptr );
   auto * const thread_d( reinterpret_cast< thread_data* >( data ) );
   ptr_map_t in;
   ptr_set_t out;
   ptr_set_t peekset;

   //set these for the fifo classes to collect the pointers used 
   //on both stream input and output, then we garbage collect
   //at the end of several firings
   Schedule::setPtrSets( thread_d->k, 
                        &in, 
                        &out,
                        &peekset );
   if( thread_d->loc != -1 )
   {
      /** call does nothing if not available **/
      affinity::set( thread_d->loc );
   }
   else
   {
#ifdef USE_PARTITION
       assert( false );
#endif
   }
   //key to remember is here, the kernel gets to run quite a bit
   //before we GC
   while( ! *(thread_d->finished) )
   {
      Schedule::kernelRun( thread_d->k, *(thread_d->finished) );
      //takes care of peekset clearing too
      //figure out which data needs to be reclaimed
      Schedule::fifo_gc( &in, &out, &peekset );
   }
}

Here's the fun part, the garbage collection in schedule.cpp. It should be noted, that there are various levels to the RaftLib runtime. At the upper level, everything is nicely typed. At the middle layer, the ringbuffer template files are basically the last bit to understand type. Everything else only cares about moving bytes, alignment, access permissions, synchronization. Much easier to build the data movement bits completely divorced from the C++ type bits. They're just data delivery vehicles after all. Okay, on to the rest of the fun stuff.

void
Schedule::fifo_gc( ptr_map_t * const in,
                   ptr_set_t * const out,
                   ptr_set_t * const peekset )
{
    auto in_begin( in->begin() ), in_end( in->end() );
    auto out_beg( out->begin() ), out_end( out->end() );
    while( out_beg != out_end )
    {
        if( (*out_beg) != (*in_begin).first )
        {
            void * const ptr = reinterpret_cast< void* >( (*in_begin).first );
            (*in_begin).second( ptr );
            ++in_begin;
        }
        else
        {
            ++out_beg;
            ++in_begin;
        }
    }
    while( in_begin != in_end )
    {
            void * const ptr = reinterpret_cast< void* >( (*in_begin).first );
            (*in_begin).second( ptr );
            ++in_begin;
    }
    in->clear();
    out->clear();
    peekset->clear();
    return;
}

Within the fifo itself ringbufferheap.tcc there is code to enter the pointers as they're received into maps at peek/pop/push ops so that the proper action can be taken, as an example, on peek

      /** prefetch first 1024  bytes **/
      //probably need to optimize this for each arch / # of
      //threads
      raft::prefetch< raft::READ,
                      raft::LOTS,
                      sizeof( T ) < sizeof( std::uintptr_t ) << 7 ?
                      sizeof( T ) : sizeof( std::uintptr_t ) << 7
                      >( **real_ptr );
      (this)->in_peek->insert( reinterpret_cast< ptr_t >( **real_ptr ) );
      return;
      /**
       * exitBuffer() called when recycle is called, can't be sure the
       * reference isn't being used until all outside accesses to it are
       * invalidated from the buffer.
       */

then on push of that reference, this code would be called

         T *item( reinterpret_cast< T* >( ptr ) );
         auto **b_ptr( reinterpret_cast< T** >( &buff_ptr->store[ write_index ] ) );

         if( (this)->out_peek->find(
            reinterpret_cast< std::uintptr_t >( item ) ) != (this)->out_peek->cend() )
         {
            /** HERE's THE LINE WE'RE LOOKING FOR **/
            //this was from a previous peek call
            (this)->out->insert( reinterpret_cast< std::uintptr_t >( item ) );
            *b_ptr = item;
         }
         else /** hope we have a move/copy constructor for object **/
         {
            *b_ptr = new T( *item );
         }
         (this)->write_stats.bec.count++;
       }
      buff_ptr->signal[ write_index ]         = signal;
       Pointer::inc( buff_ptr->write_pt );
      if( signal == raft::quit )
      {
         (this)->write_finished = true;
      }
      (this)->datamanager.exitBuffer( dm::push );
   }

Where I've labeled the line that checks to see if the pooled object should be garbage collected or not. There are more performant ways, however, this is the safest and there's not that much extra performance to be gained. For a large application, I'd gladly take 1-2% perf hit for the satisfaction in knowing the data passed from one port to the next won't segfault because I did something silly or someone else did. That being said, there are plenty of optimizations that could be made on this loop, just haven't had time to do them.

Any questions? There's quite a bit of depth to the process so feel free to ask!

-Jonathan

@jonathan-beard
Copy link
Member

@vectronic, feel free to close if you're satisfied with the answer.

-J

@vectronic
Copy link
Author

Thank you so much for the in-depth answer. You've given me a lot to digest and I definitely have a better grasp on things now. Diagram was excellent as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants