Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into catalog-validation-…
Browse files Browse the repository at this point in the history
…control
  • Loading branch information
zachjsh committed May 21, 2024
2 parents ec66928 + 80db8cd commit ed4bee7
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [batch-index, input-format, input-source, perfect-rollup-parallel-batch-index, kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, ldap-security, realtime-index, append-ingestion, compaction, cds-task-schema-publish-disabled, cds-coordinator-smq-disabled]
testing_group: [batch-index, input-format, input-source, perfect-rollup-parallel-batch-index, kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, ldap-security, realtime-index, append-ingestion, compaction, cds-task-schema-publish-disabled, cds-coordinator-metadata-query-disabled]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
Expand Down Expand Up @@ -196,6 +196,6 @@ jobs:
with:
build_jdk: 8
runtime_jdk: 8
testing_groups: -DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties,centralized-datasource-schema,cds-task-schema-publish-disabled,cds-coordinator-smq-disabled
testing_groups: -DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties,centralized-datasource-schema,cds-task-schema-publish-disabled,cds-coordinator-metadata-query-disabled
use_indexer: ${{ matrix.indexer }}
group: other
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Compressed big decimal is an absolute number based complex type based on big dec
2. Accuracy: Provides greater level of accuracy in decimal arithmetic

## Operations
To use this extension, make sure to [load](../../configuration/extensions.md#loading-extensions) `compressed-big-decimal` to your config file.
To use this extension, make sure to [load](../../configuration/extensions.md#loading-extensions) `druid-compressed-bigdecimal` to your config file.

## Configuration
There are currently no configuration properties specific to Compressed Big Decimal
Expand Down
2 changes: 1 addition & 1 deletion docs/querying/sql-scalar.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,6 @@ The [DataSketches extension](../development/extensions-core/datasketches-extensi
|`CASE WHEN boolean_expr1 THEN result1 \[ WHEN boolean_expr2 THEN result2 ... \] \[ ELSE resultN \] END`|Searched CASE.|
|`CAST(value AS TYPE)`|Cast value to another type. See [Data types](sql-data-types.md) for details about how Druid SQL handles CAST.|
|`COALESCE(value1, value2, ...)`|Returns the first value that is neither NULL nor empty string.|
|`DECODE_BASE64_COMPLEX(dataType, expr)`| Decodes a Base64-encoded string into a complex data type, where `dataType` is the complex data type and `expr` is the Base64-encoded string to decode. The `hyperUnique` and `serializablePairLongString` data types are supported by default. You can enable support for the following complex data types by loading their extensions:<br/><ul><li>`druid-bloom-filter`: `bloom`</li><li>`druid-datasketches`: `arrayOfDoublesSketch`, `HLLSketch`, `KllDoublesSketch`, `KllFloatsSketch`, `quantilesDoublesSketch`, `thetaSketch`</li><li>`druid-histogram`: `approximateHistogram`, `fixedBucketsHistogram`</li><li>`druid-stats`: `variance`</li><li>`druid-compressed-big-decimal`: `compressedBigDecimal`</li><li>`druid-momentsketch`: `momentSketch`</li><li>`druid-tdigestsketch`: `tDigestSketch`</li></ul>|
|`DECODE_BASE64_COMPLEX(dataType, expr)`| Decodes a Base64-encoded string into a complex data type, where `dataType` is the complex data type and `expr` is the Base64-encoded string to decode. The `hyperUnique` and `serializablePairLongString` data types are supported by default. You can enable support for the following complex data types by loading their extensions:<br/><ul><li>`druid-bloom-filter`: `bloom`</li><li>`druid-datasketches`: `arrayOfDoublesSketch`, `HLLSketch`, `KllDoublesSketch`, `KllFloatsSketch`, `quantilesDoublesSketch`, `thetaSketch`</li><li>`druid-histogram`: `approximateHistogram`, `fixedBucketsHistogram`</li><li>`druid-stats`: `variance`</li><li>`druid-compressed-bigdecimal`: `compressedBigDecimal`</li><li>`druid-momentsketch`: `momentSketch`</li><li>`druid-tdigestsketch`: `tDigestSketch`</li></ul>|
|`NULLIF(value1, value2)`|Returns NULL if `value1` and `value2` match, else returns `value1`.|
|`NVL(value1, value2)`|Returns `value1` if `value1` is not null, otherwise `value2`.|
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ services:
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- AWS_REGION=us-west-2
depends_on:
- druid-zookeeper-kafka

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@
<plugin>
<groupId>org.openrewrite.maven</groupId>
<artifactId>rewrite-maven-plugin</artifactId>
<version>5.27.0</version>
<version>5.31.0</version>
<configuration>
<activeRecipes>
<recipe>org.apache.druid.RewriteRules</recipe>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public IndexerSQLMetadataStorageCoordinator(
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{

this.jsonMapper = jsonMapper;
this.dbTables = dbTables;
this.connector = connector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.rpc.ServiceLocation;
Expand All @@ -57,7 +56,6 @@ public class SpecificTaskServiceLocator implements ServiceLocator

private final String taskId;
private final OverlordClient overlordClient;
private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher();
private final Object lock = new Object();

@GuardedBy("lock")
Expand Down Expand Up @@ -125,60 +123,26 @@ public void onSuccess(final Map<String, TaskStatus> taskStatusMap)
lastUpdateTime = System.currentTimeMillis();

final TaskStatus status = taskStatusMap.get(taskId);

if (status == null) {
// If the task status is unknown, we'll treat it as closed.
lastKnownState = null;
lastKnownLocation = null;
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
// Do not resolve the future just yet, try the fallback API instead
fetchFallbackTaskLocation();
} else {
lastKnownState = status.getStatusCode();
final TaskLocation location;
if (TaskLocation.unknown().equals(status.getLocation())) {
location = locationFetcher.getLocation();
} else {
location = status.getLocation();
}

if (TaskLocation.unknown().equals(location)) {
lastKnownLocation = null;
} else {
lastKnownLocation = new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);
}
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}

if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}

@Override
public void onFailure(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
resolvePendingFutureOnException(t);
}
},
MoreExecutors.directExecutor()
Execs.directExecutor()
);

return Futures.nonCancellationPropagating(retVal);
Expand Down Expand Up @@ -209,18 +173,104 @@ public void close()
}
}

