Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gzip read/write to file/s3/url/hdfs #7840

Merged
merged 10 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbms/programs/server/HTTPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,16 +407,16 @@ void HTTPHandler::processQuery(
{
if (http_request_compression_method_str == "gzip")
{
in_post = std::make_unique<ZlibInflatingReadBuffer>(*in_post_raw, CompressionMethod::Gzip);
in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Gzip);
}
else if (http_request_compression_method_str == "deflate")
{
in_post = std::make_unique<ZlibInflatingReadBuffer>(*in_post_raw, CompressionMethod::Zlib);
in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Zlib);
}
#if USE_BROTLI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use std::move for BrotliReadBuffer ? And Is it really necessary to move in_post_raw inside constructor?

Copy link
Contributor Author

@apbodrov apbodrov Nov 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use std::move for BrotliReadBuffer ?

Its mistake, fixed.

And Is it really necessary to move in_post_raw inside constructor?

Yes, we need it because in_post_raw it is local pointer.

else if (http_request_compression_method_str == "br")
{
in_post = std::make_unique<BrotliReadBuffer>(*in_post_raw);
in_post = std::make_unique<BrotliReadBuffer>(std::move(in_post_raw));
}
#endif
else
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/IO/BrotliReadBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class BrotliReadBuffer::BrotliStateWrapper
BrotliDecoderResult result;
};

BrotliReadBuffer::BrotliReadBuffer(ReadBuffer &in_, size_t buf_size, char *existing_memory, size_t alignment)
BrotliReadBuffer::BrotliReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t buf_size, char *existing_memory, size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(in_)
, in(std::move(in_))
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
Expand All @@ -55,12 +55,12 @@ bool BrotliReadBuffer::nextImpl()

if (!in_available)
{
in.nextIfAtEnd();
in_available = in.buffer().end() - in.position();
in_data = reinterpret_cast<uint8_t *>(in.position());
in->nextIfAtEnd();
in_available = in->buffer().end() - in->position();
in_data = reinterpret_cast<uint8_t *>(in->position());
}

if (brotli->result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && (!in_available || in.eof()))
if (brotli->result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && (!in_available || in->eof()))
{
throw Exception("brotli decode error", ErrorCodes::BROTLI_READ_FAILED);
}
Expand All @@ -70,12 +70,12 @@ bool BrotliReadBuffer::nextImpl()

brotli->result = BrotliDecoderDecompressStream(brotli->state, &in_available, &in_data, &out_capacity, &out_data, nullptr);

in.position() = in.buffer().end() - in_available;
in->position() = in->buffer().end() - in_available;
working_buffer.resize(internal_buffer.size() - out_capacity);

if (brotli->result == BROTLI_DECODER_RESULT_SUCCESS)
{
if (in.eof())
if (in->eof())
{
eof = true;
return working_buffer.size() != 0;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/BrotliReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class BrotliReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
BrotliReadBuffer(
ReadBuffer & in_,
std::unique_ptr<ReadBuffer> in_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
Expand All @@ -21,7 +21,7 @@ class BrotliReadBuffer : public BufferWithOwnMemory<ReadBuffer>
private:
bool nextImpl() override;

ReadBuffer & in;
std::unique_ptr<ReadBuffer> in;

class BrotliStateWrapper;
std::unique_ptr<BrotliStateWrapper> brotli;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/IO/CompressionMethod.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum class CompressionMethod
/// This option corresponds to HTTP Content-Encoding: deflate.
Zlib,
Brotli,
None
};

}
13 changes: 13 additions & 0 deletions dbms/src/IO/ReadHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

#include <Formats/FormatSettings.h>

#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/VarInt.h>
#include <IO/ZlibInflatingReadBuffer.h>

#ifdef __clang__
#pragma clang diagnostic push
Expand Down Expand Up @@ -911,4 +913,15 @@ void skipToNextLineOrEOF(ReadBuffer & buf);
/// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences.
void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);

