Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ suite("test_file_cache_query_limit", "external_docker,hive,external_docker_hive,
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
int queryCacheCapacity

// Poll a file_cache_statistics metric until predicate holds, or until timeout.
// file_cache_statistics is refreshed by the background monitor on its own cadence,
// so waiting a single fixed interval (the previous behavior) races the refresh and
// makes assertions flaky. On timeout we swallow the exception so the caller's
// assertFalse below can surface its own metric-specific message.
def pollFileCacheMetric = { String metricName, Closure predicate, long timeoutSeconds ->
try {
Awaitility.await()
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until {
def r = sql """select METRIC_VALUE from information_schema.file_cache_statistics
where METRIC_NAME = '${metricName}' limit 1;"""
return r.size() > 0 && predicate(Double.valueOf(r[0][0]))
}
} catch (org.awaitility.core.ConditionTimeoutException ignored) {
// fall through; the caller's assert will surface the precise failure
}
}

sql """drop catalog if exists ${catalog_name} """

sql """CREATE CATALOG ${catalog_name} PROPERTIES (
Expand Down Expand Up @@ -147,22 +167,20 @@ suite("test_file_cache_query_limit", "external_docker,hive,external_docker_hive,
def totalWaitTime = (fileCacheBackgroundMonitorIntervalMsResult[0][3].toLong() / 1000) as int
def interval = 1
def iterations = totalWaitTime / interval
long pollTimeoutSeconds = Math.max(30L, (long) totalWaitTime * 6L)

// Waiting for file cache clearing
(1..iterations).each { count ->
Thread.sleep(interval * 1000)
def elapsedSeconds = count * interval
def remainingSeconds = totalWaitTime - elapsedSeconds
logger.info("Waited for file cache clearing ${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
}
// Poll until the cache clear has drained the LRU queue. The HTTP clear endpoint with sync=true
// deletes blocks synchronously, but the queue counters are republished by the background monitor
// thread on its own cadence — so a single fixed-time wait can race the refresh.
pollFileCacheMetric('normal_queue_curr_size', { it == 0.0 }, pollTimeoutSeconds)
pollFileCacheMetric('normal_queue_curr_elements', { it == 0.0 }, pollTimeoutSeconds)

def initialNormalQueueCurrSizeResult = sql """select METRIC_VALUE from information_schema.file_cache_statistics
where METRIC_NAME = 'normal_queue_curr_size' limit 1;"""
logger.info("normal_queue_curr_size result: " + initialNormalQueueCurrSizeResult)
assertFalse(initialNormalQueueCurrSizeResult.size() == 0 || Double.valueOf(initialNormalQueueCurrSizeResult[0][0]) != 0.0,
INITIAL_NORMAL_QUEUE_CURR_SIZE_NOT_ZERO_MSG)

// Check normal queue current elements
def initialNormalQueueCurrElementsResult = sql """select METRIC_VALUE from information_schema.file_cache_statistics
where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
logger.info("normal_queue_curr_elements result: " + initialNormalQueueCurrElementsResult)
Expand Down Expand Up @@ -199,13 +217,9 @@ suite("test_file_cache_query_limit", "external_docker,hive,external_docker_hive,
// load the table into file cache
sql query_sql

// Waiting for file cache statistics update
(1..iterations).each { count ->
Thread.sleep(interval * 1000)
def elapsedSeconds = count * interval
def remainingSeconds = totalWaitTime - elapsedSeconds
logger.info("Waited for file cache statistics update ${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
}
// Poll until the query has populated the cache.
pollFileCacheMetric('normal_queue_curr_elements', { it > 0.0 }, pollTimeoutSeconds)
pollFileCacheMetric('normal_queue_curr_size', { it > 0.0 }, pollTimeoutSeconds)

def baseNormalQueueCurrElementsResult = sql """select METRIC_VALUE from information_schema.file_cache_statistics
where METRIC_NAME = 'normal_queue_curr_elements' limit 1;"""
Expand Down Expand Up @@ -247,13 +261,9 @@ suite("test_file_cache_query_limit", "external_docker,hive,external_docker_hive,
logger.info("File cache clear command output: ${output.toString()}")
assertTrue(exitCode == 0, "File cache clear failed with exit code ${exitCode}. Error: ${errorOutput.toString()}")

// Waiting for file cache clearing
(1..iterations).each { count ->
Thread.sleep(interval * 1000)
def elapsedSeconds = count * interval
def remainingSeconds = totalWaitTime - elapsedSeconds
logger.info("Waited for file cache clearing ${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
}
// Poll until the file cache is fully cleared again.
pollFileCacheMetric('normal_queue_curr_size', { it == 0.0 }, pollTimeoutSeconds)
pollFileCacheMetric('normal_queue_curr_elements', { it == 0.0 }, pollTimeoutSeconds)

// ===== Normal Queue Metrics Check =====
// Check normal queue current size
Expand Down Expand Up @@ -337,13 +347,9 @@ suite("test_file_cache_query_limit", "external_docker,hive,external_docker_hive,
// load the table into file cache
sql query_sql

// Waiting for file cache statistics update
(1..iterations).each { count ->
Thread.sleep(interval * 1000)
def elapsedSeconds = count * interval
def remainingSeconds = totalWaitTime - elapsedSeconds
logger.info("Waited for file cache statistics update ${elapsedSeconds} seconds, ${remainingSeconds} seconds remaining")
}
// Poll until the query has populated the cache under the new file_cache_query_limit.
pollFileCacheMetric('normal_queue_curr_size', { it > 0.0 }, pollTimeoutSeconds)
pollFileCacheMetric('normal_queue_curr_elements', { it > 0.0 }, pollTimeoutSeconds)

// Get updated value of normal queue current elements and max elements after cache operations
def updatedNormalQueueCurrSizeResult = sql """select METRIC_VALUE from information_schema.file_cache_statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,12 @@ suite("test_hive_query_cache", "p0,external,hive,external_docker,external_docker
sql """use `tpch1_parquet`"""
qt_tpch_1sf_q09 "${tpch_1sf_q09}"
sql "${tpch_1sf_q09}"

test {
sql "${tpch_1sf_q09}"
time 20000
}
// NOTE: enable_sql_cache=false is set above, so a `test { ... time 20000 }` block here is
// NOT testing SQL cache — it is timing a cold TPC-H Q9 over containerized hive parquet,
// which routinely exceeds 20s under load. Run the query without the time guard; the qt_
// above already validates correctness. Cache behavior is verified in the blocks below
// that explicitly set enable_sql_cache=true.
sql "${tpch_1sf_q09}"

// test sql cache with empty result
try {
Expand Down
Loading