Skip to content

Commit

Permalink
recorder: add block memory manager
Browse files Browse the repository at this point in the history
  • Loading branch information
christoph2 committed Aug 18, 2022
1 parent 5597494 commit d6498a9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pyxcp/recorder/build_clang.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
clang++ -std=c++20 -O0 -fvectorize -Rpass=loop-vectorize -ggdb -Wall -Wextra -Weffc++-lpthread -DLZ4_DEBUG=1 -DSTANDALONE_REKORDER=1 lz4.cpp rekorder.cpp -o rekorder
clang++ -std=c++20 -O0 -fvectorize -Rpass=loop-vectorize -ggdb -Wall -Wextra -Weffc++ -lpthread -DLZ4_DEBUG=1 -DSTANDALONE_REKORDER=1 lz4.cpp rekorder.cpp -o rekorder
63 changes: 57 additions & 6 deletions pyxcp/recorder/rekorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <array>
#include <atomic>
#include <bitset>
#include <exception>
#include <functional>
#include <optional>
Expand Down Expand Up @@ -69,6 +70,7 @@ constexpr auto megabytes(std::size_t value) -> std::size_t
return kilobytes(value) * 1024;
}

constexpr uint16_t XCP_PAYLOAD_MAX = 0xFFFF;

/*
byte-order is, where applicable little ending (LSB first).
Expand Down Expand Up @@ -269,6 +271,58 @@ class Event {
};


/*
*
* Super simplicistic block memory manager.
*
*/
template <typename T, int _IS, int _NB>
class BlockMemory {

public:

using mem_block_t = T[_IS];

explicit BlockMemory() : m_memory{nullptr}, m_allocation_count{0} {
m_memory = new T[_IS * _NB]; // Ts to allocate: itemsize * number of items.
//printf("mem-base : %p\n", m_memory);
}

~BlockMemory() {
if (m_memory) {
delete[] m_memory;
}
}

T * acquire() noexcept {
const std::lock_guard<std::mutex> lock(m_mtx);

if (m_allocation_count >= _NB) {
return nullptr;
}
T * ptr = reinterpret_cast<T *>((m_memory + (m_allocation_count * _IS)));
//printf("acquire() %p\n", ptr);
m_allocation_count++;
return ptr;
}

void release() noexcept {
const std::lock_guard<std::mutex> lock(m_mtx);
//printf("release() %u\n", m_allocation_count);
if (m_allocation_count == 0) {
return;
}
m_allocation_count--;
}

private:

T * m_memory;
std::size_t m_allocation_count;
std::mutex m_mtx;
};


/**
*/
class XcpLogFileWriter
Expand Down Expand Up @@ -326,6 +380,7 @@ class XcpLogFileWriter

void add_frame(uint8_t category, uint16_t counter, double timestamp, uint16_t length, char const * data) {
auto payload= new char[length];
//auto payload = mem.acquire();

_fcopy(payload, data, length);
my_queue.put(
Expand Down Expand Up @@ -420,14 +475,13 @@ class XcpLogFileWriter
{
break;
}

auto values = content->value();
auto [category, counter, timestamp, length, payload] = content->value();
const frame_header_t frame{ category, counter, timestamp, length };

store_im(&frame, sizeof(frame));
store_im(payload, length);
delete[] payload;
//mem.release();
m_container_record_count += 1;
m_container_size_uncompressed += (sizeof(frame) + length);
if (m_container_size_uncompressed > m_chunk_size) {
Expand Down Expand Up @@ -466,11 +520,8 @@ class XcpLogFileWriter
bool m_finalized{false};
std::thread collector_thread{};
std::mutex mtx;
std::queue<FrameVector> data_queue;
std::condition_variable data_cond;

TsQueue<std::optional<FrameTupleWriter>> my_queue;

BlockMemory<char, XCP_PAYLOAD_MAX, 16> mem{};
std::atomic_bool stop_collector_thread_flag{false};
};

Expand Down
5 changes: 3 additions & 2 deletions pyxcp/recorder/test_reko.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
writer = XcpLogFileWriter("test_logger", prealloc=100, chunk_size=1)

# Write some records.
start_time = perf_counter()
for idx in range(512 * 1024):
value = idx % 256
writer.add_frame(FrameCategory.CMD, idx, perf_counter(), bytes([value] * value))
writer.add_frame(FrameCategory.CMD, idx, perf_counter() - start_time, bytes([value] * value))
writer.finalize() # We're done.


Expand All @@ -22,7 +23,7 @@
df = reader.as_dataframe() # Return recordings as Pandas DataFrame.
print(df.info())
print(df.describe())

print(df)

reader.reset_iter() # Non-standard method to restart iteration.

Expand Down

0 comments on commit d6498a9

Please sign in to comment.