Skip to content
Permalink
Browse files
IGNITE-16937 MvTableStorage interface introduced (#807)
  • Loading branch information
ibessonov committed May 20, 2022
1 parent 84de2e9 commit f3a7635bd5b646e6983e95649686fe164dcfb487
Showing 16 changed files with 282 additions and 64 deletions.
@@ -26,8 +26,6 @@

/**
* Multi-versioned partition storage.
* POC version, that represents a combination between a replicated TX-aware MV storage and physical MV storage. Their API are very similar,
* although there are very important differences that will be addressed in the future.
*/
public interface MvPartitionStorage extends AutoCloseable {
/**
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.storage.engine;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;

/**
* Table storage that contains meta, partitions and SQL indexes.
*/
public interface MvTableStorage {
/**
* Creates a partition for the current table. Not expected to be called concurrently with the same partition id.
*
* @param partitionId Partition id.
* @return Partition storage.
* @throws IllegalArgumentException If partition id is out of bounds.
* @throws StorageException If an error has occurred during the partition creation.
*/
MvPartitionStorage createPartition(int partitionId) throws StorageException;

/**
* Returns the partition storage or {@code null} if the requested storage doesn't exist.
*
* @param partitionId Partition id.
* @return Partition storage or {@code null}.
* @throws IllegalArgumentException If partition id is out of bounds.
* @throws NullPointerException If partition doesn't exist.
*/
MvPartitionStorage partition(int partitionId);

/**
* Destroys a partition if it exists.
*
* @param partitionId Partition id.
* @throws IllegalArgumentException If partition id is out of bounds.
* @throws StorageException If an error has occurred during the partition destruction.
*/
CompletableFuture<?> destroyPartition(int partitionId) throws StorageException;

/**
* Returns the table configuration.
*/
TableConfiguration configuration();

/**
* Starts the storage.
*
* @throws StorageException If an error has occurred during the start of the storage.
*/
void start() throws StorageException;

/**
* Stops the storage.
*
* @throws StorageException If an error has occurred during the stop of the storage.
*/
void stop() throws StorageException;

/**
* Stops and destroys the storage and cleans all allocated resources.
*
* @throws StorageException If an error has occurred during the destruction of the storage.
*/
void destroy() throws StorageException;
}
@@ -45,4 +45,14 @@ public interface StorageEngine {
* @throws StorageException If an error has occurs while creating the table.
*/
TableStorage createTable(TableConfiguration tableCfg) throws StorageException;

/**
* Creates new table storage.
*
* @param tableCfg Table configuration.
* @throws StorageException If an error has occurs while creating the table.
*/
default MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
throw new UnsupportedOperationException("createMvTable");
}
}
@@ -53,7 +53,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
public void testEmpty() throws Exception {
MvPartitionStorage pk = partitionStorage();

RowId rowId = UuidRowId.randomRowId(0);
RowId rowId = pk.insert(binaryRow(new TestKey(1, "1"), new TestValue(1, "1")), UUID.randomUUID());

pk.abortWrite(rowId);

assertEquals(0, rowId.partitionId());

// Read.
@@ -227,11 +227,10 @@ public void testAbort() throws Exception {

TestKey key = new TestKey(1, "1");
TestValue val = new TestValue(10, "10");
RowId rowId = UuidRowId.randomRowId(0);

UUID txId = UUID.randomUUID();

pk.addWrite(rowId, binaryRow(key, val), txId);
RowId rowId = pk.insert(binaryRow(key, val), txId);

// Using transaction id.
assertEquals(List.of(val), convert(index.scan(null, null, 0, txId, null)));
@@ -257,11 +256,10 @@ public void testAbortRemove() throws Exception {

TestKey key = new TestKey(1, "1");
TestValue val = new TestValue(10, "10");
RowId rowId = UuidRowId.randomRowId(0);

Timestamp insertTs = Timestamp.nextVersion();

pk.addWrite(rowId, binaryRow(key, val), UUID.randomUUID());
RowId rowId = pk.insert(binaryRow(key, val), UUID.randomUUID());

pk.commitWrite(rowId, insertTs);

@@ -296,11 +294,10 @@ public void testCommit() throws Exception {

TestKey key = new TestKey(1, "1");
TestValue val = new TestValue(10, "10");
RowId rowId = UuidRowId.randomRowId(0);

UUID txId = UUID.randomUUID();

pk.addWrite(rowId, binaryRow(key, val), txId);
RowId rowId = pk.insert(binaryRow(key, val), txId);

// Using transaction id.
assertEquals(List.of(val), convert(index.scan(null, null, 0, txId, null)));
@@ -328,11 +325,10 @@ public void testCommitRemove() throws Exception {

TestKey key = new TestKey(1, "1");
TestValue val = new TestValue(10, "10");
RowId rowId = UuidRowId.randomRowId(0);

Timestamp insertTs = Timestamp.nextVersion();

pk.addWrite(rowId, binaryRow(key, val), UUID.randomUUID());
RowId rowId = pk.insert(binaryRow(key, val), UUID.randomUUID());

pk.commitWrite(rowId, insertTs);

@@ -29,7 +29,6 @@
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.storage.UuidRowId;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -42,8 +41,11 @@ public class TestMvPartitionStorage implements MvPartitionStorage {

private final List<TestSortedIndexMvStorage> indexes;

public TestMvPartitionStorage(List<TestSortedIndexMvStorage> indexes) {
private final int partitionId;

public TestMvPartitionStorage(List<TestSortedIndexMvStorage> indexes, int partitionId) {
this.indexes = indexes;
this.partitionId = partitionId;
}

private static class VersionChain {
@@ -71,7 +73,7 @@ public static VersionChain createCommitted(Timestamp timestamp, VersionChain unc
/** {@inheritDoc} */
@Override
public RowId insert(BinaryRow row, UUID txId) throws StorageException {
RowId rowId = UuidRowId.randomRowId(0);
RowId rowId = new TestRowId(partitionId);

addWrite(rowId, row, txId);

@@ -244,4 +246,19 @@ public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, Timestamp timestamp)
public void close() throws Exception {
// No-op.
}

static class TestRowId implements RowId {
final int partitionId;
final UUID uuid;

TestRowId(int partitionId) {
this.partitionId = partitionId;
uuid = UUID.randomUUID();
}

@Override
public int partitionId() {
return partitionId;
}
}
}
@@ -26,7 +26,7 @@
*/
public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest {
/** Test partition storage instance. */
private final TestMvPartitionStorage storage = new TestMvPartitionStorage(List.of());
private final TestMvPartitionStorage storage = new TestMvPartitionStorage(List.of(), 0);

/** {@inheritDoc} */
@Override
@@ -40,7 +40,7 @@
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.storage.UuidRowId;
import org.apache.ignite.internal.storage.basic.TestMvPartitionStorage.TestRowId;
import org.apache.ignite.internal.storage.index.IndexRowPrefix;
import org.apache.ignite.internal.storage.index.PrefixComparator;
import org.apache.ignite.internal.storage.index.SortedIndexMvStorage;
@@ -53,7 +53,7 @@
* Test implementation of MV sorted index storage.
*/
public class TestSortedIndexMvStorage implements SortedIndexMvStorage {
private final NavigableSet<Pair<BinaryRow, UuidRowId>> index;
private final NavigableSet<Pair<BinaryRow, TestRowId>> index;

private final SchemaDescriptor descriptor;

@@ -83,11 +83,10 @@ public TestSortedIndexMvStorage(
partitions = tableCfg.partitions();

index = new ConcurrentSkipListSet<>(
((Comparator<Pair<BinaryRow, UuidRowId>>) (p1, p2) -> {
((Comparator<Pair<BinaryRow, TestRowId>>) (p1, p2) -> {
return compareColumns(p1.getFirst(), p2.getFirst());
})
.thenComparingLong(pair -> pair.getSecond().mostSignificantBits())
.thenComparingLong(pair -> pair.getSecond().leastSignificantBits())
.thenComparing(pair -> pair.getSecond().uuid)
);

// Init columns.
@@ -150,11 +149,11 @@ private int compareColumns(BinaryRow l, BinaryRow r) {
}

public void append(BinaryRow row, RowId rowId) {
index.add(new Pair<>(row, (UuidRowId) rowId));
index.add(new Pair<>(row, (TestRowId) rowId));
}

public void remove(BinaryRow row, RowId rowId) {
index.remove(new Pair<>(row, (UuidRowId) rowId));
index.remove(new Pair<>(row, (TestRowId) rowId));
}

public boolean matches(BinaryRow aborted, BinaryRow existing) {
@@ -198,7 +197,7 @@ private Cursor<IndexRowEx> scan(
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;

NavigableSet<Pair<BinaryRow, UuidRowId>> index = this.index;
NavigableSet<Pair<BinaryRow, TestRowId>> index = this.index;
int direction = 1;

// Swap bounds and flip index for backwards scan.
@@ -31,7 +31,7 @@
public class TestSortedIndexMvStorageTest extends AbstractSortedIndexMvStorageTest {
private List<TestSortedIndexMvStorage> indexes = new CopyOnWriteArrayList<>();

private TestMvPartitionStorage partitionStorage = new TestMvPartitionStorage(indexes);
private TestMvPartitionStorage partitionStorage = new TestMvPartitionStorage(indexes, 0);

@Override
protected MvPartitionStorage partitionStorage() {
@@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.engine.TableStorage;

@@ -49,4 +50,10 @@ public TableStorage createTable(TableConfiguration tableCfg) throws StorageExcep

return new TestConcurrentHashMapTableStorage(tableCfg);
}

/** {@inheritDoc} */
@Override
public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
return new TestMvTableStorage(tableCfg);
}
}
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.storage.chm;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.basic.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;

/**
* Test table storage implementation.
*/
public class TestMvTableStorage implements MvTableStorage {
private final TableConfiguration tableConfig;

private final Map<Integer, TestMvPartitionStorage> partitions = new ConcurrentHashMap<>();

public TestMvTableStorage(TableConfiguration tableCfg) {
this.tableConfig = tableCfg;
}

@Override
public MvPartitionStorage createPartition(int partitionId) throws StorageException {
partitions.put(partitionId, new TestMvPartitionStorage(List.of(), partitionId));

return partition(partitionId);
}

@Override
public MvPartitionStorage partition(int partitionId) {
return Objects.requireNonNull(partitions.get(partitionId), "Partition doesn't exist");
}

@Override
public CompletableFuture<?> destroyPartition(int partitionId) throws StorageException {
partitions.remove(partitionId);

return CompletableFuture.completedFuture(null);
}

@Override
public TableConfiguration configuration() {
return tableConfig;
}

@Override
public void start() throws StorageException {
}

@Override
public void stop() throws StorageException {
}

@Override
public void destroy() throws StorageException {
}
}

0 comments on commit f3a7635

Please sign in to comment.