From 88ca20c92e316fe5d8f2c3be9d1a4b556dc127f4 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 14:45:50 +0800 Subject: [PATCH 01/10] [flink] support migrate_file action --- .../flink/action/MigrateFileAction.java | 63 +++++++++++++++ .../action/MigrateFileActionFactory.java | 79 +++++++++++++++++++ .../org.apache.paimon.factories.Factory | 1 + .../MigrateTableProcedureITCase.java | 42 ++++++++++ 4 files changed, 185 insertions(+) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java new file mode 100644 index 000000000000..0f24046853de --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.MigrateFileProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Migrate from external hive table to paimon table. */ +public class MigrateFileAction extends ActionBase { + + private final String connector; + private final String sourceTable; + private final String targetTable; + private final String tableProperties; + private boolean deleteOrigin; + + public MigrateFileAction( + String connector, + String warehouse, + String sourceTable, + String targetTable, + boolean deleteOrigin, + Map catalogConfig, + String tableProperties) { + super(warehouse, catalogConfig); + this.connector = connector; + this.sourceTable = sourceTable; + this.targetTable = targetTable; + this.deleteOrigin = deleteOrigin; + this.tableProperties = tableProperties; + } + + @Override + public void run() throws Exception { + MigrateFileProcedure migrateTableProcedure = new MigrateFileProcedure(); + migrateTableProcedure.withCatalog(catalog); + migrateTableProcedure.call( + new DefaultProcedureContext(env), + connector, + sourceTable, + targetTable, + deleteOrigin); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java new file mode 100644 index 000000000000..9e94ad1779a0 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Action Factory for {@link MigrateFileAction}. */ +public class MigrateFileActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "migrate_file"; + + private static final String SOURCE_TYPE = "source_type"; + + private static final String SOURCE_TABLE = "source_table"; + + private static final String TARGET_TABLE = "target_table"; + + private static final String DELETE_ORIGIN = "delete_origin"; + + private static final String OPTIONS = "options"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + String warehouse = params.get(WAREHOUSE); + String connector = params.get(SOURCE_TYPE); + String sourceHiveTable = params.get(SOURCE_TABLE); + String targetTable = params.get(TARGET_TABLE); + boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN)); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + + MigrateFileAction migrateFileAction = + new MigrateFileAction( + connector, + warehouse, + sourceHiveTable, + targetTable, + deleteOrigin, + catalogConfig, + ""); + return Optional.of(migrateFileAction); + } + + @Override + public void printHelp() { + System.out.println("Action \"migrate_file\" runs a migrating job from hive to paimon."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " migrate_file --warehouse --source_type hive " + + "--source_table " + + "--target_table " + + "--delete_origin true " + + "[--catalog_conf ==,=,...]"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6d3dc249179b..e38d6ebd4c08 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -25,6 +25,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory org.apache.paimon.flink.action.DeleteTagActionFactory org.apache.paimon.flink.action.ResetConsumerActionFactory org.apache.paimon.flink.action.MigrateTableActionFactory +org.apache.paimon.flink.action.MigrateFileActionFactory org.apache.paimon.flink.action.MigrateDatabaseActionFactory org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory org.apache.paimon.flink.action.QueryServiceActionFactory diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 634f10a60be7..1b0c1b7d1eba 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -190,6 +190,48 @@ public void testMigrateAction(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } + @ParameterizedTest + @ValueSource(strings = {"orc", "parquet", "avro"}) + public void testMigrateFileAction(String format) throws Exception { + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); + tEnv.useCatalog("HIVE"); + tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + tEnv.executeSql( + "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " + + format); + tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await(); + tEnv.executeSql("SHOW CREATE TABLE hivetable"); + + tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); + tEnv.useCatalog("PAIMON_GE"); + List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect()); + Map catalogConf = new HashMap<>(); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + MigrateTableAction migrateTableAction = + new MigrateTableAction( + "hive", + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), + "default.hivetable", + catalogConf, + ""); + migrateTableAction.run(); + + tEnv.executeSql( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:" + + PORT + + "' , 'warehouse' = '" + + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) + + "')"); + tEnv.useCatalog("PAIMON"); + List r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect()); + + Assertions.assertThat(r1.size() == 100); + Assertions.assertThat(r2.size() == 0); + } + protected static String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); From 12e439be9026ff15259274927036f57a8e5656d0 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 15:13:19 +0800 Subject: [PATCH 02/10] add more ut --- .../procedure/MigrateFileProcedureITCase.java | 58 +++++++++++++++++++ .../MigrateTableProcedureITCase.java | 49 ++-------------- 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index 19b5a4f972c6..d4577190ee19 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -19,6 +19,7 @@ package org.apache.paimon.hive.procedure; import org.apache.paimon.flink.action.ActionITCaseBase; +import org.apache.paimon.flink.action.MigrateFileAction; import org.apache.paimon.flink.procedure.MigrateFileProcedure; import org.apache.paimon.hive.TestHiveMetastore; @@ -33,7 +34,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; /** Tests for {@link MigrateFileProcedure}. */ @@ -57,18 +60,21 @@ public void afterEach() throws Exception { public void testOrc() throws Exception { test("orc"); testWithDeleteOrigin("orc"); + testMigrateFileAction("orc"); } @Test public void testAvro() throws Exception { test("avro"); testWithDeleteOrigin("avro"); + testMigrateFileAction("avro"); } @Test public void testParquet() throws Exception { test("parquet"); testWithDeleteOrigin("parquet"); + testMigrateFileAction("parquet"); } public void test(String format) throws Exception { @@ -135,6 +141,58 @@ public void testWithDeleteOrigin(String format) throws Exception { Assertions.assertThat(r1.size() == 0); } + public void testMigrateFileAction(String format) throws Exception { + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); + tEnv.useCatalog("HIVE"); + tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + tEnv.executeSql( + "CREATE TABLE hivetable02 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " + + format); + tEnv.executeSql("INSERT INTO hivetable02 VALUES" + data(100)).await(); + tEnv.executeSql("SHOW CREATE TABLE hivetable02"); + + tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); + tEnv.useCatalog("PAIMON_GE"); + + Map catalogConf = new HashMap<>(); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + + tEnv.executeSql( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:" + + PORT + + "' , 'warehouse' = '" + + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) + + "')"); + tEnv.useCatalog("PAIMON"); + tEnv.executeSql( + "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); + tEnv.useCatalog("PAIMON"); + + MigrateFileAction migrateFileAction = + new MigrateFileAction( + "hive", + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), + "default.hivetable02", + "default.paimontable02", + false, + catalogConf, + ""); + migrateFileAction.run(); + + tEnv.useCatalog("HIVE"); + List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable02").collect()); + + tEnv.useCatalog("PAIMON"); + List r2 = + ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM paimontable02").collect()); + + Assertions.assertThat(r1.size() == 0); + Assertions.assertThat(r2.size() == 100); + } + private String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 1b0c1b7d1eba..3bc8bab34333 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -19,7 +19,7 @@ package org.apache.paimon.hive.procedure; import org.apache.paimon.flink.action.ActionITCaseBase; -import org.apache.paimon.flink.action.MigrateTableAction; +import org.apache.paimon.flink.action.MigrateFileAction; import org.apache.paimon.flink.procedure.MigrateFileProcedure; import org.apache.paimon.hive.TestHiveMetastore; @@ -169,11 +169,12 @@ public void testMigrateAction(String format) throws Exception { Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); - MigrateTableAction migrateTableAction = - new MigrateTableAction( + MigrateFileAction migrateTableAction = + new MigrateFileAction( "hive", System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), "default.hivetable", + "", catalogConf, ""); migrateTableAction.run(); @@ -190,48 +191,6 @@ public void testMigrateAction(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } - @ParameterizedTest - @ValueSource(strings = {"orc", "parquet", "avro"}) - public void testMigrateFileAction(String format) throws Exception { - TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); - tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); - tEnv.useCatalog("HIVE"); - tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); - tEnv.executeSql( - "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " - + format); - tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await(); - tEnv.executeSql("SHOW CREATE TABLE hivetable"); - - tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); - tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); - tEnv.useCatalog("PAIMON_GE"); - List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect()); - Map catalogConf = new HashMap<>(); - catalogConf.put("metastore", "hive"); - catalogConf.put("uri", "thrift://localhost:" + PORT); - MigrateTableAction migrateTableAction = - new MigrateTableAction( - "hive", - System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), - "default.hivetable", - catalogConf, - ""); - migrateTableAction.run(); - - tEnv.executeSql( - "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:" - + PORT - + "' , 'warehouse' = '" - + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) - + "')"); - tEnv.useCatalog("PAIMON"); - List r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect()); - - Assertions.assertThat(r1.size() == 100); - Assertions.assertThat(r2.size() == 0); - } - protected static String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); From 24295fca793516d7ff8da9e381e53c63d1c05878 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 15:18:55 +0800 Subject: [PATCH 03/10] fix build --- .../paimon/hive/procedure/MigrateTableProcedureITCase.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 3bc8bab34333..634f10a60be7 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -19,7 +19,7 @@ package org.apache.paimon.hive.procedure; import org.apache.paimon.flink.action.ActionITCaseBase; -import org.apache.paimon.flink.action.MigrateFileAction; +import org.apache.paimon.flink.action.MigrateTableAction; import org.apache.paimon.flink.procedure.MigrateFileProcedure; import org.apache.paimon.hive.TestHiveMetastore; @@ -169,12 +169,11 @@ public void testMigrateAction(String format) throws Exception { Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); - MigrateFileAction migrateTableAction = - new MigrateFileAction( + MigrateTableAction migrateTableAction = + new MigrateTableAction( "hive", System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname), "default.hivetable", - "", catalogConf, ""); migrateTableAction.run(); From 64905505865fd22eabaf57ddad9b89960699160d Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 16:03:40 +0800 Subject: [PATCH 04/10] regen --- .../hive/procedure/MigrateFileProcedureITCase.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index d4577190ee19..b33627d66f8d 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -160,16 +160,8 @@ public void testMigrateFileAction(String format) throws Exception { catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); - tEnv.executeSql( - "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:" - + PORT - + "' , 'warehouse' = '" - + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) - + "')"); - tEnv.useCatalog("PAIMON"); tEnv.executeSql( "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); - tEnv.useCatalog("PAIMON"); MigrateFileAction migrateFileAction = new MigrateFileAction( @@ -185,12 +177,7 @@ public void testMigrateFileAction(String format) throws Exception { tEnv.useCatalog("HIVE"); List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable02").collect()); - tEnv.useCatalog("PAIMON"); - List r2 = - ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM paimontable02").collect()); - Assertions.assertThat(r1.size() == 0); - Assertions.assertThat(r2.size() == 100); } private String data(int i) { From 0f1d2d4d781d75a4d6d88f30de38afad7eb5fd4d Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 16:36:04 +0800 Subject: [PATCH 05/10] fix u --- .../paimon/hive/procedure/MigrateFileProcedureITCase.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index b33627d66f8d..72f2f02a0d45 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -160,9 +160,17 @@ public void testMigrateFileAction(String format) throws Exception { catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); + tEnv.executeSql( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:" + + PORT + + "' , 'warehouse' = '" + + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) + + "')"); + tEnv.useCatalog("PAIMON"); tEnv.executeSql( "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); + tEnv.useCatalog("PAIMON_GE"); MigrateFileAction migrateFileAction = new MigrateFileAction( "hive", From dccd899ac72390c78618d6671eb4387ac85a7e17 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 17:02:56 +0800 Subject: [PATCH 06/10] change catalog --- .../paimon/hive/procedure/MigrateFileProcedureITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index 72f2f02a0d45..9d0eacda13d0 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -182,7 +182,7 @@ public void testMigrateFileAction(String format) throws Exception { ""); migrateFileAction.run(); - tEnv.useCatalog("HIVE"); + tEnv.useCatalog("PAIMON"); List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable02").collect()); Assertions.assertThat(r1.size() == 0); From e2ee08bc829e5517dc91fef417b25d7e8be28bf8 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 17:31:55 +0800 Subject: [PATCH 07/10] fix ut --- .../action/MigrateFileActionFactory.java | 3 +- .../procedure/MigrateFileProcedureITCase.java | 49 +++++-------------- 2 files changed, 13 insertions(+), 39 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java index 9e94ad1779a0..3c15b03cf6f8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java @@ -49,6 +49,7 @@ public Optional create(MultipleParameterToolAdapter params) { String targetTable = params.get(TARGET_TABLE); boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN)); Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + String tableConf = params.get(OPTIONS); MigrateFileAction migrateFileAction = new MigrateFileAction( @@ -58,7 +59,7 @@ public Optional create(MultipleParameterToolAdapter params) { targetTable, deleteOrigin, catalogConfig, - ""); + tableConf); return Optional.of(migrateFileAction); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index 9d0eacda13d0..d07bf655c367 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -59,21 +59,18 @@ public void afterEach() throws Exception { @Test public void testOrc() throws Exception { test("orc"); - testWithDeleteOrigin("orc"); testMigrateFileAction("orc"); } @Test public void testAvro() throws Exception { test("avro"); - testWithDeleteOrigin("avro"); testMigrateFileAction("avro"); } @Test public void testParquet() throws Exception { test("parquet"); - testWithDeleteOrigin("parquet"); testMigrateFileAction("parquet"); } @@ -109,7 +106,7 @@ public void test(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } - public void testWithDeleteOrigin(String format) throws Exception { + public void testMigrateFileAction(String format) throws Exception { TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); tEnv.useCatalog("HIVE"); @@ -118,7 +115,12 @@ public void testWithDeleteOrigin(String format) throws Exception { "CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " + format); tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await(); - tEnv.executeSql("SHOW CREATE TABLE hivetable01"); + + tEnv.executeSql( + "CREATE TABLE hivetable02 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " + + format); + tEnv.executeSql("INSERT INTO hivetable02 VALUES" + data(100)).await(); + tEnv.executeSql("SHOW CREATE TABLE hivetable02"); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); @@ -136,41 +138,11 @@ public void testWithDeleteOrigin(String format) throws Exception { tEnv.executeSql( "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") .await(); - tEnv.useCatalog("HIVE"); - List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable01").collect()); - Assertions.assertThat(r1.size() == 0); - } - - public void testMigrateFileAction(String format) throws Exception { - TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); - tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); - tEnv.useCatalog("HIVE"); - tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); - tEnv.executeSql( - "CREATE TABLE hivetable02 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " - + format); - tEnv.executeSql("INSERT INTO hivetable02 VALUES" + data(100)).await(); - tEnv.executeSql("SHOW CREATE TABLE hivetable02"); - tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); - tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); tEnv.useCatalog("PAIMON_GE"); - Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); - - tEnv.executeSql( - "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:" - + PORT - + "' , 'warehouse' = '" - + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) - + "')"); - tEnv.useCatalog("PAIMON"); - tEnv.executeSql( - "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); - - tEnv.useCatalog("PAIMON_GE"); MigrateFileAction migrateFileAction = new MigrateFileAction( "hive", @@ -182,10 +154,11 @@ public void testMigrateFileAction(String format) throws Exception { ""); migrateFileAction.run(); - tEnv.useCatalog("PAIMON"); - List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable02").collect()); - + tEnv.useCatalog("HIVE"); + List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable01").collect()); + List r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable02").collect()); Assertions.assertThat(r1.size() == 0); + Assertions.assertThat(r2.size() == 0); } private String data(int i) { From 4cca4dd82855207ecf0d8ab4ed5efc030c1438c9 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 17:53:43 +0800 Subject: [PATCH 08/10] fix uts --- .../paimon/hive/procedure/MigrateFileProcedureITCase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index d07bf655c367..721578631ddc 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -135,6 +135,8 @@ public void testMigrateFileAction(String format) throws Exception { tEnv.useCatalog("PAIMON"); tEnv.executeSql( "CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); + tEnv.executeSql( + "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); tEnv.executeSql( "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") .await(); From a4d5a5682db1739047362344c109e839a4c04dd7 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 18:26:57 +0800 Subject: [PATCH 09/10] fix uts --- .../paimon/hive/procedure/MigrateFileProcedureITCase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index 721578631ddc..7c0945c1eb54 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -140,8 +140,7 @@ public void testMigrateFileAction(String format) throws Exception { tEnv.executeSql( "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") .await(); - - tEnv.useCatalog("PAIMON_GE"); + Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT); From 6a34fd32a8b72e14e122d68b6b27a988459a419e Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 22 Jul 2024 18:32:00 +0800 Subject: [PATCH 10/10] fix --- .../paimon/hive/procedure/MigrateFileProcedureITCase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index 7c0945c1eb54..721578631ddc 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -140,7 +140,8 @@ public void testMigrateFileAction(String format) throws Exception { tEnv.executeSql( "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") .await(); - + + tEnv.useCatalog("PAIMON_GE"); Map catalogConf = new HashMap<>(); catalogConf.put("metastore", "hive"); catalogConf.put("uri", "thrift://localhost:" + PORT);