Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CreateStream/Table will now create topics if they do not exist #2771

Merged
merged 9 commits into from May 6, 2019

Conversation

@agavra
Copy link
Contributor

commented May 2, 2019

Description

This PR completes the developer testing dream: you may now optionally create topics via CREATE STREAM/TABLE!

Minor Changes:

  • Rename AbstractCreateStreamStatement to CreateSource because verbosity
  • Use the passed in replicationFactor to validate a topic in KafkaTopicClientImpl, not the resolved one
  • Added full documentation for INSERT INTO suite (more to come with examples soon!)
  • Added config to enable/disable INSERT INTO functionality

Testing done

  • Unit Tests
  • The below test (fubar did not exist before this command):
ksql> CREATE STREAM fubar (FOO VARCHAR, BAR INT) WITH(kafka_topic='fubar', partitions=1, value_format='json');
ksql> INSERT INTO fubar VALUES ('key', 'foo', 123);
ksql> SELECT * FROM fubar;
1556753838733 | key | foo | 123

// test supply wrong partitions experience
ksql> CREATE STREAM fubar (FOO VARCHAR, BAR INT) WITH(kafka_topic='fubar', partitions=10, value_format='json');
A Kafka topic with the name 'fubar' already exists, with different partition/replica configuration than required. KSQL expects 10 partitions (topic has 1), and 1 replication factor (topic has 1).

// test create without partitions experience
ksql> CREATE STREAM fubar (FOO VARCHAR, BAR INT) WITH(kafka_topic='fubar2', value_format='json');
Topic 'fubar2' does not exist. If you want to create a new topic for the stream/table please re-run the statement providing th required 'PARTITIONS' configuration in the WITH clause (and optionally 'REPLICAS'). For example: CREATE STREAM FUBAR (FOO STRING, BAR INTEGER) WITH (REPLICAS=1, PARTITIONS=2, KAFKA_TOPIC='fubar2', VALUE_FORMAT='json');

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra added this to the 5.3 milestone May 2, 2019

@agavra agavra requested review from JimGalasyn and confluentinc/ksql as code owners May 2, 2019

explicitly specified values. The first ``column_name`` of every schema is ``ROWKEY``, which
defines the corresponding kafka key - if the source specifies a ``key`` and that column is present
in the column names for this insert statement, the values are expected to match, otherwise the
value will be duplicated into the value (or conversely from the value into the ``ROWKEY``).

This comment has been minimized.

Copy link
@JimGalasyn

JimGalasyn May 2, 2019

Member

"the ROWKEY value will be duplicated"?

This comment has been minimized.

Copy link
@agavra

agavra May 2, 2019

Author Contributor

I meant that the contents of the ROWKEY will be copied into the value for the key column. Any suggestions in phrasing?

This comment has been minimized.

Copy link
@agavra

agavra May 2, 2019

Author Contributor

Maybe:

If the source specifies a key and that column is present in the column names for this INSERT statement then the values are expected to match, otherwise the value from ROWKEY will be copied into the value of the key column (or conversely from the key column into the ROWKEY column).

@JimGalasyn
Copy link
Member

left a comment

LGTM, great feature! Just a few suggestions.

@hjafarpour
Copy link
Member

left a comment

LGTM with a few comments.

docs/developer-guide/syntax-reference.rst Outdated Show resolved Hide resolved
docs/developer-guide/syntax-reference.rst Outdated Show resolved Hide resolved
final Map<String, Literal> exampleProps = new HashMap<>(createSource.getProperties());
exampleProps.put(KsqlConstants.SINK_NUMBER_OF_PARTITIONS, new IntegerLiteral(2));
exampleProps.putIfAbsent(KsqlConstants.SINK_NUMBER_OF_REPLICAS, new IntegerLiteral(1));
final CreateSource example = createSource.copyWith(createSource.getElements(), exampleProps);

This comment has been minimized.

Copy link
@hjafarpour

hjafarpour May 2, 2019

Member

Hmm, interesting way of creating error message!

This comment has been minimized.

Copy link
@agavra

agavra May 3, 2019

Author Contributor

@big-andy-coates' suggestion on the KLIP :)

@agavra agavra requested a review from confluentinc/ksql May 3, 2019

agavra added some commits May 2, 2019

@@ -395,6 +397,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
ConfigDef.Importance.LOW,
"Enable the security manager for UDFs. Default is true and will stop UDFs from"
+ " calling System.exit or executing processes"
).define(

This comment has been minimized.

Copy link
@rodesai

rodesai May 6, 2019

Contributor

Why do we want a config for this? Sort of a coarse access control mechanism?

This comment has been minimized.

Copy link
@agavra

agavra May 6, 2019

Author Contributor

Yes - if you are deploying this in production in interactive mode you may want to disable the ability for users to arbitrarily insert into topics (the security story around this isn't great as it stands, though with the changes being made to support custom security plugins this might get better)

@rodesai

rodesai approved these changes May 6, 2019

Copy link
Contributor

left a comment

LGTM

@agavra agavra merged commit 59c083a into confluentinc:master May 6, 2019

1 check passed

continuous-integration/jenkins/pr-merge This commit looks good
Details

@agavra agavra deleted the agavra:create_topics branch May 6, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.