Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions ice/src/main/java/com/altinity/ice/cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,15 @@ void insert(
description = "Create table if not exists")
boolean createTableIfNotExists,
@CommandLine.Parameters(
arity = "1..*",
arity = "0..*",
paramLabel = "<files>",
description = "/path/to/file.parquet")
String[] dataFiles,
@CommandLine.Option(
names = "--from-file",
description =
"Read list of parquet files from the specified file (one file per line)")
String fromFile,
@CommandLine.Option(
names = "--no-copy",
description = "Add files to catalog without copying them")
Expand Down Expand Up @@ -351,12 +356,25 @@ void insert(
}
setAWSRegion(s3Region);
try (RESTCatalog catalog = loadCatalog()) {
if (dataFiles.length == 1 && "-".equals(dataFiles[0])) {
// Handle --from-file option
if (!Strings.isNullOrEmpty(fromFile)) {
if (dataFiles != null && dataFiles.length > 0) {
throw new IllegalArgumentException("Cannot specify both --from-file and file arguments");
}
dataFiles = readInputFromFile(fromFile).toArray(new String[0]);
if (dataFiles.length == 0) {
logger.info("Nothing to insert (file empty)");
return;
}
} else if (dataFiles != null && dataFiles.length == 1 && "-".equals(dataFiles[0])) {
dataFiles = readInput().toArray(new String[0]);
if (dataFiles.length == 0) {
logger.info("Nothing to insert (stdin empty)");
return;
}
} else if (dataFiles == null || dataFiles.length == 0) {
throw new IllegalArgumentException(
"No files specified. Provide files as arguments or use --from-file");
}

List<IcePartition> partitions = null;
Expand Down Expand Up @@ -411,7 +429,13 @@ void insert(
.build();

if (!watchMode) {
Insert.run(catalog, tableId, dataFiles, options);
Insert.Result result = Insert.run(catalog, tableId, dataFiles, options);
if (result.anyFilesFailed()) {
logger.error(
"{} file(s) failed to insert. Check logs or retry list for details.",
result.failedCount());
System.exit(1);
}
} else {
if (!Strings.isNullOrEmpty(watchDebugAddr)) {
JvmMetrics.builder().register();
Expand Down Expand Up @@ -453,6 +477,19 @@ private static List<String> readInput() {
return r;
}

private static List<String> readInputFromFile(String filePath) throws IOException {
List<String> r = new ArrayList<>();
try (Scanner scanner = new Scanner(new java.io.File(filePath))) {
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (!line.isBlank()) {
r.add(line);
}
}
}
return r;
}

@CommandLine.Command(name = "scan", description = "Scan table.")
void scanTable(
@CommandLine.Parameters(
Expand Down
13 changes: 11 additions & 2 deletions ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -92,13 +93,15 @@ public final class Insert {

private Insert() {}

public record Result(boolean anyFilesProcessed, boolean anyFilesFailed, int failedCount) {}

// TODO: refactor
public static void run(
public static Result run(
RESTCatalog catalog, TableIdentifier nsTable, String[] files, Options options)
throws NoSuchTableException, IOException, InterruptedException {
if (files.length == 0) {
// no work to be done
return;
return new Result(false, false, 0);
}

Table table = catalog.loadTable(nsTable);
Expand Down Expand Up @@ -166,6 +169,7 @@ public static void run(
? new RetryLog(options.retryListFile)
: null) {
boolean atLeastOneFileAppended = false;
final AtomicInteger failedCount = new AtomicInteger(0);

int numThreads = Math.min(options.threadCount(), filesExpanded.size());
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
Expand Down Expand Up @@ -202,6 +206,7 @@ public static void run(
logger.error(
"{}: error (adding to retry list and continuing)", file, e);
retryLog.add(file);
failedCount.incrementAndGet();
return Collections.emptyList();
} else {
throw new IOException(String.format("Error processing %s", file), e);
Expand All @@ -221,6 +226,7 @@ public static void run(
if (retryLog == null) {
throw new IOException("Error processing file(s)", e.getCause());
}
failedCount.incrementAndGet();
}
}
} finally {
Expand All @@ -245,6 +251,9 @@ public static void run(
} else {
logger.warn("Table commit skipped (--no-commit)");
}

int finalFailedCount = failedCount.get();
return new Result(atLeastOneFileAppended, finalFailedCount > 0, finalFailedCount);
}
} finally {
if (s3ClientLazy.hasValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ public static void run(
logger.info("Inserting {}", insertBatch);

try {
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
Insert.Result result =
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
if (result.anyFilesFailed()) {
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
}
} catch (NoSuchTableException e) {
if (!createTableIfNotExists) {
throw e;
Expand All @@ -145,7 +149,11 @@ public static void run(
retryInsert = false;
}
if (retryInsert) {
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
Insert.Result result =
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
if (result.anyFilesFailed()) {
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
}
}
}
}
Expand Down