Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
wrapperVersion=3.3.2
wrapperVersion=3.3.4
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.6/apache-maven-3.8.6-bin.zip
distributionSha256Sum=ccf20a80e75a17ffc34d47c5c95c98c39d426ca17d670f09cd91e877072a9309
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.11/apache-maven-3.9.11-bin.zip
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,14 @@ protected Catalog getPaimonCatalog() {
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
throws TableAlreadyExistException {
// then, create the table
Identifier paimonPath = toPaimon(tablePath);
Schema paimonSchema = toPaimonSchema(tableDescriptor);
try {
createTable(paimonPath, paimonSchema, context.isCreatingFlussTable());
createTable(tablePath, paimonSchema, context.isCreatingFlussTable());
} catch (Catalog.DatabaseNotExistException e) {
// create database
createDatabase(tablePath.getDatabaseName());
try {
createTable(paimonPath, paimonSchema, context.isCreatingFlussTable());
createTable(tablePath, paimonSchema, context.isCreatingFlussTable());
} catch (Catalog.DatabaseNotExistException t) {
// shouldn't happen in normal cases
throw new RuntimeException(
Expand All @@ -109,26 +108,26 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {
try {
Identifier paimonPath = toPaimon(tablePath);
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
alterTable(paimonPath, paimonSchemaChanges);
alterTable(tablePath, paimonSchemaChanges);
} catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
// shouldn't happen before we support schema change
throw new RuntimeException(e);
}
}

private void createTable(Identifier tablePath, Schema schema, boolean isCreatingFlussTable)
private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable)
throws Catalog.DatabaseNotExistException {
Identifier paimonPath = toPaimon(tablePath);
try {
// not ignore if table exists
paimonCatalog.createTable(tablePath, schema, false);
paimonCatalog.createTable(paimonPath, schema, false);
} catch (Catalog.TableAlreadyExistException e) {
try {
Table table = paimonCatalog.getTable(tablePath);
Table table = paimonCatalog.getTable(paimonPath);
FileStoreTable fileStoreTable = (FileStoreTable) table;
validatePaimonSchemaCompatible(
tablePath, fileStoreTable.schema().toSchema(), schema);
paimonPath, fileStoreTable.schema().toSchema(), schema);
// if creating a new fluss table, we should ensure the lake table is empty
if (isCreatingFlussTable) {
checkTableIsEmpty(tablePath, fileStoreTable);
Expand All @@ -155,12 +154,12 @@ private void createDatabase(String databaseName) {
}
}

private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
private void alterTable(TablePath tablePath, List<SchemaChange> tableChanges)
throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {
try {
paimonCatalog.alterTable(tablePath, tableChanges, false);
paimonCatalog.alterTable(toPaimon(tablePath), tableChanges, false);
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException("Table " + tablePath + " not exists.");
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.lake.paimon.utils;

import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.metadata.TablePath;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
Expand Down Expand Up @@ -79,13 +80,13 @@ private static void removeChangeableOptions(Map<String, String> options) {
&& !entry.getKey().startsWith(FLUSS_CONF_PREFIX));
}

public static void checkTableIsEmpty(Identifier tablePath, FileStoreTable table) {
public static void checkTableIsEmpty(TablePath tablePath, FileStoreTable table) {
if (table.latestSnapshot().isPresent()) {
throw new TableAlreadyExistException(
String.format(
"The table %s already exists in Paimon catalog, and the table is not empty. "
+ "Please first drop the table in Paimon catalog or use a new table name.",
tablePath.getEscapedFullName()));
tablePath));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ void testCreateLakeEnableTableWithExistNonEmptyLakeTable() throws Exception {
.cause()
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_with_non_empty_lake_table` already exists in Paimon catalog, and the table is not empty. Please first drop the table in Paimon catalog or use a new table name.");
"The table fluss.log_table_with_non_empty_lake_table already exists in Paimon catalog, and the table is not empty. Please first drop the table in Paimon catalog or use a new table name.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.lake.paimon;

import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
Expand All @@ -35,6 +36,7 @@
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Unit test for {@link PaimonLakeCatalog}. */
class PaimonLakeCatalogTest {
Expand Down Expand Up @@ -83,6 +85,34 @@ void testAlterTableProperties() throws Exception {
assertThat(table.options().get("fluss.key")).isEqualTo(null);
}

@Test
void alterTablePropertiesWithNonExistentTable() {
TestingLakeCatalogContext context = new TestingLakeCatalogContext();
// db & table don't exist
assertThatThrownBy(
() ->
flussPaimonCatalog.alterTable(
TablePath.of("non_existing_db", "non_existing_table"),
Collections.singletonList(TableChange.set("key", "value")),
context))
.isInstanceOf(TableNotExistException.class)
.hasMessage("Table non_existing_db.non_existing_table does not exist.");

String database = "alter_props_db";
String tableName = "alter_props_table";
createTable(database, tableName);

// database exists but table doesn't
assertThatThrownBy(
() ->
flussPaimonCatalog.alterTable(
TablePath.of(database, "non_existing_table"),
Collections.singletonList(TableChange.set("key", "value")),
context))
.isInstanceOf(TableNotExistException.class)
.hasMessage("Table alter_props_db.non_existing_table does not exist.");
}

private void createTable(String database, String tableName) {
Schema flussSchema =
Schema.newBuilder()
Expand Down
50 changes: 43 additions & 7 deletions mvnw

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 48 additions & 8 deletions mvnw.cmd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.