Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Syntax & Executor for InsertValues #2723

Merged
merged 1 commit into from May 1, 2019

Conversation

@agavra
Copy link
Contributor

commented Apr 23, 2019

Progression

This is the first of a few PRs that implement #2693 (Insert Values), which is split across the following 4 PRs:

  1. Introduce syntax and basic functionality for insert values
  2. Introduce new semantics for CREATE STREAM/CREATE TABLE
  3. Documentation PR (leaving for the end so that nobody uses it until it is more complete)

If those 3 PRs are done before 5.3, we can add more functionality (like non-literal support for INSERT VALUES)

Description

This PR contains the following changes:

  • INSERT INTO ... VALUES syntax in SqlBase.g4 and corresponding parsing/formatting code
  • InsertValuesExecutor to produce data into the underlying topics
  • Some helpful methods in SchemaUtil

Testing done

  • Unit testing

Local Testing:

ksql> SELECT * FROM PAGEVIEWS;
1556037848196 | 1 | Page_23 | 1556037847931 | User_9
^CQuery terminated

ksql> INSERT INTO PAGEVIEWS VALUES ('1', 'Page_19', 123, 'Almog');

ksql> INSERT INTO PAGEVIEWS (ROWKEY, USERID) VALUES ('2', 'Almog');

ksql> INSERT INTO PAGEVIEWS (ROWKEY) VALUES ('2', 'Almog');
Failed to prepare statement: Expected number columns and values to match: [ROWKEY], ['2', 'Almog']

ksql> INSERT INTO PAGEVIEWS VALUES ('2', 'Almog');
Expected a value for each column. Expected Columns: [ROWKEY, PAGEID, VIEWTIME, USERID]. Got ['2', 'Almog']

ksql> SELECT * FROM PAGEVIEWS;
1556037848196 | 1 | Page_23 | 1556037847931 | User_9
1556038380373 | 1 | Page_19 | 123 | Almog
1556038448271 | 2 | null | null | Almog
^CQuery terminated

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra added the enhancement label Apr 23, 2019

@agavra agavra added this to the 5.3 milestone Apr 23, 2019

@agavra agavra requested a review from confluentinc/ksql as a code owner Apr 23, 2019

@spena
Copy link
Member

left a comment

This looks good Almog! I left one question about SQL standard only.

Btw, can we have QTT tests for this new statement?

@agavra agavra requested a review from confluentinc/ksql Apr 24, 2019

@spena

spena approved these changes Apr 24, 2019

Copy link
Member

left a comment

Good. I just left a suggestion if you want to use it. But overall it looks good.

@rodesai rodesai requested a review from confluentinc/ksql Apr 24, 2019

@hjafarpour
Copy link
Member

left a comment

Thanks @agavra , had the initial pass and left a few comments. Will look into it again :)

.fields()
.stream()
.map(Field::name)
.filter(name -> !SchemaUtil.ROWTIME_NAME.equals(name))

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour Apr 25, 2019

Member

I thought we allow setting ROWTIME!

This comment has been minimized.

Copy link
@agavra

agavra Apr 25, 2019

Author Contributor

We do! We just require you to specify it in the schema (we decided that the default schema does not include ROWTIME):

ksql> INSERT INTO PAGEVIEWS (ROWTIME, ROWKEY, USERID) VALUES (123, '2', 'Almog');
ksql> SELECT * FROM PAGEVIEWS;
...
123 | 2 | null | null | Almog

@agavra agavra requested review from rodesai, hjafarpour and confluentinc/ksql Apr 26, 2019

@hjafarpour
Copy link
Member

left a comment

Left a few more comments! :)

.getSource(insertValues.getTarget().getSuffix());

if (dataSource instanceof KsqlStream && ((KsqlStream<?>) dataSource).hasWindowedKey()) {
throw new KsqlException("INSERT VALUES does not support windowed data sources!");

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour Apr 26, 2019

Member

Data source is a bit ambiguous here since it's indeed sink for the statement, maybe change the error message to something like You cannot insert values into stream/table with windowed key!

@hjafarpour
Copy link
Member

left a comment

LGTM with one comment.
Also seems that the order of columns when the schema is given is not strict. It would be good to have a test for this case too!

.field("ROWKEY", Schema.OPTIONAL_INT64_SCHEMA)
.field("COL0", Schema.OPTIONAL_INT64_SCHEMA)
.field("COL1", Schema.OPTIONAL_STRING_SCHEMA)
.build();

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour Apr 26, 2019

Member

Can we add at least one column for each supported primitive type and have tests for them.

@agavra agavra referenced this pull request Apr 29, 2019
0 of 2 tasks complete

@agavra agavra force-pushed the agavra:insert_values branch from 3f92588 to 9af361d Apr 29, 2019

@big-andy-coates
Copy link
Member

left a comment

Thanks @agavra, looking good, but also some nits and:

  1. I think the logic for producing is in the wrong place: it should be in the engine.
  2. We need a retry loop on the produce.
  3. We need better error handling on the execution.
return ARITHMETIC_TYPES.contains(type);
}

