Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-19082: Catalog. Cleanup dead code #3669

Merged
merged 48 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f30cfd5
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
2b951c0
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
ce9ff2f
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
840ad91
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
d223298
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
9b7d653
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
b327218
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
abad141
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
576ceab
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
70eb425
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
7a1013d
[IGNITE-19082] Catalog. Cleanup dead code
lowka Apr 30, 2024
5004b43
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 1, 2024
d77002f
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 1, 2024
0c56f78
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 2, 2024
a890286
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 2, 2024
bad54f4
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 2, 2024
d1a0d5e
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 2, 2024
c38524b
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 2, 2024
847170c
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 2, 2024
70f99cd
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 7, 2024
62cd5bb
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 7, 2024
7e09e3c
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 7, 2024
a5ab378
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 7, 2024
eabb3b7
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 7, 2024
13da6c5
Update modules/catalog/src/main/java/org/apache/ignite/internal/catal…
lowka May 8, 2024
5a283c4
Update modules/catalog/src/main/java/org/apache/ignite/internal/catal…
lowka May 8, 2024
028e40f
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 8, 2024
a9f2aeb
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 8, 2024
0ef98b9
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 8, 2024
4add4da
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 8, 2024
3ffb542
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 8, 2024
1e5c8dc
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 8, 2024
b6d3848
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 9, 2024
66c793d
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 9, 2024
98fd973
IGNITE-19082: Catalog. Cleanup dead code
lowka May 9, 2024
4067a34
IGNITE-19082: Catalog. Cleanup dead code
lowka May 9, 2024
888b2de
IGNITE-19082: Catalog. Cleanup dead code
lowka May 9, 2024
ceed415
IGNITE-19082: Catalog. Cleanup dead code
lowka May 9, 2024
05e80ed
IGNITE-19082: Catalog. Cleanup dead code
lowka May 9, 2024
f5dc047
IGNITE-19082: Catalog. Cleanup dead code
lowka May 9, 2024
8248371
IGNITE-19082: Catalog. Cleanup dead code
lowka May 9, 2024
29a7236
IGNITE-19082: Catalog. Cleanup dead code
lowka May 10, 2024
2830695
IGNITE-19082: Catalog. Cleanup dead code
lowka May 10, 2024
483a4c1
Merge branch 'main' into ignite-19082
lowka May 10, 2024
c79b540
IGNITE-19082: Catalog. Cleanup dead code
lowka May 10, 2024
5831091
Merge branch 'main' into ignite-19082
lowka May 10, 2024
b3ab12e
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 10, 2024
9551be9
[IGNITE-19082] Catalog. Cleanup dead code
lowka May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.Flow.Publisher;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCatalogCommand;
import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
Expand Down Expand Up @@ -119,6 +120,9 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata
/** Versioned catalog descriptors sorted in chronological order. */
private final NavigableMap<Long, Catalog> catalogByTs = new ConcurrentSkipListMap<>();

/** A future that completes when an empty catalog is initialised. If catalog is not empty this future when this completes starts. */
private final CompletableFuture<Void> catalogInitializationFuture = new CompletableFuture<>();

private final UpdateLog updateLog;

private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
Expand Down Expand Up @@ -158,27 +162,7 @@ public CatalogManagerImpl(
public CompletableFuture<Void> startAsync() {
int objectIdGen = 0;

// TODO: IGNITE-19082 Move default schema objects initialization to cluster init procedure.
CatalogSchemaDescriptor publicSchema = new CatalogSchemaDescriptor(
objectIdGen++,
DEFAULT_SCHEMA_NAME,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
new CatalogSystemViewDescriptor[0],
INITIAL_CAUSALITY_TOKEN
);

// TODO: IGNITE-19082 Move system schema objects initialization to cluster init procedure.
CatalogSchemaDescriptor systemSchema = new CatalogSchemaDescriptor(
objectIdGen++,
SYSTEM_SCHEMA_NAME,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
new CatalogSystemViewDescriptor[0],
INITIAL_CAUSALITY_TOKEN
);

Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(publicSchema, systemSchema), null);
Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(), null);

registerCatalog(emptyCatalog);

