Skip to content

Commit

Permalink
SWDEV-445711: Workaround for race condition on exit
Browse files Browse the repository at this point in the history
Change-Id: I7391637005ccc7ea89611f28691c02e569197dff
  • Loading branch information
ApoKalipse-V committed Feb 27, 2024
1 parent cae0856 commit bb98838
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 264 deletions.
47 changes: 26 additions & 21 deletions src/core/hsa/queues/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,25 +384,27 @@ void SignalAsyncReadyHandler(const hsa_signal_t& signal, void* data) {
signal, HSA_SIGNAL_CONDITION_EQ, 0, AsyncSignalReadyHandler, data);
if (status != HSA_STATUS_SUCCESS) fatal("hsa_amd_signal_async_handler failed");
}
bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) {
bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data)
{
auto queue_info_session = static_cast<queue_info_session_t*>(data);
if (!queue_info_session) return true;

rocprofiler::ROCProfiler_Singleton& rocprofiler_singleton =
rocprofiler::ROCProfiler_Singleton::GetInstance();
rocprofiler::ROCProfiler_Singleton::GetInstance();
rocprofiler::HSASupport_Singleton& hsasupport_singleton =
rocprofiler::HSASupport_Singleton::GetInstance();
if (!queue_info_session || !rocprofiler_singleton.GetSession(queue_info_session->session_id) ||
!rocprofiler_singleton.GetSession(queue_info_session->session_id)->GetProfiler())
return true;
rocprofiler::HSASupport_Singleton::GetInstance();

rocprofiler::Session* session = rocprofiler_singleton.GetSession(queue_info_session->session_id);
if (!session) return true;

std::lock_guard<std::mutex> lock(session->GetSessionLock());
rocprofiler::profiler::Profiler* profiler = session->GetProfiler();
std::vector<pending_signal_t*> pending_signals = const_cast<std::vector<pending_signal_t*>&>(
profiler->GetPendingSignals(queue_info_session->writer_id));
if (!profiler) return true;

auto pending_signals = profiler->MovePendingSignals(queue_info_session->writer_id);

if (!pending_signals.empty()) {
for (auto it = pending_signals.begin(); it != pending_signals.end();
it = pending_signals.erase(it)) {
auto& pending = *it;
for (auto& pending : pending_signals)
{
if (hsasupport_singleton.GetCoreApiTable().hsa_signal_load_relaxed_fn(pending->new_signal))
return true;
hsa_amd_profiling_dispatch_time_t time;
Expand Down Expand Up @@ -458,7 +460,7 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) {
rocprofiler::metrics::GetMetricsData(pending->context->results_map,
pending->context->metrics_list,
time.end - time.start);
AddRecordCounters(&record, pending);
AddRecordCounters(&record, pending.get());
} else {
if (session->FindBuffer(pending->buffer_id)) {
Memory::GenericBuffer* buffer = session->GetBuffer(pending->buffer_id);
Expand Down Expand Up @@ -503,14 +505,11 @@ bool AsyncSignalHandler(hsa_signal_value_t signal_value, void* data) {

}



if (pending->new_signal.handle)
hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(pending->new_signal);
if (queue_info_session->interrupt_signal.handle)
hsasupport_singleton.GetCoreApiTable().hsa_signal_destroy_fn(
queue_info_session->interrupt_signal);
}
}
delete queue_info_session;
ACTIVE_INTERRUPT_SIGNAL_COUNT.fetch_sub(1, std::memory_order_relaxed);
Expand All @@ -529,7 +528,8 @@ void CreateSignal(uint32_t attribute, hsa_signal_t* signal) {
HSASupport_Singleton::GetInstance().CreateSignal(attribute, signal);
}

rocprofiler_session_id_t session_id = rocprofiler_session_id_t{0};
rocprofiler_session_id_t Queue::session_id = rocprofiler_session_id_t{0};
std::shared_mutex Queue::session_id_mutex;
// Counter Names declaration
std::vector<std::string> session_data;

Expand All @@ -546,9 +546,13 @@ uint32_t replay_mode_count = 0;

rocprofiler::Session* session = nullptr;

void ResetSessionID(rocprofiler_session_id_t id) { session_id = id; }
void Queue::ResetSessionID(rocprofiler_session_id_t id)
{
std::unique_lock<std::shared_mutex> session_id_lock(session_id_mutex);
session_id = id;
}

void CheckNeededProfileConfigs() {
void Queue::CheckNeededProfileConfigs() {
rocprofiler_session_id_t internal_session_id;
// Getting Session ID
rocprofiler::ROCProfiler_Singleton& rocprofiler_singleton =
Expand Down Expand Up @@ -609,7 +613,9 @@ std::atomic<uint32_t> WRITER_ID{0};
* interceptor by invoking the writer function.
*/
void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t user_pkt_index,
void* data, hsa_amd_queue_intercept_packet_writer writer) {
void* data, hsa_amd_queue_intercept_packet_writer writer)
{
std::shared_lock<std::shared_mutex> session_id_lock(session_id_mutex);
const Packet::packet_t* packets_arr = reinterpret_cast<const Packet::packet_t*>(packets);
std::vector<Packet::packet_t> transformed_packets;

Expand Down Expand Up @@ -669,7 +675,6 @@ void Queue::WriteInterceptor(const void* packets, uint64_t pkt_count, uint64_t u
Packet::CreateBarrierPacket(&transformed_packets, &block_signal, &block_signal);
}


uint32_t writer_id = WRITER_ID.fetch_add(1, std::memory_order_release);

if (session_data_count > 0 && is_counter_collection_mode && profiles.size() > 0 &&
Expand Down
7 changes: 5 additions & 2 deletions src/core/hsa/queues/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <string>
#include <vector>
#include <condition_variable>
#include <shared_mutex>
#include "src/core/session/profiler/profiler.h"

namespace rocprofiler {
Expand Down Expand Up @@ -87,7 +88,11 @@ class Queue {
hsa_signal_t GetReadySignal();
hsa_signal_t GetBlockSignal();

static void ResetSessionID(rocprofiler_session_id_t id = rocprofiler_session_id_t{0});
static void CheckNeededProfileConfigs();
private:
static std::shared_mutex session_id_mutex;
static rocprofiler_session_id_t session_id;

hsa_agent_t cpu_agent_;
hsa_agent_t gpu_agent_;
Expand All @@ -113,8 +118,6 @@ struct queue_info_session_t {

void AddRecordCounters(rocprofiler_record_profiler_t* record, const pending_signal_t& pending);

void ResetSessionID(rocprofiler_session_id_t id = rocprofiler_session_id_t{0});

void CheckPacketReqiurements();

} // namespace queue
Expand Down
135 changes: 72 additions & 63 deletions src/core/session/att/att.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ void AttTracer::AddPendingSignals(
uint32_t thread_id, uint64_t queue_index
) {
std::lock_guard<std::mutex> lock(sessions_pending_signals_lock_);
if (bIsSessionDestroying.load())
return;

auto pending = sessions_pending_signals_.find(writer_id);
if (pending == sessions_pending_signals_.end())
Expand All @@ -64,11 +66,18 @@ void AttTracer::AddPendingSignals(
});
}

const std::vector<att_pending_signal_t>& AttTracer::GetPendingSignals(uint32_t writer_id) {
std::vector<att_pending_signal_t> AttTracer::MovePendingSignals(uint32_t writer_id)
{
std::lock_guard<std::mutex> lock(sessions_pending_signals_lock_);
assert(sessions_pending_signals_.find(writer_id) != sessions_pending_signals_.end() &&
"writer_id is not found in the pending_signals");
return sessions_pending_signals_.at(writer_id);
auto it = sessions_pending_signals_.find(writer_id);
if (it == sessions_pending_signals_.end())
rocprofiler::fatal("writer_id is not found in the pending_signals");

auto move_pending = std::move(it->second);
sessions_pending_signals_.erase(writer_id);
if (bIsSessionDestroying.load() && sessions_pending_signals_.size() == 0)
has_session_pending_cv.notify_all();
return move_pending;
}

#define DEFAULT_ATT_BUFFER_SIZE 0x40000000
Expand Down Expand Up @@ -174,7 +183,8 @@ void AttTracer::signalAsyncHandlerATT(const hsa_signal_t& signal, void* data) {
rocprofiler::fatal("Error: hsa_amd_signal_async_handler for ATT failed");
}

bool AttTracer::AsyncSignalHandlerATT(hsa_signal_value_t /* signal */, void* data) {
bool AttTracer::AsyncSignalHandlerATT(hsa_signal_value_t /* signal */, void* data)
{
auto queue_info_session = static_cast<queue::queue_info_session_t*>(data);
rocprofiler::ROCProfiler_Singleton& rocprofiler_singleton =
rocprofiler::ROCProfiler_Singleton::GetInstance();
Expand All @@ -191,68 +201,55 @@ bool AttTracer::AsyncSignalHandlerATT(hsa_signal_value_t /* signal */, void* dat
rocprofiler::att::AttTracer* att_tracer = session->GetAttTracer();

if (!session->GetAttTracer()) return true;
auto pending_signals = att_tracer->MovePendingSignals(queue_info_session->writer_id);

std::vector<att_pending_signal_t>& pending_signals =
const_cast<std::vector<att_pending_signal_t>&>(
att_tracer->GetPendingSignals(queue_info_session->writer_id));

if (!pending_signals.empty()) {
for (auto it = pending_signals.begin(); it != pending_signals.end();
it = pending_signals.erase(it)) {

auto& pending = *it;
//if (hsasupport_singleton.GetCoreApiTable().hsa_signal_load_relaxed_fn(pending.new_signal))
// return true;
rocprofiler_record_att_tracer_t record{};
record.kernel_id = rocprofiler_kernel_id_t{pending.kernel_descriptor};
record.gpu_id = rocprofiler_agent_id_t{(uint64_t)queue_info_session->gpu_index};
record.kernel_properties = pending.kernel_properties;
record.thread_id = rocprofiler_thread_id_t{pending.thread_id};
record.queue_idx = rocprofiler_queue_index_t{pending.queue_index};
record.queue_id = rocprofiler_queue_id_t{queue_info_session->queue_id};
record.writer_id = queue_info_session->writer_id;

if (/*pending.counters_count > 0 && */ pending.profile) {
AddAttRecord(&record, queue_info_session->agent, pending);
}

// July/01/2023 -> Changed this to queue_info_session->writer_id
// so we can correlate to dispatches. kernel_id already has the descriptor.
record.header = {ROCPROFILER_ATT_TRACER_RECORD,
rocprofiler_record_id_t{pending.kernel_descriptor}};

record.intercept_list = codeobj_record::get_capture(record.header.id);
std::atomic_thread_fence(std::memory_order_release);

if (pending.session_id.handle == 0) {
pending.session_id = rocprofiler_singleton.GetCurrentSessionId();
}

if (session->FindBuffer(pending.buffer_id)) {
Memory::GenericBuffer* buffer = session->GetBuffer(pending.buffer_id);
buffer->AddRecord(record);
buffer->Flush();
}
codeobj_record::free_capture(record.header.id);

hsa_status_t status = hsasupport_singleton.GetAmdExtTable().hsa_amd_memory_pool_free_fn(
(pending.profile->output_buffer.ptr));
if (status != HSA_STATUS_SUCCESS)
rocprofiler::warning("Error: Couldn't free output buffer memory");

status = hsasupport_singleton.GetAmdExtTable().hsa_amd_memory_pool_free_fn(
(pending.profile->command_buffer.ptr));
if (status != HSA_STATUS_SUCCESS)
rocprofiler::warning("Error: Couldn't free command buffer memory");

if (pending.profile->parameters)
delete[] pending.profile->parameters;
delete pending.profile;
for (auto& pending : pending_signals)
{
rocprofiler_record_att_tracer_t record{};
record.kernel_id = rocprofiler_kernel_id_t{pending.kernel_descriptor};
record.gpu_id = rocprofiler_agent_id_t{(uint64_t)queue_info_session->gpu_index};
record.kernel_properties = pending.kernel_properties;
record.thread_id = rocprofiler_thread_id_t{pending.thread_id};
record.queue_idx = rocprofiler_queue_index_t{pending.queue_index};
record.queue_id = rocprofiler_queue_id_t{queue_info_session->queue_id};
record.writer_id = queue_info_session->writer_id;

if (pending.profile)
AddAttRecord(&record, queue_info_session->agent, pending);

// July/01/2023 -> Changed this to queue_info_session->writer_id
// so we can correlate to dispatches. kernel_id already has the descriptor.
record.header = {ROCPROFILER_ATT_TRACER_RECORD,
rocprofiler_record_id_t{pending.kernel_descriptor}};

record.intercept_list = codeobj_record::get_capture(record.header.id);
std::atomic_thread_fence(std::memory_order_release);

if (pending.session_id.handle == 0)
pending.session_id = rocprofiler_singleton.GetCurrentSessionId();

if (session->FindBuffer(pending.buffer_id)) {
Memory::GenericBuffer* buffer = session->GetBuffer(pending.buffer_id);
buffer->AddRecord(record);
buffer->Flush();
}
codeobj_record::free_capture(record.header.id);

hsa_status_t status = hsasupport_singleton.GetAmdExtTable().hsa_amd_memory_pool_free_fn(
(pending.profile->output_buffer.ptr));
if (status != HSA_STATUS_SUCCESS)
rocprofiler::warning("Error: Couldn't free output buffer memory");

status = hsasupport_singleton.GetAmdExtTable().hsa_amd_memory_pool_free_fn(
(pending.profile->command_buffer.ptr));
if (status != HSA_STATUS_SUCCESS)
rocprofiler::warning("Error: Couldn't free command buffer memory");

if (pending.profile->parameters)
delete[] pending.profile->parameters;
delete pending.profile;
}
delete queue_info_session;

std::atomic_thread_fence(std::memory_order_seq_cst);
return false;
}

Expand Down Expand Up @@ -320,6 +317,18 @@ hsa_status_t AttTracer::attTraceDataCallback(
return status;
}

void AttTracer::WaitForPendingAndDestroy()
{
bIsSessionDestroying.store(true);
std::unique_lock<std::mutex> lk(sessions_pending_signals_lock_);
if (sessions_pending_signals_.size() == 0)
return;

has_session_pending_cv.wait_for(lk, std::chrono::seconds(2), [this] () {
return this->sessions_pending_signals_.size() == 0;
});
}

std::unordered_map<uint64_t, ATTRecordSignal> AttTracer::pending_stop_packets;
std::mutex AttTracer::att_enable_disable_mutex;

Expand Down
6 changes: 5 additions & 1 deletion src/core/session/att/att.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AttTracer {
uint64_t agent_handle
);

const std::vector<att_pending_signal_t>& GetPendingSignals(uint32_t writer_id);
std::vector<att_pending_signal_t> MovePendingSignals(uint32_t writer_id);

bool ATTWriteInterceptor(
const void* packets,
Expand Down Expand Up @@ -151,6 +151,8 @@ class AttTracer {
return pending_stop_packets.find(agent_handle) != pending_stop_packets.end();
}

void WaitForPendingAndDestroy();

protected:
using packet_t = hsa_ext_amd_aql_pm4_packet_t;
static std::unordered_map<uint64_t, ATTRecordSignal> pending_stop_packets;
Expand Down Expand Up @@ -208,6 +210,8 @@ class AttTracer {

std::mutex sessions_pending_signals_lock_;
std::map<uint32_t, std::vector<att_pending_signal_t>> sessions_pending_signals_;
std::condition_variable has_session_pending_cv;
std::atomic<bool> bIsSessionDestroying{false};

rocprofiler_record_id_t capture_id;
std::unordered_set<uint32_t> active_capture_event_ids;
Expand Down
Loading

0 comments on commit bb98838

Please sign in to comment.