Skip to content

Commit

Permalink
[AMORO-2259] Support collecting the number of records into the metric…
Browse files Browse the repository at this point in the history
…s of Optimizing (#2262)

* [AMORO-2259] Support collecting the number of records into the metrics of Optimizing

---------

Co-authored-by: wangtaohz <103108928+wangtaohz@users.noreply.github.com>
  • Loading branch information
zhongqishang and wangtaohz committed Nov 20, 2023
1 parent d48c0e0 commit caec8bc
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,16 @@ public static OptimizingProcessInfo build(
MetricsSummary summary = meta.getSummary();
if (summary != null) {
inputBuilder.addFiles(summary.getEqualityDeleteSize(), summary.getEqDeleteFileCnt());
inputBuilder.addFiles(summary.getPositionalDeleteSize(), summary.getPosDeleteFileCnt());
inputBuilder.addFiles(
summary.getPositionDeleteSize() + summary.getPositionalDeleteSize(),
summary.getPosDeleteFileCnt());
inputBuilder.addFiles(summary.getRewriteDataSize(), summary.getRewriteDataFileCnt());
inputBuilder.addFiles(summary.getRewritePosDataSize(), summary.getReRowDeletedDataFileCnt());
inputBuilder.addFiles(
summary.getRewritePosDataSize(),
summary.getReRowDeletedDataFileCnt() + summary.getRewritePosDataFileCnt());
outputBuilder.addFiles(summary.getNewFileSize(), summary.getNewFileCnt());
outputBuilder.addFiles(summary.getNewDataSize(), summary.getNewDataFileCnt());
outputBuilder.addFiles(summary.getNewDeleteSize(), summary.getNewDeleteFileCnt());
}
result.setInputFiles(inputBuilder.build());
result.setOutputFiles(outputBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.netease.arctic.server.dashboard.utils;

import com.netease.arctic.optimizing.RewriteFilesOutput;
import com.netease.arctic.server.dashboard.model.FilesStatistics;
import com.netease.arctic.server.dashboard.model.TableOptimizingInfo;
import com.netease.arctic.server.optimizing.MetricsSummary;
import com.netease.arctic.server.optimizing.OptimizingProcess;
import com.netease.arctic.server.optimizing.OptimizingStatus;
import com.netease.arctic.server.optimizing.plan.OptimizingEvaluator;
import com.netease.arctic.server.table.TableRuntime;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ContentFile;

public class OptimizingUtil {

Expand Down Expand Up @@ -63,34 +61,34 @@ private static FilesStatistics collectOptimizingFileInfo(MetricsSummary metricsS
}
return FilesStatistics.builder()
.addFiles(metricsSummary.getEqualityDeleteSize(), metricsSummary.getEqDeleteFileCnt())
.addFiles(metricsSummary.getPositionalDeleteSize(), metricsSummary.getPosDeleteFileCnt())
.addFiles(
metricsSummary.getPositionalDeleteSize() + metricsSummary.getPositionDeleteSize(),
metricsSummary.getPosDeleteFileCnt())
.addFiles(metricsSummary.getRewriteDataSize(), metricsSummary.getRewriteDataFileCnt())
.build();
}

public static long getFileSize(RewriteFilesOutput output) {
public static long getFileSize(ContentFile<?>[] contentFiles) {
long size = 0;
if (output.getDataFiles() != null) {
for (DataFile dataFile : output.getDataFiles()) {
size += dataFile.fileSizeInBytes();
}
}
if (output.getDeleteFiles() != null) {
for (DeleteFile dataFile : output.getDeleteFiles()) {
size += dataFile.fileSizeInBytes();
if (contentFiles != null) {
for (ContentFile<?> contentFile : contentFiles) {
size += contentFile.fileSizeInBytes();
}
}
return size;
}

public static int getFileCount(RewriteFilesOutput output) {
int length = 0;
if (output.getDataFiles() != null) {
length += output.getDataFiles().length;
}
if (output.getDeleteFiles() != null) {
length += output.getDeleteFiles().length;
public static int getFileCount(ContentFile<?>[] contentFiles) {
return contentFiles == null ? 0 : contentFiles.length;
}

public static long getRecordCnt(ContentFile<?>[] contentFiles) {
int recordCnt = 0;
if (contentFiles != null) {
for (ContentFile<?> contentFile : contentFiles) {
recordCnt += contentFile.recordCount();
}
}
return length;
return recordCnt;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netease.arctic.server.optimizing;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.netease.arctic.optimizing.RewriteFilesInput;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand All @@ -8,35 +9,60 @@

import java.util.Collection;

@JsonIgnoreProperties(ignoreUnknown = true)
public class MetricsSummary {
private long newFileSize = 0;
private int newFileCnt = 0;
/** @deprecated since 0.7.0, will be removed in 0.8.0 */
@Deprecated long newFileSize = 0;

@Deprecated int newFileCnt = 0;
private long newDataSize = 0;
private int newDataFileCnt = 0;
private long newDataRecordCnt = 0;
/** Only position delete files will be generated */
private long newDeleteSize = 0;

private int newDeleteFileCnt = 0;
private long newDeleteRecordCnt = 0;
private long rewriteDataSize = 0;
private long rewritePosDataSize = 0;
private long equalityDeleteSize = 0;
private long positionalDeleteSize = 0;
/** @deprecated since 0.7.0, will be removed in 0.8.0 */
@Deprecated private long positionalDeleteSize = 0;

private long positionDeleteSize = 0;
private int rewriteDataFileCnt = 0;
private int reRowDeletedDataFileCnt = 0;
/** @deprecated since 0.7.0, will be removed in 0.8.0 */
@Deprecated private int reRowDeletedDataFileCnt = 0;

private int rewritePosDataFileCnt = 0;
private int eqDeleteFileCnt = 0;
private int posDeleteFileCnt = 0;
private int rewriteDataRecordCnt = 0;
private int rewritePosDataRecordCnt = 0;
private int eqDeleteRecordCnt = 0;
private int posDeleteRecordCnt = 0;

public MetricsSummary() {}

protected MetricsSummary(RewriteFilesInput input) {
rewriteDataFileCnt = input.rewrittenDataFiles().length;
reRowDeletedDataFileCnt = input.rePosDeletedDataFiles().length;
rewritePosDataFileCnt = input.rePosDeletedDataFiles().length;
for (DataFile rewriteFile : input.rewrittenDataFiles()) {
rewriteDataSize += rewriteFile.fileSizeInBytes();
rewriteDataRecordCnt += rewriteFile.recordCount();
}
for (DataFile rewritePosDataFile : input.rePosDeletedDataFiles()) {
rewritePosDataSize += rewritePosDataFile.fileSizeInBytes();
rewritePosDataRecordCnt += rewritePosDataFile.recordCount();
}
for (ContentFile<?> delete : input.deleteFiles()) {
if (delete.content() == FileContent.POSITION_DELETES) {
positionalDeleteSize += delete.fileSizeInBytes();
positionDeleteSize += delete.fileSizeInBytes();
posDeleteRecordCnt += delete.recordCount();
posDeleteFileCnt++;
} else {
equalityDeleteSize += delete.fileSizeInBytes();
eqDeleteRecordCnt += delete.recordCount();
eqDeleteFileCnt++;
}
}
Expand All @@ -47,14 +73,26 @@ public MetricsSummary(Collection<TaskRuntime> taskRuntimes) {
.map(TaskRuntime::getMetricsSummary)
.forEach(
metrics -> {
newDataFileCnt += metrics.getNewDataFileCnt();
newDataSize += metrics.getNewDataSize();
newDataRecordCnt += metrics.getNewDataRecordCnt();
newDeleteSize += metrics.getNewDeleteSize();
newDeleteFileCnt += metrics.getNewDeleteFileCnt();
newDeleteRecordCnt += metrics.getNewDeleteRecordCnt();
rewriteDataFileCnt += metrics.getRewriteDataFileCnt();
reRowDeletedDataFileCnt += metrics.getReRowDeletedDataFileCnt();
rewritePosDataFileCnt += metrics.getRewritePosDataFileCnt();
rewriteDataSize += metrics.getRewriteDataSize();
rewritePosDataSize += metrics.getRewritePosDataSize();
posDeleteFileCnt += metrics.getPosDeleteFileCnt();
positionalDeleteSize += metrics.getPositionalDeleteSize();
positionDeleteSize += metrics.getPositionDeleteSize();
eqDeleteFileCnt += metrics.getEqDeleteFileCnt();
equalityDeleteSize += metrics.getEqualityDeleteSize();
rewriteDataRecordCnt += metrics.getRewriteDataRecordCnt();
rewritePosDataRecordCnt += metrics.getRewritePosDataRecordCnt();
eqDeleteRecordCnt += metrics.getEqDeleteRecordCnt();
posDeleteRecordCnt += metrics.getPosDeleteRecordCnt();
newFileCnt += metrics.getNewFileCnt();
newFileSize += metrics.getNewFileSize();
});
Expand All @@ -68,6 +106,54 @@ public int getNewFileCnt() {
return newFileCnt;
}

public long getNewDataSize() {
return newDataSize;
}

protected void setNewDataSize(long newDataSize) {
this.newDataSize = newDataSize;
}

public int getNewDataFileCnt() {
return newDataFileCnt;
}

protected void setNewDataFileCnt(int newDataFileCnt) {
this.newDataFileCnt = newDataFileCnt;
}

public long getNewDataRecordCnt() {
return newDataRecordCnt;
}

protected void setNewDataRecordCnt(long newDataRecordCnt) {
this.newDataRecordCnt = newDataRecordCnt;
}

public void setNewDeleteSize(long newDeleteSize) {
this.newDeleteSize = newDeleteSize;
}

public void setNewDeleteFileCnt(int newDeleteFileCnt) {
this.newDeleteFileCnt = newDeleteFileCnt;
}

public long getNewDeleteSize() {
return newDeleteSize;
}

public int getNewDeleteFileCnt() {
return newDeleteFileCnt;
}

public long getNewDeleteRecordCnt() {
return newDeleteRecordCnt;
}

protected void setNewDeleteRecordCnt(long newDeleteRecordCnt) {
this.newDeleteRecordCnt = newDeleteRecordCnt;
}

public long getRewriteDataSize() {
return rewriteDataSize;
}
Expand All @@ -84,6 +170,10 @@ public long getPositionalDeleteSize() {
return positionalDeleteSize;
}

public long getPositionDeleteSize() {
return positionDeleteSize;
}

public int getRewriteDataFileCnt() {
return rewriteDataFileCnt;
}
Expand All @@ -92,6 +182,10 @@ public int getReRowDeletedDataFileCnt() {
return reRowDeletedDataFileCnt;
}

public int getRewritePosDataFileCnt() {
return rewritePosDataFileCnt;
}

public int getEqDeleteFileCnt() {
return eqDeleteFileCnt;
}
Expand All @@ -100,27 +194,47 @@ public int getPosDeleteFileCnt() {
return posDeleteFileCnt;
}

protected void setNewFileSize(long newFileSize) {
this.newFileSize = newFileSize;
public int getRewriteDataRecordCnt() {
return rewriteDataRecordCnt;
}

public int getRewritePosDataRecordCnt() {
return rewritePosDataRecordCnt;
}

public int getEqDeleteRecordCnt() {
return eqDeleteRecordCnt;
}

protected void setNewFileCnt(int newFileCnt) {
this.newFileCnt = newFileCnt;
public int getPosDeleteRecordCnt() {
return posDeleteRecordCnt;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("newFileSize", newFileSize)
.add("newFileCnt", newFileCnt)
.add("newDataSize", newDataSize)
.add("newDataFileCnt", newDataFileCnt)
.add("newDataRecordCnt", newDataRecordCnt)
.add("newDeleteSize", newDeleteSize)
.add("newDeleteFileCnt", newDeleteFileCnt)
.add("newDeleteRecordCnt", newDeleteRecordCnt)
.add("rewriteDataSize", rewriteDataSize)
.add("rewritePosDataSize", rewritePosDataSize)
.add("equalityDeleteSize", equalityDeleteSize)
.add("positionalDeleteSize", positionalDeleteSize)
.add("positionDeleteSize", positionDeleteSize)
.add("rewriteDataFileCnt", rewriteDataFileCnt)
.add("reRowDeletedDataFileCnt", reRowDeletedDataFileCnt)
.add("rewritePosDataFileCnt", rewritePosDataFileCnt)
.add("eqDeleteFileCnt", eqDeleteFileCnt)
.add("posDeleteFileCnt", posDeleteFileCnt)
.add("rewriteDataRecordCnt", rewriteDataRecordCnt)
.add("rewritePosDataRecordCnt", rewritePosDataRecordCnt)
.add("eqDeleteRecordCnt", eqDeleteRecordCnt)
.add("posDeleteRecordCnt", posDeleteRecordCnt)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ private void finish(RewriteFilesOutput filesOutput) {
invokeConsisitency(
() -> {
statusMachine.accept(Status.SUCCESS);
summary.setNewFileCnt(OptimizingUtil.getFileCount(filesOutput));
summary.setNewFileSize(OptimizingUtil.getFileSize(filesOutput));
summary.setNewDataFileCnt(OptimizingUtil.getFileCount(filesOutput.getDataFiles()));
summary.setNewDataSize(OptimizingUtil.getFileSize(filesOutput.getDataFiles()));
summary.setNewDataRecordCnt(OptimizingUtil.getRecordCnt(filesOutput.getDataFiles()));
summary.setNewDeleteFileCnt(OptimizingUtil.getFileCount(filesOutput.getDeleteFiles()));
summary.setNewDeleteSize(OptimizingUtil.getFileSize(filesOutput.getDeleteFiles()));
summary.setNewDeleteRecordCnt(OptimizingUtil.getRecordCnt(filesOutput.getDeleteFiles()));
endTime = System.currentTimeMillis();
costTime += endTime - startTime;
output = filesOutput;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netease.arctic.server.persistence.converter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
Expand All @@ -22,6 +23,7 @@ public JsonObjectConverter(Class<T> clazz) {
throw new IllegalArgumentException("Type argument cannot be null");
}
this.clazz = clazz;
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ protected void assertOptimizingProcess(
Assert.assertEquals(optimizeType, optimizingProcess.getOptimizingType());
Assert.assertEquals(
fileCntBefore,
optimizingProcess.getSummary().getReRowDeletedDataFileCnt()
optimizingProcess.getSummary().getRewritePosDataFileCnt()
+ optimizingProcess.getSummary().getRewriteDataFileCnt()
+ optimizingProcess.getSummary().getEqDeleteFileCnt()
+ optimizingProcess.getSummary().getPosDeleteFileCnt());
Assert.assertEquals(fileCntAfter, optimizingProcess.getSummary().getNewFileCnt());
Assert.assertEquals(
fileCntAfter,
optimizingProcess.getSummary().getNewDataFileCnt()
+ optimizingProcess.getSummary().getNewDeleteFileCnt());
}

protected OptimizingProcessMeta waitOptimizeResult() {
Expand Down

0 comments on commit caec8bc

Please sign in to comment.