Expand All @@ -187,12 +171,17 @@ public CompletableFuture<Void> startAsync() {
return updateLog.startAsync()
.thenCompose(none -> {
if (latestCatalogVersion() == emptyCatalog.version()) {
// node has not seen any updates yet, let's try to initialise
// catalog with default zone
return createDefaultZone(emptyCatalog);
}
int initializedCatalogVersion = emptyCatalog.version() + 1;

this.catalogReadyFuture(initializedCatalogVersion)
.thenCompose(ignored -> awaitVersionActivation(initializedCatalogVersion))
.handle((r, e) -> catalogInitializationFuture.complete(null));

return nullCompletedFuture();
return initCatalog(emptyCatalog);
} else {
catalogInitializationFuture.complete(null);
return nullCompletedFuture();
}
});
}

Expand All @@ -205,7 +194,11 @@ public CompletableFuture<Void> stopAsync() {

@Override
public @Nullable CatalogTableDescriptor table(String tableName, long timestamp) {
return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).table(tableName);
CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
if (schema == null) {
return null;
}
return schema.table(tableName);
}

@Override
Expand All @@ -225,7 +218,11 @@ public Collection<CatalogTableDescriptor> tables(int catalogVersion) {

@Override
public @Nullable CatalogIndexDescriptor aliveIndex(String indexName, long timestamp) {
return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).aliveIndex(indexName);
CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
if (schema == null) {
return null;
}
return schema.aliveIndex(indexName);
}

@Override
Expand Down Expand Up @@ -321,6 +318,11 @@ public CompletableFuture<Void> catalogReadyFuture(int version) {
return versionTracker.waitFor(version);
}

@Override
public CompletableFuture<Void> catalogInitializationFuture() {
return catalogInitializationFuture;
}

