Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions opencensus/trace/internal/span_exporter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ SpanExporterImpl* SpanExporterImpl::Get() {
// Create detached worker thread
SpanExporterImpl::SpanExporterImpl(uint32_t buffer_size,
absl::Duration interval)
: buffer_size_(buffer_size), interval_(interval), size_(0) {}
: buffer_size_(buffer_size), interval_(interval) {}

void SpanExporterImpl::RegisterHandler(
std::unique_ptr<SpanExporter::Handler> handler) {
Expand All @@ -50,7 +50,6 @@ void SpanExporterImpl::AddSpan(
const std::shared_ptr<opencensus::trace::SpanImpl>& span_impl) {
absl::MutexLock l(&span_mu_);
spans_.emplace_back(span_impl);
size_.fetch_add(1, std::memory_order_acq_rel);
}

void SpanExporterImpl::StartExportThread() {
Expand All @@ -68,20 +67,19 @@ void SpanExporterImpl::RunWorkerLoop() {
{
absl::MutexLock l(&span_mu_);
// Wait until batch is full or interval time has been exceeded.
span_mu_.AwaitWithDeadline(
absl::Condition(
+[](SpanExporterImpl* ptr) {
return (ptr->size_.load(std::memory_order_acquire) >=
ptr->buffer_size_);
},
this),
next_forced_export_time);
span_mu_.AwaitWithDeadline(absl::Condition(
+[](SpanExporterImpl* ptr) {
ptr->span_mu_.AssertHeld();
return ptr->spans_.size() >=
ptr->buffer_size_;
},
this),
next_forced_export_time);
next_forced_export_time = absl::Now() + interval_;
if (spans_.empty()) {
continue;
}
std::swap(spans_copy_, spans_);
size_.store(0, std::memory_order_release);
}
for (const auto& span : spans_copy_) {
span_data_.emplace_back(span->ToSpanData());
Expand Down
7 changes: 1 addition & 6 deletions opencensus/trace/internal/span_exporter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#ifndef OPENCENSUS_TRACE_INTERNAL_SPAN_EXPORTER_IMPL_H_
#define OPENCENSUS_TRACE_INTERNAL_SPAN_EXPORTER_IMPL_H_

#include <atomic>
#include <functional>
#include <memory>
#include <string>
Expand Down Expand Up @@ -74,18 +73,14 @@ class SpanExporterImpl {
static SpanExporterImpl* span_exporter_;
const uint32_t buffer_size_;
const absl::Duration interval_;
// This is intentionally not guarded by span_mu_. You cannot lock the waiting
// mutex within an AwaitWithTimeout, so we need to store the size in another
// variable.
std::atomic<size_t> size_;
mutable absl::Mutex span_mu_;
mutable absl::Mutex handler_mu_;
std::vector<std::shared_ptr<opencensus::trace::SpanImpl>> spans_
GUARDED_BY(span_mu_);
std::vector<std::unique_ptr<SpanExporter::Handler>> handlers_
GUARDED_BY(handler_mu_);
bool thread_started_ GUARDED_BY(handler_mu_) = false;
std::thread t_;
std::thread t_ GUARDED_BY(handler_mu_);
};

} // namespace exporter
Expand Down