public static boolean canUpCast(final Schema.Type expected, final Schema.Type actual) {

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates Apr 30, 2019

Member

Is this being used for some kind of implicit conversion?

If so... conversion from a floating point to integer types? That's a bit lossy! Is this SQL compliant?

(Or is this just existing code refactored?)

This comment has been minimized.

Copy link
@agavra

agavra Apr 30, 2019

Author Contributor

The upcast isn't lossy - it only allows from int -> long -> float -> double, but if it makes you feel more comfortable I can remove this functionality and add it another time. I don't know if it's SQL compliant or not, so perhaps that's the better option to unblock this without sucking time into it.

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates May 1, 2019

Member

Oh yeah - clues in the name up casting :D

Not sure if its SQL compliant either... but I'm now less worried.

@@ -345,7 +345,7 @@ private Integer findMatchingTimestampField(final String timestampFieldName) {
// 5. Aggregate the KStream into a KTable using a custom aggregator that handles Optional.EMPTY
final KsqlTable ksqlTable = (KsqlTable) getStructuredDataSource();

if (ksqlTable.isWindowed()) {
if (ksqlTable.hasWindowedKey()) {

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates Apr 30, 2019

Member

There is a reason why KStream has hasWindowedKey where as KTable has isWindowed. This was at @hjafarpour's request.

A table can be windowed or not windowed. A stream is itself not windowed or windowed. However, if you re-import the topic of a windowed table as a stream, you can tell KSQL that the key of the record is windowed. Hence the key of the stream can be windowed.

It's not pretty... but it is correct.

This comment has been minimized.

Copy link
@agavra

agavra Apr 30, 2019

Author Contributor

Yes. That is why I changed both to be hasWindowedKey since that semantic is true for both of them.

For the sake of moving this PR through, though, I'll just change it back.

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates May 1, 2019

Member

My point was that the semantic is not true for both of them. Talk to @hjafarpour about it.

.getKafkaClientSupplier()
.getProducer(config.getProducerClientConfigProps());

producer.send(

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates Apr 30, 2019

Member

KStreams will retry certain failure types. It has config to configure the number / delay of retries. We should consider adding retries here too and potentially picking up the streams config to control it.

This comment has been minimized.

Copy link
@agavra

agavra Apr 30, 2019

Author Contributor

There's a producer config retries. If the user wants to, they should be able to set this in the producer config :)

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionall that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

This comment has been minimized.

Copy link
@agavra

agavra May 1, 2019

Author Contributor

Or better yet, they set delivery.timeout.ms

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates May 1, 2019

Member

Fairy-snuff. :D . - we should document this!

This comment has been minimized.

Copy link
@agavra

agavra May 1, 2019

Author Contributor

I will have a big-*** documentation PR at the end of this INSERT INTO stuff, I will make sure to include that there

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;

public class InsertValuesExecutor {

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates Apr 30, 2019

Member

It feels to me that the logic for doing the production of messages to Kafka should be in the engine, not the rest-api. This executor should call the engine with the command.

This will also mean we get headless for free...

This comment has been minimized.

Copy link
@agavra

agavra Apr 30, 2019

Author Contributor

This can be done in a follow-up PR, but there isn't a good place as of today to put it in the engine that I know of. This definitely isn't an ExecutableDdlStatement and it doesn't create a persistent query. Furthermore, I don't want this to be persisted onto the command topic and after #2712 there won't be any unreplicated statements that the engine executes, so this would be another edge case we need to handle.

This will also mean we get headless for free...

I know we disagree on this, but this is something that I actively don't want in V1 so for me that's not a reason to make this change. When we want to move this to headless mode, we need to give it more thought and if we move it now we can't take it away.

This comment has been minimized.

Copy link
@big-andy-coates

big-andy-coates May 1, 2019

Member

Hummm... I still feel this should be in the engine! However, happy for this to be a follow up task as long as its not ignored.

@agavra agavra force-pushed the agavra:insert_values branch 2 times, most recently from 150979d to f12d562 Apr 30, 2019

@agavra agavra force-pushed the agavra:insert_values branch from 51189db to 649f0c7 May 1, 2019

@agavra agavra merged commit 97fc956 into confluentinc:master May 1, 2019

1 check passed

continuous-integration/jenkins/pr-merge This commit looks good
Details

@agavra agavra deleted the agavra:insert_values branch May 1, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.