Skip to content

Commit

Permalink
pgjdbcgh-597 - changing i.r.p.m.b.CommandComplete.rows from Integer t…
Browse files Browse the repository at this point in the history
…o Long
  • Loading branch information
Chris Hall committed Jun 15, 2023
1 parent 7021b67 commit 9375d59
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 31 deletions.
8 changes: 4 additions & 4 deletions src/main/java/io/r2dbc/postgresql/PostgresqlResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ final class PostgresqlResult extends AbstractReferenceCounted implements io.r2db
public Mono<Long> getRowsUpdated() {

return this.messages
.<Integer>handle((message, sink) -> {
.<Long>handle((message, sink) -> {

if (message instanceof ErrorResponse) {
this.factory.handleErrorResponse(message, (SynchronousSink) sink);
Expand All @@ -77,7 +77,7 @@ public Mono<Long> getRowsUpdated() {

if (message instanceof CommandComplete) {

Integer rowCount = ((CommandComplete) message).getRows();
Long rowCount = ((CommandComplete) message).getRows();
if (rowCount != null) {
sink.next(rowCount);
}
Expand All @@ -91,8 +91,8 @@ public Mono<Long> getRowsUpdated() {

long sum = 0;

for (Integer integer : list) {
sum += integer;
for (Long value : list) {
sum += value;
}

sink.next(sum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private PostgresqlSegmentResult(Flux<Segment> segments) {

if (message instanceof CommandComplete) {

Integer rowCount = ((CommandComplete) message).getRows();
Long rowCount = ((CommandComplete) message).getRows();
if (rowCount != null) {
sink.next(new PostgresqlUpdateCountSegment(rowCount));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final class CommandComplete implements BackendMessage {

private final Integer rowId;

private final Integer rows;
private final Long rows;

/**
* Create a new message.
Expand All @@ -45,7 +45,7 @@ public final class CommandComplete implements BackendMessage {
* @param rows the number of rows affected by the command
* @throws IllegalArgumentException if {@code command} is {@code null}
*/
public CommandComplete(String command, @Nullable Integer rowId, @Nullable Integer rows) {
public CommandComplete(String command, @Nullable Integer rowId, @Nullable Long rows) {
this.command = Assert.requireNonNull(command, "command must not be null");
this.rowId = rowId;
this.rows = rows;
Expand Down Expand Up @@ -90,7 +90,7 @@ public Integer getRowId() {
* @return the number of rows affected by the command
*/
@Nullable
public Integer getRows() {
public Long getRows() {
return this.rows;
}

Expand Down Expand Up @@ -122,15 +122,15 @@ static CommandComplete decode(ByteBuf in) {
String rowId = tag.substring(index1 + 1, index2);
String rows = tag.substring(index2 + 1, index3 != -1 ? index3 : tag.length());

return new CommandComplete(command, Integer.parseInt(rowId), Integer.parseInt(rows));
return new CommandComplete(command, Integer.parseInt(rowId), Long.parseLong(rows));
} else if (isNoRowId(tag)) {

int index1 = tag.indexOf(' ');
int index2 = tag.indexOf(' ', index1 + 1);
String command = tag.substring(0, index1 != -1 ? index1 : tag.length());
String rows = index1 != -1 ? tag.substring(index1 + 1, index2 != -1 ? index2 : tag.length()) : null;

return new CommandComplete(command, null, rows != null ? Integer.parseInt(rows) : null);
return new CommandComplete(command, null, rows != null ? Long.parseLong(rows) : null);
} else {
return new CommandComplete(tag, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ void copyIn() {
.expectRequest(new Query("some-sql"), new CopyData(Unpooled.EMPTY_BUFFER), CopyDone.INSTANCE)
.thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 0),
new CommandComplete("cmd", 1, 0L),
new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE)
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void copyIn() {
.expectRequest(new Query("some-sql"), new CopyData(byteBuffer), CopyDone.INSTANCE)
.thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 1),
new CommandComplete("cmd", 1, 1L),
new ReadyForQuery(IDLE)
).build();

Expand Down Expand Up @@ -85,7 +85,7 @@ void copyInEmpty() {
.transactionStatus(TransactionStatus.IDLE)
.expectRequest(new Query("some-sql"), CopyDone.INSTANCE).thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 0),
new CommandComplete("cmd", 1, 0L),
new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE)
)
.build();
Expand Down Expand Up @@ -128,7 +128,7 @@ void copyInCancel() {
new CopyFail("Copy operation failed: Cancelled")
).thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 1),
new CommandComplete("cmd", 1, 1L),
new ReadyForQuery(IDLE)
).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final class PostgresqlResultUnitTests {

@Test
void toResultCommandComplete() {
PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1)), ExceptionFactory.INSTANCE);
PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1L)), ExceptionFactory.INSTANCE);

result.map((row, rowMetadata) -> row)
.as(StepVerifier::create)
Expand All @@ -49,7 +49,7 @@ void toResultCommandComplete() {

@Test
void toResultCommandCompleteUsingSegments() {
io.r2dbc.postgresql.api.PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1)), ExceptionFactory.INSTANCE).filter(it -> true);
io.r2dbc.postgresql.api.PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1L)), ExceptionFactory.INSTANCE).filter(it -> true);

result.map((row, rowMetadata) -> row)
.as(StepVerifier::create)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void getRowsUpdatedShouldTerminateWithError() {
void shouldConsumeRowsUpdated() {

PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new CommandComplete
("test", null, 42)), ExceptionFactory.INSTANCE);
("test", null, 42L)), ExceptionFactory.INSTANCE);

result.getRowsUpdated()
.as(StepVerifier::create)
Expand All @@ -101,7 +101,7 @@ void shouldConsumeRowsUpdated() {
void filterShouldRetainUpdateCount() {

PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new CommandComplete
("test", null, 42)), ExceptionFactory.INSTANCE);
("test", null, 42L)), ExceptionFactory.INSTANCE);

result.filter(Result.UpdateCount.class::isInstance).getRowsUpdated()
.as(StepVerifier::create)
Expand Down Expand Up @@ -193,7 +193,7 @@ void flatMapShouldNotTerminateWithError() {

PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new ErrorResponse(Collections.emptyList()), new RowDescription(Collections.emptyList()),
new DataRow(), new CommandComplete
("test", null, 42)), ExceptionFactory.INSTANCE);
("test", null, 42L)), ExceptionFactory.INSTANCE);

