Skip to content

Commit

Permalink
Add HashJoinSegment, a virtual segment for joins. (#9111)
Browse files Browse the repository at this point in the history
* Add HashJoinSegment, a virtual segment for joins.

An initial step towards #8728. This patch adds enough functionality to implement a joining
cursor on top of a normal datasource. It does not include enough to actually do a query. For
that, future patches will need to wire this low-level functionality into the query language.

* Fixups.

* Fix missing format argument.

* Various tests and minor improvements.

* Changes.

* Remove or add tests for unused stuff.

* Fix up package locations.
  • Loading branch information
gianm committed Jan 16, 2020
1 parent 09efd20 commit a87db7f
Show file tree
Hide file tree
Showing 79 changed files with 6,804 additions and 145 deletions.
Expand Up @@ -118,6 +118,29 @@ public static Double defaultDoubleValue()
return replaceWithDefault() ? ZERO_DOUBLE : null;
}

/**
* Returns the default value for an object of the provided class. Will be null in SQL-compatible null handling mode.
* May be null or some non-null default value when not in SQL-compatible null handling mode.
*/
@Nullable
@SuppressWarnings("unchecked")
public static <T> T defaultValueForClass(final Class<T> clazz)
{
if (clazz == Float.class) {
return (T) defaultFloatValue();
} else if (clazz == Double.class) {
return (T) defaultDoubleValue();
} else if (clazz == Long.class) {
return (T) defaultLongValue();
} else if (clazz == Number.class) {
return (T) defaultDoubleValue();
} else if (clazz == String.class) {
return (T) defaultStringValue();
} else {
return null;
}
}

public static boolean isNullOrEquivalent(@Nullable String value)
{
return replaceWithDefault() ? Strings.isNullOrEmpty(value) : value == null;
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/Exprs.java
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.math.expr;

import org.apache.druid.java.util.common.Pair;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Stack;

public class Exprs
{
/**
* Decomposes any expr into a list of exprs that, if ANDed together, are equivalent to the input expr.
*
* @param expr any expr
*
* @return list of exprs that, if ANDed together, are equivalent to the input expr
*/
public static List<Expr> decomposeAnd(final Expr expr)
{
final List<Expr> retVal = new ArrayList<>();
final Stack<Expr> stack = new Stack<>();
stack.push(expr);

while (!stack.empty()) {
final Expr current = stack.pop();

if (current instanceof BinAndExpr) {
stack.push(((BinAndExpr) current).right);
stack.push(((BinAndExpr) current).left);
} else {
retVal.add(current);
}
}

return retVal;
}

/**
* Decomposes an equality expr into the left- and right-hand side.
*
* @return decomposed equality, or empty if the input expr was not an equality expr
*/
public static Optional<Pair<Expr, Expr>> decomposeEquals(final Expr expr)
{
if (expr instanceof BinEqExpr) {
return Optional.of(Pair.of(((BinEqExpr) expr).left, ((BinEqExpr) expr).right));
} else {
return Optional.empty();
}
}
}
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.common.config;

import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class NullHandlingTest
{
@BeforeClass
public static void setUpClass()
{
NullHandling.initializeForTests();
}

@Test
public void test_defaultValueForClass_float()
{
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 0f : null,
NullHandling.defaultValueForClass(Float.class)
);
}

@Test
public void test_defaultValueForClass_double()
{
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 0d : null,
NullHandling.defaultValueForClass(Double.class)
);
}

@Test
public void test_defaultValueForClass_integer()
{
Assert.assertNull(NullHandling.defaultValueForClass(Integer.class));
}

@Test
public void test_defaultValueForClass_long()
{
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 0L : null,
NullHandling.defaultValueForClass(Long.class)
);
}

@Test
public void test_defaultValueForClass_number()
{
Assert.assertEquals(
NullHandling.replaceWithDefault() ? 0d : null,
NullHandling.defaultValueForClass(Number.class)
);
}

@Test
public void test_defaultValueForClass_string()
{
Assert.assertEquals(
NullHandling.replaceWithDefault() ? "" : null,
NullHandling.defaultValueForClass(String.class)
);
}

@Test
public void test_defaultValueForClass_object()
{
Assert.assertNull(NullHandling.defaultValueForClass(Object.class));
}
}
99 changes: 99 additions & 0 deletions core/src/test/java/org/apache/druid/math/expr/ExprsTest.java
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.math.expr;

