diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 2798c60..a44dabd 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -261,10 +261,15 @@ void insert( description = "Create table if not exists") boolean createTableIfNotExists, @CommandLine.Parameters( - arity = "1..*", + arity = "0..*", paramLabel = "", description = "/path/to/file.parquet") String[] dataFiles, + @CommandLine.Option( + names = "--from-file", + description = + "Read list of parquet files from the specified file (one file per line)") + String fromFile, @CommandLine.Option( names = "--no-copy", description = "Add files to catalog without copying them") @@ -351,12 +356,25 @@ void insert( } setAWSRegion(s3Region); try (RESTCatalog catalog = loadCatalog()) { - if (dataFiles.length == 1 && "-".equals(dataFiles[0])) { + // Handle --from-file option + if (!Strings.isNullOrEmpty(fromFile)) { + if (dataFiles != null && dataFiles.length > 0) { + throw new IllegalArgumentException("Cannot specify both --from-file and file arguments"); + } + dataFiles = readInputFromFile(fromFile).toArray(new String[0]); + if (dataFiles.length == 0) { + logger.info("Nothing to insert (file empty)"); + return; + } + } else if (dataFiles != null && dataFiles.length == 1 && "-".equals(dataFiles[0])) { dataFiles = readInput().toArray(new String[0]); if (dataFiles.length == 0) { logger.info("Nothing to insert (stdin empty)"); return; } + } else if (dataFiles == null || dataFiles.length == 0) { + throw new IllegalArgumentException( + "No files specified. Provide files as arguments or use --from-file"); } List partitions = null; @@ -411,7 +429,13 @@ void insert( .build(); if (!watchMode) { - Insert.run(catalog, tableId, dataFiles, options); + Insert.Result result = Insert.run(catalog, tableId, dataFiles, options); + if (result.anyFilesFailed()) { + logger.error( + "{} file(s) failed to insert. Check logs or retry list for details.", + result.failedCount()); + System.exit(1); + } } else { if (!Strings.isNullOrEmpty(watchDebugAddr)) { JvmMetrics.builder().register(); @@ -453,6 +477,19 @@ private static List readInput() { return r; } + private static List readInputFromFile(String filePath) throws IOException { + List r = new ArrayList<>(); + try (Scanner scanner = new Scanner(new java.io.File(filePath))) { + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if (!line.isBlank()) { + r.add(line); + } + } + } + return r; + } + @CommandLine.Command(name = "scan", description = "Scan table.") void scanTable( @CommandLine.Parameters( diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 805f41b..2fc0c99 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -92,13 +93,15 @@ public final class Insert { private Insert() {} + public record Result(boolean anyFilesProcessed, boolean anyFilesFailed, int failedCount) {} + // TODO: refactor - public static void run( + public static Result run( RESTCatalog catalog, TableIdentifier nsTable, String[] files, Options options) throws NoSuchTableException, IOException, InterruptedException { if (files.length == 0) { // no work to be done - return; + return new Result(false, false, 0); } Table table = catalog.loadTable(nsTable); @@ -166,6 +169,7 @@ public static void run( ? new RetryLog(options.retryListFile) : null) { boolean atLeastOneFileAppended = false; + final AtomicInteger failedCount = new AtomicInteger(0); int numThreads = Math.min(options.threadCount(), filesExpanded.size()); ExecutorService executor = Executors.newFixedThreadPool(numThreads); @@ -202,6 +206,7 @@ public static void run( logger.error( "{}: error (adding to retry list and continuing)", file, e); retryLog.add(file); + failedCount.incrementAndGet(); return Collections.emptyList(); } else { throw new IOException(String.format("Error processing %s", file), e); @@ -221,6 +226,7 @@ public static void run( if (retryLog == null) { throw new IOException("Error processing file(s)", e.getCause()); } + failedCount.incrementAndGet(); } } } finally { @@ -245,6 +251,9 @@ public static void run( } else { logger.warn("Table commit skipped (--no-commit)"); } + + int finalFailedCount = failedCount.get(); + return new Result(atLeastOneFileAppended, finalFailedCount > 0, finalFailedCount); } } finally { if (s3ClientLazy.hasValue()) { diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java index bfad50e..00aadfb 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java @@ -120,7 +120,11 @@ public static void run( logger.info("Inserting {}", insertBatch); try { - Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options); + Insert.Result result = + Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options); + if (result.anyFilesFailed()) { + logger.warn("{} file(s) failed to insert in this batch", result.failedCount()); + } } catch (NoSuchTableException e) { if (!createTableIfNotExists) { throw e; @@ -145,7 +149,11 @@ public static void run( retryInsert = false; } if (retryInsert) { - Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options); + Insert.Result result = + Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options); + if (result.anyFilesFailed()) { + logger.warn("{} file(s) failed to insert in this batch", result.failedCount()); + } } } }