Skip to content
Merged
48 changes: 42 additions & 6 deletions docs/content/append-table/append-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ CREATE TABLE my_table (
{{< /tab >}}
{{< /tabs >}}

## Data Distribution

By default, append table has no bucket concept. It acts just like a Hive Table. The data files are placed under
partitions where they can be reorganized and reordered to speed up queries.

## Automatic small file merging

In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use
Expand All @@ -61,4 +56,45 @@ Do not worry about backpressure, compaction never backpressure.
If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology.

The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by
flink action in paimon and disable all the other compaction by set `write-only`.
flink action in paimon and disable all the other compaction by set `write-only`.

## Streaming Query

You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options
for streaming reads:
1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the
latest incremental records.
2. You can specify `scan.mode` or `scan.snapshot-id` or `scan.timestamp-millis` or `scan.file-creation-time-millis` to
streaming read incremental only.

Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also
need to consider defining a `bucket-key`, see [Bucketed Append]({{< ref "append-table/bucketed-append" >}}).

## OLAP Query

### Data Skipping By Order

Paimon by default records the maximum and minimum values of each field in the manifest file.

In the query, according to the `WHERE` condition of the query, according to the statistics in the manifest do files
filtering, if the filtering effect is good, the query would have been minutes of the query will be accelerated to
milliseconds to complete the execution.

Often the data distribution is not always effective filtering, so if we can sort the data by the field in `WHERE` condition?
You can take a look to [Flink COMPACT Action]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}) or
[Flink COMPACT Procedure]({{< ref "flink/procedures" >}}) or [Spark COMPACT Procedure]({{< ref "spark/procedures" >}}).

### Data Skipping By File Index

You can use file index too, it filters files by index on the read side.

```sql
CREATE TABLE <PAIMON_TABLE> (<COLUMN> <COLUMN_TYPE> , ...) WITH (
'file-index.bloom-filter.columns' = 'c1,c2',
'file-index.bloom-filter.c1.items' = '200'
);
```

## DELETE & UPDATE

Now, only Spark SQL supports DELETE & UPDATE, you can take a look to [Spark Write]({{< ref "spark/sql-write" >}}).
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: "Append Queue"
title: "Bucketed Append"
weight: 3
type: docs
aliases:
- /append-table/append-queue.html
- /append-table/bucketed-append.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -24,20 +24,22 @@ specific language governing permissions and limitations
under the License.
-->

# Append Queue
# Bucketed Append

## Definition

In this mode, you can regard append table as a queue separated by bucket. Every record in the same bucket is ordered strictly,
streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you do not need
to config special configurations, all the data will go into one bucket as a queue. You can also define the `bucket` and
`bucket-key` to enable larger parallelism and disperse data.
An ordinary Append table has no strict ordering guarantees for its streaming writes and reads, but there are some cases
where you need to define a key similar to Kafka's.

You can define the `bucket` and `bucket-key` to get a bucketed append table. Every record in the same bucket is ordered
strictly, streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you
do not need to config special configurations, all the data will go into one bucket as a queue.

{{< img src="/img/for-queue.png">}}

Example to create append queue table:
Example to create bucketed append table:

{{< tabs "create-append-queue" >}}
{{< tabs "create-bucketed-append" >}}
{{< tab "Flink" >}}

