Skip to content

Loading…

Pushdown index column predicate to Cql token range query #1103

Closed
wants to merge 1 commit into from

2 participants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
20 presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraColumnHandle.java
@@ -40,6 +40,7 @@
private final List<CassandraType> typeArguments;
private final boolean partitionKey;
private final boolean clusteringKey;
+ private final boolean indexed;
@JsonCreator
public CassandraColumnHandle(
@@ -49,7 +50,8 @@ public CassandraColumnHandle(
@JsonProperty("cassandraType") CassandraType cassandraType,
@Nullable @JsonProperty("typeArguments") List<CassandraType> typeArguments,
@JsonProperty("partitionKey") boolean partitionKey,
- @JsonProperty("clusteringKey") boolean clusteringKey)
+ @JsonProperty("clusteringKey") boolean clusteringKey,
+ @JsonProperty("indexed") boolean indexed)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.name = checkNotNull(name, "name is null");
@@ -67,6 +69,7 @@ public CassandraColumnHandle(
}
this.partitionKey = partitionKey;
this.clusteringKey = clusteringKey;
+ this.indexed = indexed;
}
@JsonProperty
@@ -111,6 +114,12 @@ public boolean isClusteringKey()
return clusteringKey;
}
+ @JsonProperty
+ public boolean isIndexed()
+ {
+ return indexed;
+ }
+
public ColumnMetadata getColumnMetadata()
{
return new ColumnMetadata(CassandraCqlUtils.cqlNameToSqlName(name), cassandraType.getNativeType(), ordinalPosition, partitionKey);
@@ -131,7 +140,8 @@ public int hashCode()
cassandraType,
typeArguments,
partitionKey,
- clusteringKey);
+ clusteringKey,
+ indexed);
}
@Override
@@ -150,7 +160,8 @@ public boolean equals(Object obj)
&& Objects.equal(this.cassandraType, other.cassandraType)
&& Objects.equal(this.typeArguments, other.typeArguments)
&& Objects.equal(this.partitionKey, other.partitionKey)
- && Objects.equal(this.clusteringKey, other.clusteringKey);
+ && Objects.equal(this.clusteringKey, other.clusteringKey)
+ && Objects.equal(this.indexed, other.indexed);
}
@Override
@@ -167,7 +178,8 @@ public String toString()
}
helper.add("partitionKey", partitionKey)
- .add("clusteringKey", clusteringKey);
+ .add("clusteringKey", clusteringKey)
+ .add("indexed", indexed);
return helper.toString();
}
View
15 presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraPartition.java
@@ -27,19 +27,22 @@
private final String partitionId;
private final byte[] key;
private final TupleDomain tupleDomain;
+ private final boolean indexedColumnPredicatePushdown;
private CassandraPartition()
{
partitionId = UNPARTITIONED_ID;
tupleDomain = TupleDomain.all();
key = null;
+ indexedColumnPredicatePushdown = false;
}
- public CassandraPartition(byte[] key, String partitionId, TupleDomain tupleDomain)
+ public CassandraPartition(byte[] key, String partitionId, TupleDomain tupleDomain, boolean indexedColumnPredicatePushdown)
{
this.key = key;
this.partitionId = partitionId;
this.tupleDomain = tupleDomain;
+ this.indexedColumnPredicatePushdown = indexedColumnPredicatePushdown;
}
public boolean isUnpartitioned()
@@ -47,6 +50,11 @@ public boolean isUnpartitioned()
return partitionId.equals(UNPARTITIONED_ID);
}
+ public boolean isIndexedColumnPredicatePushdown()
+ {
+ return indexedColumnPredicatePushdown;
+ }
+
@Override
public TupleDomain getTupleDomain()
{
@@ -68,4 +76,9 @@ public ByteBuffer getKeyAsByteBuffer()
{
return ByteBuffer.wrap(key);
}
+
+ public byte[] getKey()
+ {
+ return key;
+ }
}
View
5 presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraSession.java
@@ -179,7 +179,8 @@ private CassandraColumnHandle buildColumnHandle(ColumnMetadata columnMeta, boole
throw new IllegalArgumentException("Invalid type arguments: " + typeArgs);
}
}
- return new CassandraColumnHandle(connectorId, columnMeta.getName(), index, cassandraType, typeArguments, partitionKey, clusteringKey);
+ boolean indexed = columnMeta.getIndex() != null;
+ return new CassandraColumnHandle(connectorId, columnMeta.getName(), index, cassandraType, typeArguments, partitionKey, clusteringKey, indexed);
}
public List<CassandraPartition> getPartitions(CassandraTable table, List<Comparable<?>> filterPrefix)
@@ -232,7 +233,7 @@ private CassandraColumnHandle buildColumnHandle(ColumnMetadata columnMeta, boole
TupleDomain tupleDomain = TupleDomain.withFixedValues(map);
String partitionId = stringBuilder.toString();
if (uniquePartitionIds.add(partitionId)) {
- partitions.add(new CassandraPartition(key, partitionId, tupleDomain));
+ partitions.add(new CassandraPartition(key, partitionId, tupleDomain, false));
}
}
return partitions.build();
View
31 presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraSplitManager.java
@@ -14,6 +14,7 @@
package com.facebook.presto.cassandra;
import com.datastax.driver.core.Host;
+import com.facebook.presto.cassandra.util.CassandraCqlUtils;
import com.facebook.presto.cassandra.util.HostAddressFactory;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplitManager;
@@ -32,6 +33,7 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.airlift.log.Logger;
@@ -39,6 +41,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
@@ -128,6 +131,32 @@ public PartitionResult getPartitions(TableHandle tableHandle, TupleDomain tupleD
remainingTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(tupleDomain.getDomains(), not(in(partitionColumns))));
}
+ // push down indexed column fixed value predicates only for unpartitioned partition which uses token range query
+ if (partitions.size() == 1 && ((CassandraPartition) partitions.get(0)).isUnpartitioned()) {
+ Map<ColumnHandle, Domain> domains = tupleDomain.getDomains();
+ List<ColumnHandle> indexedColumns = Lists.newArrayList();
+ // compose partitionId by using indexed column
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<ColumnHandle, Domain> entry : domains.entrySet()) {
+ CassandraColumnHandle column = (CassandraColumnHandle) entry.getKey();
+ Domain domain = entry.getValue();
+ if (column.isIndexed() && domain.isSingleValue()) {
+ sb.append(CassandraCqlUtils.validColumnName(column.getName()))
+ .append(" = ")
+ .append(CassandraCqlUtils.cqlValue(entry.getValue().getSingleValue().toString(), column.getCassandraType()));
+ indexedColumns.add(column);
+ // Only onde indexed column predicate can be pushed down.
+ break;
+ }
+ }
+ if (sb.length() > 0) {
+ CassandraPartition partition = (CassandraPartition) partitions.get(0);
+ TupleDomain filterIndexedColumn = TupleDomain.withColumnDomains(Maps.filterKeys(remainingTupleDomain.getDomains(), not(in(indexedColumns))));
+ partitions = Lists.newArrayList();
+ partitions.add(new CassandraPartition(partition.getKey(), sb.toString(), filterIndexedColumn, true));
+ return new PartitionResult(partitions, filterIndexedColumn);
+ }
+ }
return new PartitionResult(partitions, remainingTupleDomain);
}
@@ -149,7 +178,7 @@ public SplitSource getPartitionSplits(TableHandle tableHandle, List<Partition> p
checkArgument(partition instanceof CassandraPartition, "partitions are no CassandraPartitions");
CassandraPartition cassandraPartition = (CassandraPartition) partition;
- if (cassandraPartition.isUnpartitioned()) {
+ if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) {
CassandraTable table = schemaProvider.getTable(cassandraTableHandle);
List<Split> splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId());
return new FixedSplitSource(connectorId, splits);
View
16 presto-cassandra/src/main/java/com/facebook/presto/cassandra/util/CassandraCqlUtils.java
@@ -18,6 +18,7 @@
import com.datastax.driver.core.querybuilder.Select.Selection;
import com.facebook.presto.cassandra.CassandraColumnHandle;
import com.facebook.presto.cassandra.CassandraTableHandle;
+import com.facebook.presto.cassandra.CassandraType;
import com.facebook.presto.spi.ColumnHandle;
import java.util.Arrays;
@@ -153,4 +154,19 @@ public static Select selectCountAllFrom(CassandraTableHandle tableHandle)
String table = validTableName(tableHandle.getTableName());
return QueryBuilder.select().countAll().from(schema, table);
}
+
+ public static String cqlValue(String value, CassandraType cassandraType)
+ {
+ switch (cassandraType) {
+ case ASCII:
+ case TEXT:
+ case VARCHAR:
+ return quoteStringLiteral(value);
+ case INET:
+ // remove '/' in the string. e.g. /127.0.0.1
+ return quoteStringLiteral(value.substring(1));
+ default:
+ return value;
+ }
+ }
}
View
4 presto-cassandra/src/test/java/com/facebook/presto/cassandra/MockCassandraSession.java
@@ -113,8 +113,8 @@ public CassandraTable getTable(SchemaTableName tableName)
return new CassandraTable(
new CassandraTableHandle(connectorId, TEST_SCHEMA, TEST_TABLE),
ImmutableList.of(
- new CassandraColumnHandle(connectorId, TEST_COLUMN1, 0, CassandraType.VARCHAR, null, true, false),
- new CassandraColumnHandle(connectorId, TEST_COLUMN2, 0, CassandraType.INT, null, false, false)));
+ new CassandraColumnHandle(connectorId, TEST_COLUMN1, 0, CassandraType.VARCHAR, null, true, false, false),
+ new CassandraColumnHandle(connectorId, TEST_COLUMN2, 0, CassandraType.INT, null, false, false, false)));
}
throw new TableNotFoundException(tableName);
}
View
5 presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraColumnHandle.java
@@ -27,7 +27,7 @@
@Test
public void testRoundTrip()
{
- CassandraColumnHandle expected = new CassandraColumnHandle("connector", "name", 42, CassandraType.FLOAT, null, true, false);
+ CassandraColumnHandle expected = new CassandraColumnHandle("connector", "name", 42, CassandraType.FLOAT, null, true, false, false);
String json = codec.toJson(expected);
CassandraColumnHandle actual = codec.fromJson(json);
@@ -50,7 +50,8 @@ public void testRoundTrip2()
CassandraType.MAP,
ImmutableList.of(CassandraType.VARCHAR, CassandraType.UUID),
false,
- true);
+ true,
+ false);
String json = codec.toJson(expected);
CassandraColumnHandle actual = codec.fromJson(json);
View
3 presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestJsonCassandraHandles.java
@@ -85,7 +85,7 @@ public void testTableHandleDeserialize()
public void testColumnHandleSerialize()
throws Exception
{
- CassandraColumnHandle columnHandle = new CassandraColumnHandle("cassandra", "column", 42, CassandraType.BIGINT, null, false, true);
+ CassandraColumnHandle columnHandle = new CassandraColumnHandle("cassandra", "column", 42, CassandraType.BIGINT, null, false, true, false);
assertTrue(objectMapper.canSerialize(CassandraColumnHandle.class));
String json = objectMapper.writeValueAsString(columnHandle);
@@ -103,6 +103,7 @@ public void testColumn2HandleSerialize()
CassandraType.SET,
ImmutableList.of(CassandraType.INT),
false,
+ false,
false);
assertTrue(objectMapper.canSerialize(CassandraColumnHandle.class));
View
6 presto-cassandra/src/test/java/com/facebook/presto/cassandra/util/TestCassandraCqlUtils.java
@@ -63,9 +63,9 @@ public void testQuote()
public void testAppendSelectColumns()
{
List<CassandraColumnHandle> columns = ImmutableList.of(
- new CassandraColumnHandle("", "foo", 0, CassandraType.VARCHAR, null, false, false),
- new CassandraColumnHandle("", "bar", 0, CassandraType.VARCHAR, null, false, false),
- new CassandraColumnHandle("", "table", 0, CassandraType.VARCHAR, null, false, false));
+ new CassandraColumnHandle("", "foo", 0, CassandraType.VARCHAR, null, false, false, false),
+ new CassandraColumnHandle("", "bar", 0, CassandraType.VARCHAR, null, false, false, false),
+ new CassandraColumnHandle("", "table", 0, CassandraType.VARCHAR, null, false, false, false));
StringBuilder sb = new StringBuilder();
CassandraCqlUtils.appendSelectColumns(sb, columns);
Something went wrong with that request. Please try again.