private class TaskLocationFetcher
private void resolvePendingFuture(TaskState state, TaskLocation location)
{
TaskLocation getLocation()
{
final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
overlordClient.taskStatus(taskId),
true
);
if (statusResponse == null || statusResponse.getStatus() == null) {
return TaskLocation.unknown();
} else {
return statusResponse.getStatus().getLocation();
synchronized (lock) {
if (pendingFuture != null) {
lastKnownState = state;
lastKnownLocation = location == null ? null : new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);

if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}

private void resolvePendingFutureOnException(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}

/**
* Invokes the single task status API {@link OverlordClient#taskStatus} if the
* multi-task status API returns an unknown location (this can happen if the
* Overlord is running on a version older than Druid 30.0.0 (pre #15724)).
*/
private void fetchFallbackTaskLocation()
{
synchronized (lock) {
if (pendingFuture != null) {
final ListenableFuture<TaskStatusResponse> taskStatusFuture;
try {
taskStatusFuture = overlordClient.taskStatus(taskId);
}
catch (Exception e) {
resolvePendingFutureOnException(e);
return;
}

pendingFuture.addListener(
() -> {
if (!taskStatusFuture.isDone()) {
// pendingFuture may resolve without taskStatusFuture due to close().
taskStatusFuture.cancel(true);
}
},
Execs.directExecutor()
);

Futures.addCallback(
taskStatusFuture,
new FutureCallback<TaskStatusResponse>()
{
@Override
public void onSuccess(final TaskStatusResponse taskStatusResponse)
{
synchronized (lock) {
if (pendingFuture != null) {
lastUpdateTime = System.currentTimeMillis();

final TaskStatusPlus status = taskStatusResponse.getStatus();
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
resolvePendingFuture(status.getStatusCode(), null);
} else {
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}
}
}
}

@Override
public void onFailure(Throwable t)
{
resolvePendingFutureOnException(t);
}
},
Execs.directExecutor()
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.expression.builtin;

import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.OperatorConversions;

public class DivOperatorConversion extends DirectOperatorConversion
{
private static final SqlOperator SQL_OPERATOR =
OperatorConversions
.operatorBuilder("DIV")
.operandTypeChecker(OperandTypes.DIVISION_OPERATOR)
.operandTypeInference(InferTypes.FIRST_KNOWN)
.returnTypeCascadeNullable(SqlTypeName.BIGINT)
.functionCategory(SqlFunctionCategory.NUMERIC)
.build();

public DivOperatorConversion()
{
super(SQL_OPERATOR, "div");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.sql.calcite.expression.builtin.ContainsOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.DateTruncOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.DecodeBase64UTFOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.DivOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ExtractOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.FloorOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.GreatestOperatorConversion;
Expand Down Expand Up @@ -369,6 +370,7 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new AliasedOperatorConversion(CHARACTER_LENGTH_CONVERSION, "STRLEN"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.CONCAT, "concat"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.EXP, "exp"))
.add(new DivOperatorConversion())
.add(new DirectOperatorConversion(SqlStdOperatorTable.DIVIDE_INTEGER, "div"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LN, "log"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LOWER, "lower"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,38 @@ public void testSafeDivide()
);
}

@Test
public void testDiv()
{
cannotVectorize();
final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);

testQuery(
"select cnt, m1, div(m1, 2), div(cnt+2, cnt+1) from foo",
context,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "div(\"m1\",2)", ColumnType.LONG),
expressionVirtualColumn("v1", "div((\"cnt\" + 2),(\"cnt\" + 1))", ColumnType.LONG)
)
.columns(ImmutableList.of("cnt", "m1", "v0", "v1"))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L, 1.0f, 0L, 1L},
new Object[]{1L, 2.0f, 1L, 1L},
new Object[]{1L, 3.0f, 1L, 1L},
new Object[]{1L, 4.0f, 2L, 1L},
new Object[]{1L, 5.0f, 2L, 1L},
new Object[]{1L, 6.0f, 3L, 1L}
)
);
}

@Test
public void testGroupByLimitWrappingOrderByAgg()
{
Expand Down

0 comments on commit ed4bee7

Please sign in to comment.