Flux.from(result.flatMap(Mono::just))
.as(StepVerifier::create)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void exchange() {
.as(StepVerifier::create)
.assertNext(message -> assertThat(message).isInstanceOf(RowDescription.class))
.assertNext(message -> assertThat(message).isInstanceOf(DataRow.class))
.expectNext(new CommandComplete("SELECT", null, 1))
.expectNext(new CommandComplete("SELECT", null, 1L))
.verifyComplete();
}

Expand Down Expand Up @@ -286,8 +286,8 @@ void parallelExchange() {
.assertNext(message -> assertThat(message).isInstanceOf(RowDescription.class))
.assertNext(message -> assertThat(message).isInstanceOf(DataRow.class))
.assertNext(message -> assertThat(message).isInstanceOf(DataRow.class))
.expectNext(new CommandComplete("SELECT", null, 1))
.expectNext(new CommandComplete("SELECT", null, 1))
.expectNext(new CommandComplete("SELECT", null, 1L))
.expectNext(new CommandComplete("SELECT", null, 1L))
.verifyComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void commandComplete() {

return buffer;
});
assertThat(message).isEqualTo(new CommandComplete("COPY", null, 100));
assertThat(message).isEqualTo(new CommandComplete("COPY", null, 100L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class CommandCompleteUnitTests {

@Test
void constructorNoCommand() {
assertThatIllegalArgumentException().isThrownBy(() -> new CommandComplete(null, 100, 200))
assertThatIllegalArgumentException().isThrownBy(() -> new CommandComplete(null, 100, 200L))
.withMessage("command must not be null");
}

Expand All @@ -42,7 +42,15 @@ void decodeCopy() {

return buffer;
})
.isEqualTo(new CommandComplete("COPY", null, 100));
.isEqualTo(new CommandComplete("COPY", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("COPY 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("COPY", null, 4294967294L));
}

@Test
Expand All @@ -54,7 +62,15 @@ void decodeDelete() {

return buffer;
})
.isEqualTo(new CommandComplete("DELETE", null, 100));
.isEqualTo(new CommandComplete("DELETE", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("DELETE 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("DELETE", null, 4294967294L));
}

@Test
Expand All @@ -66,7 +82,15 @@ void decodeFetch() {

return buffer;
})
.isEqualTo(new CommandComplete("FETCH", null, 100));
.isEqualTo(new CommandComplete("FETCH", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("FETCH 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("FETCH", null, 4294967294L));
}

@Test
Expand All @@ -78,7 +102,15 @@ void decodeInsert() {

return buffer;
})
.isEqualTo(new CommandComplete("INSERT", 100, 200));
.isEqualTo(new CommandComplete("INSERT", 100, 200L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("INSERT 100 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("INSERT", 100, 4294967294L));
}

@Test
Expand All @@ -90,7 +122,15 @@ void decodeMove() {

return buffer;
})
.isEqualTo(new CommandComplete("MOVE", null, 100));
.isEqualTo(new CommandComplete("MOVE", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("MOVE 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("MOVE", null, 4294967294L));
}

@Test
Expand All @@ -114,7 +154,15 @@ void decodeSelect() {

return buffer;
})
.isEqualTo(new CommandComplete("SELECT", null, 100));
.isEqualTo(new CommandComplete("SELECT", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("SELECT 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("SELECT", null, 4294967294L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("SELECT", UTF_8);
Expand All @@ -134,7 +182,15 @@ void decodeUpdate() {

return buffer;
})
.isEqualTo(new CommandComplete("UPDATE", null, 100));
.isEqualTo(new CommandComplete("UPDATE", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("UPDATE 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("UPDATE", null, 4294967294L));
}

}

0 comments on commit 9375d59

Please sign in to comment.