diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 48c730f..8beffd7 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -187,7 +187,7 @@ run-regressiontests: - init-repo variables: #SCHEDULER_PARAMETERS: "-A hpc-prf-ekiapp -p normal -t 0:30:00 -N 1 -n 1 --cpus-per-task=2 --mem-per-cpu=2G" - SCHEDULER_PARAMETERS: "-A hpc-prf-ekiapp -p hacc -t 0:30:00" + SCHEDULER_PARAMETERS: "-A hpc-prf-ekiapp -p hacc -t 10:00:00" extends: .load-modules script: - export LD_LIBRARY_PATH="$(pwd)/build/libs:$LD_LIBRARY_PATH" @@ -202,7 +202,7 @@ run-regressiontests: - cd build/benchmarks - cp ../../example_networks/jet-structure-classification-with-host-mem/* . - - ../_deps/googlebenchmark-src/tools/compare.py -a -d results.json benchmarks ../../benchmarks/expectedPerformance.json ./SynchronousInferenceBenchmark | tee benchmark_output.txt + - ../_deps/googlebenchmark-src/tools/compare.py -a -d results.json benchmarks ../../benchmarks/expectedPerformance.json ./RegressionTest | tee benchmark_output.txt # Extract the OVERALL_GEOMEAN p-value and check if it exceeds the threshold - THRESHOLD=0.05 - GEOMEAN=$(grep "OVERALL_GEOMEAN" benchmark_output.txt) diff --git a/benchmarks/AsynchronousInferenceBenchmark.cpp b/benchmarks/AsynchronousInferenceBenchmark.cpp new file mode 100644 index 0000000..1068492 --- /dev/null +++ b/benchmarks/AsynchronousInferenceBenchmark.cpp @@ -0,0 +1,155 @@ +/** + * @file AsynchronousInferenceBenchmark.cpp + * @author Linus Jungemann (linus.jungemann@uni-paderborn.de) + * @brief Benchmarks the SynchronousInference Performance of the Driver + * @version 0.1 + * @date 2025-03-21 + * + * @copyright Copyright (c) 2025 + * @license All rights reserved. This program and the accompanying materials are made available under the terms of the MIT license. + * + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +template +using destribution_t = typename std::conditional_t, std::uniform_real_distribution, std::uniform_int_distribution>; + +using InputFinnType = Finn::DatatypeInt<8>; +using OutputFinnType = Finn::DatatypeInt<16>; + +namespace Finn { + template + using Driver = Finn::BaseDriver; +} // namespace Finn + +template +Finn::Driver createDriverFromConfig(const std::filesystem::path& configFilePath, unsigned int batchSize) { + return Finn::Driver(configFilePath, batchSize); +} + +static void BM_AsynchronousInferenceSingleThread(benchmark::State& state) { + const std::string exampleNetworkConfig = "jetConfig.json"; + const uint batchSize = static_cast(state.range(0)); + std::cout << "Running single-threaded benchmark with batch size: " << batchSize << std::endl; + auto driver = createDriverFromConfig(exampleNetworkConfig, batchSize); + using dtype = int8_t; + + // Create buffers for pipelining + std::vector inputBuffer(24 * batchSize); + + std::random_device rndDevice; + std::mt19937 mersenneEngine{rndDevice()}; + destribution_t dist{static_cast(InputFinnType().min()), static_cast(InputFinnType().max())}; + + // Fill all buffers with random data + std::generate(inputBuffer.begin(), inputBuffer.end(), [&dist, &mersenneEngine]() { return dist(mersenneEngine); }); + + // Warmup + driver.input(inputBuffer.begin(), inputBuffer.end()); + auto warmup = driver.getResults(); + benchmark::DoNotOptimize(warmup); + std::chrono::duration runtime = std::chrono::seconds(90); // Fixed runtime for the benchmark + + for (auto _ : state) { + size_t processedCount = 0; + + // Set a fixed time for the benchmark + const auto start = std::chrono::high_resolution_clock::now(); + + while (std::chrono::high_resolution_clock::now() - start < std::chrono::duration(runtime)) { + // Submit as many inputs as we have available buffers + driver.input(inputBuffer.begin(), inputBuffer.end()); + + // Retrieve results (this makes it single-threaded - we wait for results) + auto results = driver.getResults(); + benchmark::DoNotOptimize(results); + ++processedCount; + } + std::size_t infered = processedCount * batchSize; + + // Report items processed in this iteration + state.SetItemsProcessed(static_cast(infered)); + } +} + +// Register the function as a benchmark +BENCHMARK(BM_AsynchronousInferenceSingleThread)->RangeMultiplier(2)->Range(1, 4096)->Repetitions(5); + +static void BM_AsynchronousInferenceMultiThread(benchmark::State& state) { + const std::string exampleNetworkConfig = "jetConfig.json"; + const uint batchSize = static_cast(state.range(0)); + std::cout << "Running multi-threaded benchmark with batch size: " << batchSize << std::endl; + auto driver = createDriverFromConfig(exampleNetworkConfig, batchSize); + using dtype = int8_t; + + // Create buffers for pipelining + std::vector inputBuffer(24 * batchSize); + + std::random_device rndDevice; + std::mt19937 mersenneEngine{rndDevice()}; + destribution_t dist{static_cast(InputFinnType().min()), static_cast(InputFinnType().max())}; + + // Fill all buffers with random data + std::generate(inputBuffer.begin(), inputBuffer.end(), [&dist, &mersenneEngine]() { return dist(mersenneEngine); }); + + // Warmup + driver.input(inputBuffer.begin(), inputBuffer.end()); + auto warmup = driver.getResults(); + benchmark::DoNotOptimize(warmup); + std::chrono::duration runtime = std::chrono::seconds(90); // Fixed runtime for the benchmark + + for (auto _ : state) { + std::atomic processedCount = 0; + + // Start input thread that continuously submits new inputs + std::jthread inputThread([&](std::stop_token stoken) { + // Set a fixed time for the benchmark + while (!stoken.stop_requested()) { + driver.input(inputBuffer.begin(), inputBuffer.end()); + } + }); + + // Start output thread that retrieves results + std::jthread outputThread([&](std::stop_token stoken) { + // Set a fixed time for the benchmark + while (!stoken.stop_requested()) { + auto results = driver.getResults(); + benchmark::DoNotOptimize(results); + ++processedCount; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Make sure input thread is already exited + driver.drain(); // Drain any remaining results; might need to be accounted for in runtime for inf/s calculation + }); + + const auto start = std::chrono::high_resolution_clock::now(); + while (std::chrono::high_resolution_clock::now() - start < std::chrono::duration(runtime)) {} // Looks stupid, but is for some reason more reliable... + inputThread.request_stop(); // Stop input thread + outputThread.request_stop(); // Stop output thread + + inputThread.join(); + outputThread.join(); + std::size_t infered = processedCount * batchSize; + + // Report items processed in this iteration + state.SetItemsProcessed(static_cast(infered)); + } +} + +// Register the multi-threaded benchmark +BENCHMARK(BM_AsynchronousInferenceMultiThread)->RangeMultiplier(2)->Range(1, 4096)->Repetitions(5); + +BENCHMARK_MAIN(); diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index e1fdde4..6a57b05 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -6,4 +6,6 @@ set(FINN_BENCHMARK_DIR ${CMAKE_CURRENT_BINARY_DIR}) add_benchmark(DataPackingBenchmark.cpp) add_benchmark(DynamicMdSpanBenchmark.cpp) add_benchmark(SynchronousInferenceBenchmark.cpp) -add_benchmark(SPSCQueueBenchmark.cpp) \ No newline at end of file +add_benchmark(SPSCQueueBenchmark.cpp) +add_benchmark(AsynchronousInferenceBenchmark.cpp) +add_benchmark(RegressionTest.cpp) \ No newline at end of file diff --git a/benchmarks/RegressionTest.cpp b/benchmarks/RegressionTest.cpp new file mode 100644 index 0000000..10f5556 --- /dev/null +++ b/benchmarks/RegressionTest.cpp @@ -0,0 +1,67 @@ +/** + * @file RegressionTest.cpp + * @author Linus Jungemann (linus.jungemann@uni-paderborn.de) + * @brief Benchmarks the SynchronousInference Performance of the Driver + * @version 0.2 + * @date 2025-03-21 + * + * @copyright Copyright (c) 2025 + * @license All rights reserved. This program and the accompanying materials are made available under the terms of the MIT license. + * + */ + +#include + +#include +#include +#include +#include +#include +#include + +template +using destribution_t = typename std::conditional_t, std::uniform_real_distribution, std::uniform_int_distribution>; + +using InputFinnType = Finn::DatatypeInt<8>; +using OutputFinnType = Finn::DatatypeInt<16>; + +namespace Finn { + template + using Driver = Finn::BaseDriver; +} // namespace Finn + +template +Finn::Driver createDriverFromConfig(const std::filesystem::path& configFilePath, unsigned int batchSize) { + return Finn::Driver(configFilePath, batchSize); +} + +static void BM_SynchronousInference(benchmark::State& state) { + const std::string exampleNetworkConfig = "jetConfig.json"; + const uint batchSize = static_cast(state.range(0)); + auto driver = createDriverFromConfig(exampleNetworkConfig, batchSize); + using dtype = int8_t; + Finn::vector testInputs(24 * batchSize); + + std::random_device rndDevice; + std::mt19937 mersenneEngine{rndDevice()}; // Generates random integers + + destribution_t dist{static_cast(InputFinnType().min()), static_cast(InputFinnType().max())}; + + auto gen = [&dist, &mersenneEngine]() { return dist(mersenneEngine); }; + + // Warmup + std::fill(testInputs.begin(), testInputs.end(), 1); + auto warmup = driver.inferSynchronous(testInputs.begin(), testInputs.end()); + benchmark::DoNotOptimize(warmup); + + std::generate(testInputs.begin(), testInputs.end(), gen); + for (auto _ : state) { + auto ret = driver.inferSynchronous(testInputs.begin(), testInputs.end()); + benchmark::DoNotOptimize(ret); + benchmark::ClobberMemory(); + } +} +// Register the function as a benchmark +BENCHMARK(BM_SynchronousInference)->Iterations(1000000)->RangeMultiplier(2)->Range(1, 4 << 10)->Repetitions(10); + +BENCHMARK_MAIN(); \ No newline at end of file diff --git a/benchmarks/SPSCQueueBenchmark.cpp b/benchmarks/SPSCQueueBenchmark.cpp index a881f56..6ec080a 100644 --- a/benchmarks/SPSCQueueBenchmark.cpp +++ b/benchmarks/SPSCQueueBenchmark.cpp @@ -39,8 +39,8 @@ static void BM_TrivialEnqueueDequeue(benchmark::State& state) { } } - state.SetItemsProcessed(static_cast(state.iterations() * operations_per_iteration * 2)); // enqueue + dequeue - state.SetBytesProcessed(static_cast(state.iterations() * operations_per_iteration * sizeof(int) * 2)); + state.SetItemsProcessed(state.iterations() * static_cast(operations_per_iteration) * 2); // enqueue + dequeue + state.SetBytesProcessed(state.iterations() * static_cast(operations_per_iteration * sizeof(int) * 2)); } // Benchmark for non-trivial type enqueue/dequeue operations @@ -76,8 +76,8 @@ static void BM_NonTrivialEnqueueDequeue(benchmark::State& state) { } } - state.SetItemsProcessed(static_cast(state.iterations() * operations_per_iteration * 2)); - state.SetBytesProcessed(static_cast(state.iterations() * operations_per_iteration * testString.size() * 2)); + state.SetItemsProcessed(state.iterations() * static_cast(operations_per_iteration) * 2); + state.SetBytesProcessed(state.iterations() * static_cast(operations_per_iteration * testString.size() * 2)); } // Benchmark for multi-threaded producer-consumer pattern @@ -99,7 +99,7 @@ static void BM_ProducerConsumer(benchmark::State& state) { } // Use a smaller maximum to avoid potential deadlocks - const size_t num_items = std::min(static_cast(state.range(0)), queue.capacity() * 5); + const size_t num_items = std::min(static_cast(state.range(0)), static_cast(state.range(1)) * 5); // Start timing again before creating threads state.ResumeTiming(); @@ -145,12 +145,14 @@ static void BM_ProducerConsumer(benchmark::State& state) { } } - const size_t num_items = std::min(static_cast(state.range(0)), QueueSize * 5); - state.SetItemsProcessed(static_cast(state.iterations() * num_items * 2)); + const size_t num_items = std::min(static_cast(state.range(0)), static_cast(state.range(1)) * 5); + + state.SetItemsProcessed(state.iterations() * static_cast(num_items) * 2); if constexpr (std::is_same_v) { - state.SetBytesProcessed(static_cast(state.iterations() * num_items * sizeof("test-string") * 2)); + constexpr size_t string_size = sizeof("test-string"); + state.SetBytesProcessed(state.iterations() * static_cast(num_items * string_size * 2)); } else { - state.SetBytesProcessed(static_cast(state.iterations() * num_items * sizeof(T) * 2)); + state.SetBytesProcessed(state.iterations() * static_cast(num_items * sizeof(T) * 2)); } } @@ -191,8 +193,8 @@ static void BM_BulkDequeue(benchmark::State& state) { } } - state.SetItemsProcessed(static_cast(state.iterations() * num_items)); - state.SetBytesProcessed(static_cast(state.iterations() * num_items * sizeof(int))); + state.SetItemsProcessed(state.iterations() * static_cast(num_items)); + state.SetBytesProcessed(state.iterations() * static_cast(num_items * sizeof(int))); } // Benchmark comparing individual vs bulk dequeue @@ -204,7 +206,7 @@ static void BM_IndividualVsBulkDequeue(benchmark::State& state) { // Make sure we don't exceed queue capacity const int total_items = std::min(10000, static_cast(queue.capacity())); const int bulk_size = std::min(100, total_items); - std::vector items(bulk_size); + std::vector items(static_cast(bulk_size)); for (auto _ : state) { state.PauseTiming(); @@ -245,8 +247,8 @@ static void BM_IndividualVsBulkDequeue(benchmark::State& state) { } } - state.SetItemsProcessed(static_cast(state.iterations() * total_items)); - state.SetBytesProcessed(static_cast(state.iterations() * total_items * sizeof(int))); + state.SetItemsProcessed(state.iterations() * static_cast(total_items)); + state.SetBytesProcessed(state.iterations() * static_cast(total_items) * static_cast(sizeof(int))); } // Benchmark for latency measurement using std::chrono instead of cycleclock @@ -269,7 +271,7 @@ static void BM_EnqueueDequeueLatency(benchmark::State& state) { } auto duration = std::chrono::duration_cast(end - start); - state.SetIterationTime(duration.count() / 1e9); + state.SetIterationTime(static_cast(duration.count()) / 1e9); } } @@ -317,7 +319,7 @@ static void BM_EmplaceVsEnqueue(benchmark::State& state) { } } - state.SetItemsProcessed(static_cast(state.iterations() * num_items * 2)); + state.SetItemsProcessed(state.iterations() * static_cast(num_items) * 2); } // Benchmark for bulk enqueue operations @@ -362,8 +364,8 @@ static void BM_BulkEnqueue(benchmark::State& state) { benchmark::DoNotOptimize(total_enqueued); } - state.SetItemsProcessed(static_cast(state.iterations() * bulk_size * num_operations)); - state.SetBytesProcessed(static_cast(state.iterations() * bulk_size * num_operations * sizeof(int))); + state.SetItemsProcessed(state.iterations() * static_cast(bulk_size * num_operations)); + state.SetBytesProcessed(state.iterations() * static_cast(bulk_size * num_operations * sizeof(int))); } // Benchmark comparing individual vs bulk enqueue @@ -375,11 +377,11 @@ static void BM_IndividualVsBulkEnqueue(benchmark::State& state) { // Make sure we don't exceed queue capacity const int total_items = std::min(10000, static_cast(queue.capacity())); const int bulk_size = std::min(100, total_items); - std::vector items(bulk_size); + std::vector items(static_cast(bulk_size)); // Fill the items vector with test data for (int i = 0; i < bulk_size; ++i) { - items[i] = i; + items[static_cast(i)] = i; } for (auto _ : state) { @@ -392,8 +394,8 @@ static void BM_IndividualVsBulkEnqueue(benchmark::State& state) { size_t enqueued = 0; if (use_bulk) { // Bulk enqueue - for (size_t i = 0; i < total_items; i += bulk_size) { - size_t batch_size = std::min(static_cast(bulk_size), static_cast(total_items) - i); + for (int i = 0; i < total_items; i += bulk_size) { + size_t batch_size = std::min(static_cast(bulk_size), static_cast(total_items - i)); size_t batch_enqueued = queue.try_enqueue_bulk(items.begin(), batch_size); if (batch_enqueued < batch_size) break; // Stop if queue is full @@ -418,8 +420,8 @@ static void BM_IndividualVsBulkEnqueue(benchmark::State& state) { benchmark::DoNotOptimize(enqueued); } - state.SetItemsProcessed(static_cast(state.iterations() * total_items)); - state.SetBytesProcessed(static_cast(state.iterations() * total_items * sizeof(int))); + state.SetItemsProcessed(state.iterations() * static_cast(total_items)); + state.SetBytesProcessed(state.iterations() * static_cast(total_items) * static_cast(sizeof(int))); } // Benchmark for blocking bulk enqueue with varying queue fullness @@ -428,8 +430,8 @@ static void BM_BlockingBulkEnqueue(benchmark::State& state) { SPSCQueue queue; // Pre-fill the queue to a certain percentage of capacity - const double fill_percentage = state.range(0) / 100.0; - const size_t fill_count = static_cast(queue.capacity() * fill_percentage); + const double fill_percentage = static_cast(state.range(0)) / 100.0; + const size_t fill_count = static_cast(static_cast(queue.capacity()) * fill_percentage); // Calculate how many more items we can safely enqueue // Add 1 to ensure we have at least one item to enqueue @@ -467,8 +469,8 @@ static void BM_BlockingBulkEnqueue(benchmark::State& state) { benchmark::DoNotOptimize(enqueued); } - state.SetItemsProcessed(static_cast(state.iterations() * batch_size)); - state.SetBytesProcessed(static_cast(state.iterations() * batch_size * sizeof(int))); + state.SetItemsProcessed(state.iterations() * static_cast(batch_size)); + state.SetBytesProcessed(state.iterations() * static_cast(batch_size * sizeof(int))); } // Register the benchmarks @@ -525,4 +527,502 @@ BENCHMARK(BM_BlockingBulkEnqueue<1024>) ->Arg(75) // Queue 75% full ->Arg(95); // Queue 95% full +// Benchmark for trivial type enqueue/dequeue operations with DynamicSPSCQueue +static void BM_DynamicTrivialEnqueueDequeue(benchmark::State& state) { + DynamicSPSCQueue queue(static_cast(state.range(1))); // Use second range value for capacity + // Ensure we don't exceed queue capacity + const size_t operations_per_iteration = std::min(static_cast(state.range(0)), queue.capacity()); + + for (auto _ : state) { + // Enqueue phase + size_t enqueued = 0; + for (size_t i = 0; i < operations_per_iteration; ++i) { + if (queue.try_enqueue(static_cast(i))) { + enqueued++; + } + } + + // Dequeue phase + int item; + size_t dequeued = 0; + for (size_t i = 0; i < enqueued; ++i) { + if (queue.try_dequeue(item)) { + dequeued++; + } + } + + // Make sure we didn't lose any items + if (enqueued != dequeued) { + state.SkipWithError("Enqueue/dequeue count mismatch"); + break; + } + } + + state.SetItemsProcessed(state.iterations() * static_cast(operations_per_iteration) * 2); // enqueue + dequeue + state.SetBytesProcessed(state.iterations() * static_cast(operations_per_iteration * sizeof(int) * 2)); +} + +// Benchmark for non-trivial type enqueue/dequeue operations with DynamicSPSCQueue +static void BM_DynamicNonTrivialEnqueueDequeue(benchmark::State& state) { + DynamicSPSCQueue queue(static_cast(state.range(1))); // Use second range value for capacity + std::string testString = "benchmark-test-string"; + // Ensure we don't exceed queue capacity + const size_t operations_per_iteration = std::min(static_cast(state.range(0)), queue.capacity()); + + for (auto _ : state) { + // Enqueue phase + size_t enqueued = 0; + for (size_t i = 0; i < operations_per_iteration; ++i) { + if (queue.try_enqueue(testString)) { + enqueued++; + } + } + + // Dequeue phase + std::string item; + size_t dequeued = 0; + for (size_t i = 0; i < enqueued; ++i) { + if (queue.try_dequeue(item)) { + dequeued++; + } + } + + // Make sure we didn't lose any items + if (enqueued != dequeued) { + state.SkipWithError("Enqueue/dequeue count mismatch"); + break; + } + } + + state.SetItemsProcessed(state.iterations() * static_cast(operations_per_iteration) * 2); + state.SetBytesProcessed(state.iterations() * static_cast(operations_per_iteration * testString.size() * 2)); +} + +// Benchmark for multi-threaded producer-consumer pattern with DynamicSPSCQueue +template +static void BM_DynamicProducerConsumer(benchmark::State& state) { + for (auto _ : state) { + state.PauseTiming(); + DynamicSPSCQueue queue(static_cast(state.range(1))); // Use second range value for capacity + std::atomic producer_done{false}; + std::atomic items_produced{0}; + std::atomic items_consumed{0}; + + // Initialize value based on type + T value; + if constexpr (std::is_same_v) { + value = 42; // For int type + } else if constexpr (std::is_same_v) { + value = "test-string"; // For string type + } + + // Use a smaller maximum to avoid potential deadlocks + const size_t num_items = std::min(static_cast(state.range(0)), static_cast(state.range(1)) * 5); + + // Start timing again before creating threads + state.ResumeTiming(); + + // Producer thread - uses non-blocking enqueue to avoid deadlocks + std::thread producer([&queue, &producer_done, &items_produced, &num_items, value]() { + while (items_produced.load(std::memory_order_relaxed) < num_items) { + if (queue.try_enqueue(value)) { + items_produced.fetch_add(1, std::memory_order_relaxed); + } else { + // Small yield to prevent busy waiting + std::this_thread::yield(); + } + } + producer_done.store(true, std::memory_order_release); + }); + + // Consumer thread + std::thread consumer([&queue, &producer_done, &items_consumed, &num_items, &items_produced]() { + T item; + while (items_consumed.load(std::memory_order_relaxed) < num_items) { + if (queue.try_dequeue(item)) { + items_consumed.fetch_add(1, std::memory_order_relaxed); + } else if (producer_done.load(std::memory_order_acquire) && items_consumed.load(std::memory_order_relaxed) >= items_produced.load(std::memory_order_relaxed)) { + // All items have been produced and consumed + break; + } else { + // Small yield to prevent busy waiting + std::this_thread::yield(); + } + } + }); + + producer.join(); + consumer.join(); + + // Verify all items were processed + benchmark::DoNotOptimize(items_consumed.load(std::memory_order_relaxed)); + + if (items_consumed.load(std::memory_order_relaxed) != num_items) { + state.SkipWithError("Not all items were processed"); + break; + } + } + + const size_t num_items = std::min(static_cast(state.range(0)), static_cast(state.range(1)) * 5); + + state.SetItemsProcessed(state.iterations() * static_cast(num_items) * 2); + if constexpr (std::is_same_v) { + constexpr size_t string_size = sizeof("test-string"); + state.SetBytesProcessed(state.iterations() * static_cast(num_items * string_size * 2)); + } else { + state.SetBytesProcessed(state.iterations() * static_cast(num_items * sizeof(T) * 2)); + } +} + +// Benchmark for bulk dequeue operations with DynamicSPSCQueue +static void BM_DynamicBulkDequeue(benchmark::State& state) { + DynamicSPSCQueue queue(static_cast(state.range(2))); // Use third range value for capacity + const auto bulk_size = static_cast(state.range(1)); + std::vector items(bulk_size); + + // Ensure we don't exceed queue capacity + const size_t num_items = std::min(static_cast(state.range(0)), queue.capacity()); + + for (auto _ : state) { + state.PauseTiming(); + // Fill the queue + size_t enqueued = 0; + for (size_t i = 0; i < num_items; ++i) { + if (queue.try_enqueue(static_cast(i))) { + enqueued++; + } + } + state.ResumeTiming(); + + // Dequeue in bulk + size_t total_dequeued = 0; + while (total_dequeued < enqueued) { + size_t batch_size = std::min(bulk_size, enqueued - total_dequeued); + size_t dequeued = queue.try_dequeue_bulk(items.begin(), batch_size); + if (dequeued == 0) + break; // Avoid infinite loop if dequeue fails + total_dequeued += dequeued; + } + + if (total_dequeued != enqueued) { + state.SkipWithError("Not all items were dequeued"); + break; + } + } + + state.SetItemsProcessed(state.iterations() * static_cast(num_items)); + state.SetBytesProcessed(state.iterations() * static_cast(num_items * sizeof(int))); +} + +// Benchmark comparing individual vs bulk dequeue with DynamicSPSCQueue +static void BM_DynamicIndividualVsBulkDequeue(benchmark::State& state) { + DynamicSPSCQueue queue(1024); // Fixed capacity for this test + const bool use_bulk = state.range(0) == 1; + + // Make sure we don't exceed queue capacity + const int total_items = std::min(10000, static_cast(queue.capacity())); + const int bulk_size = std::min(100, total_items); + std::vector items(static_cast(bulk_size)); + + for (auto _ : state) { + state.PauseTiming(); + // Fill the queue + size_t enqueued = 0; + for (int i = 0; i < total_items; ++i) { + if (queue.try_enqueue(i)) { + enqueued++; + } + } + state.ResumeTiming(); + + size_t dequeued = 0; + if (use_bulk) { + // Bulk dequeue + while (dequeued < enqueued) { + size_t batch_size = std::min(static_cast(bulk_size), enqueued - dequeued); + size_t batch_dequeued = queue.try_dequeue_bulk(items.begin(), batch_size); + if (batch_dequeued == 0) + break; // Avoid infinite loop + dequeued += batch_dequeued; + } + } else { + // Individual dequeue + int item; + for (size_t i = 0; i < enqueued; ++i) { + if (queue.try_dequeue(item)) { + dequeued++; + } else { + break; // Stop if dequeue fails + } + } + } + + if (dequeued != enqueued) { + state.SkipWithError("Not all items were dequeued"); + break; + } + } + + state.SetItemsProcessed(state.iterations() * static_cast(total_items)); + state.SetBytesProcessed(state.iterations() * static_cast(total_items) * static_cast(sizeof(int))); +} + +// Benchmark for latency measurement with DynamicSPSCQueue +static void BM_DynamicEnqueueDequeueLatency(benchmark::State& state) { + DynamicSPSCQueue queue(static_cast(state.range(0))); // Use range value for capacity + + for (auto _ : state) { + auto start = std::chrono::high_resolution_clock::now(); + bool enq_success = queue.try_enqueue(0); + + int64_t item = 0; + bool deq_success = queue.try_dequeue(item); + + auto end = std::chrono::high_resolution_clock::now(); + + if (!enq_success || !deq_success) { + state.SkipWithError("Enqueue or dequeue failed"); + break; + } + + auto duration = std::chrono::duration_cast(end - start); + state.SetIterationTime(static_cast(duration.count()) / 1e9); + } +} + +// Benchmark for emplace performance with DynamicSPSCQueue +static void BM_DynamicEmplaceVsEnqueue(benchmark::State& state) { + DynamicSPSCQueue> queue(static_cast(state.range(1))); // Use second range value for capacity + const bool use_emplace = state.range(0) == 1; + + // Make sure we don't exceed queue capacity + const auto num_items = static_cast(std::min(static_cast(state.range(2)), queue.capacity())); + + for (auto _ : state) { + // Track successful operations + size_t enqueued = 0; + + if (use_emplace) { + // Use emplace + for (int i = 0; i < num_items; ++i) { + if (queue.try_emplace(i, "test-string")) { + enqueued++; + } + } + } else { + // Use regular enqueue with constructor + for (int i = 0; i < num_items; ++i) { + if (queue.try_enqueue(std::make_pair(i, "test-string"))) { + enqueued++; + } + } + } + + // Dequeue all items + std::pair item; + size_t dequeued = 0; + for (size_t i = 0; i < enqueued; ++i) { + if (queue.try_dequeue(item)) { + dequeued++; + } + } + + if (dequeued != enqueued) { + state.SkipWithError("Not all items were dequeued"); + break; + } + } + + state.SetItemsProcessed(state.iterations() * static_cast(num_items) * 2); +} + +// Benchmark for bulk enqueue operations with DynamicSPSCQueue +static void BM_DynamicBulkEnqueue(benchmark::State& state) { + DynamicSPSCQueue queue(static_cast(state.range(2))); // Use third range value for capacity + const auto bulk_size = static_cast(state.range(1)); + std::vector items(bulk_size); + + // Fill the items vector with test data + for (size_t i = 0; i < bulk_size; ++i) { + items[i] = static_cast(i); + } + + // Ensure we don't exceed queue capacity + const size_t num_operations = std::min(static_cast(state.range(0)), queue.capacity() / bulk_size); + + for (auto _ : state) { + state.PauseTiming(); + // Clear the queue before each measurement + int temp; + while (queue.try_dequeue(temp)) {} + state.ResumeTiming(); + + // Enqueue in bulk + size_t total_enqueued = 0; + for (size_t i = 0; i < num_operations; ++i) { + size_t enqueued = queue.try_enqueue_bulk(items.begin(), bulk_size); + total_enqueued += enqueued; + + // If we couldn't enqueue the full batch, break to avoid infinite loop + if (enqueued < bulk_size) + break; + } + + // Make sure we dequeue everything for the next iteration + state.PauseTiming(); + while (queue.try_dequeue(temp)) {} + state.ResumeTiming(); + + // Record how many items we processed + benchmark::DoNotOptimize(total_enqueued); + } + + state.SetItemsProcessed(state.iterations() * static_cast(bulk_size * num_operations)); + state.SetBytesProcessed(state.iterations() * static_cast(bulk_size * num_operations * sizeof(int))); +} + +// Benchmark comparing individual vs bulk enqueue with DynamicSPSCQueue +static void BM_DynamicIndividualVsBulkEnqueue(benchmark::State& state) { + DynamicSPSCQueue queue(1024); // Fixed capacity for this test + const bool use_bulk = state.range(0) == 1; + + // Make sure we don't exceed queue capacity + const int total_items = std::min(10000, static_cast(queue.capacity())); + const int bulk_size = std::min(100, total_items); + std::vector items(static_cast(bulk_size)); + + // Fill the items vector with test data + for (int i = 0; i < bulk_size; ++i) { + items[static_cast(i)] = i; + } + + for (auto _ : state) { + state.PauseTiming(); + // Clear the queue before each measurement + int temp; + while (queue.try_dequeue(temp)) {} + state.ResumeTiming(); + + size_t enqueued = 0; + if (use_bulk) { + // Bulk enqueue + for (int i = 0; i < total_items; i += bulk_size) { + size_t batch_size = std::min(static_cast(bulk_size), static_cast(total_items - i)); + size_t batch_enqueued = queue.try_enqueue_bulk(items.begin(), batch_size); + if (batch_enqueued < batch_size) + break; // Stop if queue is full + enqueued += batch_enqueued; + } + } else { + // Individual enqueue + for (int i = 0; i < total_items; ++i) { + if (queue.try_enqueue(i)) { + enqueued++; + } else { + break; // Stop if queue is full + } + } + } + + // Empty the queue for the next iteration + state.PauseTiming(); + while (queue.try_dequeue(temp)) {} + state.ResumeTiming(); + + benchmark::DoNotOptimize(enqueued); + } + + state.SetItemsProcessed(state.iterations() * static_cast(total_items)); + state.SetBytesProcessed(state.iterations() * static_cast(total_items) * static_cast(sizeof(int))); +} + +// Benchmark for blocking bulk enqueue with varying queue fullness with DynamicSPSCQueue +static void BM_DynamicBlockingBulkEnqueue(benchmark::State& state) { + DynamicSPSCQueue queue(static_cast(state.range(1))); // Use second range value for capacity + + // Pre-fill the queue to a certain percentage of capacity + const double fill_percentage = static_cast(state.range(0)) / 100.0; + const size_t fill_count = static_cast(static_cast(queue.capacity()) * fill_percentage); + + // Calculate how many more items we can safely enqueue + // Add 1 to ensure we have at least one item to enqueue + const size_t max_safe_to_enqueue = std::max(1, queue.capacity() - fill_count); + // Limit batch size to avoid deadlocks + const size_t batch_size = std::min(50, max_safe_to_enqueue); + + std::vector items(batch_size); + + // Fill the items vector with test data + for (size_t i = 0; i < items.size(); ++i) { + items[i] = static_cast(i); + } + + for (auto _ : state) { + state.PauseTiming(); + // Clear the queue + int temp; + while (queue.try_dequeue(temp)) {} + + // Pre-fill the queue to the specified percentage + for (size_t i = 0; i < fill_count; ++i) { + queue.try_enqueue(static_cast(i)); + } + state.ResumeTiming(); + + // Perform blocking bulk enqueue operation with a timeout to prevent deadlocks + size_t enqueued = queue.enqueue_bulk_for(items.begin(), items.size(), std::chrono::milliseconds(100)); + + // Empty the queue for the next iteration + state.PauseTiming(); + while (queue.try_dequeue(temp)) {} + state.ResumeTiming(); + + benchmark::DoNotOptimize(enqueued); + } + + state.SetItemsProcessed(state.iterations() * static_cast(batch_size)); + state.SetBytesProcessed(state.iterations() * static_cast(batch_size * sizeof(int))); +} + +// Register dynamic benchmark variants +BENCHMARK(BM_DynamicTrivialEnqueueDequeue)->Args({10, 16})->Args({100, 128})->Args({1000, 1024}); + +BENCHMARK(BM_DynamicNonTrivialEnqueueDequeue)->Args({10, 16})->Args({100, 128})->Args({1000, 1024}); + +BENCHMARK(BM_DynamicProducerConsumer)->Args({1000, 128})->Args({10000, 128})->Args({100000, 1024}); + +BENCHMARK(BM_DynamicProducerConsumer)->Args({1000, 128})->Args({10000, 128})->Args({100000, 1024}); + +BENCHMARK(BM_DynamicBulkDequeue)->Args({10000, 1, 1024})->Args({10000, 10, 1024})->Args({10000, 50, 1024})->Args({10000, 100, 1024}); + +BENCHMARK(BM_DynamicIndividualVsBulkDequeue) + ->Arg(0) // Use individual dequeue + ->Arg(1); // Use bulk dequeue + +BENCHMARK(BM_DynamicEnqueueDequeueLatency)->Arg(16)->Arg(128)->Arg(1024)->UseRealTime(); + +BENCHMARK(BM_DynamicEmplaceVsEnqueue) + ->Args({0, 128, 1000}) // Regular enqueue, capacity 128, 1000 items + ->Args({1, 128, 1000}); // Emplace, capacity 128, 1000 items + +BENCHMARK(BM_DynamicBulkEnqueue)->Args({100, 1, 1024})->Args({100, 10, 1024})->Args({100, 50, 1024})->Args({100, 100, 1024}); + +BENCHMARK(BM_DynamicIndividualVsBulkEnqueue) + ->Arg(0) // Use individual enqueue + ->Arg(1); // Use bulk enqueue + +BENCHMARK(BM_DynamicBlockingBulkEnqueue) + ->Args({0, 128}) // Queue 0% full, capacity 128 + ->Args({25, 128}) // Queue 25% full, capacity 128 + ->Args({50, 128}) // Queue 50% full, capacity 128 + ->Args({75, 128}) // Queue 75% full, capacity 128 + ->Args({95, 128}); // Queue 95% full, capacity 128 + +BENCHMARK(BM_DynamicBlockingBulkEnqueue) + ->Args({0, 1024}) // Queue 0% full, capacity 1024 + ->Args({25, 1024}) // Queue 25% full, capacity 1024 + ->Args({50, 1024}) // Queue 50% full, capacity 1024 + ->Args({75, 1024}) // Queue 75% full, capacity 1024 + ->Args({95, 1024}); // Queue 95% full, capacity 1024 + BENCHMARK_MAIN(); \ No newline at end of file diff --git a/benchmarks/SynchronousInferenceBenchmark.cpp b/benchmarks/SynchronousInferenceBenchmark.cpp index 62ca772..48f9eae 100644 --- a/benchmarks/SynchronousInferenceBenchmark.cpp +++ b/benchmarks/SynchronousInferenceBenchmark.cpp @@ -35,34 +35,48 @@ Finn::Driver createDriverFromConfig(const std::filesystem: return Finn::Driver(configFilePath, batchSize); } -static void BM_SynchronousInference(benchmark::State& state) { +static void BM_SynchronousInferenceSingleThread(benchmark::State& state) { const std::string exampleNetworkConfig = "jetConfig.json"; const uint batchSize = static_cast(state.range(0)); + std::cout << "Running single-threaded benchmark with batch size: " << batchSize << std::endl; auto driver = createDriverFromConfig(exampleNetworkConfig, batchSize); using dtype = int8_t; - Finn::vector testInputs(24 * batchSize); - std::random_device rndDevice; - std::mt19937 mersenneEngine{rndDevice()}; // Generates random integers + // Create buffers for pipelining + std::vector inputBuffer(24 * batchSize); + std::random_device rndDevice; + std::mt19937 mersenneEngine{rndDevice()}; destribution_t dist{static_cast(InputFinnType().min()), static_cast(InputFinnType().max())}; - auto gen = [&dist, &mersenneEngine]() { return dist(mersenneEngine); }; + // Fill all buffers with random data + std::generate(inputBuffer.begin(), inputBuffer.end(), [&dist, &mersenneEngine]() { return dist(mersenneEngine); }); // Warmup - std::fill(testInputs.begin(), testInputs.end(), 1); - auto warmup = driver.inferSynchronous(testInputs.begin(), testInputs.end()); + auto warmup = driver.inferSynchronous(inputBuffer.begin(), inputBuffer.end()); benchmark::DoNotOptimize(warmup); - std::generate(testInputs.begin(), testInputs.end(), gen); + std::chrono::duration runtime = std::chrono::seconds(90); // Fixed runtime for the benchmark + for (auto _ : state) { - auto ret = driver.inferSynchronous(testInputs.begin(), testInputs.end()); - benchmark::DoNotOptimize(ret); - benchmark::ClobberMemory(); + std::size_t processedCount = 0; + + // Set a fixed time for the benchmark + const auto start = std::chrono::high_resolution_clock::now(); + + while (std::chrono::high_resolution_clock::now() - start < std::chrono::duration(runtime)) { + auto results = driver.inferSynchronous(inputBuffer.begin(), inputBuffer.end()); + benchmark::DoNotOptimize(results); + ++processedCount; + } + std::size_t infered = processedCount * batchSize; + + // Report items processed in this iteration + state.SetItemsProcessed(static_cast(infered)); } } -// Register the function as a benchmark -BENCHMARK(BM_SynchronousInference)->Iterations(1000000)->RangeMultiplier(2)->Range(1, 4 << 10)->Repetitions(10); +// Register the function as a benchmark +BENCHMARK(BM_SynchronousInferenceSingleThread)->RangeMultiplier(2)->Range(1, 4096)->Repetitions(5); BENCHMARK_MAIN(); diff --git a/cmake/AddBenchmark.cmake b/cmake/AddBenchmark.cmake index b93ab65..4c16c5d 100644 --- a/cmake/AddBenchmark.cmake +++ b/cmake/AddBenchmark.cmake @@ -14,8 +14,8 @@ function(add_benchmark benchmark_name) target_link_libraries(${benchmark} PUBLIC finnc_options - finnc_core_test - xrt_mock + finnc_core + xrt_coreutil benchmark::benchmark OpenMP::OpenMP_CXX ) diff --git a/integrationtest/AsyncInference.cpp b/integrationtest/AsyncInference.cpp index 6d92570..aa49c65 100644 --- a/integrationtest/AsyncInference.cpp +++ b/integrationtest/AsyncInference.cpp @@ -14,6 +14,9 @@ #include #include #include +#include +#include +#include #include #include @@ -38,10 +41,9 @@ TEST(AsyncInference, asyncInferenceTest) { // Run inference driver.input(data.begin(), data.end()); - std::this_thread::sleep_for(200ms); - auto results = driver.getResults(); + auto results = driver.getResults(); // This should block until the results are available - Finn::vector expectedResults = { 98, 50, 65476, 65493, 27 }; + Finn::vector expectedResults = {98, 50, 65476, 65493, 27}; EXPECT_EQ(results, expectedResults); } @@ -50,22 +52,27 @@ TEST(AsyncInference, asyncBatchInferenceTest) { std::string exampleNetworkConfig = "jetConfig.json"; Finn::Config conf = Finn::createConfigFromPath(exampleNetworkConfig); std::size_t batchLength = 10; + std::atomic availableData(0); + std::condition_variable cv; + std::mutex m; - auto driver = Finn::Driver(conf, 0, conf.deviceWrappers[0].idmas[0]->kernelName, 0, conf.deviceWrappers[0].odmas[0]->kernelName, batchLength); + + // BUG HIER IRGENDWO SODASS FEATUREMAPSIZE UND TOTALDATASIZE GLEICH SIND + auto driver = Finn::Driver(conf, 0, conf.deviceWrappers[0].idmas[0]->kernelName, 0, conf.deviceWrappers[0].odmas[0]->kernelName, static_cast(batchLength)); Finn::vector data(driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName) * batchLength, 1); for (std::size_t i = 0; i < batchLength; ++i) { std::iota(data.begin() + static_cast(i * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), - data.begin() + static_cast((i + 1) * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), -127+static_cast(i)); + data.begin() + static_cast((i + 1) * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), -127 + static_cast(i)); } // Run inference driver.input(data.begin(), data.end()); - std::this_thread::sleep_for(1000ms); - auto results = driver.getResults(); + auto results = driver.getResults(); // This should block until the results are available - Finn::vector expectedResults = { 98,50,65476,65493,27,98,50,65476,65493,27,98,50,65476,65493,27,98,50,65476,65493,27,98,50,65476,65493,27,95,61,65483,65491,12,98,50,65476,65493,27,92,53,65483,65498,15,92,53,65483,65498,15,86,53,65489,65498,9 }; + Finn::vector expectedResults = {98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, + 95, 61, 65483, 65491, 12, 98, 50, 65476, 65493, 27, 92, 53, 65483, 65498, 15, 92, 53, 65483, 65498, 15, 86, 53, 65489, 65498, 9}; EXPECT_EQ(results.size(), expectedResults.size()); @@ -73,13 +80,22 @@ TEST(AsyncInference, asyncBatchInferenceTest) { for (std::size_t i = 0; i < batchLength; ++i) { std::iota(data.begin() + static_cast(i * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), - data.begin() + static_cast((i + 1) * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), -127+static_cast(i)); + data.begin() + static_cast((i + 1) * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), -127 + static_cast(i)); } + driver.registerCallback(0, conf.deviceWrappers[0].odmas[0]->kernelName, [&availableData, &cv](std::size_t numItems) { + availableData += numItems; + cv.notify_all(); + }); + // Run inference driver.input(data.begin(), data.end()); - std::this_thread::sleep_for(1000ms); results.clear(); + + // wait until output thread notifies + std::unique_lock lk(m); + cv.wait(lk, [&availableData, &driver, &conf] { return availableData >= driver.getTotalDataSize(0, conf.deviceWrappers[0].odmas[0]->kernelName); }); + results = driver.getResults(); EXPECT_EQ(results.size(), expectedResults.size()); diff --git a/integrationtest/SyncInference.cpp b/integrationtest/SyncInference.cpp index 7cb8eb9..0779762 100644 --- a/integrationtest/SyncInference.cpp +++ b/integrationtest/SyncInference.cpp @@ -51,13 +51,14 @@ TEST(SyncInference, syncBatchInferenceTest) { for (std::size_t i = 0; i < batchLength; ++i) { std::iota(data.begin() + static_cast(i * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), - data.begin() + static_cast((i + 1) * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), -127 + static_cast(i)); + data.begin() + static_cast((i + 1) * driver.getFeatureMapSize(0, conf.deviceWrappers[0].idmas[0]->kernelName)), -127 + static_cast(i)); } // Run inference auto results = driver.inferSynchronous(data.begin(), data.end()); - Finn::vector expectedResults = { 98,50,65476,65493,27,98,50,65476,65493,27,98,50,65476,65493,27,98,50,65476,65493,27,98,50,65476,65493,27,95,61,65483,65491,12,98,50,65476,65493,27,92,53,65483,65498,15,92,53,65483,65498,15,86,53,65489,65498,9}; + Finn::vector expectedResults = {98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, 98, 50, 65476, 65493, 27, + 95, 61, 65483, 65491, 12, 98, 50, 65476, 65493, 27, 92, 53, 65483, 65498, 15, 92, 53, 65483, 65498, 15, 86, 53, 65489, 65498, 9}; EXPECT_EQ(results, expectedResults); } diff --git a/src/FINNCppDriver/FINNDriver.cpp b/src/FINNCppDriver/FINNDriver.cpp index 9540595..7998ff3 100644 --- a/src/FINNCppDriver/FINNDriver.cpp +++ b/src/FINNCppDriver/FINNDriver.cpp @@ -130,6 +130,14 @@ Finn::Driver createDriverFromConfig(const std::filesystem: template using destribution_t = typename std::conditional_t, std::uniform_real_distribution, std::uniform_int_distribution>; +/** + * @brief Implementation function for running throughput tests + * + * @tparam T Data type for the test inputs + * @param baseDriver Reference to the FINN driver + * @param elementCount Number of elements in test data + * @param batchSize Batch size for inference + */ template void runThroughputTestImpl(Finn::Driver& baseDriver, std::size_t elementCount, uint batchSize) { using dtype = T; @@ -231,6 +239,14 @@ void runThroughputTest(Finn::Driver& baseDriver) { } } +/** + * @brief Load data from numpy file, run inference, and dump results + * + * @tparam T Data type for the loaded data + * @param baseDriver Reference to the FINN driver + * @param loadedNpyFile Loaded numpy file containing input data + * @param outputFile Path to output file for results + */ template void loadInferDump(Finn::Driver& baseDriver, xt::detail::npy_file& loadedNpyFile, const std::string& outputFile) { auto xtensorArray = std::move(loadedNpyFile).cast(); diff --git a/src/FINNCppDriver/core/Accelerator.cpp b/src/FINNCppDriver/core/Accelerator.cpp index 552fccf..03b2778 100644 --- a/src/FINNCppDriver/core/Accelerator.cpp +++ b/src/FINNCppDriver/core/Accelerator.cpp @@ -101,14 +101,15 @@ namespace Finn { return ret; } - Finn::vector Accelerator::getOutputData(const unsigned int deviceIndex, const std::string& outputBufferKernelName) { + // cppcheck-suppress unusedFunction + [[maybe_unused]] Finn::vector Accelerator::getOutputData(const unsigned int deviceIndex, const std::string& outputBufferKernelName, const std::size_t& numItems) { if (containsDevice(deviceIndex)) { - FINN_LOG_DEBUG(loglevel::info) << "Retrieving results from the specified device index! [accelerator.retrieveResults()]"; - return getDeviceHandler(deviceIndex).retrieveResults(outputBufferKernelName); + FINN_LOG_DEBUG(loglevel::info) << "Retrieving results from the specified device index!"; + return getDeviceHandler(deviceIndex).retrieveResults(outputBufferKernelName, numItems); } else { if (containsDevice(0)) { - FINN_LOG_DEBUG(loglevel::info) << "Retrieving results from 0 device index! [accelerator.retrieveResults()]"; - return getDeviceHandler(0).retrieveResults(outputBufferKernelName); + FINN_LOG_DEBUG(loglevel::info) << "Retrieving results from 0 device index!"; + return getDeviceHandler(0).retrieveResults(outputBufferKernelName, numItems); } else { // cppcheck-suppress missingReturn Finn::logAndError("Tried receiving data in a devicehandler with an invalid deviceIndex!"); @@ -116,6 +117,11 @@ namespace Finn { } } + Finn::vector Accelerator::getOutputData(const unsigned int deviceIndex, const std::string& outputBufferKernelName) { + std::size_t numItems = getDeviceHandler(deviceIndex).getTotalDataSize(outputBufferKernelName); + return getDeviceHandler(deviceIndex).retrieveResults(outputBufferKernelName, numItems); + } + size_t Accelerator::getSizeInBytes(unsigned int deviceIndex, const std::string& bufferName) { if (containsDevice(deviceIndex)) { return getDeviceHandler(deviceIndex).getSizeInBytes(bufferName); @@ -143,4 +149,20 @@ namespace Finn { } return 0; } + + void Accelerator::registerCallback(unsigned int deviceIndex, const std::string& bufferName, std::function callback) { + if (containsDevice(deviceIndex)) { + getDeviceHandler(deviceIndex).registerCallback(bufferName, callback); + } else { + Finn::logAndError("Tried registering a callback on a deviceIndex which does not exist! Queried index: " + std::to_string(deviceIndex) + ", KernelBufferName: " + bufferName); + } + } + + void Accelerator::drain(unsigned int deviceIndex, const std::string& bufferName) { + if (containsDevice(deviceIndex)) { + getDeviceHandler(deviceIndex).drain(bufferName); + } else { + Finn::logAndError("Tried draining a buffer on a deviceIndex which does not exist! Queried index: " + std::to_string(deviceIndex) + ", KernelBufferName: " + bufferName); + } + } } // namespace Finn diff --git a/src/FINNCppDriver/core/Accelerator.h b/src/FINNCppDriver/core/Accelerator.h index b51bda6..e91a303 100644 --- a/src/FINNCppDriver/core/Accelerator.h +++ b/src/FINNCppDriver/core/Accelerator.h @@ -10,16 +10,18 @@ * */ -#ifndef ACCELERATOR_H -#define ACCELERATOR_H +#ifndef ACCELERATOR +#define ACCELERATOR #include // for DeviceHandler, Uncheck... #include // for vector, SIZE_SPECIFIER -#include // for uint8_t -#include // for size_t -#include // for string -#include // for vector, vector<>::iter... +#include // for Severity, loglevel +#include // for uint8_t +#include // for size_t +#include // for function +#include // for string +#include // for vector, vector<>::iter... namespace Finn { struct DeviceWrapper; @@ -82,7 +84,7 @@ namespace Finn { * @brief Destroy the Accelerator object * */ - ~Accelerator() = default; + ~Accelerator() { FINN_LOG(loglevel::info) << "Destructing Accelerator"; } /** @@ -162,21 +164,75 @@ namespace Finn { * * @param deviceIndex * @param outputBufferKernelName - * @param forceArchival Whether or not to force a readout into archive. Necessary to get new data. Will be done automatically if a whole multiple of the buffer size is produced + * @param numItems Number of items to read from the output buffer * @return std::vector> */ + Finn::vector getOutputData(unsigned int deviceIndex, const std::string& outputBufferKernelName, const std::size_t& numItems); + + /** + * @brief Return a vector of output samples. + * + * @param deviceIndex The index of the device to read from + * @param outputBufferKernelName The name of the output buffer kernel + * @return Finn::vector Vector containing output data + */ Finn::vector getOutputData(unsigned int deviceIndex, const std::string& outputBufferKernelName); + /** + * @brief Get the size in bytes of a buffer + * + * @param deviceIndex The index of the device + * @param bufferName The name of the buffer + * @return size_t Size in bytes + */ size_t getSizeInBytes(unsigned int deviceIndex, const std::string& bufferName); + /** + * @brief Get the feature map size of a buffer + * + * @param deviceIndex The index of the device + * @param bufferName The name of the buffer + * @return size_t Feature map size + */ size_t getFeatureMapSize(unsigned int deviceIndex, const std::string& bufferName); + /** + * @brief Get the batch size of a buffer + * + * @param deviceIndex The index of the device + * @param bufferName The name of the buffer + * @return size_t Batch size + */ size_t getBatchSize(unsigned int deviceIndex, const std::string& bufferName); + /** + * @brief Get the total data size of a buffer + * + * @param deviceIndex The index of the device + * @param bufferName The name of the buffer + * @return size_t Total data size + */ size_t getTotalDataSize(unsigned int deviceIndex, const std::string& bufferName); + + /** + * @brief Register a callback function for a buffer + * + * @param deviceIndex The index of the device + * @param bufferName The name of the buffer + * @param callback Callback function to register + */ + void registerCallback(unsigned int deviceIndex, const std::string& bufferName, std::function callback); + + /** + * @brief Drain a buffer on the specified device + * + * @param deviceIndex The index of the device + * @param bufferName The name of the buffer to drain + */ + void drain(unsigned int deviceIndex, const std::string& bufferName); }; } // namespace Finn -#endif // ACCELERATOR_H +#endif // ACCELERATOR diff --git a/src/FINNCppDriver/core/BaseDriver.hpp b/src/FINNCppDriver/core/BaseDriver.hpp index 8236233..4cb6f9c 100644 --- a/src/FINNCppDriver/core/BaseDriver.hpp +++ b/src/FINNCppDriver/core/BaseDriver.hpp @@ -236,6 +236,14 @@ namespace Finn { size_t getTotalDataSize(unsigned int deviceIndex, const std::string& bufferName) { return accelerator.getTotalDataSize(deviceIndex, bufferName); } + /** + * @brief Register a callback function to be called when the inference of a batch is finished + */ + template> + void registerCallback(unsigned int deviceIndex, const std::string& bufferName, std::function callback) { + accelerator.registerCallback(deviceIndex, bufferName, callback); + } + /** * @brief Store input into the driver for asynchronous inference * @@ -246,7 +254,7 @@ namespace Finn { * @param inputBufferKernelName Identifier of the input kernel * @param batchSize Batch size contained in the input */ - template> + template> void input(IteratorType first, IteratorType last, uint inputDeviceIndex, const std::string& inputBufferKernelName, uint batchSize) { FINN_LOG_DEBUG(loglevel::info) << loggerPrefix() << "Store data for asynchronous inference."; auto packed = Finn::pack(first, last); @@ -254,8 +262,8 @@ namespace Finn { if (std::abs(std::distance(packed.begin(), packed.end())) != getFeatureMapSize(inputDeviceIndex, inputBufferKernelName) * batchSize) { Finn::logAndError("Input length (" + std::to_string(std::abs(std::distance(packed.begin(), packed.end()))) + ") does not match up with batches*inputsize_per_batch (" + - std::to_string(getFeatureMapSize(inputDeviceIndex, inputBufferKernelName)) + "*" + std::to_string(batchSize) + "=" + - std::to_string(getFeatureMapSize(inputDeviceIndex, inputBufferKernelName) * batchSize) + ")"); + std::to_string(getFeatureMapSize(inputDeviceIndex, inputBufferKernelName)) + "*" + std::to_string(batchSize) + "=" + + std::to_string(getFeatureMapSize(inputDeviceIndex, inputBufferKernelName) * batchSize) + ")"); } storeFunc(packed.begin(), packed.end()); @@ -269,7 +277,7 @@ namespace Finn { * @param first * @param last */ - template> + template> void input(IteratorType first, IteratorType last) { input(first, last, defaultInputDeviceIndex, defaultInputKernelName, batchElements); } @@ -282,11 +290,11 @@ namespace Finn { * @param outputBufferKernelName Identifier of the output kernel * @return Finn::vector */ - template, typename = std::enable_if> + template, bool Sync = SynchronousInference, typename = std::enable_if_t> [[nodiscard]] Finn::vector getResults(uint outputDeviceIndex, const std::string& outputBufferKernelName) { // TODO(linusjun): maybe this method should block until data is available? auto result = accelerator.getOutputData(outputDeviceIndex, outputBufferKernelName); - + static auto packedOutput = configuration.deviceWrappers[outputDeviceIndex].odmas[0]->packedShape; packedOutput[0] = batchElements; static auto foldedOutput = static_cast(configuration.deviceWrappers[outputDeviceIndex].odmas[0].get())->foldedShape; @@ -304,21 +312,39 @@ namespace Finn { * @tparam typename * @return Finn::vector */ - template, typename = std::enable_if> + template, bool Sync = SynchronousInference, typename = std::enable_if_t> [[nodiscard]] Finn::vector getResults() { // TODO(linusjun): maybe this method should block until data is available? auto result = accelerator.getOutputData(defaultOutputDeviceIndex, defaultOutputKernelName); - + static auto packedOutput = configuration.deviceWrappers[defaultOutputDeviceIndex].odmas[0]->packedShape; packedOutput[0] = batchElements; static auto foldedOutput = static_cast(configuration.deviceWrappers[defaultOutputDeviceIndex].odmas[0].get())->foldedShape; foldedOutput[0] = batchElements; const Finn::DynamicMdSpan reshapedOutput(result.begin(), result.end(), packedOutput); auto unpacked = Finn::unpackMultiDimensionalOutputs::iterator, false, V>(result.begin(), result.end(), reshapedOutput, foldedOutput); - + return unpacked; } + /** + * @brief Drains the output buffer of the specified device. This is only available in asynchronous inference mode. + */ + template> + void drain(uint outputDeviceIndex, const std::string& outputBufferKernelName) { + accelerator.drain(outputDeviceIndex, outputBufferKernelName); + } + + /** + * @brief Drains the output buffer of the default device. This is only available in asynchronous inference mode. + * + * @tparam typename + */ + template> + void drain() { + accelerator.drain(defaultOutputDeviceIndex, defaultOutputKernelName); + } + /** * @brief Implements the synchronous inference operation * @@ -333,7 +359,7 @@ namespace Finn { * @param outputBufferKernelName name of output kernel * @return Finn::vector */ - template, typename = std::enable_if> + template, bool Sync = SynchronousInference, typename = std::enable_if_t> [[nodiscard]] Finn::vector inferSynchronous(IteratorType first, IteratorType last, uint inputDeviceIndex, const std::string& inputBufferKernelName, uint outputDeviceIndex, const std::string& outputBufferKernelName) { using IterValueType = typename std::iterator_traits::value_type; static auto foldedShape = static_cast(configuration.deviceWrappers[inputDeviceIndex].idmas[0].get())->foldedShape; @@ -364,7 +390,7 @@ namespace Finn { * @param last * @return Finn::vector */ - template, typename = std::enable_if> + template, bool Sync = SynchronousInference, typename = std::enable_if_t> [[nodiscard]] Finn::vector inferSynchronous(IteratorType first, IteratorType last) { return inferSynchronous(first, last, defaultInputDeviceIndex, defaultInputKernelName, defaultOutputDeviceIndex, defaultOutputKernelName); } @@ -382,7 +408,7 @@ namespace Finn { * @param outputBufferKernelName * @return Finn::vector */ - template, typename = std::enable_if> + template, bool Sync = SynchronousInference, typename = std::enable_if_t> [[nodiscard]] Finn::vector inferSynchronous(const Finn::vector& data, uint inputDeviceIndex, const std::string& inputBufferKernelName, uint outputDeviceIndex, const std::string& outputBufferKernelName) { return inferSynchronous(data.begin(), data.end(), inputDeviceIndex, inputBufferKernelName, outputDeviceIndex, outputBufferKernelName, batchElements); } @@ -396,12 +422,12 @@ namespace Finn { * @param data * @return Finn::vector */ - template, typename = std::enable_if> + template, bool Sync = SynchronousInference, typename = std::enable_if_t> [[nodiscard]] Finn::vector inferSynchronous(const Finn::vector& data) { return inferSynchronous(data, defaultInputDeviceIndex, defaultInputKernelName, defaultOutputDeviceIndex, defaultOutputKernelName, batchElements); } - + protected: /** * * @brief Do an inference with the given data. This assumes already flattened data in uint8_t's. Specify inputs and outputs. @@ -415,15 +441,15 @@ namespace Finn { * @param batchSize * @return Finn::vector */ - template + template> [[nodiscard]] Finn::vector infer(IteratorType first, IteratorType last, uint inputDeviceIndex, const std::string& inputBufferKernelName, uint outputDeviceIndex, const std::string& outputBufferKernelName, uint batchSize) { FINN_LOG_DEBUG(loglevel::info) << loggerPrefix() << "Starting inference (raw data)"; auto storeFunc = accelerator.storeFactory(inputDeviceIndex, inputBufferKernelName); if (std::abs(std::distance(first, last)) != getTotalDataSize(inputDeviceIndex, inputBufferKernelName)) { Finn::logAndError(loggerPrefix() + " Input length (" + std::to_string(std::abs(std::distance(first, last))) + ") does not match up with batches*inputsize_per_batch (" + - std::to_string(getFeatureMapSize(inputDeviceIndex, inputBufferKernelName)) + "*" + std::to_string(batchSize) + "=" + - std::to_string(getTotalDataSize(inputDeviceIndex, inputBufferKernelName)) + ")"); + std::to_string(getFeatureMapSize(inputDeviceIndex, inputBufferKernelName)) + "*" + std::to_string(batchSize) + "=" + + std::to_string(getTotalDataSize(inputDeviceIndex, inputBufferKernelName)) + ")"); } bool stored = storeFunc(first, last); @@ -438,7 +464,7 @@ namespace Finn { FINN_LOG_DEBUG(loglevel::info) << "Reading out buffers"; accelerator.read(); - return accelerator.getOutputData(outputDeviceIndex, outputBufferKernelName); + return accelerator.getOutputData(outputDeviceIndex, outputBufferKernelName, getTotalDataSize(outputDeviceIndex, outputBufferKernelName)); } /** @@ -453,11 +479,11 @@ namespace Finn { * @param batchSize * @return Finn::vector */ + template> [[nodiscard]] Finn::vector infer(const Finn::vector& data, uint inputDeviceIndex, const std::string& inputBufferKernelName, uint outputDeviceIndex, const std::string& outputBufferKernelName, uint batchSize) { return infer(data.begin(), data.end(), inputDeviceIndex, inputBufferKernelName, outputDeviceIndex, outputBufferKernelName, batchSize); } - protected: #ifdef UNITTEST /** * @brief Return whether the data that is currently held on the FPGA is equivalent to the passed data diff --git a/src/FINNCppDriver/core/DeviceBuffer/AsyncDeviceBuffers.hpp b/src/FINNCppDriver/core/DeviceBuffer/AsyncDeviceBuffers.hpp index 134bb53..2b3e789 100644 --- a/src/FINNCppDriver/core/DeviceBuffer/AsyncDeviceBuffers.hpp +++ b/src/FINNCppDriver/core/DeviceBuffer/AsyncDeviceBuffers.hpp @@ -33,25 +33,19 @@ namespace Finn { template class AsyncBufferWrapper { protected: - constexpr static size_t queueSize = 1024; ///< Default size of the internal queue + constexpr static size_t featureMapCount = 5; //< Number of feature maps that can be buffered in the queue /** * @brief Internal queue used by all asynchronous buffers * */ - SPSCQueue queue; + DynamicSPSCQueue queue; /** * @brief Construct a new Async Buffer Wrapper object * * @param expectedMaxQueueSize Expected maximum size of the queue */ - AsyncBufferWrapper(std::size_t expectedMaxQueueSize) { - if (expectedMaxQueueSize > queueSize) { - FINN_LOG(loglevel::warning) << "[AsyncDeviceBuffer] Expected maximum queue size (" << expectedMaxQueueSize << ") is larger than the async buffer queue size (" << queueSize - << "). This might lead to problems or performance issues. Consider increasing the queue size in the SPSCQueue template parameter.\n"; - } - FINN_LOG(loglevel::info) << "[AsyncDeviceBuffer] Max buffer size:" << queueSize << "\n"; - } + AsyncBufferWrapper(std::size_t expectedMaxQueueSize) : queue(expectedMaxQueueSize * featureMapCount) { FINN_LOG(loglevel::info) << "[AsyncDeviceBuffer] Max buffer size:" << queue.size() << "\n"; } /** * @brief Destroy the Async Buffer Wrapper object @@ -86,7 +80,7 @@ namespace Finn { AsyncBufferWrapper& operator=(const AsyncBufferWrapper& buf) = delete; #ifdef UNITTEST public: - SPSCQueue::queueSize>& testGetQueue() { return this->queue; } + DynamicSPSCQueue& testGetQueue() { return this->queue; } #endif public: @@ -137,7 +131,7 @@ namespace Finn { * @param batchSize size of ringbuffer in input elements (batch elements) */ AsyncDeviceInputBuffer(const std::string& pCUName, xrt::device& device, xrt::uuid& pDevUUID, const shapePacked_t& pShapePacked, unsigned int batchSize) - : DeviceInputBuffer(pCUName, device, pDevUUID, pShapePacked), + : DeviceInputBuffer(pCUName, device, pDevUUID, pShapePacked, batchSize), detail::AsyncBufferWrapper(batchSize * FinnUtils::shapeToElements(pShapePacked)), workerThread(std::jthread(std::bind_front(&AsyncDeviceInputBuffer::runInternal, this))) {} @@ -157,10 +151,37 @@ namespace Finn { * @brief Destroy the Async Device Input Buffer object * */ - ~AsyncDeviceInputBuffer() override { - FINN_LOG(loglevel::info) << "Destructing Asynchronous input buffer"; - workerThread.request_stop(); // Joining will be handled automatically by destruction - }; + ~AsyncDeviceInputBuffer() override { FINN_LOG(loglevel::info) << "Destructing Asynchronous input buffer" << std::endl; }; + + /** + * @brief Prepare the buffer for shutdown + * + * This method will signal the worker thread to stop and wait for it to finish. + */ + void prepareForShutdown() override { + FINN_LOG(loglevel::info) << "Stopping Asynchronous input buffer" << std::endl; + DeviceInputBuffer::prepareForShutdown(); + + // Signal worker thread to stop + this->queue.shutdown(); + workerThread.request_stop(); + + // Attempt to join with timeout + auto joinFuture = std::async(std::launch::async, [this]() { + if (workerThread.joinable()) { + workerThread.join(); + } + }); + + if (joinFuture.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) { + FINN_LOG(loglevel::warning) << "Worker thread for " << this->name << " did not exit cleanly" << std::endl; + // Thread will be detached automatically when jthread is destroyed + throw std::runtime_error("Worker thread did not exit cleanly within timeout period"); + } else { + FINN_LOG(loglevel::info) << "Worker thread for " << this->name << " exited cleanly" << std::endl; + } + } + /** * @brief Deleted move assignment * @@ -202,6 +223,7 @@ namespace Finn { */ size_t loadMap(std::stop_token stoken) { FINN_LOG_DEBUG(loglevel::info) << "Data transfer of input data to FPGA!\n"; + FINN_LOG_DEBUG(loglevel::info) << "Queue size: " << this->queue.size() << "\n"; return this->queue.dequeue_bulk(this->map, this->totalDataSize, stoken); } @@ -224,19 +246,20 @@ namespace Finn { class AsyncDeviceOutputBuffer : public DeviceOutputBuffer, public detail::AsyncBufferWrapper { std::mutex ltsMutex; std::jthread workerThread; + std::function callback = [](std::size_t numItems) {}; ///< Callback that is called when data is available in the queue + private: void readInternal(std::stop_token stoken) { FINN_LOG_DEBUG(loglevel::info) << "Starting to read from the device"; while (!stoken.stop_requested()) { this->execute(this->shapePacked[0]); - FINN_LOG_DEBUG(loglevel::info) << "PRE WAIT"; bool success = this->wait(stoken); // Wait until the kernel is done executing - FINN_LOG_DEBUG(loglevel::info) << "POST WAIT"; if (!success) { FINN_LOG_DEBUG(loglevel::error) << "Kernel execution failed"; } this->sync(this->totalDataSize); - saveMap(); // TODO: Maybe the queue should have a callback that is called when the queue is full/data is avaible? + saveMap(); + callback(this->queue.size() - (this->queue.size() % this->totalDataSize)); // Notify that data is available in the queue } } @@ -252,7 +275,7 @@ namespace Finn { */ AsyncDeviceOutputBuffer(const std::string& pCUName, xrt::device& device, xrt::uuid& pDevUUID, const shapePacked_t& pShapePacked, unsigned int batchSize) : DeviceOutputBuffer(pCUName, device, pDevUUID, pShapePacked, batchSize), - detail::AsyncBufferWrapper(batchSize * FinnUtils::shapeToElements(pShapePacked)), + detail::AsyncBufferWrapper(2 * batchSize * FinnUtils::shapeToElements(pShapePacked)), // Make output buffer map twice as large to circumvent a very rare deadlock in the case where one thread handles IO alone. workerThread(std::jthread(std::bind_front(&AsyncDeviceOutputBuffer::readInternal, this))){}; /** @@ -271,14 +294,35 @@ namespace Finn { * @brief Destroy the Async Device Output Buffer object * */ - ~AsyncDeviceOutputBuffer() override { - FINN_LOG(loglevel::info) << "Stopping Asynchronous output buffer"; - this->queue.shutdown(); // Shutdown the queue to prevent further enqueues - FINN_LOG(loglevel::info) << "Waiting for Asynchronous output buffer to finish"; - workerThread.request_stop(); // Joining will be handled automatically by destruction - workerThread.join(); // Wait for the worker thread to finish - FINN_LOG(loglevel::info) << "Destruction Asynchronous output buffer"; - }; + ~AsyncDeviceOutputBuffer() override { FINN_LOG(loglevel::info) << "Destruction Asynchronous output buffer" << std::endl; }; + + /** + * @brief Prepare the buffer for shutdown + * + * This method will signal the worker thread to stop and wait for it to finish. + */ + void prepareForShutdown() override { + DeviceOutputBuffer::prepareForShutdown(); + + // Signal worker thread to stop + this->queue.shutdown(); + workerThread.request_stop(); + + // Attempt to join with timeout + auto joinFuture = std::async(std::launch::async, [this]() { + if (workerThread.joinable()) { + workerThread.join(); + } + }); + + if (joinFuture.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) { + FINN_LOG(loglevel::warning) << "Worker thread for " << this->name << " did not exit cleanly" << std::endl; + // Thread will be detached automatically when jthread is destroyed + throw std::runtime_error("Worker thread did not exit cleanly within timeout period"); + } else { + FINN_LOG(loglevel::info) << "Worker thread for " << this->name << " exited cleanly" << std::endl; + } + } /** * @brief Deleted move assignment operator @@ -316,12 +360,24 @@ namespace Finn { * * @return Finn::vector */ - Finn::vector getData() override { // TODO(linusjun): Replace with a variant that takes a output iterator - Finn::vector tmp(this->totalDataSize); - this->queue.dequeue_bulk(tmp.begin(), this->totalDataSize); + Finn::vector getData(const std::size_t& numItems) override { + Finn::vector tmp(numItems); + this->queue.dequeue_bulk(tmp.begin(), numItems); return tmp; } + void registerCallback(std::function callback) override { + // Register a callback that is called when data is available in the queue + this->callback = callback; + } + + void drain() override { + // Drain the queue by reading all available items + T item; + while (this->queue.try_dequeue(item)) {} + FINN_LOG_DEBUG(loglevel::info) << "Drained the AsyncDeviceOutputBuffer queue."; + } + protected: /** * @brief Store the contents of the memory map into the ring buffer. @@ -330,10 +386,10 @@ namespace Finn { bool saveMap() { FINN_LOG_DEBUG(loglevel::info) << "Data transfer of output from FPGA!\n"; if (this->queue.enqueue_bulk(this->map, this->totalDataSize) == this->totalDataSize) { - FINN_LOG_DEBUG(loglevel::info) << this->loggerPrefix() << "Stored " << this->totalDataSize << " elements in the ring buffer"; + FINN_LOG_DEBUG(loglevel::info) << this->loggerPrefix() << "Stored " << this->totalDataSize << " elements in the FIFO"; return true; } else { - FINN_LOG_DEBUG(loglevel::error) << this->loggerPrefix() << "Failed to store data in the ring buffer."; + FINN_LOG_DEBUG(loglevel::error) << this->loggerPrefix() << "Failed to store data in the FIFO."; return false; } } diff --git a/src/FINNCppDriver/core/DeviceBuffer/DeviceBuffer.hpp b/src/FINNCppDriver/core/DeviceBuffer/DeviceBuffer.hpp index fdc338e..5a388cf 100644 --- a/src/FINNCppDriver/core/DeviceBuffer/DeviceBuffer.hpp +++ b/src/FINNCppDriver/core/DeviceBuffer/DeviceBuffer.hpp @@ -87,14 +87,27 @@ namespace Finn { */ T* map; /** - * @brief 64 bit adress of the buffer located on the FPGA card + * @brief 64 bit address of the buffer located on the FPGA card * */ const long long bufAdr; + /** + * @brief Total size of data in elements + * + */ std::size_t totalDataSize; + /** + * @brief Size of feature map + * + */ std::size_t featureMapSize; + /** + * @brief Busy wait until the IP core is done executing + * + * @param stopToken Token to request stopping the wait + */ void busyWait(std::stop_token stopToken = {}) { // Wait until the IP is DONE uint32_t axi_ctrl = 0; @@ -104,6 +117,14 @@ namespace Finn { } private: + /** + * @brief Get the group ID for a compute unit + * + * @param device XRT device + * @param uuid Device UUID + * @param computeUnit Name of the compute unit + * @return unsigned int Group ID + */ unsigned int getGroupId(const xrt::device& device, const xrt::uuid& uuid, const std::string& computeUnit) { return xrt::kernel(device, uuid, computeUnit).group_id(0); } /** @@ -112,6 +133,12 @@ namespace Finn { */ uint32_t oldRepetitions = 0; + /** + * @brief Get flags for buffer object creation based on host memory access + * + * @param hostMemoryAccess Whether host memory access is enabled + * @return consteval static xrt::bo::flags Buffer object flags + */ consteval static xrt::bo::flags getFlags(bool hostMemoryAccess) { if (hostMemoryAccess) { return xrt::bo::flags::host_only; @@ -143,6 +170,7 @@ namespace Finn { std::fill(map, map + mapSize, 0); totalDataSize = FinnUtils::shapeToElements(pShapePacked) * batchSize; featureMapSize = totalDataSize / shapePacked[0]; + FINN_LOG(loglevel::info) << "Map has totalSize " << totalDataSize << " and featureMapSize " << featureMapSize << "\n"; } /** @@ -159,11 +187,23 @@ namespace Finn { */ DeviceBuffer(const DeviceBuffer& buf) noexcept = delete; + /** + * @brief Prepare the DeviceBuffer for shutdown + * + * This function is called before the application is shutting down. + * It can be used to release resources or perform cleanup tasks. + * The default implementation does nothing. + */ + virtual void prepareForShutdown() { + // Base implementation does nothing + FINN_LOG(loglevel::info) << "Preparing " << name << " for shutdown"; + } + /** * @brief Destroy the Device Buffer object * */ - virtual ~DeviceBuffer() { FINN_LOG(loglevel::info) << "Destructing DeviceBuffer " << name << "\n"; }; + virtual ~DeviceBuffer() { FINN_LOG(loglevel::info) << "Destructing DeviceBuffer " << name << std::endl; }; /** * @brief Deleted move assignment operator @@ -181,12 +221,32 @@ namespace Finn { */ DeviceBuffer& operator=(const DeviceBuffer& buf) = delete; + /** + * @brief Get the size in bytes of the buffer + * + * @return size_t Size in bytes + */ virtual size_t getSizeInBytes() { return totalDataSize * sizeof(T); } + /** + * @brief Get the feature map size + * + * @return size_t Feature map size + */ virtual size_t getFeatureMapSize() { return featureMapSize; } + /** + * @brief Get the batch size + * + * @return size_t Batch size + */ virtual size_t getBatchSize() { return this->shapePacked[0]; } + /** + * @brief Get the total data size + * + * @return size_t Total data size + */ virtual size_t getTotalDataSize() { return totalDataSize; } /** @@ -211,7 +271,14 @@ namespace Finn { */ virtual bool run() = 0; - virtual bool wait(std::stop_token stopToken={}) { + /** + * @brief Wait for the kernel to complete execution + * + * @param stopToken Token to request stopping the wait + * @return true Success + * @return false Failure + */ + virtual bool wait(std::stop_token stopToken = {}) { busyWait(stopToken); return true; }; @@ -231,6 +298,11 @@ namespace Finn { */ virtual void sync(std::size_t bytes) = 0; + /** + * @brief Execute the kernel with specified repetitions + * + * @param repetitions Number of repetitions to execute (default: 1) + */ void execute(const uint32_t repetitions = 1) { // writes the buffer adress constexpr uint32_t offset_buf = 0x10; @@ -364,17 +436,35 @@ namespace Finn { DeviceOutputBuffer(const std::string& pCUName, xrt::device& device, xrt::uuid& pDevUUID, const shapePacked_t& pShapePacked, unsigned int batchSize = 1) : DeviceBuffer(pCUName, device, pDevUUID, pShapePacked, batchSize){}; /** - * @brief Return stored data from storage + * @brief Get the data from the buffer and return it as a vector * * @return Finn::vector */ - virtual Finn::vector getData() = 0; + virtual Finn::vector getData(const std::size_t& numItems) = 0; + /** * @brief Sync data from the FPGA back to the host * */ virtual bool read() = 0; + /** + * @brief Register a callback that is called when data is available in the queue (Only for AsyncDeviceOutputBuffer) + * + * @param callback Callback function that takes the number of items available in the queue + */ + virtual void registerCallback(std::function callback) { + // Default implementation does nothing + // This can be overridden by derived classes if needed + Finn::logAndError("Callback not supported by this DeviceOutputBuffer implementation."); + } + + virtual void drain() { + // Default implementation does nothing + // This can be overridden by derived classes if needed + Finn::logAndError("Drain not supported by this DeviceOutputBuffer implementation."); + } + protected: /** * @brief Sync data from the FPGA into the memory map diff --git a/src/FINNCppDriver/core/DeviceBuffer/SyncDeviceBuffers.hpp b/src/FINNCppDriver/core/DeviceBuffer/SyncDeviceBuffers.hpp index 3252afc..5f5a0f7 100644 --- a/src/FINNCppDriver/core/DeviceBuffer/SyncDeviceBuffers.hpp +++ b/src/FINNCppDriver/core/DeviceBuffer/SyncDeviceBuffers.hpp @@ -166,8 +166,8 @@ namespace Finn { * * @return Finn::vector */ - Finn::vector getData() override { - Finn::vector tmp(this->map, this->map + this->totalDataSize); + Finn::vector getData(const std::size_t& numItems) override { + Finn::vector tmp(this->map, this->map + numItems); return tmp; } diff --git a/src/FINNCppDriver/core/DeviceHandler.cpp b/src/FINNCppDriver/core/DeviceHandler.cpp index 4f57a02..924e544 100644 --- a/src/FINNCppDriver/core/DeviceHandler.cpp +++ b/src/FINNCppDriver/core/DeviceHandler.cpp @@ -16,8 +16,8 @@ #include #include #include -#include #include +#include #include // for copy #include #include @@ -82,25 +82,24 @@ namespace Finn { void DeviceHandler::initializeDevice() { FINN_LOG(loglevel::info) << "(" << xrtDeviceIndex << ") " - << "Initializing xrt::device, loading xclbin and assigning IP\n"; - resetFPGAS(); + << "Initializing xrt::device, loading xclbin and assigning IP\n"; + resetFPGAS(static_cast(xrtDeviceIndex)); device = xrt::device(xrtDeviceIndex); } void DeviceHandler::loadXclbinSetUUID() { FINN_LOG(loglevel::info) << "(" << xrtDeviceIndex << ") " - << "Loading XCLBIN and setting uuid\n"; + << "Loading XCLBIN and setting uuid\n"; uuid = device.load_xclbin(xclbinPath); } void DeviceHandler::initializeBufferObjects(const DeviceWrapper& devWrap, unsigned int hostBufferSize, bool pSynchronousInference) { FINN_LOG(loglevel::info) << "(" << xrtDeviceIndex << ") " - << "Initializing buffer objects with buffer size " << hostBufferSize << "\n"; + << "Initializing buffer objects with buffer size " << hostBufferSize << "\n"; for (auto&& ebdptr : devWrap.idmas) { if (pSynchronousInference) { inputBufferMap.emplace(std::make_pair(ebdptr->kernelName, std::make_shared>(ebdptr->kernelName, device, uuid, ebdptr->packedShape, hostBufferSize))); - } - else { + } else { inputBufferMap.emplace(std::make_pair(ebdptr->kernelName, std::make_shared>(ebdptr->kernelName, device, uuid, ebdptr->packedShape, hostBufferSize))); } } @@ -108,8 +107,7 @@ namespace Finn { if (pSynchronousInference) { auto ptr = std::make_shared>(ebdptr->kernelName, device, uuid, ebdptr->packedShape, hostBufferSize); outputBufferMap.emplace(std::make_pair(ebdptr->kernelName, ptr)); - } - else { + } else { auto ptr = std::make_shared>(ebdptr->kernelName, device, uuid, ebdptr->packedShape, hostBufferSize); outputBufferMap.emplace(std::make_pair(ebdptr->kernelName, ptr)); } @@ -126,10 +124,9 @@ namespace Finn { void DeviceHandler::setBatchSize(uint pBatchsize) { if (this->batchsize == pBatchsize) { return; - } - else { + } else { FINN_LOG(loglevel::info) << "(" << xrtDeviceIndex << ") " - << "Change batch size to " << pBatchsize << "\n"; + << "Change batch size to " << pBatchsize << "\n"; this->batchsize = pBatchsize; inputBufferMap.clear(); outputBufferMap.clear(); @@ -144,8 +141,7 @@ namespace Finn { [[maybe_unused]] bool DeviceHandler::containsBuffer(const std::string& kernelBufferName, IO ioMode) { if (ioMode == IO::INPUT) { return inputBufferMap.contains(kernelBufferName); - } - else if (ioMode == IO::OUTPUT) { + } else if (ioMode == IO::OUTPUT) { return outputBufferMap.contains(kernelBufferName); } return false; @@ -196,21 +192,20 @@ namespace Finn { } - [[maybe_unused]] Finn::vector DeviceHandler::retrieveResults(const std::string& outputBufferKernelName) { + [[maybe_unused]] Finn::vector DeviceHandler::retrieveResults(const std::string& outputBufferKernelName, const std::size_t& numItems) { if (!outputBufferMap.contains(outputBufferKernelName)) { auto newlineFold = [](std::string a, const auto& b) { return std::move(a) + '\n' + std::move(b.first); }; std::string existingNames = "Existing buffer names:"; std::accumulate(inputBufferMap.begin(), inputBufferMap.end(), existingNames, newlineFold); Finn::logAndError("Tried accessing kernel/buffer with name " + outputBufferKernelName + " but this kernel / buffer does not exist! " + existingNames); } - return outputBufferMap.at(outputBufferKernelName)->getData(); + return outputBufferMap.at(outputBufferKernelName)->getData(numItems); } size_t DeviceHandler::getSizeInBytes(const std::string& bufferName) { if (inputBufferMap.contains(bufferName)) { return inputBufferMap.at(bufferName)->getSizeInBytes(); - } - else if (outputBufferMap.contains(bufferName)) { + } else if (outputBufferMap.contains(bufferName)) { return outputBufferMap.at(bufferName)->getSizeInBytes(); } return 0; @@ -219,8 +214,7 @@ namespace Finn { size_t DeviceHandler::getFeatureMapSize(const std::string& bufferName) { if (inputBufferMap.contains(bufferName)) { return inputBufferMap.at(bufferName)->getFeatureMapSize(); - } - else if (outputBufferMap.contains(bufferName)) { + } else if (outputBufferMap.contains(bufferName)) { return outputBufferMap.at(bufferName)->getFeatureMapSize(); } return 0; @@ -229,8 +223,7 @@ namespace Finn { size_t DeviceHandler::getBatchSize(const std::string& bufferName) { if (inputBufferMap.contains(bufferName)) { return inputBufferMap.at(bufferName)->getBatchSize(); - } - else if (outputBufferMap.contains(bufferName)) { + } else if (outputBufferMap.contains(bufferName)) { return outputBufferMap.at(bufferName)->getBatchSize(); } return 0; @@ -239,13 +232,31 @@ namespace Finn { size_t DeviceHandler::getTotalDataSize(const std::string& bufferName) { if (inputBufferMap.contains(bufferName)) { return inputBufferMap.at(bufferName)->getTotalDataSize(); - } - else if (outputBufferMap.contains(bufferName)) { + } else if (outputBufferMap.contains(bufferName)) { return outputBufferMap.at(bufferName)->getTotalDataSize(); } return 0; } + void DeviceHandler::registerCallback(const std::string& bufferName, std::function callback) { + if (inputBufferMap.contains(bufferName)) { + Finn::logAndError("Tried registering a callback on an input buffer! This is not allowed! Queried KernelBufferName: " + bufferName); + } else if (outputBufferMap.contains(bufferName)) { + outputBufferMap.at(bufferName)->registerCallback(callback); + } else { + Finn::logAndError("Tried registering a callback on a buffer which does not exist! Queried KernelBufferName: " + bufferName); + } + } + + void DeviceHandler::drain(const std::string& bufferName) { + if (inputBufferMap.contains(bufferName)) { + Finn::logAndError("Tried draining an input buffer! This is not allowed! Queried KernelBufferName: " + bufferName); + } else if (outputBufferMap.contains(bufferName)) { + outputBufferMap.at(bufferName)->drain(); + } else { + Finn::logAndError("Tried draining a buffer which does not exist! Queried KernelBufferName: " + bufferName); + } + } #ifndef NDEBUG bool DeviceHandler::isBufferMapCollisionFree() { @@ -253,14 +264,14 @@ namespace Finn { for (size_t index = 0; index < inputBufferMap.bucket_count(); ++index) { if (inputBufferMap.bucket_size(index) > 1) { FINN_LOG_DEBUG(loglevel::error) << "(" << xrtDeviceIndex << ") " - << "Hash collision in inputBufferMap. This access to the inputBufferMap is no longer constant time!"; + << "Hash collision in inputBufferMap. This access to the inputBufferMap is no longer constant time!"; collisionFound = true; } } for (size_t index = 0; index < outputBufferMap.bucket_count(); ++index) { if (outputBufferMap.bucket_size(index) > 1) { FINN_LOG_DEBUG(loglevel::error) << "(" << xrtDeviceIndex << ") " - << "Hash collision in outputBufferMap. This access to the outputBufferMap is no longer constant time!"; + << "Hash collision in outputBufferMap. This access to the outputBufferMap is no longer constant time!"; collisionFound = true; } } diff --git a/src/FINNCppDriver/core/DeviceHandler.h b/src/FINNCppDriver/core/DeviceHandler.h index 9ca579f..fe0e536 100644 --- a/src/FINNCppDriver/core/DeviceHandler.h +++ b/src/FINNCppDriver/core/DeviceHandler.h @@ -127,7 +127,22 @@ namespace Finn { * @brief Destroy the Device Handler object * */ - ~DeviceHandler() { FINN_LOG(loglevel::info) << "Tearing down DeviceHandler\n"; }; + ~DeviceHandler() { + FINN_LOG(loglevel::info) << "Tearing down DeviceHandler" << std::endl; + + // First call prepareForShutdown on all buffers + for (auto& [_, buffer] : inputBufferMap) { + buffer->prepareForShutdown(); + } + for (auto& [_, buffer] : outputBufferMap) { + buffer->prepareForShutdown(); + } + + // Now safe to destroy buffers + inputBufferMap.clear(); + outputBufferMap.clear(); + FINN_LOG(loglevel::info) << "Destructed Buffers" << std::endl; + }; /** * @brief Sets the input batch size. Needs to reinitialize all buffers! @@ -198,19 +213,59 @@ namespace Finn { /** * @brief Read from the output buffer on the host. This does NOT execute the output kernel * - * @param outputBufferKernelName + * @param outputBufferKernelName identifier of the output buffer kernel + * @param numItems Number of items to read from the output buffer * @return Finn::vector */ - Finn::vector retrieveResults(const std::string& outputBufferKernelName); + Finn::vector retrieveResults(const std::string& outputBufferKernelName, const std::size_t& numItems); + /** + * @brief Get the size in bytes of a buffer + * + * @param bufferName The name of the buffer + * @return size_t Size in bytes + */ size_t getSizeInBytes(const std::string& bufferName); + /** + * @brief Get the feature map size of a buffer + * + * @param bufferName The name of the buffer + * @return size_t Feature map size + */ size_t getFeatureMapSize(const std::string& bufferName); + /** + * @brief Get the batch size of a buffer + * + * @param bufferName The name of the buffer + * @return size_t Batch size + */ size_t getBatchSize(const std::string& bufferName); + /** + * @brief Get the total data size of a buffer + * + * @param bufferName The name of the buffer + * @return size_t Total data size + */ size_t getTotalDataSize(const std::string& bufferName); + /** + * @brief Register a callback function for a buffer + * + * @param bufferName The name of the buffer + * @param callback Callback function to register + */ + void registerCallback(const std::string& bufferName, std::function callback); + + /** + * @brief Drain a buffer + * + * @param bufferName The name of the buffer to drain + */ + void drain(const std::string& bufferName); + /** * @brief Return whether there is a kernel with the given name in this device * @@ -279,7 +334,6 @@ namespace Finn { void initializeBufferObjects(const DeviceWrapper& devWrap, unsigned int hostBufferSize, bool pSynchronousInference); private: - /** * @brief Store the provided data into the DeviceBuffer * diff --git a/src/FINNCppDriver/utils/DoNotOptimize.h b/src/FINNCppDriver/utils/DoNotOptimize.h index fd50238..b7f343f 100644 --- a/src/FINNCppDriver/utils/DoNotOptimize.h +++ b/src/FINNCppDriver/utils/DoNotOptimize.h @@ -20,9 +20,10 @@ namespace Finn { /** - * @brief Disables compiler optimization of unused variables for specific variable + * @brief Disables compiler optimization of unused variables for specific variable (trivially copyable types <= pointer size) * - * @tparam Tp + * @tparam Tp Type of the value (must be trivially copyable and size <= pointer size) + * @param value Reference to the value to prevent optimization */ template inline __attribute__((always_inline)) typename std::enable_if::value && (sizeof(Tp) <= sizeof(Tp*))>::type DoNotOptimize(Tp& value) { @@ -30,9 +31,10 @@ namespace Finn { } /** - * @brief Disables compiler optimization of unused variables for specific variable + * @brief Disables compiler optimization of unused variables for specific variable (non-trivially copyable or large types) * - * @tparam Tp + * @tparam Tp Type of the value (non-trivially copyable or size > pointer size) + * @param value Reference to the value to prevent optimization */ template inline __attribute__((always_inline)) typename std::enable_if::value || (sizeof(Tp) > sizeof(Tp*))>::type DoNotOptimize(Tp& value) { @@ -40,9 +42,10 @@ namespace Finn { } /** - * @brief Disables compiler optimization of unused variables for specific variable + * @brief Disables compiler optimization of unused variables for specific variable (rvalue, trivially copyable <= pointer size) * - * @tparam Tp + * @tparam Tp Type of the value (must be trivially copyable and size <= pointer size) + * @param value Rvalue reference to the value to prevent optimization */ template inline __attribute__((always_inline)) typename std::enable_if::value && (sizeof(Tp) <= sizeof(Tp*))>::type DoNotOptimize(Tp&& value) { @@ -50,9 +53,10 @@ namespace Finn { } /** - * @brief Disables compiler optimization of unused variables for specific variable + * @brief Disables compiler optimization of unused variables for specific variable (rvalue, non-trivially copyable or large types) * - * @tparam Tp + * @tparam Tp Type of the value (non-trivially copyable or size > pointer size) + * @param value Rvalue reference to the value to prevent optimization */ template inline __attribute__((always_inline)) typename std::enable_if::value || (sizeof(Tp) > sizeof(Tp*))>::type DoNotOptimize(Tp&& value) { diff --git a/src/FINNCppDriver/utils/FPGAReset.hpp b/src/FINNCppDriver/utils/FPGAReset.hpp index e0cc188..1df0bf7 100644 --- a/src/FINNCppDriver/utils/FPGAReset.hpp +++ b/src/FINNCppDriver/utils/FPGAReset.hpp @@ -1,34 +1,50 @@ +/** + * @file FPGAReset.hpp + * @author Linus Jungemann (linus.jungemann@uni-paderborn.de) and others + * @brief Provides functionality to reset FPGA devices + * @version 0.1 + * @date 2025-07-12 + * + * @copyright Copyright (c) 2025 + * @license All rights reserved. This program and the accompanying materials are made available under the terms of the MIT license. + * + */ + #ifndef FPGARESET_HPP #define FPGARESET_HPP -#include -#include +#include // For open() #include -#include +#include + +#include +#include #include -#include -#include +#include #include -#include // For open() +#include +#include +#include + +using namespace std::chrono_literals; namespace Finn { /** - * Helper function to execute a command and optionally capture its output + * @brief Helper function to execute a command and optionally capture its output + * * @param args Vector of command arguments (first element is the command) * @param captureOutput Whether to capture and return the command output * @param silenceOutput Whether to silence command's stdout and stderr - * @return Pair of (success status, command output if requested) + * @return std::pair Pair of (success status, command output if requested) */ - std::pair executeCommand(const std::vector& args, - bool captureOutput = false, - bool silenceOutput = false) { + std::pair executeCommand(const std::vector& args, bool captureOutput = false, bool silenceOutput = false) { int pipefd[2]; std::string output; if (captureOutput && pipe(pipefd) == -1) { std::cerr << "Pipe creation failed: " << strerror(errno) << std::endl; - return { false, output }; + return {false, output}; } pid_t pid = fork(); @@ -39,16 +55,14 @@ namespace Finn { close(pipefd[0]); close(pipefd[1]); } - return { false, output }; - } - else if (pid == 0) { + return {false, output}; + } else if (pid == 0) { // Child process if (captureOutput) { - close(pipefd[0]); // Close read end + close(pipefd[0]); // Close read end dup2(pipefd[1], STDOUT_FILENO); close(pipefd[1]); - } - else if (silenceOutput) { + } else if (silenceOutput) { // Redirect stdout and stderr to /dev/null int devnull = open("/dev/null", O_WRONLY); if (devnull >= 0) { @@ -70,11 +84,10 @@ namespace Finn { // If we get here, exec failed std::cerr << "Exec failed for " << args[0] << ": " << strerror(errno) << std::endl; _exit(EXIT_FAILURE); - } - else { + } else { // Parent process if (captureOutput) { - close(pipefd[1]); // Close write end + close(pipefd[1]); // Close write end // Read output from pipe char buffer[4096]; @@ -92,7 +105,7 @@ namespace Finn { int status; waitpid(pid, &status, 0); - return { WIFEXITED(status) && WEXITSTATUS(status) == 0, output }; + return {WIFEXITED(status) && WEXITSTATUS(status) == 0, output}; } } @@ -102,11 +115,10 @@ namespace Finn { * @return Whether the reset was successful */ bool resetFPGA(const std::string& deviceId) { - auto [success, _] = executeCommand({ "xbutil", "reset", "--force", "-d", deviceId }, false, true); + auto [success, _] = executeCommand({"xbutil", "reset", "--force", "-d", deviceId}, false, true); if (success) { FINN_LOG(loglevel::info) << "Successfully reset FPGA device: " << deviceId << std::endl; - } - else { + } else { std::cerr << "Failed to reset FPGA device: " << deviceId << std::endl; } return success; @@ -118,7 +130,7 @@ namespace Finn { */ std::vector getDevices() { std::vector devices; - auto [success, output] = executeCommand({ "xbutil", "examine" }, true); + auto [success, output] = executeCommand({"xbutil", "examine"}, true); if (!success) { std::cerr << "Failed to execute 'xbutil examine'" << std::endl; @@ -146,31 +158,49 @@ namespace Finn { * Reset all available FPGA devices * Throws runtime_error if no devices are found or if any reset fails */ - void resetFPGAS() { + void resetFPGAS(const int index = -1) { +#ifdef UNITTEST + // In unit tests, we might want to mock this function or skip it + FINN_LOG(loglevel::info) << "Skipping FPGA reset in unit tests." << std::endl; + return; +#endif + std::vector devices = getDevices(); if (devices.empty()) { logAndError("No FPGA devices found. Cannot reset."); } + if (index != -1) { + if (index >= devices.size()) { + logAndError("Invalid device index: " + std::to_string(index) + ". Available devices: " + std::to_string(devices.size())); + } + if (!resetFPGA(devices[index])) { + logAndError("Failed to reset FPGA device at index: " + std::to_string(index)); + } + std::this_thread::sleep_for(1000ms); // Wait for the reset to complete + return; + } + bool allSuccessful = true; std::string failedDevices; for (const auto& device : devices) { if (!resetFPGA(device)) { allSuccessful = false; - if (!failedDevices.empty()) failedDevices += ", "; + if (!failedDevices.empty()) + failedDevices += ", "; failedDevices += device; } } + std::this_thread::sleep_for(1000ms); // Wait for the reset to complete if (!allSuccessful) { logAndError("Failed to reset FPGA device(s): " + failedDevices); - } - else { + } else { FINN_LOG(loglevel::info) << "Successfully reset all " << devices.size() << " FPGA device(s)" << std::endl; } } -} // namespace Finn +} // namespace Finn -#endif // !FPGARESET_HPP \ No newline at end of file +#endif // !FPGARESET_HPP \ No newline at end of file diff --git a/src/FINNCppDriver/utils/FinnUtils.h b/src/FINNCppDriver/utils/FinnUtils.h index 5df9ee3..804a008 100644 --- a/src/FINNCppDriver/utils/FinnUtils.h +++ b/src/FINNCppDriver/utils/FinnUtils.h @@ -48,20 +48,20 @@ namespace FinnUtils { BufferFiller(uint8_t min, uint8_t max) : sampler(std::uniform_int_distribution(min, max)) {} /** - * @brief + * @brief Create a BufferFiller with specified range * - * @param min - * @param max - * @return BufferFiller + * @param min Minimum value for random generation + * @param max Maximum value for random generation + * @return BufferFiller New BufferFiller instance */ static BufferFiller create(uint8_t min, uint8_t max) { return {min, max}; } /** - * @brief + * @brief Fill a range with random values * - * @tparam IteratorType - * @param first - * @param last + * @tparam IteratorType Type of iterator + * @param first Iterator to first element + * @param last Iterator to end of range */ template void fillRandom(IteratorType first, IteratorType last) { @@ -69,9 +69,9 @@ namespace FinnUtils { } /** - * @brief + * @brief Fill a vector with random values * - * @param vec + * @param vec Vector to fill with random values */ void fillRandom(std::vector& vec) { fillRandom(vec.begin(), vec.end()); } }; @@ -161,16 +161,38 @@ namespace FinnUtils { // NOLINTEND } + /** + * @brief Fast implementation of log2 using bit operations + * + * @tparam T Integral type + * @param value Value to compute log2 of + * @return constexpr T Floor of log2(value) + */ template inline constexpr T fastLog2(T value) { return (value == 0) ? 0 : std::bit_width(value) - 1; } + /** + * @brief Fast implementation of ceiling of log2 + * + * @tparam T Integral type + * @param value Value to compute ceil(log2) of + * @return constexpr T Ceiling of log2(value) + */ template inline constexpr T fastLog2Ceil(T value) { return (value == 0) ? 0 : fastLog2(value - 1) + 1; } + /** + * @brief Fast implementation of ceiling division + * + * @tparam T Arithmetic type + * @param value Dividend + * @param value2 Divisor + * @return constexpr T Ceiling of value/value2 + */ template inline constexpr T fastDivCeil(T value, T value2) { return value == 0 ? 0 : 1 + ((value - 1) / value2); @@ -252,7 +274,6 @@ namespace FinnUtils { } - } // namespace FinnUtils #endif diff --git a/src/FINNCppDriver/utils/Logger.hpp b/src/FINNCppDriver/utils/Logger.hpp index f1534fd..e4c7437 100644 --- a/src/FINNCppDriver/utils/Logger.hpp +++ b/src/FINNCppDriver/utils/Logger.hpp @@ -58,6 +58,11 @@ DevNull& operator<<(DevNull& dest, [[maybe_unused]] T) { */ class Logger { public: + /** + * @brief Initialize the logger with optional console output + * + * @param console Enable console output in addition to file logging + */ void static initLogger(bool console = false) { static Logger log(console); } /** @@ -102,18 +107,18 @@ class Logger { }; namespace Finn { -/** - * @brief First log the message as an error into the logger, then throw the passed error! - * - * @tparam E - * @param msg - */ -template -[[noreturn]] void logAndError(const std::string& msg) { - FINN_LOG(loglevel::error) << msg; - throw E(msg); -} -} // namespace Finn + /** + * @brief First log the message as an error into the logger, then throw the passed error! + * + * @tparam E + * @param msg + */ + template + [[noreturn]] void logAndError(const std::string& msg) { + FINN_LOG(loglevel::error) << msg; + throw E(msg); + } +} // namespace Finn #endif // !LOGGING_H \ No newline at end of file diff --git a/src/FINNCppDriver/utils/SPSCQueue.hpp b/src/FINNCppDriver/utils/SPSCQueue.hpp index ee5750f..4b43779 100644 --- a/src/FINNCppDriver/utils/SPSCQueue.hpp +++ b/src/FINNCppDriver/utils/SPSCQueue.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,9 @@ using namespace std::literals::chrono_literals; * functions that support the main SPSCQueue implementation. */ namespace detail { + // Make CACHE_LINE_SIZE accessible throughout the namespace + static constexpr size_t CACHE_LINE_SIZE = 64; + /** * @brief Enumeration of supported SIMD instruction sets * @@ -428,25 +432,23 @@ namespace detail { }; /** - * @brief Base class for SPSCQueue implementations + * @brief Common base template for SPSC queue implementations * - * Contains common implementation details shared by all specializations - * of the SPSCQueue. + * Provides the core functionality for both static and dynamic queue variants. * * @tparam T Element type - * @tparam ActualCapacity Actual capacity of the queue (power of 2) + * @tparam IndexMask Type of mask for index wrapping (size_t for dynamic, integral constant for static) + * @tparam BufferAccessor Type that provides access to the underlying buffer * @tparam IsTrivial Whether T is a trivially copyable type */ - template + template class SPSCQueueBase { protected: - static constexpr size_t CACHE_LINE_SIZE = 64; ///< Size of a cache line in bytes - static constexpr size_t INDEX_MASK = ActualCapacity - 1; ///< Mask for index wrapping + // Buffer access through composition + BufferAccessor buffer_; - /** - * @brief Cache-aligned atomic size_t with padding to prevent false sharing - */ - struct alignas(CACHE_LINE_SIZE) AlignedAtomicSize { + // Cache-aligned elements to prevent false sharing + alignas(CACHE_LINE_SIZE) struct AlignedAtomicSize { std::atomic value{0}; ///< The atomic value /// Padding to fill a complete cache line char padding[CACHE_LINE_SIZE - sizeof(std::atomic)]; @@ -468,14 +470,11 @@ namespace detail { void store(size_t desired, std::memory_order order = std::memory_order_seq_cst) noexcept { value.store(desired, order); } }; - // Cache-aligned elements to prevent false sharing - alignas(CACHE_LINE_SIZE) std::array buffer_; ///< Element storage buffer - - alignas(CACHE_LINE_SIZE) AlignedAtomicSize head_; ///< Consumer position - char head_padding_[CACHE_LINE_SIZE]; ///< Extra padding between head and tail + AlignedAtomicSize head_; ///< Consumer position + char head_padding_[CACHE_LINE_SIZE]; ///< Extra padding between head and tail - alignas(CACHE_LINE_SIZE) AlignedAtomicSize tail_; ///< Producer position - char tail_padding_[CACHE_LINE_SIZE]; ///< Extra padding after tail + AlignedAtomicSize tail_; ///< Producer position + char tail_padding_[CACHE_LINE_SIZE]; ///< Extra padding after tail /** * @brief State for blocking operations @@ -491,6 +490,9 @@ namespace detail { static constexpr int SPIN_ATTEMPTS = 1000; ///< Number of spin attempts before blocking static constexpr int YIELD_ATTEMPTS = 50; ///< Number of yield attempts during spinning + // Provide access to the index mask - store by value, not by reference + IndexMask index_mask_; + /** * @brief Calculates the number of items available for consumption * @@ -499,7 +501,7 @@ namespace detail { size_t available_items() const noexcept { const size_t head = head_.load(std::memory_order_relaxed); const size_t tail = tail_.load(std::memory_order_acquire); - return (tail - head) & INDEX_MASK; + return (tail - head) & static_cast(index_mask_); } /** @@ -510,13 +512,79 @@ namespace detail { size_t available_space() const noexcept { const size_t head = head_.load(std::memory_order_acquire); const size_t tail = tail_.load(std::memory_order_relaxed); - return ((head - tail - 1) & INDEX_MASK); + return ((head - tail - 1) & static_cast(index_mask_)); + } + + /** + * @brief Constructs the base queue + * + * @param index_mask Mask for index wrapping + * @param buffer Buffer accessor (using move semantics to handle non-copyable element types) + */ + SPSCQueueBase(IndexMask index_mask, BufferAccessor&& buffer) : buffer_(std::move(buffer)), index_mask_(index_mask) {} + }; + + /** + * @brief Static buffer accessor for fixed-size queues + * + * @tparam T Element type + * @tparam Capacity Capacity of the buffer + */ + template + class StaticBufferAccessor { + private: + alignas(CACHE_LINE_SIZE) std::array buffer_; + + public: + T& operator[](size_t index) noexcept { return buffer_[index]; } + + const T& operator[](size_t index) const noexcept { return buffer_[index]; } + }; + + /** + * @brief Dynamic buffer accessor for runtime-sized queues + * + * @tparam T Element type + */ + template + class DynamicBufferAccessor { + private: + std::unique_ptr> buffer_; + + public: + explicit DynamicBufferAccessor(size_t capacity) { + // Calculate total size needed + size_t size_bytes = capacity * sizeof(T); + + // Round up to the next multiple of CACHE_LINE_SIZE + size_t aligned_size = (size_bytes + CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE - 1); + + void* raw_memory = std::aligned_alloc(CACHE_LINE_SIZE, aligned_size); + if (!raw_memory) + throw std::bad_alloc(); + + buffer_ = std::unique_ptr>(static_cast(raw_memory), [](T* ptr) { + // Cleanup will be handled by the queue class + std::free(ptr); + }); } + + T& operator[](size_t index) noexcept { return buffer_.get()[index]; } + + const T& operator[](size_t index) const noexcept { return buffer_.get()[index]; } + }; + + /** + * @brief Constant integral wrapper for static index masks + */ + template + struct StaticIndexMask { + constexpr operator size_t() const noexcept { return Value; } }; } // namespace detail /** - * @brief Single-Producer Single-Consumer lock-free queue + * @brief Single-Producer Single-Consumer lock-free queue with static storage * * A high-performance queue designed for the single-producer, * single-consumer scenario. Features include: @@ -532,21 +600,23 @@ namespace detail { */ template requires std::movable -class SPSCQueue : private detail::SPSCQueueBase> { +class SPSCQueue : private detail::SPSCQueueBase, detail::StaticBufferAccessor, std::is_trivially_copyable_v> { private: + using ActualCapacityValue = std::integral_constant; + static constexpr size_t ActualCapacity = ActualCapacityValue::value; + using IndexMask = detail::StaticIndexMask; + using BufferAccessor = detail::StaticBufferAccessor; + using Base = detail::SPSCQueueBase>; + // Import base members into this scope - using Base = detail::SPSCQueueBase>; using Base::blocking_; using Base::buffer_; using Base::head_; - using Base::INDEX_MASK; + using Base::index_mask_; using Base::SPIN_ATTEMPTS; using Base::tail_; using Base::YIELD_ATTEMPTS; - /// Actual capacity rounded up to the next power of 2 - static constexpr size_t ActualCapacity = std::bit_ceil(RequestedCapacity); - public: /** * @brief Constructs an empty queue @@ -555,7 +625,7 @@ class SPSCQueue : private detail::SPSCQueueBase= 2, "Queue capacity must be at least 2"); } + constexpr SPSCQueue() noexcept : Base(IndexMask{}, BufferAccessor{}) { static_assert(ActualCapacity >= 2, "Queue capacity must be at least 2"); } /** * @brief Destructor @@ -566,7 +636,7 @@ class SPSCQueue : private detail::SPSCQueueBase(index_mask_); } } } @@ -593,7 +663,7 @@ class SPSCQueue : private detail::SPSCQueueBase) { const size_t current_tail = tail_.load(std::memory_order_relaxed); - const size_t next_tail = (current_tail + 1) & INDEX_MASK; + const size_t next_tail = (current_tail + 1) & index_mask_; // Relaxed load followed by acquire if needed (optimization) if (next_tail == head_.load(std::memory_order_relaxed)) { @@ -632,7 +702,7 @@ class SPSCQueue : private detail::SPSCQueueBase) { const size_t current_tail = tail_.load(std::memory_order_relaxed); - const size_t next_tail = (current_tail + 1) & INDEX_MASK; + const size_t next_tail = (current_tail + 1) & index_mask_; // Optimization: Relaxed load first, then acquire if needed if (next_tail == head_.load(std::memory_order_relaxed)) { @@ -761,7 +831,7 @@ class SPSCQueue : private detail::SPSCQueueBase ActualCapacity / 4) { @@ -1515,7 +1585,7 @@ class SPSCQueue : private detail::SPSCQueueBase bool try_emplace(Args&&... args) noexcept(std::is_nothrow_constructible_v) { const size_t current_tail = tail_.load(std::memory_order_relaxed); - const size_t next_tail = (current_tail + 1) & INDEX_MASK; + const size_t next_tail = (current_tail + 1) & index_mask_; // Optimization: Relaxed load first, then acquire if needed if (next_tail == head_.load(std::memory_order_relaxed)) { @@ -1598,7 +1668,7 @@ class SPSCQueue : private detail::SPSCQueueBase + requires std::movable +class DynamicSPSCQueue : private detail::SPSCQueueBase, std::is_trivially_copyable_v> { + private: + using Base = detail::SPSCQueueBase, std::is_trivially_copyable_v>; + + // Import base members into this scope + using Base::blocking_; + using Base::buffer_; + using Base::head_; + using Base::index_mask_; + using Base::SPIN_ATTEMPTS; + using Base::tail_; + using Base::YIELD_ATTEMPTS; + + size_t buffer_size_; // Actual size of the buffer (power of 2) + size_t requested_capacity_; // Original requested capacity + + public: + /** + * @brief Constructs an empty queue with dynamic size + * + * @param requested_capacity Desired minimum capacity (will be rounded up to next power of 2) + */ + explicit DynamicSPSCQueue(size_t requested_capacity) + : Base( + // Round up to the next power of 2 and create mask + std::bit_ceil(requested_capacity) - 1, + // Create buffer with the calculated size + detail::DynamicBufferAccessor(std::bit_ceil(requested_capacity))), + buffer_size_(std::bit_ceil(requested_capacity)), + requested_capacity_(requested_capacity) { + // Ensure minimum size + if (buffer_size_ < 2) { + throw std::invalid_argument("Queue capacity must be at least 2"); + } + } + + /** + * @brief Destructor + * + * Wakes up any waiting threads and properly destroys + * any remaining elements in the queue. + */ + ~DynamicSPSCQueue() { + // Wake up any waiting threads + blocking_.is_active_.store(false, std::memory_order_release); + blocking_.not_empty_.notify_all(); + blocking_.not_full_.notify_all(); + + // Clean up any remaining elements if not trivially destructible + if constexpr (!std::is_trivially_destructible_v) { + size_t head = head_.load(std::memory_order_relaxed); + size_t tail = tail_.load(std::memory_order_relaxed); + + while (head != tail) { + buffer_[head].~T(); + head = (head + 1) & index_mask_; + } + } + } + + // Prevent copying and moving + DynamicSPSCQueue(const DynamicSPSCQueue&) = delete; + DynamicSPSCQueue& operator=(const DynamicSPSCQueue&) = delete; + DynamicSPSCQueue(DynamicSPSCQueue&&) = delete; + DynamicSPSCQueue& operator=(DynamicSPSCQueue&&) = delete; + + /** + * @brief Attempts to enqueue an element (copy version) + * + * Non-blocking operation that attempts to add an item to the queue. + * + * @param item Element to enqueue + * @return true if successful, false if the queue was full + */ + bool try_enqueue(const T& item) noexcept(std::is_nothrow_copy_constructible_v) { + const size_t current_tail = tail_.load(std::memory_order_relaxed); + const size_t next_tail = (current_tail + 1) & index_mask_; + + // Relaxed load followed by acquire if needed (optimization) + if (next_tail == head_.load(std::memory_order_relaxed)) { + if (next_tail == head_.load(std::memory_order_acquire)) + return false; + } + + // Prefetch with locality hint for next operation + detail::prefetch_write(&buffer_[current_tail], 3); + + // For trivially copyable small types, direct assignment is faster than placement new + if constexpr (std::is_trivially_copyable_v && sizeof(T) <= 16) { + buffer_[current_tail] = item; + } else { + new (&buffer_[current_tail]) T(item); + } + + // Release memory ordering ensures visibility to consumer + tail_.store(next_tail, std::memory_order_release); + + // Only notify if queue was empty (reduces contention) + if (current_tail == head_.load(std::memory_order_relaxed)) + blocking_.not_empty_.notify_one(); + + return true; + } + + /** + * @brief Attempts to enqueue an element (move version) + * + * Non-blocking operation that attempts to add an item to the queue + * using move semantics for better performance. + * + * @param item Element to enqueue + * @return true if successful, false if the queue was full + */ + bool try_enqueue(T&& item) noexcept(std::is_nothrow_move_constructible_v) { + const size_t current_tail = tail_.load(std::memory_order_relaxed); + const size_t next_tail = (current_tail + 1) & index_mask_; + + // Optimization: Relaxed load first, then acquire if needed + if (next_tail == head_.load(std::memory_order_relaxed)) { + // Double-check with acquire semantics + if (next_tail == head_.load(std::memory_order_acquire)) + return false; // Queue is full + } + + // Optimization: Prefetch for write to reduce cache misses + detail::prefetch_write(&buffer_[current_tail]); + + new (&buffer_[current_tail]) T(std::move(item)); + tail_.store(next_tail, std::memory_order_release); + + // Notify consumer if queue was empty + if (current_tail == head_.load(std::memory_order_relaxed)) + + blocking_.not_empty_.notify_one(); + + return true; + } + + /** + * @brief Enqueues an element, blocking if necessary (copy version) + * + * Blocks the calling thread until space is available in the queue. + * + * @param item Element to enqueue + */ + void enqueue(const T& item) { + // Try fast path first + if (try_enqueue(item)) + return; + + // Slow path with blocking + std::unique_lock lock(blocking_.mutex_); + blocking_.not_full_.wait(lock, [this, &item] { return try_enqueue(item) || !blocking_.is_active_.load(std::memory_order_acquire); }); + } + + /** + * @brief Enqueues an element, blocking if necessary (move version) + * + * Blocks the calling thread until space is available in the queue. + * Uses move semantics for better performance. + * + * @param item Element to enqueue + */ + void enqueue(T&& item) { + // Try fast path first + if (try_enqueue(std::move(item))) + return; + + // Slow path with blocking + std::unique_lock lock(blocking_.mutex_); + blocking_.not_full_.wait(lock, [this, &item] { return try_enqueue(std::move(item)) || !blocking_.is_active_.load(std::memory_order_acquire); }); + } + + /** + * @brief Enqueues an element with cancellation support (copy version) + * + * Blocks until space is available or the operation is cancelled. + * + * @tparam StopToken Type meeting the StopToken concept + * @param item Element to enqueue + * @param stop_token Token that can be used to cancel the operation + * @return true if the element was enqueued, false if cancelled + */ + template + bool enqueue(const T& item, StopToken&& stop_token) { + // Try fast path first + if (try_enqueue(item)) + return true; + + // Slow path with blocking and cancellation support + std::unique_lock lock(blocking_.mutex_); + + // Wait until space available, queue inactive, or stop requested + std::condition_variable_any{}.wait(lock, stop_token, [this, &item] { return try_enqueue(item) || !blocking_.is_active_.load(std::memory_order_acquire); }); + + // Check if enqueue succeeded or stopped + return !stop_token.stop_requested() && blocking_.is_active_.load(std::memory_order_acquire); + } + + /** + * @brief Enqueues an element with cancellation support (move version) + * + * Blocks until space is available or the operation is cancelled. + * Uses move semantics for better performance. + * + * @tparam StopToken Type meeting the StopToken concept + * @param item Element to enqueue + * @param stop_token Token that can be used to cancel the operation + * @return true if the element was enqueued, false if cancelled + */ + template + bool enqueue(T&& item, StopToken&& stop_token) { + // Try fast path first + if (try_enqueue(std::move(item))) + return true; + + // Slow path with blocking and cancellation support + std::unique_lock lock(blocking_.mutex_); + blocking_.not_full_.wait(lock, [this, &item] { return try_enqueue(std::move(item)) || !blocking_.is_active_.load(std::memory_order_acquire); }); + + // Check if enqueue succeeded or stopped + return !stop_token.stop_requested() && blocking_.is_active_.load(std::memory_order_acquire); + } + + /** + * @brief Attempts to enqueue multiple elements in a single operation + * + * Non-blocking operation that attempts to add multiple items to the queue. + * + * @tparam InputIt Iterator type pointing to elements + * @param first Iterator to the first element to enqueue + * @param count Number of elements to enqueue + * @return Number of elements successfully enqueued + */ + template + size_t try_enqueue_bulk(InputIt first, size_t count) noexcept { + if (count == 0) + return 0; + + // Fast path with relaxed ordering first + const size_t current_tail = tail_.load(std::memory_order_relaxed); + + // Calculate available space (optimized) + const size_t head = head_.load(std::memory_order_acquire); + const size_t capacity = buffer_size_; // Use buffer_size_ instead of ActualCapacity + const size_t available_space = (head + capacity - current_tail - 1) & index_mask_; + + if (available_space == 0) + return 0; + + // Calculate actual amount to copy + const size_t to_copy = std::min(available_space, count); + const bool was_empty = (current_tail == head); + + // Optimize based on whether the enqueue wraps around the buffer + const size_t first_chunk = std::min(to_copy, capacity - current_tail); + const size_t second_chunk = to_copy - first_chunk; + + // Use the fastest copy method based on type + if constexpr (std::is_trivially_copyable_v) { + if constexpr (std::is_pointer_v && std::is_same_v, T>) { + // Pointer to same type - use SIMD-optimized memory transfer + // Use SIMD for first chunk + detail::simd_memcpy(&buffer_[current_tail], first, first_chunk); + + // Handle wrap-around if needed with SIMD + if (second_chunk > 0) { + detail::simd_memcpy(&buffer_[0], first + first_chunk, second_chunk); + } + } else { + // Process first chunk + auto it = first; + for (size_t i = 0; i < first_chunk; i++) { + buffer_[current_tail + i] = *it++; + } + + // Process second chunk if needed + for (size_t i = 0; i < second_chunk; i++) { + buffer_[i] = *it++; + } + } + } else { + // Non-trivially copyable type - use placement new with iterator + auto it = first; + + // Process first chunk + for (size_t i = 0; i < first_chunk; i++) { + new (&buffer_[current_tail + i]) T(*it++); + } + + // Process second chunk if needed + for (size_t i = 0; i < second_chunk; i++) { + new (&buffer_[i]) T(*it++); + } + } + + // Update tail position with a single atomic operation + tail_.store((current_tail + to_copy) & index_mask_, std::memory_order_release); + + // Only notify if queue was empty before + if (was_empty) { + blocking_.not_empty_.notify_one(); + } + + return to_copy; + } + + /** + * @brief Enqueues multiple elements, blocking if necessary + * + * Blocks until all elements are enqueued or the queue is shut down. + * + * @tparam InputIt Iterator type pointing to elements + * @param first Iterator to the first element to enqueue + * @param count Number of elements to enqueue + * @return Number of elements successfully enqueued + */ + template + size_t enqueue_bulk(InputIt first, size_t count) { + if (count == 0) + return 0; + + // Try non-blocking fast path first + size_t items_enqueued = try_enqueue_bulk(first, count); + if (items_enqueued == count) { + return items_enqueued; + } + + // Advance iterator by items already enqueued + std::advance(first, items_enqueued); + size_t remaining = count - items_enqueued; + + // Exponential backoff spinning before falling back to mutex + detail::exponential_backoff backoff; + for (int i = 0; i < SPIN_ATTEMPTS; i++) { // Try spinning a few times first + size_t batch_enqueued = try_enqueue_bulk(first, remaining); + if (batch_enqueued > 0) { + std::advance(first, batch_enqueued); + items_enqueued += batch_enqueued; + remaining -= batch_enqueued; + + if (items_enqueued == count) { + return items_enqueued; + } + } + backoff(); + } + + // Fall back to mutex-based waiting + std::unique_lock lock(blocking_.mutex_); + + while (items_enqueued < count && blocking_.is_active_.load(std::memory_order_acquire)) { + // Wait until space is available + blocking_.not_full_.wait(lock, [this] { return !is_full() || !blocking_.is_active_.load(std::memory_order_acquire); }); + + if (!blocking_.is_active_.load(std::memory_order_acquire)) { + break; // Queue was shut down + } + + // Critical section - minimize time with lock held + lock.unlock(); + + // Try to enqueue multiple items in one go + size_t batch_enqueued = try_enqueue_bulk(first, remaining); + + lock.lock(); + + if (batch_enqueued > 0) { + std::advance(first, batch_enqueued); + items_enqueued += batch_enqueued; + remaining -= batch_enqueued; + + if (items_enqueued == count) { + break; + } + } + } + + return items_enqueued; + } + + /** + * @brief Enqueues multiple elements with a timeout + * + * Attempts to enqueue elements until the specified timeout expires. + * + * @tparam InputIt Iterator type pointing to elements + * @tparam Rep Duration representation type + * @tparam Period Duration period type + * @param first Iterator to the first element to enqueue + * @param count Number of elements to enqueue + * @param timeout Maximum time to wait + * @return Number of elements successfully enqueued + */ + template + size_t enqueue_bulk_for(InputIt first, size_t count, const std::chrono::duration& timeout) { + if (count == 0) + return 0; + + // Track start time for timeout + auto start_time = std::chrono::steady_clock::now(); + auto end_time = start_time + timeout; + + // Try non-blocking fast path first + size_t items_enqueued = try_enqueue_bulk(first, count); + if (items_enqueued == count) { + return items_enqueued; + } + + // Advance iterator by items already enqueued + std::advance(first, items_enqueued); + size_t remaining = count - items_enqueued; + + // Adaptive spinning phase - use up to 20% of timeout for spinning + // Fix: convert both durations to microseconds for comparison + auto timeout_us = std::chrono::duration_cast(timeout); + auto spin_time = std::min(timeout_us / 5, std::chrono::microseconds(200)); + auto spin_end_time = start_time + spin_time; + + // Spin with exponential backoff + detail::exponential_backoff backoff; + while (items_enqueued < count && std::chrono::steady_clock::now() < spin_end_time) { + size_t batch_enqueued = try_enqueue_bulk(first, remaining); + if (batch_enqueued > 0) { + std::advance(first, batch_enqueued); + items_enqueued += batch_enqueued; + remaining -= batch_enqueued; + + if (items_enqueued == count) { + return items_enqueued; + } + + // Reset backoff on progress + backoff.reset(); + } + backoff(); + } + + // Check if timeout expired during spinning + if (std::chrono::steady_clock::now() >= end_time) { + return items_enqueued; + } + + // Fall back to condition variable waiting + std::unique_lock lock(blocking_.mutex_); + + do { + // Wait until space is available or timeout + if (!blocking_.not_full_.wait_until(lock, end_time, [this] { return !is_full() || !blocking_.is_active_.load(std::memory_order_acquire); })) { + break; // Timeout occurred + } + + if (!blocking_.is_active_.load(std::memory_order_acquire)) { + break; // Queue was shut down + } + + // Release lock during actual enqueue operation + lock.unlock(); + size_t batch_enqueued = try_enqueue_bulk(first, remaining); + lock.lock(); + + if (batch_enqueued > 0) { + std::advance(first, batch_enqueued); + items_enqueued += batch_enqueued; + remaining -= batch_enqueued; + + if (items_enqueued == count) { + break; // All items enqueued + } + } + + } while (items_enqueued < count && std::chrono::steady_clock::now() < end_time && blocking_.is_active_.load(std::memory_order_acquire)); + + return items_enqueued; + } + + + //////////DEQUEUE OPERATIONS////////// + + /** + * @brief Attempts to dequeue an element + * + * Non-blocking operation that attempts to remove an item from the queue. + * + * @param item Reference to store the dequeued element + * @return true if successful, false if the queue was empty + */ + bool try_dequeue(T& item) noexcept(std::is_nothrow_move_assignable_v) { + const size_t current_head = head_.load(std::memory_order_relaxed); + + // Early relaxed check before acquiring + if (current_head == tail_.load(std::memory_order_relaxed)) { + if (current_head == tail_.load(std::memory_order_acquire)) + return false; + } + + // Prefetch next items in queue for better throughput + const size_t next_head = (current_head + 1) & index_mask_; + if (next_head != tail_.load(std::memory_order_relaxed)) { + detail::prefetch_read(&buffer_[next_head], 3); + + const size_t next_next_head = (next_head + 1) & index_mask_; + if (next_next_head != tail_.load(std::memory_order_relaxed)) { + detail::prefetch_read(&buffer_[next_next_head], 2); // Lower locality hint + } + } + + // Move the item out with optimization for trivial types + if constexpr (std::is_trivially_copyable_v && sizeof(T) <= 16) { + item = buffer_[current_head]; + } else { + item = std::move(buffer_[current_head]); + + // Only call destructor if not an unsafe type after move + if constexpr (!detail::unsafe_to_destroy_after_move_v) { + buffer_[current_head].~T(); + } + } + + // Release memory ordering ensures visibility to producer + head_.store(next_head, std::memory_order_release); + + // Selective notification strategy + const size_t used_capacity = ((tail_.load(std::memory_order_relaxed) - next_head) & index_mask_); + if (used_capacity < buffer_size_ / 4) { + blocking_.not_full_.notify_one(); + } + + return true; + } + + /** + * @brief Dequeues an element, blocking if necessary + * + * Blocks the calling thread until an item is available in the queue. + * + * @param item Reference to store the dequeued element + * @return true if an element was dequeued, false if the queue was shut down + */ + bool dequeue(T& item) { + // Try optimistic fast path first + if (try_dequeue(item)) + return true; + + // Use exponential backoff with CPU hints + detail::exponential_backoff backoff; + for (int i = 0; i < SPIN_ATTEMPTS; ++i) { + if (try_dequeue(item)) + return true; + backoff(); + } + + // Fall back to blocking wait + std::unique_lock lock(blocking_.mutex_); + blocking_.not_empty_.wait(lock, [this, &item] { return try_dequeue(item) || !blocking_.is_active_.load(std::memory_order_acquire); }); + return blocking_.is_active_.load(std::memory_order_acquire); + } + + /** + * @brief Add a timed wait method with adaptive waiting + * + * @tparam Rep Duration representation type + * @tparam Period Duration period type + * @param timeout Maximum time to wait + * @return true if an element was dequeued, false if timeout expired during spinning + */ + template + bool dequeue_for(T& item, const std::chrono::duration& timeout) { + // Try fast path first + if (try_dequeue(item)) + return true; + + // Calculate how much time to allocate for spinning vs blocking + auto start_time = std::chrono::steady_clock::now(); + auto spin_duration = std::min(timeout / 2, std::chrono::milliseconds(1)); + auto spin_end_time = start_time + spin_duration; + + // Spin with increasing backoff until spin time elapsed + detail::exponential_backoff backoff; + while (std::chrono::steady_clock::now() < spin_end_time) { + if (try_dequeue(item)) + return true; + + backoff(); + } + + // Calculate remaining time for blocking wait + auto current_time = std::chrono::steady_clock::now(); + auto remaining = timeout - (current_time - start_time); + if (remaining <= std::chrono::duration::zero()) + return false; // Timeout already expired during spinning + + // Slow path with timeout + std::unique_lock lock(blocking_.mutex_); + return blocking_.not_empty_.wait_for(lock, remaining, [this, &item] { return try_dequeue(item) || !blocking_.is_active_.load(std::memory_order_acquire); }) && blocking_.is_active_.load(std::memory_order_acquire); + } + + /** + * @brief Dequeues an element with cancellation support + * + * Blocks until an item is available or the operation is cancelled. + * + * @tparam StopToken Type meeting the StopToken concept + * @param item Reference to store the dequeued element + * @param stop_token Token that can be used to cancel the operation + * @return true if an element was dequeued, false if cancelled or queue shut down + */ + template + bool dequeue(T& item, StopToken&& stop_token) { + // Try fast path first + if (try_dequeue(item)) + return true; + + // Slow path with blocking and cancellation support + std::unique_lock lock(blocking_.mutex_); + + // Wait until item available, queue inactive, or stop requested + std::condition_variable_any{}.wait(lock, stop_token, [this, &item] { return try_dequeue(item) || !blocking_.is_active_.load(std::memory_order_acquire); }); + + // Check if we got an item or stopped + return !stop_token.stop_requested() && blocking_.is_active_.load(std::memory_order_acquire); + } + + /** + * @brief Dequeues an element with timeout and cancellation support + * + * Attempts to dequeue an element, waiting up to the specified timeout + * or until the operation is cancelled. + * + * @tparam Rep Duration representation type + * @tparam Period Duration period type + * @tparam StopToken Type meeting the StopToken concept + * @param item Reference to store the dequeued element + * @param timeout Maximum time to wait + * @param stop_token Token that can be used to cancel the operation + * @return true if an element was dequeued, false otherwise + */ + template + bool dequeue_for(T& item, const std::chrono::duration& timeout, StopToken&& stop_token) { + // Try fast path first + if (try_dequeue(item)) + return true; + + // Slow path with timeout and cancellation support + std::unique_lock lock(blocking_.mutex_); + + // Wait until item available, timeout, queue inactive, or stop requested + std::condition_variable_any{}.wait_for(lock, timeout, stop_token, [this, &item] { return try_dequeue(item) || !blocking_.is_active_.load(std::memory_order_acquire); }); + + // Return success only if we got an item (not stopped or timed out) + return !stop_token.stop_requested() && blocking_.is_active_.load(std::memory_order_acquire) && !is_empty(); + } + + /** + * @brief Attempts to dequeue multiple elements in a single operation + * + * Non-blocking operation that attempts to remove multiple items from the queue. + * + * @tparam OutputIt Iterator type for destination + * @param dest Iterator to the destination to store dequeued elements + * @param max_items Maximum number of elements to dequeue + * @return Number of elements successfully dequeued + */ + template + size_t try_dequeue_bulk(OutputIt dest, size_t max_items) noexcept { + // Quick empty check with relaxed ordering (fastest path) + const size_t current_head = head_.load(std::memory_order_relaxed); + + // Use relaxed first, then acquire only if needed + size_t tail = tail_.load(std::memory_order_relaxed); + if (current_head == tail) { + tail = tail_.load(std::memory_order_acquire); + if (current_head == tail) { + return 0; + } + } + + // Calculate items to dequeue with minimal calculations + const size_t available = (tail - current_head) & index_mask_; + const size_t to_copy = std::min(available, max_items); + + // Optimize based on whether the dequeue wraps around the buffer + const size_t first_chunk = std::min(to_copy, buffer_size_ - current_head); // Use buffer_size_ instead of ActualCapacity + const size_t second_chunk = to_copy - first_chunk; + + // Prefetch the next cache lines ahead of time to reduce false sharing impact + if (first_chunk > 1) { + // Prefetch several cache lines ahead to minimize false sharing effects + for (size_t i = 0; i < std::min(first_chunk, size_t(4)); i++) { + detail::prefetch_read(&buffer_[current_head + i], 3); + } + } + + // Use the fastest copy method based on type and iterator + if constexpr (std::is_trivially_copyable_v) { + if constexpr (std::is_pointer_v && std::is_same_v, T>) { + // Pointer to same type - use SIMD-optimized memory transfer + // Use SIMD for first chunk + detail::simd_memcpy(dest, &buffer_[current_head], first_chunk); + + // Handle wrap-around if needed with SIMD + if (second_chunk > 0) { + detail::simd_memcpy(dest + first_chunk, &buffer_[0], second_chunk); + } + } else { + // Other iterator type - use iterator operations + std::copy_n(&buffer_[current_head], first_chunk, dest); + + if (second_chunk > 0) { + auto advanced_dest = dest; + std::advance(advanced_dest, first_chunk); + std::copy_n(&buffer_[0], second_chunk, advanced_dest); + } + } + } else { + // Non-trivial type - use move semantics + for (size_t i = 0; i < first_chunk; i++) { + *dest = std::move(buffer_[current_head + i]); + ++dest; + + if constexpr (!detail::unsafe_to_destroy_after_move_v) { + buffer_[current_head + i].~T(); + } + } + + for (size_t i = 0; i < second_chunk; i++) { + *dest = std::move(buffer_[i]); + ++dest; + + if constexpr (!detail::unsafe_to_destroy_after_move_v) { + buffer_[i].~T(); + } + } + } + + // Update head position with a single atomic operation + head_.store((current_head + to_copy) & index_mask_, std::memory_order_release); + + // Only notify if we freed substantial space + if (available == to_copy || to_copy > buffer_size_ / 4) { + blocking_.not_full_.notify_one(); + } + + return to_copy; + } + + /** + * @brief Dequeues multiple elements + * + * Attempts to dequeue multiple elements. + * + * @tparam OutputIt Iterator type for destination + * @param dest Iterator to the destination to store dequeued elements + * @param max_items Maximum number of elements to dequeue + * @param stoken Stop token for cancellation + * @return Number of elements successfully dequeued + */ + template + size_t dequeue_bulk(OutputIt dest, size_t max_items, std::stop_token stoken = {}) { + if (max_items == 0) + return 0; + + // Try non-blocking fast path first + size_t items_dequeued = try_dequeue_bulk(dest, max_items); + if (items_dequeued == max_items) { + return items_dequeued; + } + + // If we got some items but not all, advance the destination iterator + if (items_dequeued > 0) { + std::advance(dest, items_dequeued); + max_items -= items_dequeued; + } + + // Spin with exponential backoff for a short time + auto start_time = std::chrono::steady_clock::now(); + auto spin_time = std::chrono::microseconds(200); + auto spin_end_time = start_time + spin_time; + + detail::exponential_backoff backoff; + while (items_dequeued < max_items && std::chrono::steady_clock::now() < spin_end_time) { + size_t batch_dequeued = try_dequeue_bulk(dest, max_items); + if (batch_dequeued > 0) { + std::advance(dest, batch_dequeued); + items_dequeued += batch_dequeued; + max_items -= batch_dequeued; + + if (max_items == 0) { + return items_dequeued; + } + } + backoff(); + } + + if (stoken.stop_requested()) { + return items_dequeued; + } + + // Fall back to condition variable waiting + std::unique_lock lock(blocking_.mutex_); + + do { + // Wait until items are available or timeout + while (!blocking_.not_empty_.wait_for(lock, 2000ms, [this] { return !is_empty() || !blocking_.is_active_.load(std::memory_order_acquire); })) { + if (stoken.stop_requested()) { + return false; + } + } + + if (!blocking_.is_active_.load(std::memory_order_acquire)) { + break; // Queue was shut down + } + + // Release lock during actual dequeue operation + lock.unlock(); + size_t batch_dequeued = try_dequeue_bulk(dest, max_items); + lock.lock(); + + if (batch_dequeued > 0) { + std::advance(dest, batch_dequeued); + items_dequeued += batch_dequeued; + max_items -= batch_dequeued; + + if (max_items == 0) { + break; // All items dequeued + } + } + + } while (max_items > 0 && !stoken.stop_requested() && blocking_.is_active_.load(std::memory_order_acquire)); + + return items_dequeued; + } + + /** + * @brief Dequeues multiple elements with timeout + * + * Attempts to dequeue multiple elements, waiting up to the specified timeout. + * + * @tparam OutputIt Iterator type for destination + * @tparam Rep Duration representation type + * @tparam Period Duration period type + * @param dest Iterator to the destination to store dequeued elements + * @param max_items Maximum number of elements to dequeue + * @param timeout Maximum time to wait + * @return Number of elements successfully dequeued + */ + template + size_t dequeue_bulk_for(OutputIt dest, size_t max_items, const std::chrono::duration& timeout) { + if (max_items == 0) + return 0; + + // Track start time for timeout + auto start_time = std::chrono::steady_clock::now(); + auto end_time = start_time + timeout; + + // Try non-blocking fast path first + size_t items_dequeued = try_dequeue_bulk(dest, max_items); + if (items_dequeued == max_items) { + return items_dequeued; + } + + // If we got some items but not all, advance the destination iterator + if (items_dequeued > 0) { + std::advance(dest, items_dequeued); + max_items -= items_dequeued; + } + + // Spin with exponential backoff for a short time + auto timeout_us = std::chrono::duration_cast(timeout); + auto spin_time = std::min(timeout_us / 5, std::chrono::microseconds(200)); + auto spin_end_time = start_time + spin_time; + + detail::exponential_backoff backoff; + while (items_dequeued < max_items && std::chrono::steady_clock::now() < spin_end_time) { + size_t batch_dequeued = try_dequeue_bulk(dest, max_items); + if (batch_dequeued > 0) { + std::advance(dest, batch_dequeued); + items_dequeued += batch_dequeued; + max_items -= batch_dequeued; + + if (max_items == 0) { + return items_dequeued; + } + } + backoff(); + } + + // Check if timeout expired during spinning + if (std::chrono::steady_clock::now() >= end_time) { + return items_dequeued; + } + + // Fall back to condition variable waiting + std::unique_lock lock(blocking_.mutex_); + + do { + // Wait until items are available or timeout + if (!blocking_.not_empty_.wait_until(lock, end_time, [this] { return !is_empty() || !blocking_.is_active_.load(std::memory_order_acquire); })) { + break; // Timeout occurred + } + + if (!blocking_.is_active_.load(std::memory_order_acquire)) { + break; // Queue was shut down + } + + // Release lock during actual dequeue operation + lock.unlock(); + size_t batch_dequeued = try_dequeue_bulk(dest, max_items); + lock.lock(); + + if (batch_dequeued > 0) { + std::advance(dest, batch_dequeued); + items_dequeued += batch_dequeued; + max_items -= batch_dequeued; + + if (max_items == 0) { + break; // All items dequeued + } + } + + } while (max_items > 0 && std::chrono::steady_clock::now() < end_time && blocking_.is_active_.load(std::memory_order_acquire)); + + return items_dequeued; + } + + /** + * @brief Dequeues any available elements with timeout + * + * Attempts to dequeue elements, returning as soon as any are available + * or the timeout expires. + * + * @tparam OutputIt Iterator type for destination + * @tparam Rep Duration representation type + * @tparam Period Duration period type + * @param dest Iterator to the destination to store dequeued elements + * @param max_items Maximum number of elements to dequeue + * @param timeout Maximum time to wait + * @return Number of elements successfully dequeued + */ + template + size_t dequeue_bulk_for_any(OutputIt dest, size_t max_items, const std::chrono::duration& timeout) { + if (max_items == 0) + return 0; + + // Try non-blocking fast path first + size_t items_dequeued = try_dequeue_bulk(dest, max_items); + if (items_dequeued > 0) { + return items_dequeued; // Return immediately if any items were dequeued + } + + // Track time for timeout + auto start_time = std::chrono::steady_clock::now(); + auto end_time = start_time + timeout; + + // Spin with exponential backoff for a short time + auto timeout_us = std::chrono::duration_cast(timeout); + auto spin_time = std::min(timeout_us / 5, std::chrono::microseconds(100)); + auto spin_end_time = start_time + spin_time; + + detail::exponential_backoff backoff; + while (std::chrono::steady_clock::now() < spin_end_time) { + size_t batch_dequeued = try_dequeue_bulk(dest, max_items); + if (batch_dequeued > 0) { + return batch_dequeued; // Return immediately with any items + } + backoff(); + } + + // Fall back to condition variable waiting + std::unique_lock lock(blocking_.mutex_); + + // Wait until any items are available, timeout, or queue inactive + bool has_items = blocking_.not_empty_.wait_until(lock, end_time, [this] { return !is_empty() || !blocking_.is_active_.load(std::memory_order_acquire); }); + + // If no items or queue shut down, return 0 + if (!has_items || !blocking_.is_active_.load(std::memory_order_acquire)) { + return 0; + } + + // Try to dequeue with lock released + lock.unlock(); + return try_dequeue_bulk(dest, max_items); + } + + //////////EMPLACE OPERATIONS////////// + + /** + * @brief Attempts to construct an element in-place in the queue + * + * Non-blocking operation that attempts to construct an element + * directly in the queue's buffer. + * + * @tparam Args Types of arguments to forward to the constructor + * @param args Arguments to forward to the constructor + * @return true if successful, false if the queue was full + */ + template + bool try_emplace(Args&&... args) noexcept(std::is_nothrow_constructible_v) { + const size_t current_tail = tail_.load(std::memory_order_relaxed); + const size_t next_tail = (current_tail + 1) & index_mask_; + + // Optimization: Relaxed load first, then acquire if needed + if (next_tail == head_.load(std::memory_order_relaxed)) { + // Double-check with acquire semantics + if (next_tail == head_.load(std::memory_order_acquire)) + return false; // Queue is full + } + + // Optimization: Prefetch for write to reduce cache misses + detail::prefetch_write(&buffer_[current_tail]); + + new (&buffer_[current_tail]) T(std::forward(args)...); + tail_.store(next_tail, std::memory_order_release); + + // Notify if queue was empty + if (current_tail == head_.load(std::memory_order_relaxed)) + blocking_.not_empty_.notify_one(); + + return true; + } + + /** + * @brief Constructs an element in-place in the queue, blocking if necessary + * + * Blocks the calling thread until space is available in the queue. + * + * @tparam Args Types of arguments to forward to the constructor + * @param args Arguments to forward to the constructor + */ + template + void emplace(Args&&... args) { + // Try fast path first + if (try_emplace(std::forward(args)...)) + return; + + // Slow path with blocking + std::unique_lock lock(blocking_.mutex_); + blocking_.not_full_.wait(lock, [this, &args...] { return try_emplace(std::forward(args)...) || !blocking_.is_active_.load(std::memory_order_acquire); }); + } + + /** + * @brief Constructs an element in-place with cancellation support + * + * Blocks until space is available or the operation is cancelled. + * + * @tparam StopToken Type meeting the StopToken concept + * @tparam Args Types of arguments to forward to the constructor + * @param stop_token Token that can be used to cancel the operation + * @param args Arguments to forward to the constructor + * @return true if the element was emplaced, false if cancelled + */ + template + bool emplace(StopToken&& stop_token, Args&&... args) { + // Try fast path first + if (try_emplace(std::forward(args)...)) + return true; + + // Slow path with blocking and cancellation support + std::unique_lock lock(blocking_.mutex_); + + // Wait until space available, queue inactive, or stop requested + std::condition_variable_any{}.wait(lock, stop_token, [this, &args...] { return try_emplace(std::forward(args)...) || !blocking_.is_active_.load(std::memory_order_acquire); }); + + // Check if emplace succeeded or stopped + return !stop_token.stop_requested() && blocking_.is_active_.load(std::memory_order_acquire); + } + + /////////////UTILITY METHODS////////// + + /** + * @brief Checks if the queue is empty + * + * @return true if the queue is empty, false otherwise + */ + bool is_empty() const noexcept { return head_.load(std::memory_order_relaxed) == tail_.load(std::memory_order_relaxed); } + + /** + * @brief Checks if the queue is full + * + * @return true if the queue is full, false otherwise + */ + bool is_full() const noexcept { + const size_t next_tail = (tail_.load(std::memory_order_relaxed) + 1) & index_mask_; + return next_tail == head_.load(std::memory_order_relaxed); + } + + /** + * @brief Gets the current number of elements in the queue + * + * @return Current size of the queue + */ + size_t size() const noexcept { return (tail_.load(std::memory_order_relaxed) - head_.load(std::memory_order_relaxed)) & index_mask_; } + + /** + * @brief Gets the capacity of the queue + * + * Returns the actual usable capacity, which is one less than + * the internal buffer size due to the need to distinguish + * between empty and full states. + * + * @return Maximum number of elements the queue can hold + */ + constexpr size_t capacity() const noexcept { + return buffer_size_ - 1; // One slot is always kept empty + } + + /** + * @brief Gets the requested capacity from construction + * + * @return The minimum capacity requested when the queue was created + */ + constexpr size_t requested_capacity() const noexcept { return requested_capacity_; } + + /** + * @brief Gets the actual capacity after power-of-2 rounding + * + * @return The actual capacity of the queue + */ + constexpr size_t actual_capacity() const noexcept { return buffer_size_ - 1; } + + + /** + * @brief Shuts down the queue + * + * Wakes up all waiting threads and marks the queue as inactive. + * No new blocking operations will succeed after shutdown. + */ + void shutdown() noexcept { + blocking_.is_active_.store(false, std::memory_order_release); + blocking_.not_empty_.notify_all(); + blocking_.not_full_.notify_all(); + } + + // Additional methods would be implemented similarly to the original SPSCQueue +}; + #endif // SPSC_QUEUE_HPP \ No newline at end of file diff --git a/unittests/core/DeviceBufferTest.cpp b/unittests/core/DeviceBufferTest.cpp index 78d0510..534966d 100644 --- a/unittests/core/DeviceBufferTest.cpp +++ b/unittests/core/DeviceBufferTest.cpp @@ -49,7 +49,7 @@ TEST_F(DBTest, DBOutputTest) { FinnUtils::BufferFiller(0, 255).fillRandom(data.begin(), data.end()); buffer.testSetMap(data); buffer.read(); - auto vec = buffer.getData(); + auto vec = buffer.getData(buffer.getTotalDataSize()); EXPECT_EQ(data, vec); } diff --git a/unittests/utils/SPSCQueueTest.cpp b/unittests/utils/SPSCQueueTest.cpp index 2844d44..45ae6cf 100644 --- a/unittests/utils/SPSCQueueTest.cpp +++ b/unittests/utils/SPSCQueueTest.cpp @@ -511,7 +511,9 @@ TEST(SPSCQueueTest, BlockingBulkEnqueue) { while (!producer_done || !queue.is_empty()) { size_t dequeued = queue.try_dequeue_bulk(results.begin(), results.size()); if (dequeued > 0) { - all_consumed.insert(all_consumed.end(), results.begin(), results.begin() + dequeued); + for (size_t i = 0; i < dequeued; i++) { + all_consumed.push_back(results[i]); + } } else { std::this_thread::yield(); // Give producer time to produce } @@ -565,6 +567,296 @@ TEST(SPSCQueueTest, TimedBulkEnqueue) { EXPECT_EQ(results[2], 4); // The first item from the timed bulk enqueue } +// DYNAMIC QUEUE TESTS +// These tests verify the same functionality for the dynamic queue variant + +// Basic tests for dynamic queue with non-trivial type +TEST(DynamicSPSCQueueTest, BasicOperations) { + DynamicSPSCQueue queue(16); + + // Test empty state + EXPECT_TRUE(queue.is_empty()); + EXPECT_FALSE(queue.is_full()); + + // Test enqueue + EXPECT_TRUE(queue.try_enqueue("test1")); + EXPECT_FALSE(queue.is_empty()); + + // Test dequeue + std::string item; + EXPECT_TRUE(queue.try_dequeue(item)); + EXPECT_EQ(item, "test1"); + EXPECT_TRUE(queue.is_empty()); + + // Test multiple items + EXPECT_TRUE(queue.try_enqueue("test2")); + EXPECT_TRUE(queue.try_enqueue("test3")); + + EXPECT_TRUE(queue.try_dequeue(item)); + EXPECT_EQ(item, "test2"); + EXPECT_TRUE(queue.try_dequeue(item)); + EXPECT_EQ(item, "test3"); + EXPECT_TRUE(queue.is_empty()); +} + +// Test for dynamic queue with trivially copyable type +TEST(DynamicSPSCQueueTest, TrivialTypeOperations) { + DynamicSPSCQueue queue(16); + + EXPECT_TRUE(queue.try_enqueue(42)); + + int item = 0; + EXPECT_TRUE(queue.try_dequeue(item)); + EXPECT_EQ(item, 42); + EXPECT_TRUE(queue.is_empty()); +} + +// Test filling the dynamic queue to capacity +TEST(DynamicSPSCQueueTest, FullQueue) { + DynamicSPSCQueue queue(4); // Actual capacity: 3 + + EXPECT_TRUE(queue.try_enqueue(1)); + EXPECT_TRUE(queue.try_enqueue(2)); + EXPECT_TRUE(queue.try_enqueue(3)); + EXPECT_TRUE(queue.is_full()); // Should be full now + EXPECT_FALSE(queue.try_enqueue(4)); // Should fail + + int item; + EXPECT_TRUE(queue.try_dequeue(item)); + EXPECT_EQ(item, 1); + EXPECT_FALSE(queue.is_full()); // No longer full + + // Can enqueue again + EXPECT_TRUE(queue.try_enqueue(4)); +} + +// Test wrap-around behavior with dynamic queue +TEST(DynamicSPSCQueueTest, WrapAround) { + DynamicSPSCQueue queue(4); // Actual capacity: 3 + std::vector results; + + // Fill and drain multiple times to force wrap-around + for (int cycle = 0; cycle < 3; cycle++) { + EXPECT_TRUE(queue.try_enqueue(cycle * 3 + 1)); + EXPECT_TRUE(queue.try_enqueue(cycle * 3 + 2)); + EXPECT_TRUE(queue.try_enqueue(cycle * 3 + 3)); + + int item; + EXPECT_TRUE(queue.try_dequeue(item)); + results.push_back(item); + EXPECT_TRUE(queue.try_dequeue(item)); + results.push_back(item); + EXPECT_TRUE(queue.try_dequeue(item)); + results.push_back(item); + } + + // Verify correct order + std::vector expected = {1, 2, 3, 4, 5, 6, 7, 8, 9}; + EXPECT_EQ(results, expected); +} + +// Test move semantics with dynamic queue +TEST(DynamicSPSCQueueTest, MoveSemantics) { + DynamicSPSCQueue> queue(4); + + auto ptr1 = std::make_unique(42); + auto ptr2 = std::make_unique(43); + + EXPECT_TRUE(queue.try_enqueue(std::move(ptr1))); + EXPECT_TRUE(queue.try_enqueue(std::move(ptr2))); + + // Original pointers should be null after move + EXPECT_EQ(ptr1, nullptr); + EXPECT_EQ(ptr2, nullptr); + + std::unique_ptr result; + EXPECT_TRUE(queue.try_dequeue(result)); + EXPECT_EQ(*result, 42); + + EXPECT_TRUE(queue.try_dequeue(result)); + EXPECT_EQ(*result, 43); +} + +// Test emplace functionality with dynamic queue +TEST(DynamicSPSCQueueTest, Emplace) { + DynamicSPSCQueue> queue(4); + + EXPECT_TRUE(queue.try_emplace(1, "one")); + EXPECT_TRUE(queue.try_emplace(2, "two")); + + std::pair item; + EXPECT_TRUE(queue.try_dequeue(item)); + EXPECT_EQ(item.first, 1); + EXPECT_EQ(item.second, "one"); + + EXPECT_TRUE(queue.try_dequeue(item)); + EXPECT_EQ(item.first, 2); + EXPECT_EQ(item.second, "two"); +} + +// Test bulk timed operations with dynamic queue +TEST(DynamicSPSCQueueTest, BulkTimedOperations) { + DynamicSPSCQueue queue(16); + + // Test timeout on empty queue + std::vector results(5); + auto start = std::chrono::steady_clock::now(); + size_t count = queue.dequeue_bulk_for(results.begin(), 5, std::chrono::milliseconds(100)); + auto end = std::chrono::steady_clock::now(); + + EXPECT_EQ(count, 0); + auto duration = std::chrono::duration_cast(end - start).count(); + EXPECT_GE(duration, 90); // Allow for some timing variation + + // Test with some items + for (int i = 0; i < 3; i++) { + queue.try_enqueue(i); + } + + count = queue.dequeue_bulk_for(results.begin(), 5, std::chrono::milliseconds(100)); + EXPECT_EQ(count, 3); + for (size_t i = 0; i < count; i++) { + EXPECT_EQ(results[i], static_cast(i)); + } +} + +// Test dequeue_bulk_for_any with dynamic queue +TEST(DynamicSPSCQueueTest, BulkTimedAnyOperations) { + DynamicSPSCQueue queue(16); + + // Test timeout on empty queue + std::vector results(5); + auto start = std::chrono::steady_clock::now(); + size_t count = queue.dequeue_bulk_for_any(results.begin(), 5, std::chrono::milliseconds(100)); + auto end = std::chrono::steady_clock::now(); + + EXPECT_EQ(count, 0); + auto duration = std::chrono::duration_cast(end - start).count(); + EXPECT_GE(duration, 90); // Allow for some timing variation + + // Test with some items, added gradually + std::thread producer([&queue]() { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + queue.try_enqueue(42); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + queue.try_enqueue(43); + queue.try_enqueue(44); + }); + + results.assign(5, 0); + count = queue.dequeue_bulk_for_any(results.begin(), 5, std::chrono::milliseconds(200)); + + EXPECT_GE(count, 1); // Should get at least the first item + EXPECT_EQ(results[0], 42); + + if (count > 1) { + EXPECT_EQ(results[1], 43); + } + + producer.join(); + + // Cleanup any remaining items + queue.try_dequeue_bulk(results.begin(), 5); +} + +// Test shutdown behavior with dynamic queue +TEST(DynamicSPSCQueueTest, Shutdown) { + DynamicSPSCQueue queue(4); + std::atomic consumer_unblocked{false}; + + // Start a consumer thread that will block + std::thread consumer([&queue, &consumer_unblocked]() { + int item; + bool result = queue.dequeue(item); // This should block + EXPECT_FALSE(result); // After shutdown, should return false + consumer_unblocked = true; + }); + + // Give the consumer time to block + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Shutdown the queue + queue.shutdown(); + + // Consumer should unblock + consumer.join(); + EXPECT_TRUE(consumer_unblocked); + + // After shutdown, operations should fail + int item; + EXPECT_FALSE(queue.dequeue(item)); + EXPECT_FALSE(queue.dequeue_for(item, std::chrono::milliseconds(1))); +} + +// Test concurrent enqueue/dequeue with high throughput for dynamic queue +TEST(DynamicSPSCQueueTest, ConcurrentThroughput) { + // Use fewer iterations for the dynamic test to reduce test time + for (size_t runs = 0; runs < 50; ++runs) { + constexpr size_t ITEM_COUNT = 100000; + DynamicSPSCQueue queue(1024); + std::atomic error{false}; + std::atomic producer_done{false}; + + std::thread producer([&queue, &error, &producer_done]() { + try { + for (uint64_t i = 0; i < ITEM_COUNT; i++) { + // Use non-blocking enqueue with retry + while (!queue.try_enqueue(i) && !error.load(std::memory_order_relaxed)) { + std::this_thread::yield(); // Give consumer time to catch up + } + + // Exit early if consumer detected an error + if (error.load(std::memory_order_relaxed)) { + break; + } + } + } catch (...) { error = true; } + producer_done.store(true, std::memory_order_release); + }); + + std::thread consumer([&queue, &error, &producer_done]() { + try { + uint64_t expected = 0; + while (expected < ITEM_COUNT && !error.load(std::memory_order_relaxed)) { + uint64_t item; + + // Use non-blocking dequeue with yield + if (queue.try_dequeue(item)) { + if (item != expected) { + error = true; + break; + } + expected++; + } else if (producer_done.load(std::memory_order_acquire)) { + // If producer is done and no more items, we're missing items + if (expected < ITEM_COUNT) { + error = true; + } + break; + } else { + std::this_thread::yield(); // Give producer time to produce + } + } + } catch (...) { error = true; } + }); + + // Set timeout for test to avoid hanging forever + auto start_time = std::chrono::steady_clock::now(); + bool joined = false; + + // Try to join with timeout + while (!joined && std::chrono::steady_clock::now() - start_time < std::chrono::seconds(30)) { + producer.join(); + consumer.join(); + joined = true; + } + + // If not joined within timeout, consider it a deadlock + EXPECT_TRUE(joined) << "Test timed out - likely deadlock"; + EXPECT_FALSE(error) << "Data corruption or missing items detected"; + } +} + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();