From 0cf3e530af9f0cdc3651283fad151a4647fd66da Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Tue, 16 Jul 2024 19:48:18 +0800 Subject: [PATCH 1/8] [spark] Support migrate_file procedure save source table --- docs/content/spark/procedures.md | 11 +++ .../spark/procedure/MigrateFileProcedure.java | 10 ++- .../procedure/MigrateFileProcedureTest.scala | 76 +++++++++++++++++++ 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index d9a744cdd66f..3f9b3de852c0 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -126,6 +126,17 @@ This section introduce all available spark procedures about paimon. CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet') + + migrate_file + + Migrate from hive table to a paimon table. Arguments: +
  • source_type: the origin table's type to be migrated, such as hive. Cannot be empty.
  • +
  • source_table: name of the origin table to migrate. Cannot be empty.
  • +
  • target_table: name of the target table to be migrated. Cannot be empty.
  • +
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
  • + + CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true) + remove_orphan_files diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java index 177ad5c2b7e5..8f4850fef48c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java @@ -26,13 +26,13 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.Collections; +import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; /** @@ -48,13 +48,14 @@ public class MigrateFileProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("source_type", StringType), ProcedureParameter.required("source_table", StringType), - ProcedureParameter.required("target_table", StringType) + ProcedureParameter.required("target_table", StringType), + ProcedureParameter.optional("delete_origin", BooleanType) }; private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + new StructField("result", BooleanType, true, Metadata.empty()) }); protected MigrateFileProcedure(TableCatalog tableCatalog) { @@ -76,6 +77,7 @@ public InternalRow[] call(InternalRow args) { String format = args.getString(0); String sourceTable = args.getString(1); String targetTable = args.getString(2); + boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3); Identifier sourceTableId = Identifier.fromString(sourceTable); Identifier targetTableId = Identifier.fromString(targetTable); @@ -97,6 +99,8 @@ public InternalRow[] call(InternalRow args) { targetTableId.getDatabaseName(), targetTableId.getObjectName(), Collections.emptyMap()); + + migrator.deleteOriginTable(deleteNeed); migrator.executeMigrate(); } catch (Exception e) { throw new RuntimeException("Call migrate_file error", e); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala index 90428b33198e..9ddbffb75197 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala @@ -57,6 +57,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase { } }) + Seq("parquet", "orc", "avro").foreach( + format => { + test( + s"Paimon migrate file procedure: migrate $format non-partitioned table with delete source table") { + withTable("hive_tbl", "paimon_tbl") { + // create hive table + spark.sql(s""" + |CREATE TABLE hive_tbl (id STRING, name STRING, pt STRING) + |USING $format + |""".stripMargin) + + spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')") + + // create paimon table + spark.sql(s""" + |CREATE TABLE paimon_tbl (id STRING, name STRING, pt STRING) + |USING PAIMON + |TBLPROPERTIES ('file.format'='$format', 'bucket'='-1') + |""".stripMargin) + + spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 'd', 'p2')") + + spark.sql( + s"CALL sys.migrate_file(source_type => 'hive', source_table => '$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin => false)") + + checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil) + + checkAnswer( + spark.sql("SELECT * FROM paimon_tbl ORDER BY id"), + Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1") :: Row( + "4", + "d", + "p2") :: Nil) + } + } + }) + Seq("parquet", "orc", "avro").foreach( format => { test(s"Paimon migrate file procedure: migrate $format partitioned table") { @@ -92,4 +129,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase { } } }) + + Seq("parquet", "orc", "avro").foreach( + format => { + test( + s"Paimon migrate file procedure: migrate $format partitioned table with delete source table") { + withTable("hive_tbl", "paimon_tbl") { + // create hive table + spark.sql(s""" + |CREATE TABLE hive_tbl (id STRING, name STRING, pt STRING) + |USING $format + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')") + + // create paimon table + spark.sql(s""" + |CREATE TABLE paimon_tbl (id STRING, name STRING, pt STRING) + |USING PAIMON + |TBLPROPERTIES ('file.format'='$format', 'bucket'='-1') + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 'd', 'p2')") + + spark.sql( + s"CALL sys.migrate_file(source_type => 'hive', source_table => '$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin => false)") + + checkAnswer( + spark.sql("SELECT * FROM paimon_tbl ORDER BY id"), + Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1") :: Row( + "4", + "d", + "p2") :: Nil) + + checkAnswer(spark.sql("SELECT * FROM paimon_tbl ORDER BY id"), Nil) + } + } + }) } From ad594d6be0507f9c523acdf787ba7bd76cdd30d7 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Tue, 16 Jul 2024 20:18:45 +0800 Subject: [PATCH 2/8] fix ut --- .../paimon/spark/procedure/MigrateFileProcedureTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala index 9ddbffb75197..ceb0879cb9cf 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala @@ -164,7 +164,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase { "d", "p2") :: Nil) - checkAnswer(spark.sql("SELECT * FROM paimon_tbl ORDER BY id"), Nil) + checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil) } } }) From c3ab9b6abbfa2c1766191ee0527f04fd4903a279 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Wed, 17 Jul 2024 14:14:45 +0800 Subject: [PATCH 3/8] fix --- .../org/apache/paimon/hive/procedure/RepairActionITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java index fdaa243dfa5d..51b1027a887b 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java @@ -42,7 +42,7 @@ public class RepairActionITCase extends ActionITCaseBase { private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); - private static final int PORT = 9082; + private static final int PORT = 9083; @BeforeEach public void beforeEach() { From 302fe0a5e7e6768f239655a8d2395f240fc3e543 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Wed, 17 Jul 2024 16:44:23 +0800 Subject: [PATCH 4/8] fix port --- .../org/apache/paimon/hive/procedure/RepairActionITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java index 51b1027a887b..fdaa243dfa5d 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java @@ -42,7 +42,7 @@ public class RepairActionITCase extends ActionITCaseBase { private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); - private static final int PORT = 9083; + private static final int PORT = 9082; @BeforeEach public void beforeEach() { From 84a6237d3a1da5ec3437ce5c7f95b97dd70e45d4 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 18 Jul 2024 14:37:18 +0800 Subject: [PATCH 5/8] support flink --- .../flink/procedure/MigrateFileProcedure.java | 31 ++++++++++++++--- .../procedure/MigrateFileProcedureITCase.java | 34 +++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java index c3e1972912f0..c9a273336c3f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.migrate.Migrator; import org.apache.flink.table.procedure.ProcedureContext; @@ -40,6 +41,27 @@ public String[] call( String sourceTablePath, String targetPaimonTablePath) throws Exception { + call(procedureContext, connector, sourceTablePath, targetPaimonTablePath, true); + return new String[] {"Success"}; + } + + public String[] call( + ProcedureContext procedureContext, + String connector, + String sourceTablePath, + String targetPaimonTablePath, + boolean deleteOrigin) + throws Exception { + migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin); + return new String[] {"Success"}; + } + + public void migrateHandle( + String connector, + String sourceTablePath, + String targetPaimonTablePath, + boolean deleteOrigin) + throws Exception { if (!(catalog instanceof HiveCatalog)) { throw new IllegalArgumentException("Only support Hive Catalog"); } @@ -51,15 +73,16 @@ public String[] call( "Target paimon table does not exist: " + targetPaimonTablePath); } - TableMigrationUtils.getImporter( + Migrator importer = + TableMigrationUtils.getImporter( connector, (HiveCatalog) catalog, sourceTableId.getDatabaseName(), sourceTableId.getObjectName(), targetTableId.getDatabaseName(), targetTableId.getObjectName(), - Collections.emptyMap()) - .executeMigrate(); - return new String[] {"Success"}; + Collections.emptyMap()); + importer.deleteOriginTable(deleteOrigin); + importer.executeMigrate(); } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index b71a38c59cee..c7ed2089d5e2 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -56,16 +56,19 @@ public void afterEach() throws Exception { @Test public void testOrc() throws Exception { test("orc"); + testWithDeleteOrigin("orc"); } @Test public void testAvro() throws Exception { test("avro"); + testWithDeleteOrigin("avro"); } @Test public void testParquet() throws Exception { test("parquet"); + testWithDeleteOrigin("parquet"); } public void test(String format) throws Exception { @@ -100,6 +103,37 @@ public void test(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } + public void testWithDeleteOrigin(String format) throws Exception { + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')"); + tEnv.useCatalog("HIVE"); + tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + tEnv.executeSql( + "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " + + format); + tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await(); + tEnv.executeSql("SHOW CREATE TABLE hivetable"); + + tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); + tEnv.useCatalog("PAIMON_GE"); + + tEnv.executeSql( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:" + + PORT + + "' , 'warehouse' = '" + + System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) + + "')"); + tEnv.useCatalog("PAIMON"); + tEnv.executeSql( + "CREATE TABLE paimontable (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); + tEnv.executeSql( + "CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable', false)") + .await(); + List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect()); + Assertions.assertThat(r1.size() == 0); + } + private String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); From 6bc2e0ce79fb3c8d9cb1b88f5f98615190142f52 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 18 Jul 2024 15:34:42 +0800 Subject: [PATCH 6/8] fix utname --- .../hive/procedure/MigrateFileProcedureITCase.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index c7ed2089d5e2..0181b3af7437 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -109,10 +109,10 @@ public void testWithDeleteOrigin(String format) throws Exception { tEnv.useCatalog("HIVE"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.executeSql( - "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " + "CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS " + format); - tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await(); - tEnv.executeSql("SHOW CREATE TABLE hivetable"); + tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await(); + tEnv.executeSql("SHOW CREATE TABLE hivetable01"); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')"); @@ -126,11 +126,11 @@ public void testWithDeleteOrigin(String format) throws Exception { + "')"); tEnv.useCatalog("PAIMON"); tEnv.executeSql( - "CREATE TABLE paimontable (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); + "CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); tEnv.executeSql( - "CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable', false)") + "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") .await(); - List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect()); + List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable01").collect()); Assertions.assertThat(r1.size() == 0); } From 5fe8364c8d5cdc8ce8345689eab61aa1c04a7053 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 18 Jul 2024 16:03:04 +0800 Subject: [PATCH 7/8] fix sql --- .../paimon/hive/procedure/MigrateFileProcedureITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index 0181b3af7437..4764c7bba141 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -128,7 +128,7 @@ public void testWithDeleteOrigin(String format) throws Exception { tEnv.executeSql( "CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');"); tEnv.executeSql( - "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") + "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") .await(); List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable01").collect()); Assertions.assertThat(r1.size() == 0); From d1070863e823d82863858daa5ec1148e701cea49 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 18 Jul 2024 16:36:25 +0800 Subject: [PATCH 8/8] fix catalog --- .../apache/paimon/hive/procedure/MigrateFileProcedureITCase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java index 4764c7bba141..19b5a4f972c6 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java @@ -130,6 +130,7 @@ public void testWithDeleteOrigin(String format) throws Exception { tEnv.executeSql( "CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)") .await(); + tEnv.useCatalog("HIVE"); List r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable01").collect()); Assertions.assertThat(r1.size() == 0); }