diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index a99b5659a10e6..34552bb0b31a4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -25,10 +25,11 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.Serializable; -import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.Properties; import org.apache.avro.Schema; @@ -64,8 +65,8 @@ public class HDFSParquetImporter implements Serializable { private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class); - public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd"); - private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); + private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd") + .withZone(ZoneId.systemDefault()); private final Config cfg; private transient FileSystem fs; /** @@ -73,11 +74,8 @@ public class HDFSParquetImporter implements Serializable { */ private TypedProperties props; - public HDFSParquetImporter(Config cfg) throws IOException { + public HDFSParquetImporter(Config cfg) { this.cfg = cfg; - this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) - : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); - log.info("Creating Cleaner with configs : " + props.toString()); } public static void main(String[] args) throws Exception { @@ -98,8 +96,11 @@ public static void main(String[] args) throws Exception { } - public int dataImport(JavaSparkContext jsc, int retry) throws Exception { + public int dataImport(JavaSparkContext jsc, int retry) { this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); + this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) + : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + log.info("Starting data import with configs : " + props.toString()); int ret = -1; try { // Verify that targetPath is not present. @@ -110,7 +111,7 @@ public int dataImport(JavaSparkContext jsc, int retry) throws Exception { ret = dataImport(jsc); } while (ret != 0 && retry-- > 0); } catch (Throwable t) { - logger.error(t); + log.error(t); } return ret; } @@ -141,7 +142,7 @@ protected int dataImport(JavaSparkContext jsc) throws IOException { JavaRDD writeResponse = load(client, instantTime, hoodieRecords); return UtilHelpers.handleErrors(jsc, instantTime, writeResponse); } catch (Throwable t) { - logger.error("Error occurred.", t); + log.error("Error occurred.", t); } return -1; } @@ -159,8 +160,7 @@ protected JavaRDD> buildHoodieRecordsForImport return jsc .newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) - // To reduce large number of - // tasks. + // To reduce large number of tasks. .coalesce(16 * cfg.parallelism).map(entry -> { GenericRecord genericRecord = ((Tuple2) entry)._2(); Object partitionField = genericRecord.get(cfg.partitionKey); @@ -172,16 +172,16 @@ protected JavaRDD> buildHoodieRecordsForImport throw new HoodieIOException("row field is missing. :" + cfg.rowKey); } String partitionPath = partitionField.toString(); - logger.info("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); + log.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); if (partitionField instanceof Number) { try { long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); - partitionPath = PARTITION_FORMATTER.format(new Date(ts)); + partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts)); } catch (NumberFormatException nfe) { - logger.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + log.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); } } - return new HoodieRecord<>(new HoodieKey((String) rowField, partitionPath), + return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath), new HoodieJsonPayload(genericRecord.toString())); }); } @@ -195,11 +195,31 @@ protected JavaRDD> buildHoodieRecordsForImport * @param Type */ protected JavaRDD load(HoodieWriteClient client, String instantTime, - JavaRDD> hoodieRecords) { - if (cfg.command.toLowerCase().equals("insert")) { - return client.insert(hoodieRecords, instantTime); + JavaRDD> hoodieRecords) throws Exception { + switch (cfg.command.toLowerCase()) { + case "upsert": { + return client.upsert(hoodieRecords, instantTime); + } + case "bulkinsert": { + return client.bulkInsert(hoodieRecords, instantTime); + } + default: { + return client.insert(hoodieRecords, instantTime); + } + } + } + + public static class CommandValidator implements IValueValidator { + + List validCommands = Arrays.asList("insert", "upsert", "bulkinsert"); + + @Override + public void validate(String name, String value) throws ParameterException { + if (value == null || !validCommands.contains(value.toLowerCase())) { + throw new ParameterException( + String.format("Invalid command: value:%s: supported commands:%s", value, validCommands)); + } } - return client.upsert(hoodieRecords, instantTime); } public static class FormatValidator implements IValueValidator { @@ -217,8 +237,8 @@ public void validate(String name, String value) throws ParameterException { public static class Config implements Serializable { - @Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert", - required = false) + @Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert/bulkinsert", + required = false, validateValueWith = CommandValidator.class) public String command = "INSERT"; @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true) public String srcPath = null; @@ -233,7 +253,7 @@ public static class Config implements Serializable { public String rowKey = null; @Parameter(names = {"--partition-key-field", "-pk"}, description = "Partition key field name", required = true) public String partitionKey = null; - @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true) + @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert(default)/upsert/bulkinsert", required = true) public int parallelism = 1; @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true) public String schemaFile = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 8c334f695cff8..bbb18af8abd7c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -190,7 +190,9 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) .orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); HoodieWriteConfig config = - HoodieWriteConfig.newBuilder().withPath(basePath).withParallelism(parallelism, parallelism) + HoodieWriteConfig.newBuilder().withPath(basePath) + .withParallelism(parallelism, parallelism) + .withBulkInsertParallelism(parallelism) .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withProps(properties).build();