Skip to content

Commit

Permalink
Merge pull request #7377 from azat/INSERT-Distributed-MATERIALIZED-cols
Browse files Browse the repository at this point in the history
* Fix INSERT into Distributed non local node with MATERIALIZED columns

Previous patch e527def ("Fix INSERT
into Distributed() table with MATERIALIZED column") fixes it only for
cases when the node is local, i.e. direct insert.

This patch address the problem when the node is not local
(`is_local == false`), by erasing materialized columns on INSERT into
Distributed.

And this patch fixes two cases, depends on `insert_distributed_sync`
setting:

- `insert_distributed_sync=0`

    ```
    Not found column value in block. There are only columns: date. Stack trace:

    2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27
    3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187
    4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159
    5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61
    6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971
    7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855
    8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406
    9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437
    10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464
    11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257
    ```

- `insert_distributed_sync=1`

    ```
    2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns:
    date Date UInt16(size = 1), value Date UInt16(size = 1)
    date Date UInt16(size = 0): Insertion status:
    Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block)
    Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block)
     (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace:

    2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27
    3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460
    4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467
    5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515
    6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68
    7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280
    <snip>
    ````

Fixes: #7365
Fixes: #5429
Refs: #6891

* Cover INSERT into Distributed with MATERIALIZED columns and !is_local node

I guess that adding new cluster into server-test.xml is not required,
but it won't harm.

* Update DistributedBlockOutputStream.cpp

(cherry picked from commit 29052b6)
  • Loading branch information
akuzm committed Oct 29, 2019
1 parent 5e43161 commit da115dd
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 6 deletions.
16 changes: 15 additions & 1 deletion dbms/programs/server/config.xml
Expand Up @@ -180,7 +180,21 @@
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards_localhost>
</test_cluster_two_shards_localhost>
<test_cluster_two_shards>
<shard>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards>
<test_shard_localhost_secure>
<shard>
<replica>
Expand Down
21 changes: 18 additions & 3 deletions dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp
Expand Up @@ -81,13 +81,28 @@ void DistributedBlockOutputStream::writePrefix()

void DistributedBlockOutputStream::write(const Block & block)
{
Block ordinary_block{ block };

/* They are added by the AddingDefaultBlockOutputStream, and we will get
* different number of columns eventually */
for (const auto & col : storage.getColumns().getMaterialized())
{
if (ordinary_block.has(col.name))
{
ordinary_block.erase(col.name);
LOG_DEBUG(log, storage.getTableName()
<< ": column " + col.name + " will be removed, "
<< "because it is MATERIALIZED");
}
}


if (insert_sync)
writeSync(block);
writeSync(ordinary_block);
else
writeAsync(block);
writeAsync(ordinary_block);
}


void DistributedBlockOutputStream::writeAsync(const Block & block)
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/StorageDistributed.cpp
@@ -1,7 +1,6 @@
#include <Storages/StorageDistributed.h>

#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/materializeBlock.h>

#include <Databases/IDatabase.h>

Expand Down
@@ -1,3 +1,14 @@
insert_distributed_sync=0
2018-08-01
2018-08-01
2018-08-01 2017-08-01
2018-08-01 2017-08-01
2018-08-01
2018-08-01 2017-08-01
insert_distributed_sync=1
2018-08-01
2018-08-01
2018-08-01 2017-08-01
2018-08-01 2017-08-01
2018-08-01
2018-08-01 2017-08-01
@@ -1,15 +1,42 @@
DROP TABLE IF EXISTS local_00952;
DROP TABLE IF EXISTS distributed_00952;

--
-- insert_distributed_sync=0
--
SELECT 'insert_distributed_sync=0';
SET insert_distributed_sync=0;

CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_shard_localhost', currentDatabase(), local_00952, rand());
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());

INSERT INTO distributed_00952 VALUES ('2018-08-01');
SYSTEM FLUSH DISTRIBUTED distributed_00952;

SELECT * FROM distributed_00952;
SELECT date, value FROM distributed_00952;
SELECT * FROM local_00952;
SELECT date, value FROM local_00952;

DROP TABLE distributed_00952;
DROP TABLE local_00952;

--
-- insert_distributed_sync=1
--
SELECT 'insert_distributed_sync=1';
SET insert_distributed_sync=1;

CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());

INSERT INTO distributed_00952 VALUES ('2018-08-01');

SELECT * FROM distributed_00952;
SELECT date, value FROM distributed_00952;
SELECT * FROM local_00952;
SELECT date, value FROM local_00952;

DROP TABLE distributed_00952;
DROP TABLE local_00952;

14 changes: 14 additions & 0 deletions dbms/tests/server-test.xml
Expand Up @@ -75,6 +75,20 @@
</replica>
</shard>
</test_shard_localhost>
<test_cluster_two_shards>
<shard>
<replica>
<host>127.0.0.1</host>
<port>59000</port>
</replica>
</shard>
<shard>
<replica>
<host>127.0.0.2</host>
<port>59000</port>
</replica>
</shard>
</test_cluster_two_shards>
<test_cluster_two_shards_localhost>
<shard>
<replica>
Expand Down

0 comments on commit da115dd

Please sign in to comment.