Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ This section introduce all available spark procedures about paimon.
</td>
<td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet')</td>
</tr>
<tr>
<td>migrate_file</td>
<td>
Migrate from hive table to a paimon table. Arguments:
<li>source_type: the origin table's type to be migrated, such as hive. Cannot be empty.</li>
<li>source_table: name of the origin table to migrate. Cannot be empty.</li>
<li>target_table: name of the target table to be migrated. Cannot be empty.</li>
<li>delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true</li>
</td>
<td>CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true)</td>
</tr>
<tr>
<td>remove_orphan_files</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.migrate.Migrator;

import org.apache.flink.table.procedure.ProcedureContext;

Expand All @@ -40,6 +41,27 @@ public String[] call(
String sourceTablePath,
String targetPaimonTablePath)
throws Exception {
call(procedureContext, connector, sourceTablePath, targetPaimonTablePath, true);
return new String[] {"Success"};
}

public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceTablePath,
String targetPaimonTablePath,
boolean deleteOrigin)
throws Exception {
migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin);
return new String[] {"Success"};
}

public void migrateHandle(
String connector,
String sourceTablePath,
String targetPaimonTablePath,
boolean deleteOrigin)
throws Exception {
if (!(catalog instanceof HiveCatalog)) {
throw new IllegalArgumentException("Only support Hive Catalog");
}
Expand All @@ -51,15 +73,16 @@ public String[] call(
"Target paimon table does not exist: " + targetPaimonTablePath);
}

TableMigrationUtils.getImporter(
Migrator importer =
TableMigrationUtils.getImporter(
connector,
(HiveCatalog) catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
Collections.emptyMap())
.executeMigrate();
return new String[] {"Success"};
Collections.emptyMap());
importer.deleteOriginTable(deleteOrigin);
importer.executeMigrate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,19 @@ public void afterEach() throws Exception {
@Test
public void testOrc() throws Exception {
test("orc");
testWithDeleteOrigin("orc");
}

@Test
public void testAvro() throws Exception {
test("avro");
testWithDeleteOrigin("avro");
}

@Test
public void testParquet() throws Exception {
test("parquet");
testWithDeleteOrigin("parquet");
}

public void test(String format) throws Exception {
Expand Down Expand Up @@ -100,6 +103,38 @@ public void test(String format) throws Exception {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}

public void testWithDeleteOrigin(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql(
"CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS "
+ format);
tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await();
tEnv.executeSql("SHOW CREATE TABLE hivetable01");

tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");

tEnv.executeSql(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:"
+ PORT
+ "' , 'warehouse' = '"
+ System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ "')");
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
"CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
tEnv.executeSql(
"CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)")
.await();
tEnv.useCatalog("HIVE");
List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable01").collect());
Assertions.assertThat(r1.size() == 0);
}

private String data(int i) {
Random random = new Random();
StringBuilder stringBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Collections;

import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
Expand All @@ -48,13 +48,14 @@ public class MigrateFileProcedure extends BaseProcedure {
new ProcedureParameter[] {
ProcedureParameter.required("source_type", StringType),
ProcedureParameter.required("source_table", StringType),
ProcedureParameter.required("target_table", StringType)
ProcedureParameter.required("target_table", StringType),
ProcedureParameter.optional("delete_origin", BooleanType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
new StructField("result", BooleanType, true, Metadata.empty())
});

protected MigrateFileProcedure(TableCatalog tableCatalog) {
Expand All @@ -76,6 +77,7 @@ public InternalRow[] call(InternalRow args) {
String format = args.getString(0);
String sourceTable = args.getString(1);
String targetTable = args.getString(2);
boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);

Identifier sourceTableId = Identifier.fromString(sourceTable);
Identifier targetTableId = Identifier.fromString(targetTable);
Expand All @@ -97,6 +99,8 @@ public InternalRow[] call(InternalRow args) {
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
Collections.emptyMap());

migrator.deleteOriginTable(deleteNeed);
migrator.executeMigrate();
} catch (Exception e) {
throw new RuntimeException("Call migrate_file error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
}
})

Seq("parquet", "orc", "avro").foreach(
format => {
test(
s"Paimon migrate file procedure: migrate $format non-partitioned table with delete source table") {
withTable("hive_tbl", "paimon_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE hive_tbl (id STRING, name STRING, pt STRING)
|USING $format
|""".stripMargin)

spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")

// create paimon table
spark.sql(s"""
|CREATE TABLE paimon_tbl (id STRING, name STRING, pt STRING)
|USING PAIMON
|TBLPROPERTIES ('file.format'='$format', 'bucket'='-1')
|""".stripMargin)

spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 'd', 'p2')")

spark.sql(
s"CALL sys.migrate_file(source_type => 'hive', source_table => '$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin => false)")

checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)

checkAnswer(
spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1") :: Row(
"4",
"d",
"p2") :: Nil)
}
}
})

Seq("parquet", "orc", "avro").foreach(
format => {
test(s"Paimon migrate file procedure: migrate $format partitioned table") {
Expand Down Expand Up @@ -92,4 +129,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
}
}
})

Seq("parquet", "orc", "avro").foreach(
format => {
test(
s"Paimon migrate file procedure: migrate $format partitioned table with delete source table") {
withTable("hive_tbl", "paimon_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE hive_tbl (id STRING, name STRING, pt STRING)
|USING $format
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")

// create paimon table
spark.sql(s"""
|CREATE TABLE paimon_tbl (id STRING, name STRING, pt STRING)
|USING PAIMON
|TBLPROPERTIES ('file.format'='$format', 'bucket'='-1')
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 'd', 'p2')")

spark.sql(
s"CALL sys.migrate_file(source_type => 'hive', source_table => '$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin => false)")

checkAnswer(
spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1") :: Row(
"4",
"d",
"p2") :: Nil)

checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)
}
}
})
}