Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): support (non-streaming) insert into in Java client #5448

Merged
merged 9 commits into from
May 22, 2020

Conversation

vcrfxia
Copy link
Contributor

@vcrfxia vcrfxia commented May 20, 2020

Description

This PR adds support for non-streaming inserts to the Java client.

Also a couple miscellaneous fixes:

  • KsqlArray and KsqlObject now properly handle insertion of BigDecimal and structured types
  • InsertsStreamHandler now returns 400 on bad statements, rather than 500

Testing done

Added unit and integration tests.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@vcrfxia vcrfxia requested a review from a team as a code owner May 20, 2020 23:54
throw new KsqlException("Cannot insert values into an unknown stream: "
+ sourceName);
throw new KsqlApiException(
"Cannot insert values into an unknown stream: " + sourceName, ERROR_CODE_BAD_STATEMENT);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be ERROR_CODE_NOT_FOUND instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think BAD_STATEMENT is ok. NOT _FOUND is more if the uri doesn't exist.

@vcrfxia vcrfxia requested a review from purplefox May 20, 2020 23:58
Copy link
Contributor

@purplefox purplefox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good... One thing - I don't think we should have the insert into that takes a List, if you remove that, it makes things simpler :)

@@ -282,7 +282,7 @@ public KsqlArray add(final Boolean value) {
* @return a reference to this
*/
public KsqlArray add(final BigDecimal value) {
delegate.add(value);
delegate.add(value.doubleValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we converting to double here? Surely this will lose precision in some cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Vert.x JSON types don't accept BigDecimal. For example,

new JsonArray().add(new BigDecimal(11.1));

results in

java.lang.IllegalStateException: Illegal type in JsonObject: class java.math.BigDecimal

	at io.vertx.core.json.JsonObject.checkAndCopy(JsonObject.java:1072)
	at io.vertx.core.json.JsonArray.add(JsonArray.java:446)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the BigDecimal is added as a String? Will the row get properly deserialized on the server before inserting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure the answer is no but I'll answer more definitely in a follow-up PR -- I've got one in the works to add server and client integration tests for inserting types such as decimal, array, map, etc (and fix a bug the new tests exposed) :)

Going to merge this and #5456 first to avoid merge hell with myself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @purplefox I stand corrected! This PR adds integration test coverage for inserting decimals (and also arrays and maps) and works fine if we serialize BigDecimal as a string instead of a double: #5469

As mentioned in the PR description, the drawback is that we now have an inconsistency where the decimal type is treated differently in terms of allowable casts, compared to the other numeric types. For example:

final KsqlObject obj = new KsqlObject().put("f1", new BigDecimal("1.13")).put("f2", 12.23);
System.out.println(obj.getString("f1")); // This succeeds
System.out.println(obj.getString("f2")); // This fails with "java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.CharSequence"

I think the drawback is acceptable so I've made the change.

final String streamName, final Map<String, Object> row) {
return null; // not yet implemented
public CompletableFuture<Void> insertInto(final String streamName, final KsqlObject row) {
final List<KsqlObject> rows = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight optimisation: Use Collections.singletonList

final int numRows
) {
if (response.statusCode() == OK.code()) {
response.bodyHandler(buffer -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you're waiting for the whole body and parsing it manually here.
When you come to do the streaming insert response this technique won't work and you'll need to use RecordParser - and then you'll end up with two ways of parsing the response.
I think it would be simpler and easier to just use RecordParser for all cases - this will just spit out the response lines so you don't have to do any manual parsing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't support batch inserts, then this becomes a lot simpler too :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to RecordParser. Might have to refactor this again when we support streaming inserts, though. We'll see.

* column values.
* @return a future that completes once the server response is received
*/
CompletableFuture<Void> insertInto(String streamName, List<KsqlObject> rows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's useful to have an insertInto method that takes a List of Rows. When inserting rows, the actual inserts can occur our of order, and it's possible that some inserts could succeed and some could fail. Returning a single CompletableFuture means that we couldn't tell the user about individual success or failure of inserts. So on failure, it might be that some of the inserts actually succeeded, leaving the system in an inconsistent state and the user not being able to do anything useful to correct it.

At some point I hope we will support transactions on the client. At that point I think it would make sense to support batch inserts like this, but not until then. With transactions we can guarantee that all the inserts succeed or none.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the reason I didn't include a batch insert method like this on the original prototype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm OK. My reasoning for adding the batch insert was that it seems inefficient to require the users to open a separate HTTP connection for each insert, but I guess once we implement the streaming insert we can encourage them to use that instead. I'll remove the batch insert method in light of your concern that the non-transactional nature will be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a separate HTTP connection for each insert

The client pools connections so it won't use a new one for each insert.

throw new KsqlException("Cannot insert values into an unknown stream: "
+ sourceName);
throw new KsqlApiException(
"Cannot insert values into an unknown stream: " + sourceName, ERROR_CODE_BAD_STATEMENT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think BAD_STATEMENT is ok. NOT _FOUND is more if the uri doesn't exist.

@@ -159,11 +160,17 @@ private void handleArgs(final Buffer buff) {

private Void handleInsertSubscriberException(final Throwable t,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is similar to the failure handler in QueryStreamHandler, perhaps could consider combining it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done -- refactored into ServerUtils.

@vcrfxia vcrfxia merged commit 9e8234a into confluentinc:master May 22, 2020
@vcrfxia vcrfxia deleted the java-client-insert-stream branch May 22, 2020 17:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants