Skip to content

Commit

Permalink
[LAMBERT-205]: Added logs for small file size debugging (apache#9)
Browse files Browse the repository at this point in the history
* added logs for small file size debugging

* added file name

* reverted version.txt
  • Loading branch information
adamyasharma27 authored and GitHub Enterprise committed Jul 19, 2024
1 parent 30279b8 commit cf4ccf8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import org.apache.iceberg.util.StructProjection;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private static final Logger LOG = LoggerFactory.getLogger(BaseTaskWriter.class);
private final List<DataFile> completedDataFiles = Lists.newArrayList();
private final List<DeleteFile> completedDeleteFiles = Lists.newArrayList();
private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
Expand Down Expand Up @@ -278,6 +281,7 @@ public void write(T record) throws IOException {
this.currentRows++;

if (shouldRollToNewFile()) {
LOG.info("Closing file: {} as it went beyond target size: {}", currentFile.encryptingOutputFile().location(), targetFileSize);
closeCurrent();
openCurrent();
}
Expand Down Expand Up @@ -311,6 +315,7 @@ private boolean shouldRollToNewFile() {
private void closeCurrent() throws IOException {
if (currentWriter != null) {
try {
LOG.info("Closing file: {}, File Size: {}, Row Count: {}, targetFileSize: {}", currentFile.encryptingOutputFile().location(), length(currentWriter), currentRows, targetFileSize);
currentWriter.close();

if (currentRows == 0L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
private final Map<PartitionKey, RollingFileWriter> writers = Maps.newHashMap();
private static final Logger LOG = LoggerFactory.getLogger(PartitionedFanoutWriter.class);

protected PartitionedFanoutWriter(
PartitionSpec spec,
Expand Down Expand Up @@ -66,6 +69,8 @@ public void write(T row) throws IOException {
@Override
public void close() throws IOException {
if (!writers.isEmpty()) {
LOG.info("Partition Writer Size: {}", writers.size());
LOG.info("Partition Writer Keys: {}", writers.keySet());
for (PartitionKey key : writers.keySet()) {
writers.get(key).close();
}
Expand Down

0 comments on commit cf4ccf8

Please sign in to comment.