@Override
public @Nullable Catalog catalog(int catalogVersion) {
return catalogByVer.get(catalogVersion);
Expand Down Expand Up @@ -363,8 +365,9 @@ public CompletableFuture<Boolean> compactCatalog(long timestamp) {
return updateLog.saveSnapshot(new SnapshotEntry(catalog));
}

private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
List<UpdateEntry> createZoneEntries = new BulkUpdateProducer(List.of(
private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
List<CatalogCommand> initCommands = List.of(
// Init default zone
CreateZoneCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
.partitions(DEFAULT_PARTITION_COUNT)
Expand All @@ -378,10 +381,15 @@ private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
.build(),
AlterZoneSetDefaultCatalogCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
.build()
)).get(emptyCatalog);
.build(),
// Add schemas
CreateSchemaCommand.builder().name(DEFAULT_SCHEMA_NAME).build(),
CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
);

List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(emptyCatalog);

return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, createZoneEntries))
return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries))
.handle((result, error) -> {
if (error != null) {
LOG.warn("Unable to create default zone.", error);
Expand All @@ -407,13 +415,7 @@ private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer
CompletableFuture<Integer> resultFuture = new CompletableFuture<>();

saveUpdate(updateProducer, 0)
.thenCompose(newVersion -> {
Catalog catalog = catalogByVer.get(newVersion);

HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog);

return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> newVersion);
})
.thenCompose(this::awaitVersionActivation)
.whenComplete((newVersion, err) -> {
if (err != null) {
Throwable errUnwrapped = ExceptionUtils.unwrapCause(err);
Expand Down Expand Up @@ -448,6 +450,14 @@ private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer
return resultFuture;
}

private CompletableFuture<Integer> awaitVersionActivation(int version) {
Catalog catalog = catalogByVer.get(version);

HybridTimestamp tsSafeForRoReadingInPastOptimization = calcClusterWideEnsureActivationTime(catalog);

return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> version);
}

private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog catalog) {
return clusterWideEnsuredActivationTsSafeForRoReads(
catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
* <p>TBD: events
*/
public interface CatalogService extends EventProducer<CatalogEvent, CatalogEventParameters> {
/** Default schema name. */
String DEFAULT_SCHEMA_NAME = "PUBLIC";

/** System schema name. */
String SYSTEM_SCHEMA_NAME = "SYSTEM";

/** Default storage profile. */
Expand Down Expand Up @@ -110,4 +112,9 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent
* @param version Catalog version to wait for.
*/
CompletableFuture<Void> catalogReadyFuture(int version);

/**
* Returns a future, which completes when empty catalog is initialised. Otherwise this future completes upon startup.
*/
CompletableFuture<Void> catalogInitializationFuture();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public List<UpdateEntry> get(Catalog catalog) {
}

return List.of(
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1), schemaName),
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1), schema.id()),
new ObjectIdGenUpdateEntry(1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public List<UpdateEntry> get(Catalog catalog) {
}

return List.of(
new NewColumnsEntry(table.id(), columnDescriptors, schemaName)
new NewColumnsEntry(table.id(), columnDescriptors)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public List<UpdateEntry> get(Catalog catalog) {
}

return List.of(
new AlterColumnEntry(table.id(), target, schemaName)
new AlterColumnEntry(table.id(), target)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public List<UpdateEntry> get(Catalog catalog) {
});

return List.of(
new DropColumnsEntry(table.id(), columns, schemaName)
new DropColumnsEntry(table.id(), columns)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog.commands;

import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;

import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;

/**
* Command to create a new schema.
*/
public class CreateSchemaCommand implements CatalogCommand {

private final String schemaName;

private CreateSchemaCommand(String schemaName) {
validateIdentifier(schemaName, "Name of the schema");

this.schemaName = schemaName;
}

/** {@inheritDoc} */
@Override
public List<UpdateEntry> get(Catalog catalog) {
int id = catalog.objectIdGenState();

if (catalog.schema(schemaName) != null) {
throw new CatalogValidationException(format("Schema with name '{}' already exists", schemaName));
}

lowka marked this conversation as resolved.
Show resolved Hide resolved
CatalogSchemaDescriptor schema = new CatalogSchemaDescriptor(
id,
schemaName,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
new CatalogSystemViewDescriptor[0],
0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
0
INITIAL_CAUSALITY_TOKEN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

);

return List.of(
new NewSchemaEntry(schema),
new ObjectIdGenUpdateEntry(1)
);
}

/** Returns builder to create a command to create a new schema. */
public static Builder builder() {
return new Builder();
}

/** Implementation of {@link CreateSchemaCommandBuilder}. */
public static class Builder implements CreateSchemaCommandBuilder {

private String name;

/** {@inheritDoc} */
@Override
public CreateSchemaCommandBuilder name(String name) {
this.name = name;
return this;
}

/** {@inheritDoc} */
@Override
public CatalogCommand build() {
return new CreateSchemaCommand(name);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog.commands;

import org.apache.ignite.internal.catalog.CatalogCommand;

/**
* Builder for a {@link CreateSchemaCommand}.
*/
public interface CreateSchemaCommandBuilder {

/** Sets schema name. Should not be null or blank. */
CreateSchemaCommandBuilder name(String name);

/** Creates new schema command. */
CatalogCommand build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public List<UpdateEntry> get(Catalog catalog) {
CatalogSchemaDescriptor systemSchema = schemaOrThrow(catalog, CatalogManager.SYSTEM_SCHEMA_NAME);

List<CatalogTableColumnDescriptor> viewColumns = columns.stream().map(CatalogUtils::fromParams).collect(toList());
CatalogSystemViewDescriptor descriptor = new CatalogSystemViewDescriptor(id, name, viewColumns, systemViewType);
CatalogSystemViewDescriptor descriptor = new CatalogSystemViewDescriptor(id, systemSchema.id(), name, viewColumns, systemViewType);

CatalogSystemViewDescriptor existingSystemView = systemSchema.systemView(name);

Expand All @@ -121,7 +121,7 @@ public List<UpdateEntry> get(Catalog catalog) {
}

return List.of(
new NewSystemViewEntry(descriptor, systemSchema.name()),
new NewSystemViewEntry(descriptor),
new ObjectIdGenUpdateEntry(1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ public List<UpdateEntry> get(Catalog catalog) {
CatalogIndexDescriptor pkIndex = createIndexDescriptor(txWaitCatalogVersion, indexName, pkIndexId, tableId);

return List.of(
new NewTableEntry(table, schemaName),
new NewIndexEntry(pkIndex, schemaName),
new NewTableEntry(table),
new NewIndexEntry(pkIndex, schema.id()),
new MakeIndexAvailableEntry(pkIndexId),
new ObjectIdGenUpdateEntry(id - catalog.objectIdGenState())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public List<UpdateEntry> get(Catalog catalog) {
updateEntries.add(new RemoveIndexEntry(index.id()));
});

updateEntries.add(new DropTableEntry(table.id(), schemaName));
updateEntries.add(new DropTableEntry(table.id()));

return updateEntries;
}
Expand Down