Skip to content

Commit

Permalink
[IGNITE-19082] Catalog. Cleanup dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
lowka committed Apr 26, 2024
1 parent a3bd2e5 commit 600d20a
Show file tree
Hide file tree
Showing 21 changed files with 90 additions and 904 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Flow.Publisher;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.commands.CreateSystemViewCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public List<UpdateEntry> get(Catalog catalog) {
}

return List.of(
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1), schemaName),
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1), schema.id()),
new ObjectIdGenUpdateEntry(1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public List<UpdateEntry> get(Catalog catalog) {
}

return List.of(
new NewSystemViewEntry(descriptor, systemSchema.name()),
new NewSystemViewEntry(descriptor, systemSchema.id()),
new ObjectIdGenUpdateEntry(1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ public List<UpdateEntry> get(Catalog catalog) {
int id = catalog.objectIdGenState();
int tableId = id++;
int pkIndexId = id++;
int schemaId = schema.id();

CatalogTableDescriptor table = new CatalogTableDescriptor(
tableId,
schema.id(),
schemaId,
pkIndexId,
tableName,
zone.id(),
Expand All @@ -143,8 +144,8 @@ public List<UpdateEntry> get(Catalog catalog) {
CatalogIndexDescriptor pkIndex = createIndexDescriptor(txWaitCatalogVersion, indexName, pkIndexId, tableId);

return List.of(
new NewTableEntry(table, schemaName),
new NewIndexEntry(pkIndex, schemaName),
new NewTableEntry(table, schemaId),
new NewIndexEntry(pkIndex, schemaId),
new MakeIndexAvailableEntry(pkIndexId),
new ObjectIdGenUpdateEntry(id - catalog.objectIdGenState())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ public class NewIndexEntry implements UpdateEntry, Fireable {

private final CatalogIndexDescriptor descriptor;

private final String schemaName;
private final int schemaId;

/**
* Constructs the object.
*
* @param descriptor A descriptor of an index to add.
* @param schemaName Schema name.
* @param schemaId Schema id.
*/
public NewIndexEntry(CatalogIndexDescriptor descriptor, String schemaName) {
public NewIndexEntry(CatalogIndexDescriptor descriptor, int schemaId) {
this.descriptor = descriptor;
this.schemaName = schemaName;
this.schemaId = schemaId;
}

/** Gets descriptor of an index to add. */
Expand All @@ -77,7 +77,7 @@ public CatalogEventParameters createEventParameters(long causalityToken, int cat

@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaId));

descriptor.updateToken(causalityToken);

Expand Down Expand Up @@ -109,15 +109,15 @@ public String toString() {
private static class NewIndexEntrySerializer implements CatalogObjectSerializer<NewIndexEntry> {
@Override
public NewIndexEntry readFrom(IgniteDataInput input) throws IOException {
String schemaName = input.readUTF();
int schemaId = input.readInt();
CatalogIndexDescriptor descriptor = CatalogSerializationUtils.IDX_SERIALIZER.readFrom(input);

return new NewIndexEntry(descriptor, schemaName);
return new NewIndexEntry(descriptor, schemaId);
}

@Override
public void writeTo(NewIndexEntry entry, IgniteDataOutput output) throws IOException {
output.writeUTF(entry.schemaName);
output.writeInt(entry.schemaId);
CatalogSerializationUtils.IDX_SERIALIZER.writeTo(entry.descriptor(), output);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ public class NewSystemViewEntry implements UpdateEntry, Fireable {

private final CatalogSystemViewDescriptor descriptor;

private final String schemaName;
private final int schemaId;

/**
* Constructor.
*
* @param descriptor System view descriptor.
* @param schemaName A schema name.
* @param schemaId A schema name.
*/
public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor, String schemaName) {
public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor, int schemaId) {
this.descriptor = descriptor;
this.schemaName = schemaName;
this.schemaId = schemaId;
}

@Override
Expand All @@ -76,7 +76,7 @@ public CatalogEventParameters createEventParameters(long causalityToken, int cat
/** {@inheritDoc} */
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor systemSchema = catalog.schema(schemaName);
CatalogSchemaDescriptor systemSchema = catalog.schema(schemaId);

descriptor.updateToken(causalityToken);

Expand Down Expand Up @@ -117,15 +117,15 @@ private static class NewSystemViewEntrySerializer implements CatalogObjectSerial
@Override
public NewSystemViewEntry readFrom(IgniteDataInput input) throws IOException {
CatalogSystemViewDescriptor descriptor = CatalogSystemViewDescriptor.SERIALIZER.readFrom(input);
String schema = input.readUTF();
int schemaId= input.readInt();

return new NewSystemViewEntry(descriptor, schema);
return new NewSystemViewEntry(descriptor, schemaId);
}

@Override
public void writeTo(NewSystemViewEntry entry, IgniteDataOutput output) throws IOException {
CatalogSystemViewDescriptor.SERIALIZER.writeTo(entry.descriptor, output);
output.writeUTF(entry.schemaName);
output.writeInt(entry.schemaId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ public class NewTableEntry implements UpdateEntry, Fireable {

private final CatalogTableDescriptor descriptor;

private final String schemaName;
private final int schemaId;

/**
* Constructs the object.
*
* @param descriptor A descriptor of a table to add.
* @param schemaName A schema name.
* @param schemaId Schema id.
*/
public NewTableEntry(CatalogTableDescriptor descriptor, String schemaName) {
public NewTableEntry(CatalogTableDescriptor descriptor, int schemaId) {
this.descriptor = descriptor;
this.schemaName = schemaName;
this.schemaId = schemaId;
}

/** Returns descriptor of a table to add. */
Expand All @@ -77,7 +77,7 @@ public CatalogEventParameters createEventParameters(long causalityToken, int cat

@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName));
CatalogSchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaId));

descriptor.updateToken(causalityToken);

Expand Down Expand Up @@ -112,15 +112,15 @@ private static class NewTableEntrySerializer implements CatalogObjectSerializer<
@Override
public NewTableEntry readFrom(IgniteDataInput input) throws IOException {
CatalogTableDescriptor descriptor = CatalogTableDescriptor.SERIALIZER.readFrom(input);
String schemaName = input.readUTF();
int schemaId = input.readInt();

return new NewTableEntry(descriptor, schemaName);
return new NewTableEntry(descriptor, schemaId);
}

@Override
public void writeTo(NewTableEntry entry, IgniteDataOutput output) throws IOException {
CatalogTableDescriptor.SERIALIZER.writeTo(entry.descriptor(), output);
output.writeUTF(entry.schemaName);
output.writeInt(entry.schemaId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;

import java.io.IOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -307,8 +306,8 @@ private void checkNewIndexEntry() {
CatalogSortedIndexDescriptor sortedIndexDescriptor = newSortedIndexDescriptor("idx1");
CatalogHashIndexDescriptor hashIndexDescriptor = newHashIndexDescriptor("idx2");

NewIndexEntry sortedIdxEntry = new NewIndexEntry(sortedIndexDescriptor, "PUBLIC");
NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor, "PUBLIC");
NewIndexEntry sortedIdxEntry = new NewIndexEntry(sortedIndexDescriptor, 1);
NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor, 1);

VersionedUpdate update = newVersionedUpdate(sortedIdxEntry, hashIdxEntry);

Expand All @@ -323,18 +322,15 @@ private void checkNewTableEntry() {

List<CatalogTableColumnDescriptor> columns = List.of(col1, col2, col3, col4);

NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), null), "PUBLIC");
NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of()), "PUBLIC");
NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c2")), "PUBLIC");
NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c1")), "PUBLIC");
NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), null), 1);
NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of()), 1);
NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c2")), 1);
NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1", columns, List.of("c1", "c2"), List.of("c1")), 1);

VersionedUpdate update = newVersionedUpdate(entry1, entry2, entry3, entry4);
VersionedUpdate deserialized = serialize(update);

assertVersionedUpdate(update, deserialized);

NewTableEntry deserializedEntry = (NewTableEntry) deserialized.entries().get(0);
assertSame(deserializedEntry.descriptor().primaryKeyColumns(), deserializedEntry.descriptor().colocationColumns());
}

private void checkNewSystemViewEntry() {
Expand All @@ -346,8 +342,8 @@ private void checkNewSystemViewEntry() {
CatalogSystemViewDescriptor clusterDesc =
new CatalogSystemViewDescriptor(1, "view1", List.of(col1, col2), SystemViewType.CLUSTER);

NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc, "PUBLIC");
NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc, "PUBLIC");
NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc, 1);
NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc, 1);

VersionedUpdate update = newVersionedUpdate(nodeEntry, clusterEntry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

/** For {@link IndexManager} testing. */
/** Some index tests */
public class ItIndexManagerTest extends ClusterPerClassIntegrationTest {
private static final String ZONE_NAME = "ZONE_TABLE";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
Expand All @@ -75,7 +76,7 @@
class IndexBuildController implements ManuallyCloseable {
private final IndexBuilder indexBuilder;

private final IndexManager indexManager;
private final MvTableStorageProvider storageProvider;

private final CatalogService catalogService;

Expand All @@ -94,14 +95,14 @@ class IndexBuildController implements ManuallyCloseable {
/** Constructor. */
IndexBuildController(
IndexBuilder indexBuilder,
IndexManager indexManager,
MvTableStorageProvider storageProvider,
CatalogService catalogService,
ClusterService clusterService,
PlacementDriver placementDriver,
ClockService clockService
) {
this.indexBuilder = indexBuilder;
this.indexManager = indexManager;
this.storageProvider = storageProvider;
this.catalogService = catalogService;
this.clusterService = clusterService;
this.placementDriver = placementDriver;
Expand Down Expand Up @@ -251,7 +252,14 @@ private void stopBuildingIndexesIfPrimaryExpired(TablePartitionId replicaId) {
}

private CompletableFuture<MvTableStorage> getMvTableStorageFuture(long causalityToken, TablePartitionId replicaId) {
return indexManager.getMvTableStorage(causalityToken, replicaId.tableId());
// return tableManager.tableAsync(causalityToken, replicaId.tableId()).thenApply(table -> table.internalTable().storage());
return storageProvider.getStorage(causalityToken, replicaId.tableId());
}

@FunctionalInterface
public interface MvTableStorageProvider {

CompletableFuture<MvTableStorage> getStorage(long causalityToken, int tableId);
}

private CompletableFuture<ReplicaMeta> awaitPrimaryReplica(TablePartitionId replicaId, HybridTimestamp timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.index.IndexBuildController.MvTableStorageProvider;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;

Expand Down Expand Up @@ -75,7 +77,7 @@ public IndexBuildingManager(
ReplicaService replicaService,
CatalogManager catalogManager,
MetaStorageManager metaStorageManager,
IndexManager indexManager,
TableManager tableManager,
PlacementDriver placementDriver,
ClusterService clusterService,
LogicalTopologyService logicalTopologyService,
Expand All @@ -99,10 +101,13 @@ public IndexBuildingManager(
indexBuilder = new IndexBuilder(executor, replicaService);

indexAvailabilityController = new IndexAvailabilityController(catalogManager, metaStorageManager, indexBuilder);
MvTableStorageProvider storageProvider = (causalityToken, tableId) -> {
return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> table.internalTable().storage());
};

indexBuildController = new IndexBuildController(
indexBuilder,
indexManager,
storageProvider,
catalogManager,
clusterService,
placementDriver,
Expand Down

0 comments on commit 600d20a

Please sign in to comment.