Skip to content

Commit

Permalink
[IGNITE-19082] Catalog. Cleanup dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
lowka committed Apr 30, 2024
1 parent 4bab4fa commit f30cfd5
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.concurrent.Flow.Publisher;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCatalogCommand;
import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.CreateSchemaCommandBuilder;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
Expand Down Expand Up @@ -159,26 +161,26 @@ public CompletableFuture<Void> startAsync() {
int objectIdGen = 0;

// TODO: IGNITE-19082 Move default schema objects initialization to cluster init procedure.
CatalogSchemaDescriptor publicSchema = new CatalogSchemaDescriptor(
objectIdGen++,
DEFAULT_SCHEMA_NAME,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
new CatalogSystemViewDescriptor[0],
INITIAL_CAUSALITY_TOKEN
);

// TODO: IGNITE-19082 Move system schema objects initialization to cluster init procedure.
CatalogSchemaDescriptor systemSchema = new CatalogSchemaDescriptor(
objectIdGen++,
SYSTEM_SCHEMA_NAME,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
new CatalogSystemViewDescriptor[0],
INITIAL_CAUSALITY_TOKEN
);

Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(publicSchema, systemSchema), null);
// CatalogSchemaDescriptor publicSchema = new CatalogSchemaDescriptor(
// objectIdGen++,
// DEFAULT_SCHEMA_NAME,
// new CatalogTableDescriptor[0],
// new CatalogIndexDescriptor[0],
// new CatalogSystemViewDescriptor[0],
// INITIAL_CAUSALITY_TOKEN
// );
//
// // TODO: IGNITE-19082 Move system schema objects initialization to cluster init procedure.
// CatalogSchemaDescriptor systemSchema = new CatalogSchemaDescriptor(
// objectIdGen++,
// SYSTEM_SCHEMA_NAME,
// new CatalogTableDescriptor[0],
// new CatalogIndexDescriptor[0],
// new CatalogSystemViewDescriptor[0],
// INITIAL_CAUSALITY_TOKEN
// );

Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(), List.of(), null);

registerCatalog(emptyCatalog);

Expand Down Expand Up @@ -364,7 +366,7 @@ public CompletableFuture<Boolean> compactCatalog(long timestamp) {
}

private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
List<UpdateEntry> createZoneEntries = new BulkUpdateProducer(List.of(
List<CatalogCommand> commands = List.of(
CreateZoneCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
.partitions(DEFAULT_PARTITION_COUNT)
Expand All @@ -378,10 +380,14 @@ private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
.build(),
AlterZoneSetDefaultCatalogCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
.build()
)).get(emptyCatalog);
.build(),
CreateSchemaCommand.builder().name(DEFAULT_SCHEMA_NAME).build(),
CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
);

List<UpdateEntry> entries = new BulkUpdateProducer(commands).get(emptyCatalog);

return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, createZoneEntries))
return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries))
.handle((result, error) -> {
if (error != null) {
LOG.warn("Unable to create default zone.", error);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog.commands;

import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;

/**
* Command to create a new schema.
*/
public class CreateSchemaCommand implements CatalogCommand {

private final String schemaName;

private CreateSchemaCommand(String schemaName) {
this.schemaName = schemaName;
}

/** {@inheritDoc} */
@Override
public List<UpdateEntry> get(Catalog catalog) {
int id = catalog.objectIdGenState();

if (schemaName == null) {
throw new CatalogValidationException("Schema name is null");
}

CatalogSchemaDescriptor schema = new CatalogSchemaDescriptor(
id,
schemaName,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
new CatalogSystemViewDescriptor[0],
0
);

return List.of(
new NewSchemaEntry(schema),
new ObjectIdGenUpdateEntry(1)
);
}

public static Builder builder() {
return new Builder();
}

public static class Builder implements CreateSchemaCommandBuilder {

private String name;

Builder() {

}

/** {@inheritDoc} */
@Override
public CreateSchemaCommandBuilder name(String name) {
this.name = name;
return this;
}

/** {@inheritDoc} */
@Override
public CatalogCommand build() {
return new CreateSchemaCommand(name);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog.commands;

import org.apache.ignite.internal.catalog.CatalogCommand;

/**
* Builder for a {@link CreateSchemaCommand}.
*/
public interface CreateSchemaCommandBuilder {

/** Sets schema name. */
CreateSchemaCommandBuilder name(String name);

/** Creates new schema command. */
CatalogCommand build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog.storage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
import org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntryType;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;

/**
* New schema entry.
*/
public class NewSchemaEntry implements UpdateEntry {

public static CatalogObjectSerializer<NewSchemaEntry> SERIALIZER = new CatalogObjectSerializer<NewSchemaEntry>() {
@Override
public NewSchemaEntry readFrom(IgniteDataInput input) throws IOException {
CatalogSchemaDescriptor schemaDescriptor = CatalogSchemaDescriptor.SERIALIZER.readFrom(input);
return new NewSchemaEntry(schemaDescriptor);
}

@Override
public void writeTo(NewSchemaEntry value, IgniteDataOutput output) throws IOException {
CatalogSchemaDescriptor.SERIALIZER.writeTo(value.descriptor, output);
}
};

private final CatalogSchemaDescriptor descriptor;

public NewSchemaEntry(CatalogSchemaDescriptor descriptor) {
this.descriptor = descriptor;
}

/** {@inheritDoc} */
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema = catalog.schema(descriptor.name());
if (schema != null) {
return catalog;
}

descriptor.updateToken(causalityToken);

List<CatalogSchemaDescriptor> schemas = new ArrayList<>(catalog.schemas().size() + 1);
schemas.addAll(catalog.schemas());
schemas.add(descriptor);

return new Catalog(
catalog.version(),
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
schemas,
catalog.defaultZone().id()
);
}

/** {@inheritDoc} */
@Override
public int typeId() {
return MarshallableEntryType.NEW_SCHEMA.id();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.internal.catalog.storage.MakeIndexAvailableEntry;
import org.apache.ignite.internal.catalog.storage.NewColumnsEntry;
import org.apache.ignite.internal.catalog.storage.NewIndexEntry;
import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
import org.apache.ignite.internal.catalog.storage.NewSystemViewEntry;
import org.apache.ignite.internal.catalog.storage.NewTableEntry;
import org.apache.ignite.internal.catalog.storage.NewZoneEntry;
Expand Down Expand Up @@ -81,6 +82,7 @@ public interface CatalogEntrySerializerProvider {
serializers[MarshallableEntryType.SET_DEFAULT_ZONE.id()] = SetDefaultZoneEntry.SERIALIZER;
//noinspection ThisEscapedInObjectConstruction
serializers[MarshallableEntryType.VERSIONED_UPDATE.id()] = new VersionedUpdateSerializer(this);
serializers[MarshallableEntryType.NEW_SCHEMA.id()] = NewSchemaEntry.SERIALIZER;

assert Stream.of(serializers).noneMatch(Objects::isNull);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum MarshallableEntryType {
SNAPSHOT(16),
VERSIONED_UPDATE(17),
RENAME_INDEX(18),
SET_DEFAULT_ZONE(19);
SET_DEFAULT_ZONE(19),
NEW_SCHEMA(20);

/** Type ID. */
private final int id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void testCreateTable() {

assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
assertSame(schema, manager.activeSchema(0L));
assertSame(schema, manager.activeSchema(1L));
assertSame(schema, manager.activeSchema(123L));

assertNull(schema.table(TABLE_NAME));
Expand Down

0 comments on commit f30cfd5

Please sign in to comment.