Skip to content

Commit

Permalink
Merge pull request #14703 from nikitamikhaylov/format-line-as-string
Browse files Browse the repository at this point in the history
Merging #13846 (Format LineAsString)
  • Loading branch information
nikitamikhaylov committed Sep 11, 2020
2 parents 468089a + d0c2599 commit 26742a3
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -390,6 +390,7 @@ void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);

void registerInputFormatProcessorRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory);

FormatFactory::FormatFactory()
Expand Down Expand Up @@ -454,6 +455,7 @@ FormatFactory::FormatFactory()

registerInputFormatProcessorRegexp(*this);
registerInputFormatProcessorJSONAsString(*this);
registerInputFormatProcessorLineAsString(*this);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(*this);
#endif
Expand Down
85 changes: 85 additions & 0 deletions src/Processors/Formats/Impl/LineAsStringRowInputFormat.cpp
@@ -0,0 +1,85 @@
#include <Processors/Formats/Impl/LineAsStringRowInputFormat.h>
#include <Formats/JSONEachRowUtils.h>
#include <common/find_symbols.h>
#include <IO/ReadHelpers.h>

namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}

LineAsStringRowInputFormat::LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) :
IRowInputFormat(header_, in_, std::move(params_)), buf(in)
{
if (header_.columns() > 1 || header_.getDataTypes()[0]->getTypeId() != TypeIndex::String)
{
throw Exception("This input format is only suitable for tables with a single column of type String.", ErrorCodes::INCORRECT_QUERY);
}
}

void LineAsStringRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
buf.reset();
}

void LineAsStringRowInputFormat::readLineObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{buf};
bool newline = true;
bool over = false;

char * pos;

while (newline)
{
pos = find_first_symbols<'\n', '\\'>(buf.position(), buf.buffer().end());
buf.position() = pos;
if (buf.position() == buf.buffer().end())
{
over = true;
break;
}
else if (*buf.position() == '\n')
{
newline = false;
}
else if (*buf.position() == '\\')
{
++buf.position();
if (!buf.eof())
++buf.position();
}
}

buf.makeContinuousMemoryFromCheckpointToPos();
char * end = over ? buf.position(): ++buf.position();
buf.rollbackToCheckpoint();
column.insertData(buf.position(), end - (over ? 0 : 1) - buf.position());
buf.position() = end;
}

bool LineAsStringRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (!buf.eof())
readLineObject(*columns[0]);

return !buf.eof();
}

void registerInputFormatProcessorLineAsString(FormatFactory & factory)
{
factory.registerInputFormatProcessor("LineAsString", [](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<LineAsStringRowInputFormat>(sample, buf, params);
});
}

}
31 changes: 31 additions & 0 deletions src/Processors/Formats/Impl/LineAsStringRowInputFormat.h
@@ -0,0 +1,31 @@
#pragma once

#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>

namespace DB
{

class ReadBuffer;

/// This format parses a sequence of Line objects separated by newlines, spaces and/or comma.
/// Each Line object is parsed as a whole to string.
/// This format can only parse a table with single field of type String.

class LineAsStringRowInputFormat : public IRowInputFormat
{
public:
LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);

bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "LineAsStringRowInputFormat"; }
void resetParser() override;

private:
void readLineObject(IColumn & column);

PeekableReadBuffer buf;
};

}
1 change: 1 addition & 0 deletions src/Processors/ya.make
Expand Up @@ -33,6 +33,7 @@ SRCS(
Formats/Impl/JSONEachRowRowOutputFormat.cpp
Formats/Impl/JSONEachRowWithProgressRowOutputFormat.cpp
Formats/Impl/JSONRowOutputFormat.cpp
Formats/Impl/LineAsStringRowInputFormat.cpp
Formats/Impl/MarkdownRowOutputFormat.cpp
Formats/Impl/MsgPackRowInputFormat.cpp
Formats/Impl/MsgPackRowOutputFormat.cpp
Expand Down
@@ -0,0 +1,8 @@
"id" : 1,
"date" : "01.01.2020",
"string" : "123{{{\\"\\\\",
"array" : [1, 2, 3],

Finally implement this new feature.
42 ClickHouse
42 ClickHouse is a `fast` #open-source# (OLAP) database "management" :system:
31 changes: 31 additions & 0 deletions tests/queries/0_stateless/01460_line_as_string_format.sh
@@ -0,0 +1,31 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS line_as_string1";
$CLICKHOUSE_CLIENT --query="CREATE TABLE line_as_string1(field String) ENGINE = Memory";

echo '"id" : 1,
"date" : "01.01.2020",
"string" : "123{{{\"\\",
"array" : [1, 2, 3],
Finally implement this new feature.' | $CLICKHOUSE_CLIENT --query="INSERT INTO line_as_string1 FORMAT LineAsString";

$CLICKHOUSE_CLIENT --query="SELECT * FROM line_as_string1";
$CLICKHOUSE_CLIENT --query="DROP TABLE line_as_string1"

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS line_as_string2";
$CLICKHOUSE_CLIENT --query="create table line_as_string2(
a UInt64 default 42,
b String materialized toString(a),
c String
) engine=MergeTree() order by tuple();";

$CLICKHOUSE_CLIENT --query="INSERT INTO line_as_string2(c) values ('ClickHouse')";

echo 'ClickHouse is a `fast` #open-source# (OLAP) 'database' "management" :system:' | $CLICKHOUSE_CLIENT --query="INSERT INTO line_as_string2(c) FORMAT LineAsString";

$CLICKHOUSE_CLIENT --query="SELECT * FROM line_as_string2 order by c";
$CLICKHOUSE_CLIENT --query="DROP TABLE line_as_string2"

0 comments on commit 26742a3

Please sign in to comment.