template <class TReadBuffer, class... Types>
std::unique_ptr<ReadBuffer> getReadBuffer(const DB::CompressionMethod method, Types&&... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto read_buf = std::make_unique<TReadBuffer>(std::forward<Types>(args)...);
return std::make_unique<ZlibInflatingReadBuffer>(std::move(read_buf), method);
}
return std::make_unique<TReadBuffer>(args...);
}

}
3 changes: 3 additions & 0 deletions dbms/src/IO/WriteBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class WriteBuffer : public BufferBase
++pos;
}

virtual void sync() {}
virtual void finalize() {}

private:
/** Write the data in the buffer (from the beginning of the buffer to the current position).
* Throw an exception if something is wrong.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/IO/WriteBufferAIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class WriteBufferAIO : public WriteBufferFromFileBase
/// Prepare an asynchronous request.
void prepare();
///
void finalize();
void finalize() override;

private:
/// Buffer for asynchronous data writes.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/IO/WriteBufferFromHDFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class WriteBufferFromHDFS : public BufferWithOwnMemory<WriteBuffer>

~WriteBufferFromHDFS() override;

void sync();
void sync() override;
};
}
#endif
12 changes: 6 additions & 6 deletions dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response.set("Content-Encoding", "gzip");
response_body_ostr = &(response.send());
#endif
out_raw.emplace(*response_body_ostr);
deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
else if (compression_method == CompressionMethod::Zlib)
Expand All @@ -125,8 +125,8 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response.set("Content-Encoding", "deflate");
response_body_ostr = &(response.send());
#endif
out_raw.emplace(*response_body_ostr);
deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
#if USE_BROTLI
Expand All @@ -138,7 +138,7 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response.set("Content-Encoding", "br");
response_body_ostr = &(response.send());
#endif
out_raw.emplace(*response_body_ostr);
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin());
out = &*brotli_buf;
}
Expand All @@ -155,7 +155,7 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response_body_ostr = &(response.send());
#endif

out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &*out_raw;
}
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/WriteBufferFromHTTPServerResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory<WriteBuffer
std::ostream * response_header_ostr = nullptr;
#endif

std::optional<WriteBufferFromOStream> out_raw;
std::unique_ptr<WriteBufferFromOStream> out_raw;
std::optional<ZlibDeflatingWriteBuffer> deflating_buf;
#if USE_BROTLI
std::optional<BrotliWriteBuffer> brotli_buf;
Expand Down Expand Up @@ -109,7 +109,7 @@ class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory<WriteBuffer
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
/// to change response HTTP code.
/// This method is idempotent.
void finalize();
void finalize() override;

/// Turn compression on or off.
/// The setting has any effect only if HTTP headers haven't been sent yet.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/IO/WriteBufferFromS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class WriteBufferFromS3 : public BufferWithOwnMemory<WriteBuffer>
void nextImpl() override;

/// Receives response from the server after sending all data.
void finalize();
void finalize() override;

~WriteBufferFromS3() override;

Expand Down
14 changes: 14 additions & 0 deletions dbms/src/IO/WriteHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
#include <Common/UInt128.h>
#include <Common/intExp.h>

#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteIntText.h>
#include <IO/VarInt.h>
#include <IO/DoubleConverter.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ZlibDeflatingWriteBuffer.h>

#include <Formats/FormatSettings.h>

Expand Down Expand Up @@ -905,4 +907,16 @@ inline String toString(const T & x)
writeText(x, buf);
return buf.str();
}

template <class TWriteBuffer, class... Types>
std::unique_ptr<WriteBuffer> getWriteBuffer(const DB::CompressionMethod method, Types&&... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto write_buf = std::make_unique<TWriteBuffer>(std::forward<Types>(args)...);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(write_buf), method, 1 /* compression level */);
}
return std::make_unique<TWriteBuffer>(args...);
}

}
24 changes: 12 additions & 12 deletions dbms/src/IO/ZlibDeflatingWriteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ namespace DB
{

ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
WriteBuffer & out_,
std::unique_ptr<WriteBuffer> out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, out(out_)
, out(std::move(out_))
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
Expand Down Expand Up @@ -64,18 +64,18 @@ void ZlibDeflatingWriteBuffer::nextImpl()

do
{
out.nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out.position());
zstr.avail_out = out.buffer().end() - out.position();
out->nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out->position());
zstr.avail_out = out->buffer().end() - out->position();

