Skip to content

Commit

Permalink
Backport #21936 to 21.3: Fix issues with HTMLForm::MultipartReadBuffe…
Browse files Browse the repository at this point in the history
…r and PeekableReadBuffer
  • Loading branch information
robot-clickhouse committed Mar 26, 2021
1 parent b1e3015 commit d81d717
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 30 deletions.
27 changes: 22 additions & 5 deletions src/IO/PeekableReadBuffer.cpp
Expand Up @@ -82,6 +82,7 @@ bool PeekableReadBuffer::peekNext()
checkpoint.emplace(memory.data());
checkpoint_in_own_memory = true;
}

if (currentlyReadFromOwnMemory())
{
/// Update buffer size
Expand All @@ -99,7 +100,6 @@ bool PeekableReadBuffer::peekNext()
pos_offset = 0;
}
BufferBase::set(memory.data(), peeked_size + bytes_to_copy, pos_offset);

}

peeked_size += bytes_to_copy;
Expand All @@ -113,12 +113,21 @@ void PeekableReadBuffer::rollbackToCheckpoint(bool drop)
{
checkStateCorrect();

if (!checkpoint)
throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory())
assert(checkpoint);

if (checkpointInOwnMemory() == currentlyReadFromOwnMemory())
{
/// Both checkpoint and position are in the same buffer.
pos = *checkpoint;
else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory
}
else
{
/// Checkpoint is in own memory and position is not.
assert(checkpointInOwnMemory());

/// Switch to reading from own memory.
BufferBase::set(memory.data(), peeked_size, *checkpoint - memory.data());
}

if (drop)
dropCheckpoint();
Expand All @@ -134,6 +143,7 @@ bool PeekableReadBuffer::nextImpl()

checkStateCorrect();
bool res;
bool checkpoint_at_end = checkpoint && *checkpoint == working_buffer.end() && currentlyReadFromOwnMemory();

if (checkpoint)
{
Expand Down Expand Up @@ -163,6 +173,13 @@ bool PeekableReadBuffer::nextImpl()
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
nextimpl_working_buffer_offset = sub_buf.offset();

if (checkpoint_at_end)
{
checkpoint.emplace(working_buffer.begin());
peeked_size = 0;
checkpoint_in_own_memory = false;
}

checkStateCorrect();
return res;
}
Expand Down
5 changes: 1 addition & 4 deletions src/IO/PeekableReadBuffer.h
Expand Up @@ -43,10 +43,7 @@ class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
/// Forget checkpoint and all data between checkpoint and position
ALWAYS_INLINE inline void dropCheckpoint()
{
#ifndef NDEBUG
if (!checkpoint)
throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
#endif
assert(checkpoint);
if (!currentlyReadFromOwnMemory())
{
/// Don't need to store unread data anymore
Expand Down
19 changes: 0 additions & 19 deletions src/IO/tests/gtest_peekable_read_buffer.cpp
Expand Up @@ -6,11 +6,6 @@
#include <IO/ConcatReadBuffer.h>
#include <IO/PeekableReadBuffer.h>

namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}

static void readAndAssert(DB::ReadBuffer & buf, const char * str)
{
size_t n = strlen(str);
Expand Down Expand Up @@ -48,20 +43,6 @@ try
readAndAssert(peekable, "01234");
}

#ifndef ABORT_ON_LOGICAL_ERROR
bool exception = false;
try
{
peekable.rollbackToCheckpoint();
}
catch (DB::Exception & e)
{
if (e.code() != DB::ErrorCodes::LOGICAL_ERROR)
throw;
exception = true;
}
ASSERT_TRUE(exception);
#endif
assertAvailable(peekable, "56789");

readAndAssert(peekable, "56");
Expand Down
2 changes: 0 additions & 2 deletions src/Interpreters/InterserverIOHandler.h
Expand Up @@ -9,8 +9,6 @@
#include <Common/ActionBlocker.h>
#include <common/types.h>

#include <Poco/Net/HTMLForm.h>

#include <atomic>
#include <map>
#include <shared_mutex>
Expand Down
5 changes: 5 additions & 0 deletions src/Server/HTTP/HTMLForm.cpp
Expand Up @@ -369,6 +369,11 @@ bool HTMLForm::MultipartReadBuffer::nextImpl()
else
boundary_hit = startsWith(line, boundary);

if (!line.empty())
/// If we don't make sure that memory is contiguous then situation may happen, when part of the line is inside internal memory
/// and other part is inside sub-buffer, thus we'll be unable to setup our working buffer properly.
in.makeContinuousMemoryFromCheckpointToPos();

in.rollbackToCheckpoint(true);

/// Rolling back to checkpoint may change underlying buffers.
Expand Down

0 comments on commit d81d717

Please sign in to comment.