Skip to content

[lldb] Support non-blocking reads in JSONRPCTransport #144610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

JDevlieghere
Copy link
Member

@JDevlieghere JDevlieghere commented Jun 17, 2025

Support non-blocking reads for JSONRPCTransport so we can implement a multiplexed reader using the MainLoop. Pavel pointed out in #143628 that the implementation there (which was using blocking reads) can easily result in reading partial JSON RPC packets.

Support non-blocking reads for JSONRPCTransport so we can implement a
multiplexed reader using the MainLoop. Pavel pointed out in llvm#143628 that
the implementation there (which was using blocking reads) can easily to
reading partial JSON RPC packets.
@llvmbot
Copy link
Member

llvmbot commented Jun 17, 2025

@llvm/pr-subscribers-lldb

Author: Jonas Devlieghere (JDevlieghere)

Changes

Support non-blocking reads for JSONRPCTransport so we can implement a multiplexed reader using the MainLoop. Pavel pointed out in #143628 that the implementation there (which was using blocking reads) can easily to reading partial JSON RPC packets.


Full diff: https://github.com/llvm/llvm-project/pull/144610.diff

3 Files Affected:

  • (modified) lldb/include/lldb/Host/JSONTransport.h (+15-4)
  • (modified) lldb/source/Host/common/JSONTransport.cpp (+25-17)
  • (modified) lldb/unittests/Host/JSONTransportTest.cpp (+39-4)
