Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for insertion into Kafka tables #6012

Merged
merged 20 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \
M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite.") \
M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \
M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from streaming storages.") \
M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.") \
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
Expand Down
45 changes: 0 additions & 45 deletions dbms/src/Formats/BlockOutputStreamFromRowOutputStream.cpp

This file was deleted.

38 changes: 0 additions & 38 deletions dbms/src/Formats/BlockOutputStreamFromRowOutputStream.h

This file was deleted.

14 changes: 8 additions & 6 deletions dbms/src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ BlockInputStreamPtr FormatFactory::getInput(
}


BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const
BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
{
if (name == "PrettyCompactMonoBlock")
{
Expand All @@ -124,14 +125,14 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings);

/** Materialization is needed, because formats can use the functions `IDataType`,
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, context, format_settings), sample);
output_getter(buf, sample, context, callback, format_settings), sample);
}

auto format = getOutputFormat(name, buf, sample, context);
auto format = getOutputFormat(name, buf, sample, context, callback);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}

Expand Down Expand Up @@ -165,7 +166,8 @@ InputFormatPtr FormatFactory::getInputFormat(
}


OutputFormatPtr FormatFactory::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const
OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
Expand All @@ -177,7 +179,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(const String & name, WriteBuffer
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return output_getter(buf, sample, context, format_settings);
return output_getter(buf, sample, context, callback, format_settings);
}


Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class FormatFactory final : public ext::singleton<FormatFactory>
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;

/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void()>;

private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
Expand All @@ -55,6 +59,7 @@ class FormatFactory final : public ext::singleton<FormatFactory>
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;

using InputProcessorCreator = std::function<InputFormatPtr(
Expand All @@ -68,6 +73,7 @@ class FormatFactory final : public ext::singleton<FormatFactory>
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;

struct Creators
Expand All @@ -91,7 +97,7 @@ class FormatFactory final : public ext::singleton<FormatFactory>
ReadCallback callback = {}) const;

BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;
const Block & sample, const Context & context, WriteCallback callback = {}) const;

InputFormatPtr getInputFormat(
const String & name,
Expand All @@ -102,8 +108,8 @@ class FormatFactory final : public ext::singleton<FormatFactory>
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;

OutputFormatPtr getOutputFormat(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const;

/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Formats/NativeFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void registerOutputFormatNative(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Formats/NullFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ void registerOutputFormatNull(FormatFactory & factory)
WriteBuffer &,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
return std::make_shared<NullBlockOutputStream>(sample);
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Formats/tests/block_row_transforms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>

#include <DataStreams/copyData.h>
#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
Expand Down Expand Up @@ -47,7 +46,7 @@ try

RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, format_settings));
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, []{}, format_settings));

copyData(block_input, *block_output);
}
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Formats/tests/tab_separated_streams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>

#include <DataStreams/copyData.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
Expand Down Expand Up @@ -44,7 +43,7 @@ try
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);

BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, format_settings));
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [] {}, format_settings));

copyData(block_input, *block_output);
return 0;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/IO/MemoryReadWriteBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace DB
{

/// Stores data in memory chunks, size of cunks are exponentially increasing during write
/// Stores data in memory chunks, size of chunks are exponentially increasing during write
/// Written data could be reread after write
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/IO/WriteBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class WriteBuffer : public BufferBase
*/
inline void next()
{
if (!offset())
if (!offset() && available())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abyss7 can you elaborate this hunk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that I wanted to prevent miscalling next() if there is still available space in buffer - it's permissive, so no penalties for logical error in the code. But maybe there should be assert(!available()). Does this break anything?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this break anything?

I have't any example, but is this strictly correct?
AFAICS right now destructor tries to send the data, so after this change it will send only if the buffer is full, sounds icky

I came to this while working on #19451
And quite frankly to me sending some data from destructor looks not safe anyway, and the goal is to replace destructor final flush with assert that will ensure that there is no bytes pending in buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!offset() && available())
    return;

This is strange.

The following code means: if there is no data to write - ignore the next call:

if (!offset())
    return;

This is how it was and it is 100% correct.

But available() means - if the buffer is not filled up completely.
Adding this condition with "and" is bogus - because if the buffer size is greater than zero, and !offset() - at least one byte is available.

Even if you meant this:

if (!offset() || available())
    return;

it also does not make sense, because flushing half-filled buffer is completely legit and is used extensively.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let's try to revert this hunk - #19886, and see, maybe CI will find something interesting.

return;
bytes += offset();

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Processors/Formats/IRowOutputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
first_row = false;

write(columns, row);

if (write_single_row_callback)
write_single_row_callback();
}
}

Expand Down Expand Up @@ -96,6 +99,3 @@ void IRowOutputFormat::writeTotals(const DB::Columns & columns, size_t row_num)
}

}



13 changes: 8 additions & 5 deletions dbms/src/Processors/Formats/IRowOutputFormat.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#pragma once

#include <string>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>

#include <string>


namespace DB
{
Expand All @@ -22,8 +24,8 @@ class IRowOutputFormat : public IOutputFormat
void finalize() override;

public:
IRowOutputFormat(const Block & header, WriteBuffer & out_)
: IOutputFormat(header, out_), types(header.getDataTypes())
IRowOutputFormat(const Block & header, WriteBuffer & out_, FormatFactory::WriteCallback callback)
: IOutputFormat(header, out_), types(header.getDataTypes()), write_single_row_callback(callback)
{
}

Expand Down Expand Up @@ -57,6 +59,9 @@ class IRowOutputFormat : public IOutputFormat
bool prefix_written = false;
bool suffix_written = false;

// Callback used to indicate that another row is written.
FormatFactory::WriteCallback write_single_row_callback;

void writePrefixIfNot()
{
if (!prefix_written)
Expand All @@ -76,5 +81,3 @@ class IRowOutputFormat : public IOutputFormat
};

}


10 changes: 6 additions & 4 deletions dbms/src/Processors/Formats/Impl/BinaryRowOutputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
namespace DB
{

BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_)
: IRowOutputFormat(header, out_), with_names(with_names_), with_types(with_types_)
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header, out_, callback), with_names(with_names_), with_types(with_types_)
{
}

Expand Down Expand Up @@ -53,18 +53,20 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, callback);
});

factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, callback);
});
}

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Processors/Formats/Impl/BinaryRowOutputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat: public IRowOutputFormat
{
public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_);
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback);

String getName() const override { return "BinaryRowOutputFormat"; }

Expand All @@ -32,4 +32,3 @@ class BinaryRowOutputFormat: public IRowOutputFormat
};

}

7 changes: 4 additions & 3 deletions dbms/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ namespace DB
{


CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), format_settings(format_settings_)
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), format_settings(format_settings_)
{
auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
Expand Down Expand Up @@ -77,9 +77,10 @@ void registerOutputFormatProcessorCSV(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, format_settings);
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, callback, format_settings);
});
}
}
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Processors/Formats/Impl/CSVRowOutputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class CSVRowOutputFormat : public IRowOutputFormat
/** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types
*/
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const FormatSettings & format_settings_);
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);

String getName() const override { return "CSVRowOutputFormat"; }

Expand All @@ -45,4 +45,3 @@ class CSVRowOutputFormat : public IRowOutputFormat
};

}