```sql
Expand Down
6 changes: 3 additions & 3 deletions docs/content/learn-paimon/understand-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,12 @@ file belongs to exactly one sorted run.
By default, sorted runs number depends on `num-sorted-run.compaction-trigger`, see [Compaction for Primary Key Table]({{< ref "/maintenance/write-performance#compaction" >}}),
this means that there are at least 5 files in a bucket. If you want to reduce this number, you can keep fewer files, but write performance may suffer.

### Understand Files for Append Queue Table
### Understand Files for Bucketed Append Table

By default, Append also does automatic compaction to reduce the number of small files.

However, for Bucket's Append table, it will only compact the files within the Bucket for sequential
purposes, which may keep more small files. See [Append Queue]({{< ref "append-table/append-queue" >}}).
However, for Bucketed Append table, it will only compact the files within the Bucket for sequential
purposes, which may keep more small files. See [Bucketed Append]({{< ref "append-table/bucketed-append" >}}).

### Understand Full-Compaction

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public AppendOnlyFileStore(

@Override
public BucketMode bucketMode() {
return options.bucket() == -1 ? BucketMode.UNAWARE : BucketMode.FIXED;
return options.bucket() == -1 ? BucketMode.BUCKET_UNAWARE : BucketMode.HASH_FIXED;
}

@Override
Expand Down Expand Up @@ -116,7 +116,7 @@ private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate predicate) {
if (bucketMode() != BucketMode.FIXED) {
if (bucketMode() != BucketMode.HASH_FIXED) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ public KeyValueFileStore(
@Override
public BucketMode bucketMode() {
if (options.bucket() == -1) {
return crossPartitionUpdate ? BucketMode.GLOBAL_DYNAMIC : BucketMode.DYNAMIC;
return crossPartitionUpdate ? BucketMode.CROSS_PARTITION : BucketMode.HASH_DYNAMIC;
} else {
checkArgument(!crossPartitionUpdate);
return BucketMode.FIXED;
return BucketMode.HASH_FIXED;
}
}

Expand Down Expand Up @@ -163,7 +163,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser) {
@Override
public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.DYNAMIC) {
if (bucketMode() == BucketMode.HASH_DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
}
DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null;
Expand Down Expand Up @@ -214,7 +214,7 @@ private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate keyFilter) {
if (bucketMode() != BucketMode.FIXED) {
if (bucketMode() != BucketMode.HASH_FIXED) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>

private boolean forceBufferSpill = false;
private boolean skipCompaction;
private BucketMode bucketMode = BucketMode.FIXED;
private BucketMode bucketMode = BucketMode.HASH_FIXED;

public AppendOnlyFileStoreWrite(
FileIO fileIO,
Expand Down Expand Up @@ -209,7 +209,7 @@ public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
// AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act difference in
// unaware-bucket mode (no compaction and force empty-writer).
this.bucketMode = bucketMode;
if (bucketMode == BucketMode.UNAWARE) {
if (bucketMode == BucketMode.BUCKET_UNAWARE) {
super.withIgnorePreviousFiles(true);
skipCompaction = true;
}
Expand All @@ -219,7 +219,7 @@ public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
@Override
public void withIgnorePreviousFiles(boolean ignorePrevious) {
// in unaware bucket mode, we need all writers to be empty
super.withIgnorePreviousFiles(ignorePrevious || bucketMode == BucketMode.UNAWARE);
super.withIgnorePreviousFiles(ignorePrevious || bucketMode == BucketMode.BUCKET_UNAWARE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ private static void validateBucket(TableSchema schema, CoreOptions options) {
if (bucket == -1) {
if (options.toMap().get(BUCKET_KEY.key()) != null) {
throw new RuntimeException(
"Cannot define 'bucket-key' in unaware or dynamic bucket mode.");
"Cannot define 'bucket-key' with bucket -1, please specify a bucket number.");
}

if (schema.primaryKeys().isEmpty()
Expand All @@ -537,6 +537,11 @@ private static void validateBucket(TableSchema schema, CoreOptions options) {
+ "(Primary key constraint %s not include all partition fields %s).",
schema.primaryKeys(), schema.partitionKeys()));
}

if (schema.primaryKeys().isEmpty() && schema.bucketKeys().isEmpty()) {
throw new RuntimeException(
"You should define a 'bucket-key' for bucketed append mode.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ public List<String> bucketKeys() {
if (bucketKeys.isEmpty()) {
bucketKeys = trimmedPrimaryKeys();
}
if (bucketKeys.isEmpty()) {
bucketKeys = fieldNames();
}
return bucketKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ public CatalogEnvironment catalogEnvironment() {

public RowKeyExtractor createRowKeyExtractor() {
switch (bucketMode()) {
case FIXED:
case HASH_FIXED:
return new FixedBucketRowKeyExtractor(schema());
case DYNAMIC:
case GLOBAL_DYNAMIC:
case HASH_DYNAMIC:
case CROSS_PARTITION:
return new DynamicBucketRowKeyExtractor(schema());
case UNAWARE:
case BUCKET_UNAWARE:
return new UnawareBucketRowKeyExtractor(schema());
default:
throw new UnsupportedOperationException("Unsupported mode: " + bucketMode());
Expand Down
40 changes: 23 additions & 17 deletions paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,44 @@

package org.apache.paimon.table;

import org.apache.paimon.annotation.Experimental;

/**
* Bucket mode of the table, it affects the writing process and also affects the data skipping in
* Bucket mode of the table, it affects the writing process and also affects the bucket skipping in
* reading.
*
* @since 0.9
*/
@Experimental
public enum BucketMode {

/**
* The fixed number of buckets configured by the user can only be modified through offline
* commands. The data is distributed to the corresponding buckets according to bucket key
* (default is primary key), and the reading end can perform data skipping based on the
* filtering conditions of the bucket key.
* commands. The data is distributed to the corresponding buckets according to the hash value of
* bucket key (default is primary key), and the reading end can perform bucket skipping based on
* the filtering conditions of the bucket key.
*/
FIXED,
HASH_FIXED,

/**
* The Dynamic bucket mode records which bucket the key corresponds to through the index files.
* This mode cannot support multiple concurrent writes or data skipping for reading filter
* conditions. This mode only works for changelog table.
* The dynamic bucket mode records which bucket the key corresponds to through the index files.
* The index records the correspondence between the hash value of the primary-key and the
* bucket. This mode cannot support multiple concurrent writes or bucket skipping for reading
* filter conditions. This mode only works for changelog table.
*/
DYNAMIC,
HASH_DYNAMIC,

/**
* Compared with the DYNAMIC mode, this mode not only dynamically allocates buckets for
* Partition table, but also updates data across partitions. The primary key does not contain
* partition fields.
* The cross partition mode is for cross partition upsert (primary keys not contain all
* partition fields). It directly maintains the mapping of primary keys to partition and bucket,
* uses local disks, and initializes indexes by reading all existing keys in the table when
* starting stream write job.
*/
GLOBAL_DYNAMIC,
CROSS_PARTITION,

/**
* Ignoring buckets can be equivalent to understanding that all data enters the global bucket,
* and data is randomly written to the table. The data in the bucket has no order relationship
* at all. This mode only works for append-only table.
* Ignoring bucket concept, although all data is written to bucket-0, the parallelism of reads
* and writes is unrestricted. This mode only works for append-only table.
*/
UNAWARE
BUCKET_UNAWARE
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public SinkRecord toLogRecord(SinkRecord record) {
keyAndBucketExtractor.setRecord(record.row());
return new SinkRecord(
record.partition(),
bucketMode == BucketMode.UNAWARE ? -1 : record.bucket(),
bucketMode == BucketMode.BUCKET_UNAWARE ? -1 : record.bucket(),
keyAndBucketExtractor.logPrimaryKey(),
record.row());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public AppendOnlySplitGenerator(
@Override
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
files.sort(fileComparator(bucketMode == BucketMode.UNAWARE));
files.sort(fileComparator(bucketMode == BucketMode.BUCKET_UNAWARE));
Function<DataFileMeta, Long> weightFunc = file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
Expand All @@ -58,7 +58,7 @@ public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// When the bucket mode is unaware, we spit the files as batch, because unaware-bucket table
// only contains one bucket (bucket 0).
if (bucketMode == BucketMode.UNAWARE) {
if (bucketMode == BucketMode.BUCKET_UNAWARE) {
return splitForBatch(files);
} else {
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ protected FileStoreTable createFileStoreTable() throws Exception {
.column("f2", DataTypes.INT())
.partitionKeys("f0")
.option("bucket", "100")
.option("bucket-key", "f1")
.build();
Identifier identifier = Identifier.create("default", "test");
catalog.createDatabase("default", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -681,7 +684,7 @@ public void testSequentialRead() throws Exception {
serializer
.toBinaryRow(rowData(i, random.nextInt(), random.nextLong()))
.copy();
int bucket = bucket(bucketKeyHashCode(data), numOfBucket);
int bucket = bucket(bucketKeyHashCode(row(data.getInt(1))), numOfBucket);
dataPerBucket.compute(
bucket,
(k, v) -> {
Expand Down Expand Up @@ -827,6 +830,9 @@ protected FileStoreTable createFileStoreTable(Consumer<Options> configure) throw
Options conf = new Options();
conf.set(CoreOptions.PATH, tablePath.toString());
configure.accept(conf);
if (!conf.contains(BUCKET_KEY) && conf.get(BUCKET) != -1) {
conf.set(BUCKET_KEY, "a");
}
TableSchema tableSchema =
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), tablePath),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void testBucket() {
assertThat(bucket(extractor("", "a"), row)).isEqualTo(96);
assertThat(bucket(extractor("", "a,b"), row)).isEqualTo(27);
assertThat(bucket(extractor("a,b", "a,b"), row)).isEqualTo(27);
assertThat(bucket(extractor("", ""), row)).isEqualTo(40);
assertThat(bucket(extractor("a,b,c", ""), row)).isEqualTo(40);
assertThat(bucket(extractor("", "a,b,c"), row)).isEqualTo(40);
}
Expand Down
Loading