int rc = deflate(&zstr, Z_NO_FLUSH);
out.position() = out.buffer().end() - zstr.avail_out;
out->position() = out->buffer().end() - zstr.avail_out;

// Unpoison the result of deflate explicitly. It uses some custom SSE algo
// for computing CRC32, and it looks like msan is unable to comprehend
// it fully, so it complains about the resulting value depending on the
// uninitialized padding of the input buffer.
__msan_unpoison(out.position(), zstr.avail_out);
__msan_unpoison(out->position(), zstr.avail_out);

if (rc != Z_OK)
throw Exception(std::string("deflate failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
Expand All @@ -92,18 +92,18 @@ void ZlibDeflatingWriteBuffer::finish()

while (true)
{
out.nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out.position());
zstr.avail_out = out.buffer().end() - out.position();
out->nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out->position());
zstr.avail_out = out->buffer().end() - out->position();

int rc = deflate(&zstr, Z_FINISH);
out.position() = out.buffer().end() - zstr.avail_out;
out->position() = out->buffer().end() - zstr.avail_out;

// Unpoison the result of deflate explicitly. It uses some custom SSE algo
// for computing CRC32, and it looks like msan is unable to comprehend
// it fully, so it complains about the resulting value depending on the
// uninitialized padding of the input buffer.
__msan_unpoison(out.position(), zstr.avail_out);
__msan_unpoison(out->position(), zstr.avail_out);

if (rc == Z_STREAM_END)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/ZlibDeflatingWriteBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ZlibDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
ZlibDeflatingWriteBuffer(
WriteBuffer & out_,
std::unique_ptr<WriteBuffer> out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
Expand All @@ -37,7 +37,7 @@ class ZlibDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
private:
void nextImpl() override;

WriteBuffer & out;
std::unique_ptr<WriteBuffer> out;
z_stream zstr;
bool finished = false;
};
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/IO/ZlibInflatingReadBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ namespace DB
{

ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
ReadBuffer & in_,
std::unique_ptr<ReadBuffer> in_,
CompressionMethod compression_method,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(in_)
, in(std::move(in_))
, eof(false)
{
zstr.zalloc = nullptr;
Expand Down Expand Up @@ -49,21 +49,21 @@ bool ZlibInflatingReadBuffer::nextImpl()

if (!zstr.avail_in)
{
in.nextIfAtEnd();
zstr.next_in = reinterpret_cast<unsigned char *>(in.position());
zstr.avail_in = in.buffer().end() - in.position();
in->nextIfAtEnd();
zstr.next_in = reinterpret_cast<unsigned char *>(in->position());
zstr.avail_in = in->buffer().end() - in->position();
}
zstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
zstr.avail_out = internal_buffer.size();

int rc = inflate(&zstr, Z_NO_FLUSH);

in.position() = in.buffer().end() - zstr.avail_in;
in->position() = in->buffer().end() - zstr.avail_in;
working_buffer.resize(internal_buffer.size() - zstr.avail_out);

if (rc == Z_STREAM_END)
{
if (in.eof())
if (in->eof())
{
eof = true;
return working_buffer.size() != 0;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/ZlibInflatingReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ZlibInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
ZlibInflatingReadBuffer(
ReadBuffer & in_,
std::unique_ptr<ReadBuffer> in_,
CompressionMethod compression_method,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
Expand All @@ -32,7 +32,7 @@ class ZlibInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
private:
bool nextImpl() override;

ReadBuffer & in;
std::unique_ptr<ReadBuffer> in;
z_stream zstr;
bool eof;
};
Expand Down