diff --git a/lldb/include/lldb/Host/JSONTransport.h b/lldb/include/lldb/Host/JSONTransport.h
index 4087cdf2b42f7..36a67c929a1c6 100644
--- a/lldb/include/lldb/Host/JSONTransport.h
+++ b/lldb/include/lldb/Host/JSONTransport.h
@@ -85,7 +85,8 @@ class JSONTransport {
 
   /// Reads the next message from the input stream.
   template <typename T>
-  llvm::Expected<T> Read(const std::chrono::microseconds &timeout) {
+  llvm::Expected<T>
+  Read(std::optional<std::chrono::microseconds> timeout = std::nullopt) {
     llvm::Expected<std::string> message = ReadImpl(timeout);
     if (!message)
       return message.takeError();
@@ -97,10 +98,20 @@ class JSONTransport {
 
   virtual llvm::Error WriteImpl(const std::string &message) = 0;
   virtual llvm::Expected<std::string>
-  ReadImpl(const std::chrono::microseconds &timeout) = 0;
+  ReadImpl(std::optional<std::chrono::microseconds> timeout) = 0;
+
+  llvm::Expected<std::string>
+  ReadFull(IOObject &descriptor, size_t length,
+           std::optional<std::chrono::microseconds> timeout) const;
+
+  llvm::Expected<std::string>
+  ReadUntil(IOObject &descriptor, llvm::StringRef delimiter,
+            std::optional<std::chrono::microseconds> timeout);
 
   lldb::IOObjectSP m_input;
   lldb::IOObjectSP m_output;
+
+  std::string m_buffer;
 };
 
 /// A transport class for JSON with a HTTP header.
@@ -113,7 +124,7 @@ class HTTPDelimitedJSONTransport : public JSONTransport {
 protected:
   virtual llvm::Error WriteImpl(const std::string &message) override;
   virtual llvm::Expected<std::string>
-  ReadImpl(const std::chrono::microseconds &timeout) override;
+  ReadImpl(std::optional<std::chrono::microseconds> timeout) override;
 
   // FIXME: Support any header.
   static constexpr llvm::StringLiteral kHeaderContentLength =
@@ -131,7 +142,7 @@ class JSONRPCTransport : public JSONTransport {
 protected:
   virtual llvm::Error WriteImpl(const std::string &message) override;
   virtual llvm::Expected<std::string>
-  ReadImpl(const std::chrono::microseconds &timeout) override;
+  ReadImpl(std::optional<std::chrono::microseconds> timeout) override;
 
   static constexpr llvm::StringLiteral kMessageSeparator = "\n";
 };
diff --git a/lldb/source/Host/common/JSONTransport.cpp b/lldb/source/Host/common/JSONTransport.cpp
index 1a0851d5c4365..0fae74fb87b68 100644
--- a/lldb/source/Host/common/JSONTransport.cpp
+++ b/lldb/source/Host/common/JSONTransport.cpp
@@ -27,9 +27,9 @@ using namespace lldb_private;
 
 /// ReadFull attempts to read the specified number of bytes. If EOF is
 /// encountered, an empty string is returned.
-static Expected<std::string>
-ReadFull(IOObject &descriptor, size_t length,
-         std::optional<std::chrono::microseconds> timeout = std::nullopt) {
+Expected<std::string> JSONTransport::ReadFull(
+    IOObject &descriptor, size_t length,
+    std::optional<std::chrono::microseconds> timeout) const {
   if (!descriptor.IsValid())
     return llvm::make_error<TransportInvalidError>();
 
@@ -67,19 +67,22 @@ ReadFull(IOObject &descriptor, size_t length,
   return data.substr(0, length);
 }
 
-static Expected<std::string>
-ReadUntil(IOObject &descriptor, StringRef delimiter,
-          std::optional<std::chrono::microseconds> timeout = std::nullopt) {
-  std::string buffer;
-  buffer.reserve(delimiter.size() + 1);
-  while (!llvm::StringRef(buffer).ends_with(delimiter)) {
+Expected<std::string>
+JSONTransport::ReadUntil(IOObject &descriptor, StringRef delimiter,
+                         std::optional<std::chrono::microseconds> timeout) {
+  if (!timeout || *timeout != std::chrono::microseconds::zero()) {
+    m_buffer.clear();
+    m_buffer.reserve(delimiter.size() + 1);
+  }
+
+  while (!llvm::StringRef(m_buffer).ends_with(delimiter)) {
     Expected<std::string> next =
-        ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
+        ReadFull(descriptor, m_buffer.empty() ? delimiter.size() : 1, timeout);
     if (auto Err = next.takeError())
       return std::move(Err);
-    buffer += *next;
+    m_buffer += *next;
   }
-  return buffer.substr(0, buffer.size() - delimiter.size());
+  return m_buffer.substr(0, m_buffer.size() - delimiter.size());
 }
 
 JSONTransport::JSONTransport(IOObjectSP input, IOObjectSP output)
@@ -89,11 +92,15 @@ void JSONTransport::Log(llvm::StringRef message) {
   LLDB_LOG(GetLog(LLDBLog::Host), "{0}", message);
 }
 
-Expected<std::string>
-HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
+Expected<std::string> HTTPDelimitedJSONTransport::ReadImpl(
+    std::optional<std::chrono::microseconds> timeout) {
   if (!m_input || !m_input->IsValid())
     return llvm::make_error<TransportInvalidError>();
 
+  if (timeout && *timeout == std::chrono::microseconds::zero())
+    return llvm::createStringError(
+        "HTTPDelimitedJSONTransport does not support non-blocking reads");
+
   IOObject *input = m_input.get();
   Expected<std::string> message_header =
       ReadFull(*input, kHeaderContentLength.size(), timeout);
@@ -104,7 +111,8 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
                                      kHeaderContentLength, *message_header)
                                  .str());
 
-  Expected<std::string> raw_length = ReadUntil(*input, kHeaderSeparator);
+  Expected<std::string> raw_length =
+      ReadUntil(*input, kHeaderSeparator, timeout);
   if (!raw_length)
     return handleErrors(raw_length.takeError(),
                         [&](const TransportEOFError &E) -> llvm::Error {
@@ -117,7 +125,7 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
     return createStringError(
         formatv("invalid content length {0}", *raw_length).str());
 
-  Expected<std::string> raw_json = ReadFull(*input, length);
+  Expected<std::string> raw_json = ReadFull(*input, length, timeout);
   if (!raw_json)
     return handleErrors(
         raw_json.takeError(), [&](const TransportEOFError &E) -> llvm::Error {
@@ -143,7 +151,7 @@ Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) {
 }
 
 Expected<std::string>
-JSONRPCTransport::ReadImpl(const std::chrono::microseconds &timeout) {
+JSONRPCTransport::ReadImpl(std::optional<std::chrono::microseconds> timeout) {
   if (!m_input || !m_input->IsValid())
     return make_error<TransportInvalidError>();
 
diff --git a/lldb/unittests/Host/JSONTransportTest.cpp b/lldb/unittests/Host/JSONTransportTest.cpp
index 4621869887ac8..cc43d7d851cb1 100644
--- a/lldb/unittests/Host/JSONTransportTest.cpp
+++ b/lldb/unittests/Host/JSONTransportTest.cpp
@@ -16,7 +16,7 @@ using namespace lldb_private;
 namespace {
 template <typename T> class JSONTransportTest : public PipeTest {
 protected:
-  std::unique_ptr<JSONTransport> transport;
+  std::unique_ptr<T> transport;
 
   void SetUp() override {
     PipeTest::SetUp();
@@ -36,7 +36,13 @@ class HTTPDelimitedJSONTransportTest
   using JSONTransportTest::JSONTransportTest;
 };
 
-class JSONRPCTransportTest : public JSONTransportTest<JSONRPCTransport> {
+class TestJSONRPCTransport : public JSONRPCTransport {
+public:
+  using JSONRPCTransport::JSONRPCTransport;
+  using JSONRPCTransport::WriteImpl; // For partial writes.
+};
+
+class JSONRPCTransportTest : public JSONTransportTest<TestJSONRPCTransport> {
 public:
   using JSONTransportTest::JSONTransportTest;
 };
@@ -84,7 +90,6 @@ TEST_F(HTTPDelimitedJSONTransportTest, ReadWithEOF) {
       Failed<TransportEOFError>());
 }
 
-
 TEST_F(HTTPDelimitedJSONTransportTest, InvalidTransport) {
   transport = std::make_unique<HTTPDelimitedJSONTransport>(nullptr, nullptr);
   ASSERT_THAT_EXPECTED(
@@ -142,13 +147,43 @@ TEST_F(JSONRPCTransportTest, Write) {
 }
 
 TEST_F(JSONRPCTransportTest, InvalidTransport) {
-  transport = std::make_unique<JSONRPCTransport>(nullptr, nullptr);
+  transport = std::make_unique<TestJSONRPCTransport>(nullptr, nullptr);
   ASSERT_THAT_EXPECTED(
       transport->Read<JSONTestType>(std::chrono::milliseconds(1)),
       Failed<TransportInvalidError>());
 }
 
 #ifndef _WIN32
+TEST_F(HTTPDelimitedJSONTransportTest, NonBlockingRead) {
+  ASSERT_THAT_EXPECTED(
+      transport->Read<JSONTestType>(std::chrono::microseconds::zero()),
+      llvm::FailedWithMessage(
+          "HTTPDelimitedJSONTransport does not support non-blocking reads"));
+}
+
+TEST_F(JSONRPCTransportTest, NonBlockingRead) {
+  llvm::StringRef head = R"({"str")";
+  llvm::StringRef tail = R"(: "foo"})"
+                         "\n";
+
+  ASSERT_THAT_EXPECTED(input.Write(head.data(), head.size()), Succeeded());
+  ASSERT_THAT_EXPECTED(
+      transport->Read<JSONTestType>(std::chrono::microseconds::zero()),
+      Failed<TransportTimeoutError>());
+
+  ASSERT_THAT_EXPECTED(input.Write(tail.data(), tail.size()), Succeeded());
+  while (true) {
+    llvm::Expected<JSONTestType> result =
+        transport->Read<JSONTestType>(std::chrono::microseconds::zero());
+    if (result.errorIsA<TransportTimeoutError>()) {
+      llvm::consumeError(result.takeError());
+      continue;
+    }
+    ASSERT_THAT_EXPECTED(result, HasValue(testing::FieldsAre(/*str=*/"foo")));
+    break;
+  }
+}
+
 TEST_F(HTTPDelimitedJSONTransportTest, ReadWithTimeout) {
   ASSERT_THAT_EXPECTED(
       transport->Read<JSONTestType>(std::chrono::milliseconds(1)),

Comment on lines +73 to +80
if (!timeout || *timeout != std::chrono::microseconds::zero()) {
m_buffer.clear();
m_buffer.reserve(delimiter.size() + 1);
}

while (!llvm::StringRef(m_buffer).ends_with(delimiter)) {
Expected<std::string> next =
ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
ReadFull(descriptor, m_buffer.empty() ? delimiter.size() : 1, timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a buffer, we could adjust our approach to read in larger than 1 byte chunks when we're reading until a delimiter.

We could read chunks of say 1024 and then split the buffer on the delimited until we run out of data and then do a new read with the next chunk size.

I don't know if this approach would have issues on windows or anything though, so maybe someone with more platform specific knowledge may know how it handles blocking reads if _read is called with no data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport->Read<JSONTestType>(std::chrono::microseconds::zero()),
Failed<TransportTimeoutError>());

ASSERT_THAT_EXPECTED(input.Write(tail.data(), tail.size()), Succeeded());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test a chunk with the delimiter in the middle of the data?

@@ -85,7 +85,8 @@ class JSONTransport {

/// Reads the next message from the input stream.
template <typename T>
llvm::Expected<T> Read(const std::chrono::microseconds &timeout) {
llvm::Expected<T>
Read(std::optional<std::chrono::microseconds> timeout = std::nullopt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this type has moved into lldb_private now, should we use the Timeout helper? https://github.com/llvm/llvm-project/blob/main/lldb/include/lldb/Utility/Timeout.h#L28

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants