From ab50c55c190cd699c137268261584ec72a37580e Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Sun, 15 Mar 2020 11:27:00 -0400 Subject: [PATCH] coord: fix propagation of negative multiplicities error The coordinator was inadvertently converting this error into a PeekResponse::Canceled. Looks to be a missed code path for #1648. Fix #2128. Fix #2314. --- src/coord/coord.rs | 10 ++-- test/testdrive/negative-multiplicities.td | 59 +++++++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 test/testdrive/negative-multiplicities.td diff --git a/src/coord/coord.rs b/src/coord/coord.rs index d83c14dc64f6..b92aaad79ed5 100644 --- a/src/coord/coord.rs +++ b/src/coord/coord.rs @@ -1054,10 +1054,14 @@ where match (memo, resp) { (PeekResponse::Rows(mut memo), PeekResponse::Rows(rows)) => { memo.extend(rows); - let out: Result<_, comm::Error> = Ok(PeekResponse::Rows(memo)); - future::ready(out) + future::ok(PeekResponse::Rows(memo)) + } + (PeekResponse::Error(e), _) | (_, PeekResponse::Error(e)) => { + future::ok(PeekResponse::Error(e)) + } + (PeekResponse::Canceled, _) | (_, PeekResponse::Canceled) => { + future::ok(PeekResponse::Canceled) } - _ => future::ok(PeekResponse::Canceled), } }) .map_ok(move |mut resp| { diff --git a/test/testdrive/negative-multiplicities.td b/test/testdrive/negative-multiplicities.td new file mode 100644 index 000000000000..c30a32a8191d --- /dev/null +++ b/test/testdrive/negative-multiplicities.td @@ -0,0 +1,59 @@ +# Copyright Materialize, Inc. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set schema={ + "type": "record", + "name": "envelope", + "fields": [ + { + "name": "before", + "type": [ + { + "name": "row", + "type": "record", + "fields": [ + {"name": "a", "type": "long"} + ] + }, + "null" + ] + }, + { "name": "after", "type": ["row", "null"] } + ] + } + +$ kafka-ingest format=avro topic=data schema=${schema} timestamp=1 +{"before": null, "after": {"a": 1}} + +> CREATE MATERIALIZED SOURCE data + FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}' + FORMAT AVRO USING SCHEMA '${schema}' + ENVELOPE DEBEZIUM + +> SELECT * FROM data +1 + +$ kafka-ingest format=avro topic=data schema=${schema} timestamp=1 +{"before": {"a": 1}, "after": null} +{"before": {"a": 1}, "after": null} + +> SELECT count(*) FROM data +-1 + +! SELECT * FROM data +Negative multiplicity: -1 + +$ kafka-ingest format=avro topic=data schema=${schema} timestamp=1 +{"before": {"a": 1}, "after": null} + +> SELECT count(*) FROM data +-2 + +! SELECT * FROM data +Negative multiplicity: -2