Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INSERT INTO silently fails if targeting a stream not created with CSAS #2146

Closed
rmoff opened this issue Nov 13, 2018 · 5 comments
Closed

INSERT INTO silently fails if targeting a stream not created with CSAS #2146

rmoff opened this issue Nov 13, 2018 · 5 comments
Assignees
Labels

Comments

@rmoff
Copy link
Contributor

rmoff commented Nov 13, 2018

Raised by user here: https://stackoverflow.com/questions/53226781/ksql-insert-into-a-stream-yields-no-data

tl;dr unless the target stream for INSERT INTO is a stream created by CSAS (i.e. it's instead created against an existing topic), then despite saying it's running and showing as processing messages, none are written to the target stream/topic.


For INSERT INTO to work it needs to be targetting a stream created with CREATE STREAM target AS SELECT ("CSAS"), NOT a stream simply registered against an existing topic. This is a bug because it silently fails and does not tell the user (and even says that it has processed messages).

Let's work it through. Here I'm using this docker-compose for a test setup.

Populate some dummy data:

docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t events -P <<EOF
{"eventType":"1", "eventTime" :"2018-11-13-06:34:57", "sourceHostName":"asgard"}
{"eventType":"2", "eventTime" :"2018-11-13-06:35:57", "sourceHostName":"asgard"}
{"eventType":"MatchingValue", "eventTime" :"2018-11-13-06:35:58", "sourceHostName":"asgard"}
EOF

Register the source topic with KSQL:

CREATE STREAM event_stream (eventType varchar, eventTime varchar, sourceHostName varchar) WITH (kafka_topic='events', value_format='json');

Query the stream:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM event_stream;
1542091084660 | null | 1 | 2018-11-13-06:34:57 | asgard
1542091084660 | null | 2 | 2018-11-13-06:35:57 | asgard
1542091785207 | null | MatchingValue | 2018-11-13-06:35:58 | asgard

Create the target topic:

$ docker-compose exec kafka bash -c "kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 4 --topic output_events"

And then creating the stream against this topic:

ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');

Message
----------------
Stream created
----------------

Note that it says Stream created, rather than Stream created and running

Now let's run the INSERT INTO:

ksql> INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

Message
-------------------------------
Insert Into query is running.
-------------------------------

The DESCRIBE EXTENDED output does indeed show, as you have seen, messages being processed:

ksql> DESCRIBE EXTENDED output_stream;

Name                 : OUTPUT_STREAM
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : output_events (partitions: 4, replication: 1)

Field          | Type
--------------------------------------------
ROWTIME        | BIGINT           (system)
ROWKEY         | VARCHAR(STRING)  (system)
EVENTTIME      | VARCHAR(STRING)
EXTRACOLUMN    | VARCHAR(STRING)
SOURCEHOSTNAME | VARCHAR(STRING)
--------------------------------------------

Queries that write into this STREAM
-----------------------------------
InsertQuery_0 : INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.01   total-messages:         1     last-message: 11/13/18 6:49:46 AM UTC
failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)

But the topic itself has no messages:

ksql> print 'output_events' from beginning;
^C

nor the KSQL Stream:

ksql> SELECT * FROM OUTPUT_STREAM;
^CQuery terminated

So the INSERT INTO command is designed to run against an existing CSAS/CTAS target stream, rather than a source STREAM registered against an existing topic.

Let's try it that way instead. First, we need to drop the existing stream definition, and to do that also terminate the INSERT INTO query:

ksql> DROP STREAM OUTPUT_STREAM;
Cannot drop OUTPUT_STREAM.
The following queries read from this source: [].
The following queries write into this source: [InsertQuery_0].
You need to terminate them before dropping OUTPUT_STREAM.
ksql> TERMINATE InsertQuery_0;

Message
-------------------
Query terminated.
-------------------
ksql> DROP STREAM OUTPUT_STREAM;

Message
------------------------------------
Source OUTPUT_STREAM was dropped.
------------------------------------

Now create the target stream:

ksql> CREATE STREAM output_stream WITH (kafka_topic='output_events') AS SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

Message
----------------------------
Stream created and running
----------------------------

Note that in creating the stream it is also running (vs before it was just created). Now query the stream:

ksql> SELECT * FROM OUTPUT_STREAM;
1542091785207 | null | 2018-11-13-06:35:58 | Extra info | asgard

and check the underlying topic too:

ksql> PRINT 'output_events' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542091785207,"ROWKEY":"null","EVENTTIME":"2018-11-13-06:35:58","EXTRACOLUMN":"Extra info","SOURCEHOSTNAME":"asgard"}

So, you have hit a bug in KSQL, but one that fortunately can be avoided by using a simpler KSQL syntax entirely, combining your CREATE STREAM and INSERT INTO queries into one.

@rodesai
Copy link
Contributor

rodesai commented Nov 13, 2018

@rmoff - you're basically saying that INSERT INTO doesn't work at all? In your second example of a working query, you don't issue an INSERT INTO at all - only a CSAS

@miguno
Copy link
Contributor

miguno commented Nov 13, 2018

  • My first reaction is that, from a UX perspective, INSERT INTO should work the same for both CS as well as CSAS created streams. This would also help to avoid confusion whether you have used CS vs. CSAS to create a stream.
  • Alternatively, if INSERT INTO should not work for CS created streams for some reason, then at least we should prevent you from doing the INSERT and give you a descriptive error message.

@rodesai
Copy link
Contributor

rodesai commented Nov 13, 2018

INSERT INTO should work the same, whether the sink was created via a CSAS or a CS statement.

@rodesai
Copy link
Contributor

rodesai commented Nov 13, 2018

This is a bug in how we choose the kafka topic name to write into. Each topic in the metastore has 2 names, a "ksql name", and a "kafka name". The "kafka name" is the name of the topic in Kafka. The "KSQL name" is an internal name for the topic (that happens to be a case-insensitive version of the "kafka name"). For sources created with CS, the KSQL name for the topic backing the data source uses upper-case. For sources created with CSAS, it's lower-case. When telling streams which topic to write into, KSQL is using the KSQL name. So if the sink was created with CS (as with insert-into), we use the wrong topic name. The fix is to use the kafka name.

@rmoff - what happens when you run print 'OUTPUT_STREAM' from beginning;?

@hjafarpour
Copy link
Contributor

This was a regression in 5.1 which is fixed by #2149 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants