Skip to content

Commit

Permalink
recorder: threaded writer
Browse files Browse the repository at this point in the history
  • Loading branch information
christoph2 committed Aug 15, 2022
1 parent 89253f6 commit 8dff312
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 61 deletions.
3 changes: 1 addition & 2 deletions pyxcp/recorder/rekorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ void some_records(XcpLogFileWriter& writer)
fr.length = 10 + (rand() % 240);
filler = (filler + 1) % 16;
memset(buffer, filler, fr.length);
auto payload = create_payload(fr.length, buffer);
writer.add_frame(fr.category, fr.counter, fr.timestamp, fr.length, payload);
writer.add_frame(fr.category, fr.counter, fr.timestamp, fr.length, reinterpret_cast<blob_t *>(buffer));
}
}

Expand Down
153 changes: 94 additions & 59 deletions pyxcp/recorder/rekorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <cerrno>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <cstdio>

Expand All @@ -36,7 +37,6 @@
#include <Windows.h>
#endif /* _WIN32 */

#include <stdlib.h>

#include "lz4.h"
#include "mio.hpp"
Expand Down Expand Up @@ -147,7 +147,7 @@ inline auto file_header_size() -> std::size_t {

using rounding_func_t = std::function<std::size_t(std::size_t)>;

const rounding_func_t create_rounding_func(std::size_t multiple) {
inline const rounding_func_t create_rounding_func(std::size_t multiple) {
return [=](std::size_t value) -> std::size_t {
return (value + (multiple - 1)) & ~(multiple -1 );
};
Expand All @@ -168,14 +168,14 @@ inline void _fcopy(char * dest, char const * src, std::size_t n)
return payload.get();
}

payload_t create_payload(std::size_t size, blob_t const * data) {
auto pl = std::make_unique<blob_t[]>(size);
//auto pl = std::make_shared<blob_t[]>(size);
_fcopy(pl.get(), data, size);
inline payload_t create_payload(std::size_t size, blob_t const * data) {
//auto pl = std::make_unique<char[]>(size);
auto pl = std::make_shared<blob_t[]>(size);
_fcopy(reinterpret_cast<char*>(pl.get()), reinterpret_cast<char const*>(data), size);
return pl;
}
#else
payload_t create_payload(std::size_t size, blob_t const * data) {
inline payload_t create_payload(std::size_t size, blob_t const * data) {
return py::array_t<blob_t>(size, data);
}

Expand Down Expand Up @@ -280,10 +280,10 @@ class XcpLogFileWriter
m_file_name.c_str(),
GENERIC_READ | GENERIC_WRITE,
0,
(LPSECURITY_ATTRIBUTES)NULL,
(LPSECURITY_ATTRIBUTES)nullptr,
CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS,
NULL
nullptr
);
//printf("CreateFile returned: %u\n", GetLastError());
#else
Expand All @@ -296,6 +296,8 @@ class XcpLogFileWriter
m_chunk_size = megabytes(chunk_size);
m_intermediate_storage = new blob_t[m_chunk_size + megabytes(1)];
m_offset = detail::FILE_HEADER_SIZE + detail::MAGIC_SIZE;

start_thread();
}

~XcpLogFileWriter() {
Expand All @@ -305,6 +307,7 @@ class XcpLogFileWriter
void finalize() {
if (!m_finalized) {
m_finalized = true;
stop_thread();
if (m_container_record_count) {
compress_frames();
}
Expand All @@ -321,8 +324,17 @@ class XcpLogFileWriter
}
}

void add_frame(uint8_t category, uint16_t counter, double timestamp, uint16_t length, const payload_t& payload) {
frame_header_t frame {category, counter, timestamp, length};
void add_frame(uint8_t category, uint16_t counter, double timestamp, uint16_t length, blob_t * payload/*payload_t& payload*/) {
auto pl = std::make_shared<blob_t[]>(length);
_fcopy(reinterpret_cast<char*>(pl.get()), reinterpret_cast<char const *>(payload), length);
//#if 0
my_queue.put(
std::make_tuple(category, counter, timestamp, length, pl)
);
//#endif

#if 0
const frame_header_t frame {category, counter, timestamp, length};

store_im(&frame, sizeof(frame));
store_im(get_payload_ptr(payload), length);
Expand All @@ -331,6 +343,7 @@ class XcpLogFileWriter
if (m_container_size_uncompressed > m_chunk_size) {
compress_frames();
}
#endif
}

protected:
Expand All @@ -340,10 +353,11 @@ class XcpLogFileWriter
#if defined(_WIN32)

DWORD result;
result = SetFilePointer(m_fd, size, NULL, FILE_BEGIN);
//printf("SetFilePointer returned: %ld\n", result);

result = SetFilePointer(m_fd, size, nullptr, FILE_BEGIN);

result = SetEndOfFile(m_fd);
//printf("SetEndOfFile returned: %ld\n", result);

#else
ftruncate(m_fd, size);
#endif
Expand Down Expand Up @@ -385,7 +399,7 @@ class XcpLogFileWriter
m_num_containers += 1;
}

void write_bytes(std::size_t pos, std::size_t count, char const * buf)
void write_bytes(std::size_t pos, std::size_t count, char const * buf) const
{
auto addr = reinterpret_cast<char *>(ptr(pos));

Expand All @@ -406,22 +420,71 @@ class XcpLogFileWriter
write_bytes(0x00000000UL + detail::MAGIC_SIZE, detail::FILE_HEADER_SIZE, reinterpret_cast<char const*>(&header));
}

bool start_thread() {
if (collector_thread.joinable()) {
return false;
}
stop_collector_thread_flag = false;
collector_thread = std::thread([this]() {
while (!stop_collector_thread_flag) {
auto item = my_queue.get();
const auto content = item.get();
if (stop_collector_thread_flag == true)
{
break;
}
auto [category, counter, timestamp, length, payload] = content->value();
const frame_header_t frame{ category, counter, timestamp, length };
// /*


store_im(&frame, sizeof(frame));
store_im(get_payload_ptr(payload), length);
m_container_record_count += 1;
m_container_size_uncompressed += (sizeof(frame) + length);
if (m_container_size_uncompressed > m_chunk_size) {
compress_frames();
}
// */
}
});
return true;
}

bool stop_thread() {
if (!collector_thread.joinable()) {
return false;
}
stop_collector_thread_flag = true;
my_queue.put(std::nullopt);
collector_thread.join();
return true;
}

private:
std::string m_file_name;
std::size_t m_offset{0};
std::size_t m_chunk_size{0};
std::size_t m_num_containers{0};
std::size_t m_record_count{0};
std::size_t m_container_record_count{0};
std::size_t m_total_size_uncompressed{0};
std::size_t m_total_size_compressed{0};
std::size_t m_container_size_uncompressed{0};
std::size_t m_container_size_compressed{0};
std::uint32_t m_chunk_size{0};
std::uint32_t m_num_containers{0};
std::uint32_t m_record_count{0UL};
std::uint32_t m_container_record_count{0UL};
std::uint32_t m_total_size_uncompressed{0UL};
std::uint32_t m_total_size_compressed{0UL};
std::uint32_t m_container_size_uncompressed{0UL};
std::uint32_t m_container_size_compressed{0UL};
__ALIGN blob_t * m_intermediate_storage{nullptr};
std::size_t m_intermediate_storage_offset{0};
std::uint32_t m_intermediate_storage_offset{0};
mio::file_handle_type m_fd{INVALID_HANDLE_VALUE};
mio::mmap_sink * m_mmap{nullptr};
bool m_finalized{false};
std::thread collector_thread{};
std::mutex mtx;
std::queue<FrameVector> data_queue;
std::condition_variable data_cond;

TsQueue<std::optional<FrameTuple>> my_queue;

std::atomic_bool stop_collector_thread_flag{false};
};


Expand All @@ -437,8 +500,7 @@ class XcpLogFileReader
blob_t magic[detail::MAGIC_SIZE + 1];

read_bytes(0ul, detail::MAGIC_SIZE, magic);
if (memcmp(detail::MAGIC.c_str(), magic, detail::MAGIC_SIZE))
{
if (memcmp(detail::MAGIC.c_str(), magic, detail::MAGIC_SIZE) != 0) {
throw std::runtime_error("Invalid file magic.");
}
m_offset = detail::MAGIC_SIZE;
Expand All @@ -463,11 +525,13 @@ class XcpLogFileReader
m_offset += detail::FILE_HEADER_SIZE;
}

const FileHeaderType get_header() const noexcept {
[[nodiscard]]
FileHeaderType get_header() const noexcept {
return m_header;
}

auto get_header_as_tuple() -> HeaderTuple {
[[nodiscard]]
auto get_header_as_tuple() const -> HeaderTuple {
auto hdr = get_header();

return std::make_tuple(
Expand Down Expand Up @@ -509,7 +573,7 @@ class XcpLogFileReader
for (std::uint32_t idx = 0; idx < container.record_count; ++idx) {
_fcopy(reinterpret_cast<char *>(&frame), reinterpret_cast<char const*>(&(buffer[boffs])), sizeof(frame_header_t));
boffs += sizeof(frame_header_t);
result.push_back(std::make_tuple(frame.category, frame.counter, frame.timestamp, frame.length, create_payload(frame.length, &buffer[boffs])));
result.emplace_back(std::make_tuple(frame.category, frame.counter, frame.timestamp, frame.length, create_payload(frame.length, &buffer[boffs])));
boffs += frame.length;
}
m_offset += container.size_compressed;
Expand All @@ -525,6 +589,7 @@ class XcpLogFileReader
}

protected:
[[nodiscard]]
blob_t const *ptr(std::size_t pos = 0) const
{
return reinterpret_cast<blob_t const*>(m_mmap->data() + pos);
Expand All @@ -536,42 +601,12 @@ class XcpLogFileReader
_fcopy(reinterpret_cast<char *>(buf), addr, count);
}

bool start_thread() {
if (decomp_thrd.joinable()) {
return false;
}
stop_thread_flag = false;
decomp_thrd = std::thread([this]() {
while (!stop_thread_flag) {
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
// if (cb_) cb_(1234);
}
});
return true;
}

bool stop_thread() {
if (!decomp_thrd.joinable()) {
return false;
}
stop_thread_flag = true;
decomp_thrd.join();
return true;
}

private:
std::string m_file_name;
std::size_t m_offset{0};
std::size_t m_current_container{0};
mio::mmap_source * m_mmap{nullptr};
FileHeaderType m_header;
std::thread decomp_thrd{};
std::mutex mtx;
std::queue<FrameVector> data_queue;
std::condition_variable data_cond;
std::atomic_bool stop_thread_flag{false};
std::atomic_bool request_decompression{false};
std::atomic_bool final_block{false};
};

#endif // __REKORDER_HPP

0 comments on commit 8dff312

Please sign in to comment.