Skip to content

Commit

Permalink
IGNITE-21568 Java thin: Pass client time zone to server (#3737)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptupitsyn committed May 13, 2024
1 parent 26bd72f commit 6871bba
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.ignite.client.handler.requests.sql;

import javax.annotation.Nullable;
import java.time.ZoneId;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
import org.apache.ignite.internal.sql.engine.QueryProperty;
import org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
import org.jetbrains.annotations.Nullable;

class ClientSqlProperties {
private final @Nullable String schema;
Expand All @@ -33,11 +34,14 @@ class ClientSqlProperties {

private final long idleTimeout;

private final @Nullable String timeZoneId;

ClientSqlProperties(ClientMessageUnpacker in) {
schema = in.tryUnpackNil() ? null : in.unpackString();
pageSize = in.tryUnpackNil() ? IgniteSqlImpl.DEFAULT_PAGE_SIZE : in.unpackInt();
queryTimeout = in.tryUnpackNil() ? 0 : in.unpackLong();
idleTimeout = in.tryUnpackNil() ? 0 : in.unpackLong();
timeZoneId = in.tryUnpackNil() ? null : in.unpackString();

// Skip properties - not used by SQL engine.
in.unpackInt(); // Number of properties.
Expand Down Expand Up @@ -68,6 +72,10 @@ SqlProperties toSqlProps() {
builder.set(QueryProperty.DEFAULT_SCHEMA, schema);
}

if (timeZoneId != null) {
builder.set(QueryProperty.TIME_ZONE_ID, ZoneId.of(timeZoneId));
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.ignite.internal.client.table.ClientTable.writeTx;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;

import java.time.ZoneId;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
Expand Down Expand Up @@ -75,7 +76,7 @@ public ClientSql(ReliableChannel ch, MarshallersProvider marshallers) {
/** {@inheritDoc} */
@Override
public Statement createStatement(String query) {
return new ClientStatement(query, null, null, null);
return new ClientStatement(query, null, null, null, null);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -177,7 +178,7 @@ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Object... arguments) {
Objects.requireNonNull(query);

ClientStatement statement = new ClientStatement(query, null, null, null);
ClientStatement statement = new ClientStatement(query, null, null, null, null);

return executeAsync(transaction, statement, arguments);
}
Expand All @@ -200,7 +201,7 @@ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Object... arguments) {
Objects.requireNonNull(query);

ClientStatement statement = new ClientStatement(query, null, null, null);
ClientStatement statement = new ClientStatement(query, null, null, null, null);

return executeAsync(transaction, mapper, statement, arguments);
}
Expand Down Expand Up @@ -228,6 +229,7 @@ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
w.out().packLongNullable(clientStatement.queryTimeoutNullable());

w.out().packLongNullable(0L); // defaultSessionTimeout
w.out().packString(clientStatement.timeZoneId().getId());

packProperties(w, null);

Expand Down Expand Up @@ -286,6 +288,7 @@ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object
w.out().packNil(); // pageSize
w.out().packNil(); // queryTimeout
w.out().packNil(); // sessionTimeout
w.out().packString(ZoneId.systemDefault().getId());

packProperties(w, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.ignite.internal.client.sql;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.sql.Statement;
import org.jetbrains.annotations.Nullable;

/**
* Client SQL statement.
Expand All @@ -31,13 +31,16 @@ public class ClientStatement implements Statement {
private final String query;

/** Default schema. */
private final String defaultSchema;
private final @Nullable String defaultSchema;

/** Query timeout. */
private final Long queryTimeoutMs;
private final @Nullable Long queryTimeoutMs;

/** Page size. */
private final Integer pageSize;
private final @Nullable Integer pageSize;

/** Time-zone ID. */
private final ZoneId timeZoneId;

/**
* Constructor.
Expand All @@ -48,17 +51,19 @@ public class ClientStatement implements Statement {
* @param pageSize Page size.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public ClientStatement(
ClientStatement(
String query,
String defaultSchema,
Long queryTimeoutMs,
Integer pageSize) {
@Nullable String defaultSchema,
@Nullable Long queryTimeoutMs,
@Nullable Integer pageSize,
@Nullable ZoneId timeZoneId) {
Objects.requireNonNull(query);

this.query = query;
this.defaultSchema = defaultSchema;
this.queryTimeoutMs = queryTimeoutMs;
this.pageSize = pageSize;
this.timeZoneId = timeZoneId != null ? timeZoneId : ZoneId.systemDefault();
}

/** {@inheritDoc} */
Expand All @@ -80,7 +85,7 @@ public long queryTimeout(TimeUnit timeUnit) {
*
* @return Query timeout.
*/
public Long queryTimeoutNullable() {
@Nullable Long queryTimeoutNullable() {
return queryTimeoutMs;
}

Expand All @@ -93,8 +98,7 @@ public String defaultSchema() {
/** {@inheritDoc} */
@Override
public ZoneId timeZoneId() {
// TODO: https://issues.apache.org/jira/browse/IGNITE-21568
return ZoneOffset.UTC;
return timeZoneId;
}

/** {@inheritDoc} */
Expand All @@ -108,7 +112,7 @@ public int pageSize() {
*
* @return Page size.
*/
public Integer pageSizeNullable() {
@Nullable Integer pageSizeNullable() {
return pageSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class ClientStatementBuilder implements Statement.StatementBuilder {
/** Page size. */
private Integer pageSize;

/** Time-zone ID. */
private ZoneId timeZoneId;

/** {@inheritDoc} */
@Override
public StatementBuilder query(String query) {
Expand Down Expand Up @@ -75,8 +78,9 @@ public StatementBuilder pageSize(int pageSize) {

@Override
public StatementBuilder timeZoneId(ZoneId timeZoneId) {
// TODO: https://issues.apache.org/jira/browse/IGNITE-21568
throw new UnsupportedOperationException("Not implemented yet");
this.timeZoneId = timeZoneId;

return this;
}

/** {@inheritDoc} */
Expand All @@ -86,6 +90,7 @@ public Statement build() {
query,
defaultSchema,
queryTimeoutMs,
pageSize);
pageSize,
timeZoneId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Period;
import java.time.ZoneId;
import java.util.BitSet;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -84,6 +85,7 @@ public void testStatementPropertiesPropagation() {
.defaultSchema("SCHEMA2")
.queryTimeout(124, TimeUnit.SECONDS)
.pageSize(235)
.timeZoneId(ZoneId.of("Europe/London"))
.build();

AsyncResultSet<SqlRow> resultSet = client.sql().executeAsync(null, statement).join();
Expand All @@ -94,6 +96,7 @@ public void testStatementPropertiesPropagation() {
assertEquals("SCHEMA2", props.get("schema"));
assertEquals("124000", props.get("timeout"));
assertEquals("235", props.get("pageSize"));
assertEquals("Europe/London", props.get("timeZoneId"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.internal.sql.engine.QueryProperty.DEFAULT_SCHEMA;
import static org.apache.ignite.internal.sql.engine.QueryProperty.QUERY_TIMEOUT;
import static org.apache.ignite.internal.sql.engine.QueryProperty.TIME_ZONE_ID;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.math.BigDecimal;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class FakeCursor implements AsyncSqlCursor<InternalSqlRow> {

rows.add(getRow("schema", properties.get(DEFAULT_SCHEMA)));
rows.add(getRow("timeout", String.valueOf(properties.get(QUERY_TIMEOUT))));
rows.add(getRow("timeZoneId", String.valueOf(properties.get(TIME_ZONE_ID))));
} else if ("SELECT META".equals(qry)) {
columns.add(new FakeColumnMetadata("BOOL", ColumnType.BOOLEAN));
columns.add(new FakeColumnMetadata("INT8", ColumnType.INT8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ void write_statement(protocol::writer &writer, const sql_statement &statement) {
writer.write(statement.page_size());
writer.write(std::int64_t(statement.timeout().count()));
writer.write_nil(); // Session timeout (unused, session is closed by the server immediately).
writer.write_nil(); // TODO: IGNITE-21605 Time zone id.

const auto &properties = statement.properties();
auto props_num = std::int32_t(properties.size());
Expand Down
1 change: 1 addition & 0 deletions modules/platforms/cpp/ignite/odbc/query/data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ sql_result data_query::make_request_execute() {
writer.write(m_connection.get_configuration().get_page_size().get_value());
writer.write(std::int64_t(m_connection.get_timeout()) * 1000);
writer.write_nil(); // Session timeout (unused, session is closed by the server immediately).
writer.write_nil(); // TODO: IGNITE-21605 Time zone id.

// Properties are not used for now.
writer.write(0);
Expand Down
4 changes: 3 additions & 1 deletion modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ private void SqlExec(Socket handler, long requestId, MsgPackReader reader)
props["timeoutMs"] = timeoutMs;

props["sessionTimeoutMs"] = reader.TryReadNil() ? (long?)null : reader.ReadInt64();
props["timeZoneId"] = reader.TryReadNil() ? null : reader.ReadString();

// ReSharper restore RedundantCast
var propCount = reader.ReadInt32();
Expand Down Expand Up @@ -550,7 +551,8 @@ private void SqlExecScript(MsgPackReader reader)
["schema"] = reader.TryReadNil() ? null : reader.ReadString(),
["pageSize"] = reader.TryReadNil() ? null : reader.ReadInt32(),
["timeoutMs"] = reader.TryReadNil() ? null : reader.ReadInt64(),
["sessionTimeoutMs"] = reader.TryReadNil() ? null : reader.ReadInt64()
["sessionTimeoutMs"] = reader.TryReadNil() ? null : reader.ReadInt64(),
["timeZoneId"] = reader.TryReadNil() ? null : reader.ReadString()
};

var propCount = reader.ReadInt32();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public async Task TestStatementProperties()
var props = rows.ToDictionary(x => (string)x["NAME"]!, x => (string)x["VAL"]!);

Assert.IsTrue(res.HasRowSet);
Assert.AreEqual(8, props.Count);
Assert.AreEqual(9, props.Count);

Assert.AreEqual("schema-1", props["schema"]);
Assert.AreEqual("987", props["pageSize"]);
Expand Down
2 changes: 2 additions & 0 deletions modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ private static IIgniteTuple ReadTuple(IReadOnlyList<IColumnMetadata> cols, ref B
w.Write(statement.PageSize);
w.Write((long)statement.Timeout.TotalMilliseconds);
w.WriteNil(); // Session timeout (unused, session is closed by the server immediately).
w.WriteNil(); // TODO: IGNITE-21604 Time zone id.

WriteProperties(statement, ref w);
w.Write(statement.Query);
w.WriteObjectCollectionAsBinaryTuple(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -56,6 +58,7 @@
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.Statement.StatementBuilder;
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
Expand Down Expand Up @@ -840,6 +843,28 @@ public void runScriptThatFails() {
assertEquals(1, result.result().get(0).intValue(0));
}

@ParameterizedTest
@ValueSource(strings = {"", "UTC", "Europe/Athens", "America/New_York", "Asia/Tokyo"})
public void testTimeZoneId(String timeZoneId) {
ZoneId zoneId = timeZoneId.isEmpty() ? ZoneId.systemDefault() : ZoneId.of(timeZoneId);

StatementBuilder builder = igniteSql().statementBuilder()
.query("SELECT CURRENT_TIMESTAMP")
.timeZoneId(zoneId);

ResultSet<SqlRow> resultSet = igniteSql().execute(null, builder.build());
SqlRow row = resultSet.next();

LocalDateTime ts = row.value(0);
assertNotNull(ts);

float tsMillis = ts.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
float nowMillis = LocalDateTime.now(zoneId).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
float deltaMillis = 5000;

assertEquals(nowMillis, tsMillis, deltaMillis);
}

protected ResultSet<SqlRow> executeForRead(IgniteSql sql, String query, Object... args) {
return executeForRead(sql, null, query, args);
}
Expand Down

0 comments on commit 6871bba

Please sign in to comment.