Skip to content

Commit

Permalink
[HUDI-4729] Fix fq can not be queried in pending compaction when quer…
Browse files Browse the repository at this point in the history
…y ro table with spark-sql
  • Loading branch information
zhanshaoxiong committed Sep 14, 2022
1 parent 797e7a6 commit 573fb0e
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 6 deletions.
Expand Up @@ -63,7 +63,7 @@
* <li>Query instant/range</li>
* </ul>
*/
public abstract class BaseHoodieTableFileIndex {
public abstract class BaseHoodieTableFileIndex {

private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);

Expand Down
Expand Up @@ -153,6 +153,10 @@ public Stream<FileSlice> getAllFileSlices() {
return Stream.empty();
}

public Stream<FileSlice> getAllFileSlicesBeforeOn(String maxInstantTime) {
return fileSlices.values().stream().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime));
}

/**
* Gets the latest slice - this can contain either.
* <p>
Expand Down
Expand Up @@ -664,13 +664,25 @@ public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
Stream<FileSlice> fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime));
Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime).filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime)));
if (includeFileSlicesInPendingCompaction) {
return fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
return allFileSliceStream
.map(sliceStream ->
Option.fromJavaOptional(sliceStream
.map(this::filterBaseFileAfterPendingCompaction)
.filter(slice -> !slice.isEmpty())
.findFirst()))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
} else {
return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()))
.map(this::addBootstrapBaseFileIfPresent);
return allFileSliceStream
.map(sliceStream ->
Option.fromJavaOptional(sliceStream
.filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
.filter(slice -> !slice.isEmpty())
.findFirst()))
.filter(Option::isPresent)
.map(Option::get);
}
} finally {
readLock.unlock();
Expand Down
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
test("Test Query Merge_On_Read Read_Optimized table") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '$tablePath'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql("set hoodie.parquet.max.file.size = 10000")
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"update $tableName set price = 21 where id = 2")
spark.sql(s"update $tableName set price = 31 where id = 3")
spark.sql(s"update $tableName set price = 41 where id = 4")

// expect that all complete parquet files can be scanned
assertQueryResult(4, tablePath)

// async schedule compaction job
spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
.collect()

// expect that all complete parquet files can be scanned with a pending compaction job
assertQueryResult(4, tablePath)

spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")

// expect that all complete parquet files can be scanned with a pending compaction job
assertQueryResult(5, tablePath)

// async run compaction job
spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
.collect()

// assert that all complete parquet files can be scanned after compaction
assertQueryResult(5, tablePath)
}
}

def assertQueryResult(expected: Any,
tablePath: String): Unit = {
val actual = spark.read.format("org.apache.hudi").option("hoodie.datasource.query.type", "read_optimized").load(tablePath).count()
assertResult(expected)(actual)
}
}

0 comments on commit 573fb0e

Please sign in to comment.