Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.MigrateFileProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Migrate from external hive table to paimon table. */
public class MigrateFileAction extends ActionBase {

private final String connector;
private final String sourceTable;
private final String targetTable;
private final String tableProperties;
private boolean deleteOrigin;

public MigrateFileAction(
String connector,
String warehouse,
String sourceTable,
String targetTable,
boolean deleteOrigin,
Map<String, String> catalogConfig,
String tableProperties) {
super(warehouse, catalogConfig);
this.connector = connector;
this.sourceTable = sourceTable;
this.targetTable = targetTable;
this.deleteOrigin = deleteOrigin;
this.tableProperties = tableProperties;
}

@Override
public void run() throws Exception {
MigrateFileProcedure migrateTableProcedure = new MigrateFileProcedure();
migrateTableProcedure.withCatalog(catalog);
migrateTableProcedure.call(
new DefaultProcedureContext(env),
connector,
sourceTable,
targetTable,
deleteOrigin);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Action Factory for {@link MigrateFileAction}. */
public class MigrateFileActionFactory implements ActionFactory {

public static final String IDENTIFIER = "migrate_file";

private static final String SOURCE_TYPE = "source_type";

private static final String SOURCE_TABLE = "source_table";

private static final String TARGET_TABLE = "target_table";

private static final String DELETE_ORIGIN = "delete_origin";

private static final String OPTIONS = "options";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String connector = params.get(SOURCE_TYPE);
String sourceHiveTable = params.get(SOURCE_TABLE);
String targetTable = params.get(TARGET_TABLE);
boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);

MigrateFileAction migrateFileAction =
new MigrateFileAction(
connector,
warehouse,
sourceHiveTable,
targetTable,
deleteOrigin,
catalogConfig,
tableConf);
return Optional.of(migrateFileAction);
}

@Override
public void printHelp() {
System.out.println("Action \"migrate_file\" runs a migrating job from hive to paimon.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" migrate_file --warehouse <warehouse_path> --source_type hive "
+ "--source_table <database.table_name> "
+ "--target_table <database.table_name> "
+ "--delete_origin true "
+ "[--catalog_conf <key>=<value] "
+ "[--options <key>=<value>,<key>=<value>,...]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
org.apache.paimon.flink.action.MigrateFileActionFactory
org.apache.paimon.flink.action.MigrateDatabaseActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
org.apache.paimon.flink.action.QueryServiceActionFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.hive.procedure;

import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.MigrateFileAction;
import org.apache.paimon.flink.procedure.MigrateFileProcedure;
import org.apache.paimon.hive.TestHiveMetastore;

Expand All @@ -33,7 +34,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

/** Tests for {@link MigrateFileProcedure}. */
Expand All @@ -56,19 +59,19 @@ public void afterEach() throws Exception {
@Test
public void testOrc() throws Exception {
test("orc");
testWithDeleteOrigin("orc");
testMigrateFileAction("orc");
}

@Test
public void testAvro() throws Exception {
test("avro");
testWithDeleteOrigin("avro");
testMigrateFileAction("avro");
}

@Test
public void testParquet() throws Exception {
test("parquet");
testWithDeleteOrigin("parquet");
testMigrateFileAction("parquet");
}

public void test(String format) throws Exception {
Expand Down Expand Up @@ -103,7 +106,7 @@ public void test(String format) throws Exception {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}

public void testWithDeleteOrigin(String format) throws Exception {
public void testMigrateFileAction(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
Expand All @@ -112,7 +115,12 @@ public void testWithDeleteOrigin(String format) throws Exception {
"CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS "
+ format);
tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await();
tEnv.executeSql("SHOW CREATE TABLE hivetable01");

tEnv.executeSql(
"CREATE TABLE hivetable02 (id string) PARTITIONED BY (id2 int, id3 int) STORED AS "
+ format);
tEnv.executeSql("INSERT INTO hivetable02 VALUES" + data(100)).await();
tEnv.executeSql("SHOW CREATE TABLE hivetable02");

tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')");
Expand All @@ -127,12 +135,32 @@ public void testWithDeleteOrigin(String format) throws Exception {
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
"CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
tEnv.executeSql(
"CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
tEnv.executeSql(
"CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)")
.await();

tEnv.useCatalog("PAIMON_GE");
Map<String, String> catalogConf = new HashMap<>();
catalogConf.put("metastore", "hive");
catalogConf.put("uri", "thrift://localhost:" + PORT);
MigrateFileAction migrateFileAction =
new MigrateFileAction(
"hive",
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"default.hivetable02",
"default.paimontable02",
false,
catalogConf,
"");
migrateFileAction.run();

tEnv.useCatalog("HIVE");
List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable01").collect());
List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable02").collect());
Assertions.assertThat(r1.size() == 0);
Assertions.assertThat(r2.size() == 0);
}

private String data(int i) {
Expand Down