Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions pj_proto_app/src/data_source_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace proto {

namespace {

const char* rhGetLastError(void*) {
return nullptr;
const char* rhGetLastError(void* ctx) {
auto* s = static_cast<RuntimeHostState*>(ctx);
return s->last_error.empty() ? nullptr : s->last_error.c_str();
}

void rhReportMessage(void* ctx, PJ_data_source_message_level_t level, PJ_string_view_t msg) {
Expand Down Expand Up @@ -68,22 +69,25 @@ bool rhEnsureParserBinding(void* ctx, const PJ_parser_binding_request_t* request

auto* parser_entry = state->registry->findParserByEncoding(encoding);
if (parser_entry == nullptr) {
std::cerr << "[bridge] no parser found for encoding '" << encoding << "'\n";
state->last_error = "no parser found for encoding '" + std::string(encoding) + "'";
std::cerr << "[bridge] " << state->last_error << "\n";
return false;
}

// Create parser instance
auto parser = std::make_unique<PJ::MessageParserHandle>(parser_entry->library.createHandle());
if (!parser->valid()) {
std::cerr << "[bridge] failed to create parser instance for '" << encoding << "'\n";
state->last_error = "failed to create parser instance for '" + std::string(encoding) + "'";
std::cerr << "[bridge] " << state->last_error << "\n";
return false;
}

// Create a topic in the datastore for this channel
auto topic_result =
state->engine->createTopic(state->dataset_id, PJ::TopicDescriptor{.name = std::string(topic_name)});
if (!topic_result) {
std::cerr << "[bridge] failed to create topic '" << topic_name << "': " << topic_result.error() << "\n";
state->last_error = "failed to create topic '" + std::string(topic_name) + "': " + topic_result.error();
std::cerr << "[bridge] " << state->last_error << "\n";
return false;
}

Expand All @@ -94,14 +98,16 @@ bool rhEnsureParserBinding(void* ctx, const PJ_parser_binding_request_t* request

// Bind write host to parser
if (!parser->bindWriteHost(write_host->raw())) {
std::cerr << "[bridge] failed to bind write host to parser\n";
state->last_error = "failed to bind write host to parser";
std::cerr << "[bridge] " << state->last_error << "\n";
return false;
}

// Bind schema if provided
if (request->schema.size > 0) {
PJ::Span<const uint8_t> schema_span(request->schema.data, request->schema.size);
if (!parser->bindSchema(type_name, schema_span)) {
state->last_error = "failed to parse " + std::string(type_name) + ": " + parser->lastError();
std::cerr << "[bridge] parser schema binding failed for type '" << type_name << "': " << parser->lastError()
<< "\n";
return false;
Expand Down
1 change: 1 addition & 0 deletions pj_proto_app/src/data_source_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct RuntimeHostState {
int progress_finishes = 0;
std::atomic<bool> stop_requested{false};
std::unordered_map<std::string, DedupMessage> messages;
std::string last_error;

// Delegated ingest bridge state
PJ::DataEngine* engine = nullptr;
Expand Down
Loading