Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/primary-key-table/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ By default, when Paimon appends records to the LSM tree, it will also perform co

In compaction, you can configure record-Level expire time to expire records, you should configure:
1. `'record-level.expire-time'`: time retain for records.
2. `'record-level.time-field'`: time field for record level expire, it should be a seconds INT.
2. `'record-level.time-field'`: time field for record level expire.
3. `'record-level.time-field.type'`: time field type for record level expire, it can be second or millisecond.

Expiration happens in compaction, and there is no strong guarantee to expire records in time.
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,13 @@
<td><h5>record-level.time-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Time field for record level expire, it should be a seconds INT.</td>
<td>Time field for record level expire.</td>
</tr>
<tr>
<td><h5>record-level.time-field.type</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Time field for record level expire, it can be second or millisecond.</td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
Expand Down
2 changes: 1 addition & 1 deletion paimon-benchmark/paimon-cluster-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-benchmark</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-cluster-benchmark</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-benchmark/paimon-micro-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-benchmark</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-micro-benchmarks</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-benchmark</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-bundle</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-codegen-loader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-codegen-loader</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-codegen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-codegen</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-common</artifactId>
Expand Down
42 changes: 41 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1144,8 +1144,14 @@ public class CoreOptions implements Serializable {
key("record-level.time-field")
.stringType()
.noDefaultValue()
.withDescription("Time field for record level expire.");

public static final ConfigOption<TimeFieldType> RECORD_LEVEL_TIME_FIELD_TYPE =
key("record-level.time-field.type")
.enumType(TimeFieldType.class)
.defaultValue(TimeFieldType.SECOND)
.withDescription(
"Time field for record level expire, it should be a seconds INT.");
"Time field type for record level expire, it can be second or millisecond.");

private final Options options;

Expand Down Expand Up @@ -1792,6 +1798,11 @@ public String recordLevelTimeField() {
return options.get(RECORD_LEVEL_TIME_FIELD);
}

@Nullable
public TimeFieldType recordLevelTimeFieldType() {
return options.get(RECORD_LEVEL_TIME_FIELD_TYPE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down Expand Up @@ -2380,4 +2391,33 @@ public InlineElement getDescription() {
return text(description);
}
}

/** Time field type for record level expire. */
public enum TimeFieldType implements DescribedEnum {
SECOND(
"second",
"10-bit timestamp which indicates the number of seconds from 00:00:00 UTC on January 1, 1970 to the present, excluding milliseconds."),

MILLISECOND(
"millisecond",
"13-bit timestamp which indicates the number of milliseconds from 00:00:00 UTC on January 1, 1970 to the present, including millisecond information.");

private final String value;
private final String description;

TimeFieldType(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
2 changes: 1 addition & 1 deletion paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
Expand All @@ -36,6 +37,7 @@ public class RecordLevelExpire {

private final int timeField;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;

@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
Expand All @@ -58,34 +60,43 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
"Can not find time field %s for record level expire.", timeField));
}

CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();

DataField field = rowType.getField(timeField);
if (!(field.type() instanceof IntType)) {
if (!((timeFieldType == CoreOptions.TimeFieldType.SECOND && field.type() instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.MILLISECOND
&& field.type() instanceof BigIntType))) {
throw new IllegalArgumentException(
String.format(
"Record level time field should be INT type, but is %s.",
field.type()));
}

return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds(), timeFieldType);
}

public RecordLevelExpire(int timeField, int expireTime) {
public RecordLevelExpire(
int timeField, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
this.timeField = timeField;
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
}

public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactory) {
return file -> wrap(readerFactory.createRecordReader(file));
}

public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
long currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
checkArgument(
!kv.value().isNullAt(timeField),
"Time field for record-level expire should not be null.");
int recordTime = kv.value().getInt(timeField);
int recordTime =
timeFieldType == CoreOptions.TimeFieldType.SECOND
? kv.value().getInt(timeField)
: (int) (kv.value().getLong(timeField) / 1000);
return currentTime <= recordTime + expireTime;
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.*;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class RecordLevelExpireWithMillisecondTest extends PrimaryKeyTableTestBase {
@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.BIGINT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

@Override
protected Options tableOptions() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(
CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.MILLISECOND);
return options;
}

@Test
public void test() throws Exception {
writeCommit(GenericRow.of(1, 1, 1L), GenericRow.of(1, 2, 2L));

// can be queried
assertThat(query())
.containsExactlyInAnyOrder(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));

long currentSecs = System.currentTimeMillis();
writeCommit(GenericRow.of(1, 3, currentSecs));
writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60 * 1000));
Thread.sleep(2000);

// no compaction, can be queried
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(
GenericRow.of(1, 1),
GenericRow.of(1, 2),
GenericRow.of(1, 3),
GenericRow.of(1, 4));

// compact, expired
compact(1);
assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 4));
}
}
2 changes: 1 addition & 1 deletion paimon-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-docs</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-e2e-tests</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-filesystems/paimon-hadoop-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>paimon-filesystems</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-hadoop-shaded</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-filesystems/paimon-oss-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>paimon-filesystems</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-oss-impl</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-filesystems/paimon-oss/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>paimon-filesystems</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-oss</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-filesystems/paimon-s3-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>paimon-filesystems</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-s3-impl</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-filesystems/paimon-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>paimon-filesystems</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-s3</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion paimon-filesystems/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>0.8-SNAPSHOT</version>
<version>0.8.2</version>
</parent>

<artifactId>paimon-filesystems</artifactId>
Expand Down
Loading