Skip to content

Commit

Permalink
IGNITE-17748 Enrich InternalTable.scan API in order to support index …
Browse files Browse the repository at this point in the history
…scans. (#1205)
  • Loading branch information
AMashenkov committed Nov 4, 2022
1 parent de6ee07 commit 018f66e
Show file tree
Hide file tree
Showing 60 changed files with 2,132 additions and 620 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
Expand All @@ -30,6 +31,8 @@
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.tx.InternalTransaction;
Expand Down Expand Up @@ -319,20 +322,67 @@ public CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<Binary

/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> scan(int p, @Nullable InternalTransaction tx) {
public Publisher<BinaryRow> scan(
int partId,
@Nullable InternalTransaction tx,
UUID indexId,
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
BitSet columnsToInclude
) {
throw new IgniteInternalException(new OperationNotSupportedException());
}

/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> scan(
int p,
int partId,
@NotNull HybridTimestamp readTimestamp,
@NotNull ClusterNode recipientNode,
@NotNull UUID indexId,
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
@Nullable BitSet columnsToInclude) {
throw new IgniteInternalException(new OperationNotSupportedException());
}

/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> scan(
int partId,
@NotNull HybridTimestamp readTimestamp,
@NotNull ClusterNode recipientNode
) {
return null;
}

/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(
int partId,
@Nullable InternalTransaction tx,
@NotNull UUID indexId,
BinaryTuple key,
@Nullable BitSet columnsToInclude
) {
throw new IgniteInternalException(new OperationNotSupportedException());
}

/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(
int partId,
@NotNull HybridTimestamp readTimestamp,
@NotNull ClusterNode recipientNode,
@NotNull UUID indexId,
BinaryTuple key,
@Nullable BitSet columnsToInclude
) {
throw new IgniteInternalException(new OperationNotSupportedException());
}

/** {@inheritDoc} */
@Override
public List<String> assignments() {
Expand Down
23 changes: 23 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/lang/IgniteUuid.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,36 @@
package org.apache.ignite.lang;

import java.io.Serializable;
import java.util.Comparator;
import java.util.UUID;

/**
* This is a faster performing version of {@link UUID}. On basic tests this version is at least 10x time faster for ID creation. It uses
* extra memory for 8-byte counter additionally to internal UUID.
*/
public final class IgniteUuid implements Comparable<IgniteUuid>, Cloneable, Serializable {
/** Returns global order comparator for IgniteUUID type, which orders global UUID first then local id. */
public static Comparator<IgniteUuid> globalOrderComparator() {
return (uuid1, uuid2) -> {
if (uuid1 == uuid2) {
return 0;
}

int res = uuid1.globalId().compareTo(uuid2.globalId());

if (res == 0) {
res = Long.compare(uuid1.localId(), uuid2.localId());
}

return res;
};
}

/** Returns natural order comparator for IgniteUUID type, which orders local id first then global UUID. */
public static Comparator<IgniteUuid> naturalOrderComparator() {
return IgniteUuid::compareTo;
}

/** Serial version uid. */
private static final long serialVersionUID = 0L;

Expand Down
1 change: 1 addition & 0 deletions modules/index/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-configuration')
implementation project(':ignite-schema')
implementation project(':ignite-table')
implementation project(':ignite-transactions')
implementation project(':ignite-table')
implementation project(':ignite-configuration')
Expand Down
4 changes: 2 additions & 2 deletions modules/index/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-transactions</artifactId>
<artifactId>ignite-table</artifactId>
</dependency>

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-table</artifactId>
<artifactId>ignite-transactions</artifactId>
</dependency>

<!-- Test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,32 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.network.ClusterNode;

/**
* An object that represents a hash index.
*/
public class HashIndex implements Index<IndexDescriptor> {
private final UUID id;
private final UUID tableId;
private final InternalTable table;
private final IndexDescriptor descriptor;

/**
* Constructs the index.
*
* @param id An identifier of the index.
* @param tableId An identifier of the table this index relates to.
* @param table A table this index relates to.
* @param descriptor A descriptor of the index.
*/
public HashIndex(UUID id, UUID tableId, IndexDescriptor descriptor) {
public HashIndex(UUID id, TableImpl table, IndexDescriptor descriptor) {
this.id = Objects.requireNonNull(id, "id");
this.tableId = Objects.requireNonNull(tableId, "tableId");
this.table = Objects.requireNonNull(table.internalTable(), "table");
this.descriptor = Objects.requireNonNull(descriptor, "descriptor");
}

Expand All @@ -54,7 +59,7 @@ public UUID id() {
/** {@inheritDoc} */
@Override
public UUID tableId() {
return tableId;
return table.tableId();
}

/** {@inheritDoc} */
Expand All @@ -71,7 +76,13 @@ public IndexDescriptor descriptor() {

/** {@inheritDoc} */
@Override
public Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
throw new UnsupportedOperationException("Index scan is not implemented yet");
public Publisher<BinaryRow> lookup(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns) {
return table.lookup(partId, tx, id, key, columns);
}

/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(int partId, HybridTimestamp timestamp, ClusterNode recipientNode, BinaryTuple key, BitSet columns) {
return table.lookup(partId, timestamp, recipientNode, id, key, columns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import java.util.BitSet;
import java.util.UUID;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.network.ClusterNode;

/**
* An object describing an abstract index.
Expand All @@ -35,10 +38,32 @@ public interface Index<DescriptorT extends IndexDescriptor> {
/** Returns name of the index. */
String name();

/** Returns table id index belong to. */
UUID tableId();

/** Returns index dewscriptor. */
DescriptorT descriptor();

/** Returns cursor for the values corresponding to the given key. */
Publisher<BinaryTuple> scan(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns);
/**
* Returns cursor for the values corresponding to the given key.
*
* @param partId Partition id.
* @param tx Transaction.
* @param key Key to lookup.
* @param columns Columns to include.
* @return A cursor from resulting rows.
*/
Publisher<BinaryRow> lookup(int partId, InternalTransaction tx, BinaryTuple key, BitSet columns);

/**
* Returns cursor for the values corresponding to the given key.
*
* @param partId Partition id.
* @param readTimestamp Read timestamp.
* @param recipientNode Cluster node that will handle given get request.
* @param key Key to search.
* @param columns Columns to include.
* @return A cursor from resulting rows.
*/
Publisher<BinaryRow> lookup(int partId, HybridTimestamp readTimestamp, ClusterNode recipientNode, BinaryTuple key, BitSet columns);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.ignite.internal.schema.configuration.index.TableIndexChange;
import org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
Expand All @@ -71,16 +72,18 @@

/**
* An Ignite component that is responsible for handling index-related commands like CREATE or DROP
* as well as managing indexes lifecycle.
* as well as managing indexes' lifecycle.
*/
public class IndexManager extends Producer<IndexEvent, IndexEventParameters> implements IgniteComponent {
private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);

/** Common tables and indexes configuration. */
private final TablesConfiguration tablesCfg;

/** Schema manager. */
private final SchemaManager schemaManager;

/** Table manager. */
private final TableManager tableManager;

/** Busy lock to stop synchronously. */
Expand Down Expand Up @@ -376,15 +379,15 @@ private CompletableFuture<?> createIndexLocally(long causalityToken, UUID tableI
LOG.trace("Creating local index: name={}, id={}, tableId={}, token={}",
tableIndexView.name(), tableIndexView.id(), tableId, causalityToken);

Index<?> index = newIndex(tableId, tableIndexView);

TableRowToIndexKeyConverter tableRowConverter = new TableRowToIndexKeyConverter(
schemaManager.schemaRegistry(tableId),
index.descriptor().columns().toArray(STRING_EMPTY_ARRAY)
);

return tableManager.tableAsync(causalityToken, tableId)
.thenAccept(table -> {
Index<?> index = newIndex(table, tableIndexView);

TableRowToIndexKeyConverter tableRowConverter = new TableRowToIndexKeyConverter(
schemaManager.schemaRegistry(tableId),
index.descriptor().columns().toArray(STRING_EMPTY_ARRAY)
);

if (index instanceof HashIndex) {
table.registerHashIndex(tableIndexView.id(), tableIndexView.uniq(), tableRowConverter::convert);

Expand All @@ -401,17 +404,17 @@ private CompletableFuture<?> createIndexLocally(long causalityToken, UUID tableI
});
}

private Index<?> newIndex(UUID tableId, TableIndexView indexView) {
private Index<?> newIndex(TableImpl table, TableIndexView indexView) {
if (indexView instanceof SortedIndexView) {
return new SortedIndexImpl(
indexView.id(),
tableId,
table,
convert((SortedIndexView) indexView)
);
} else if (indexView instanceof HashIndexView) {
return new HashIndex(
indexView.id(),
tableId,
table,
convert((HashIndexView) indexView)
);
}
Expand All @@ -434,8 +437,12 @@ private SortedIndexDescriptor convert(SortedIndexView indexView) {
for (var columnName : indexView.columns().namedListKeys()) {
IndexColumnView columnView = indexView.columns().get(columnName);

//TODO IGNITE-15141: Make null-order configurable.
// NULLS FIRST for DESC, NULLS LAST for ASC by default.
boolean nullsFirst = !columnView.asc();

indexedColumns.add(columnName);
collations.add(ColumnCollation.get(columnView.asc(), false));
collations.add(ColumnCollation.get(columnView.asc(), nullsFirst));
}

return new SortedIndexDescriptor(
Expand Down Expand Up @@ -495,8 +502,7 @@ private VersionedConverter createConverter(int schemaVersion) {

var rowConverter = new BinaryConverter(descriptor, tupleSchema, false);

return new VersionedConverter(descriptor.version(),
row -> new BinaryTuple(tupleSchema, rowConverter.toTuple(row)));
return new VersionedConverter(descriptor.version(), rowConverter::toTuple);
}

private int[] resolveColumnIndexes(SchemaDescriptor descriptor) {
Expand Down
Loading

0 comments on commit 018f66e

Please sign in to comment.