import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.Pair;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class ExprsTest
{
@Test
public void test_decomposeAnd_notAnAnd()
{
final List<Expr> decomposed = Exprs.decomposeAnd(new IdentifierExpr("foo"));

// Expr instances don't, in general, implement value-based equals and hashCode. So we need to verify each field.
Assert.assertEquals(1, decomposed.size());
Assert.assertThat(decomposed.get(0), CoreMatchers.instanceOf(IdentifierExpr.class));
Assert.assertEquals("foo", ((IdentifierExpr) decomposed.get(0)).getIdentifier());
}

@Test
public void test_decomposeAnd_basic()
{
final List<Expr> decomposed = Exprs.decomposeAnd(
new BinAndExpr(
"&&",
new BinAndExpr("&&", new IdentifierExpr("foo"), new IdentifierExpr("bar")),
new BinAndExpr("&&", new IdentifierExpr("baz"), new IdentifierExpr("qux"))
)
);

// Expr instances don't, in general, implement value-based equals and hashCode. So we need to verify each field.
Assert.assertEquals(4, decomposed.size());

for (Expr expr : decomposed) {
Assert.assertThat(expr, CoreMatchers.instanceOf(IdentifierExpr.class));
}

final List<String> identifiers = decomposed.stream()
.map(expr -> ((IdentifierExpr) expr).getIdentifier())
.collect(Collectors.toList());

Assert.assertEquals(
ImmutableList.of("foo", "bar", "baz", "qux"),
identifiers
);
}

@Test
public void test_decomposeEquals_notAnEquals()
{
final Optional<Pair<Expr, Expr>> optionalPair = Exprs.decomposeEquals(new IdentifierExpr("foo"));
Assert.assertFalse(optionalPair.isPresent());
}

@Test
public void test_decomposeEquals_basic()
{
final Optional<Pair<Expr, Expr>> optionalPair = Exprs.decomposeEquals(
new BinEqExpr(
"==",
new IdentifierExpr("foo"),
new IdentifierExpr("bar")
)
);

Assert.assertTrue(optionalPair.isPresent());

final Pair<Expr, Expr> pair = optionalPair.get();
Assert.assertThat(pair.lhs, CoreMatchers.instanceOf(IdentifierExpr.class));
Assert.assertThat(pair.rhs, CoreMatchers.instanceOf(IdentifierExpr.class));
Assert.assertEquals("foo", ((IdentifierExpr) pair.lhs).getIdentifier());
Assert.assertEquals("bar", ((IdentifierExpr) pair.rhs).getIdentifier());
}
}
Expand Up @@ -29,6 +29,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -91,7 +92,7 @@ public List<String> unapply(@Nullable final String value)
// valueEquivalent is null only for SQL Compatible Null Behavior
// otherwise null will be replaced with empty string in nullToEmptyIfNeeded above.
// null value maps to empty list when SQL Compatible
return Collections.EMPTY_LIST;
return Collections.emptyList();
}
final List<String> retList;
try {
Expand All @@ -100,25 +101,20 @@ public List<String> unapply(@Nullable final String value)
}
catch (ExecutionException e) {
LOGGER.debug("list of keys not found for value [%s]", value);
return Collections.EMPTY_LIST;
return Collections.emptyList();
}
}

public synchronized void close()
@Override
public boolean canIterate()
{
if (isOpen.getAndSet(false)) {
LOGGER.info("Closing loading cache [%s]", id);
loadingCache.close();
reverseLoadingCache.close();
} else {
LOGGER.info("Closing already closed lookup");
return;
}
return false;
}

public boolean isOpen()
@Override
public Iterable<Map.Entry<String, String>> iterable()
{
return isOpen.get();
throw new UnsupportedOperationException("Cannot iterate");
}

@Override
Expand All @@ -145,6 +141,22 @@ public String call()
}
}

public synchronized void close()
{
if (isOpen.getAndSet(false)) {
LOGGER.info("Closing loading cache [%s]", id);
loadingCache.close();
reverseLoadingCache.close();
} else {
LOGGER.info("Closing already closed lookup");
}
}

public boolean isOpen()
{
return isOpen.get();
}

private class UnapplyCallable implements Callable<List<String>>
{
private final String value;
Expand Down
Expand Up @@ -35,6 +35,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -165,6 +166,18 @@ public List<String> unapply(@Nullable final String value)
}
}

@Override
public boolean canIterate()
{
return false;
}

@Override
public Iterable<Map.Entry<String, String>> iterable()
{
throw new UnsupportedOperationException("Cannot iterate");
}

@Override
public byte[] getCacheKey()
{
Expand Down

0 comments on commit a87db7f

Please sign in to comment.