Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,13 +573,17 @@ public void queryRetry(TUniqueId queryId) throws Exception {
TUniqueId firstQueryId = queryId;
int retryTime = Config.max_query_retry_time;
retryTime = retryTime <= 0 ? 1 : retryTime + 1;
boolean disableCloudVersionCacheOnRetry = false;
// If the query is an `outfile` statement,
// we execute it only once to avoid exporting redundant data.
if (parsedStmt instanceof Queriable) {
retryTime = ((Queriable) parsedStmt).hasOutFileClause() ? 1 : retryTime;
}
for (int i = 1; i <= retryTime; i++) {
try {
if (disableCloudVersionCacheOnRetry) {
setCloudVersionCacheTtlZeroForNextAttempt();
}
Comment thread
mymeiyi marked this conversation as resolved.
execute(queryId);
return;
} catch (UserException e) {
Expand All @@ -592,6 +596,7 @@ public void queryRetry(TUniqueId queryId) throws Exception {
if (this.coord != null && this.coord.isQueryCancelled()) {
throw e;
}
disableCloudVersionCacheOnRetry = shouldDisableCloudVersionCacheOnRetry(e.getMessage());
TUniqueId lastQueryId = queryId;
queryId = UniqueIdUtils.fastUniqueId();
int randomMillis = 10 + (int) (Math.random() * 10);
Expand All @@ -612,6 +617,35 @@ public void queryRetry(TUniqueId queryId) throws Exception {
}
}

boolean shouldDisableCloudVersionCacheOnRetry(String errorMessage) {
return Config.isCloudMode()
&& errorMessage != null
&& errorMessage.contains(SystemInfoService.ERROR_E230)
&& (context.getSessionVariable().cloudPartitionVersionCacheTtlMs != 0
|| context.getSessionVariable().cloudTableVersionCacheTtlMs != 0);
}

void setCloudVersionCacheTtlZeroForNextAttempt() {
SessionVariable sessionVariable = context.getSessionVariable();
long partitionTtl = sessionVariable.cloudPartitionVersionCacheTtlMs;
long tableTtl = sessionVariable.cloudTableVersionCacheTtlMs;

if (partitionTtl != 0
&& !sessionVariable.setVarOnce(SessionVariable.CLOUD_PARTITION_VERSION_CACHE_TTL_MS, "0")) {
LOG.warn("failed to set {}=0 before retry. {}",
SessionVariable.CLOUD_PARTITION_VERSION_CACHE_TTL_MS, context.getQueryIdentifier());
}
if (tableTtl != 0
&& !sessionVariable.setVarOnce(SessionVariable.CLOUD_TABLE_VERSION_CACHE_TTL_MS, "0")) {
LOG.warn("failed to set {}=0 before retry. {}",
SessionVariable.CLOUD_TABLE_VERSION_CACHE_TTL_MS, context.getQueryIdentifier());
}
LOG.info("temporarily set {} from {} to 0 and {} from {} to 0 before retry. {}",
SessionVariable.CLOUD_PARTITION_VERSION_CACHE_TTL_MS, partitionTtl,
SessionVariable.CLOUD_TABLE_VERSION_CACHE_TTL_MS, tableTtl,
context.getQueryIdentifier());
Comment thread
mymeiyi marked this conversation as resolved.
}

public void execute(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
Expand Down
66 changes: 66 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.planner.PlanFragment;
Expand Down Expand Up @@ -350,4 +351,69 @@ public void testClearDeleteExistingFilesInPlan() throws Exception {

Mockito.verify(resultFileSink).setDeleteExistingFiles(false);
}

@Test
public void testShouldDisableCloudVersionCacheOnRetryForE230() throws UserException {
String originalCloudUniqueId = Config.cloud_unique_id;
long originalPartitionTtl = connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs;
long originalTableTtl = connectContext.getSessionVariable().cloudTableVersionCacheTtlMs;
try {
Config.cloud_unique_id = "test-cloud-id";
StmtExecutor executor = new StmtExecutor(connectContext, "select 1");

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 1000L;
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = some other error"));

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 1000L;
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 0L;
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 0L;
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));
} finally {
Config.cloud_unique_id = originalCloudUniqueId;
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = originalPartitionTtl;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = originalTableTtl;
}
}

@Test
public void testSetCloudVersionCacheTtlZeroForNextAttemptAndRevert() throws Exception {
SessionVariable sessionVariable = connectContext.getSessionVariable();
long originalPartitionTtl = sessionVariable.cloudPartitionVersionCacheTtlMs;
long originalTableTtl = sessionVariable.cloudTableVersionCacheTtlMs;
try {
sessionVariable.cloudPartitionVersionCacheTtlMs = 1000L;
sessionVariable.cloudTableVersionCacheTtlMs = 2000L;
StmtExecutor executor = new StmtExecutor(connectContext, "select 1");

executor.setCloudVersionCacheTtlZeroForNextAttempt();
Assertions.assertEquals(0L, sessionVariable.cloudPartitionVersionCacheTtlMs);
Assertions.assertEquals(0L, sessionVariable.cloudTableVersionCacheTtlMs);

VariableMgr.revertSessionValue(sessionVariable);
sessionVariable.setIsSingleSetVar(false);
sessionVariable.clearSessionOriginValue();

Assertions.assertEquals(1000L, sessionVariable.cloudPartitionVersionCacheTtlMs);
Assertions.assertEquals(2000L, sessionVariable.cloudTableVersionCacheTtlMs);
} finally {
sessionVariable.cloudPartitionVersionCacheTtlMs = originalPartitionTtl;
sessionVariable.cloudTableVersionCacheTtlMs = originalTableTtl;
sessionVariable.setIsSingleSetVar(false);
sessionVariable.clearSessionOriginValue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,62 @@ suite("test_retry_e-230", 'docker') {
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 4000 && cost < 100000)

def restoreTbl = 'test_retry_e_230_restore_tbl'
sql """ DROP TABLE IF EXISTS ${restoreTbl} """
sql """
CREATE TABLE ${restoreTbl} (
`k1` int(11) NULL,
`k2` int(11) NULL
)
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
"""
for (def i = 1; i <= 3; i++) {
insert_sql """INSERT INTO ${restoreTbl} VALUES (${i}, ${100 * i})""", 1
}

sql """ set cloud_partition_version_cache_ttl_ms = 3600000 """
sql """ set cloud_table_version_cache_ttl_ms = 3600000 """
def row_cnt = sql """select count() from ${restoreTbl}"""
assertEquals(row_cnt[0][0], 3)

cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])
cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])
insert_sql """INSERT INTO ${restoreTbl} VALUES (4, 400)""", 1

def futrue5 = thread {
Thread.sleep(4000)
cluster.clearBackendDebugPoints()
}

begin = System.currentTimeMillis();
def futrue6 = thread {
def result = sql """select * from ${restoreTbl} order by k1"""
log.info("select result: {}", result)
}

futrue6.get()
cost = System.currentTimeMillis() - begin;
log.info("time cost restore check : {}", cost)
futrue5.get()
assertTrue(cost > 4000 && cost < 100000)
Comment thread
mymeiyi marked this conversation as resolved.

def ttlRows = sql_return_maparray """
select
@@session.cloud_partition_version_cache_ttl_ms as partition_ttl,
@@session.cloud_table_version_cache_ttl_ms as table_ttl
"""
assertEquals("3600000", ttlRows[0].partition_ttl.toString())
assertEquals("3600000", ttlRows[0].table_ttl.toString())
row_cnt = sql """select count() from ${restoreTbl}"""
assertEquals(row_cnt[0][0], 4)
} finally {
sql """ set cloud_partition_version_cache_ttl_ms = DEFAULT """
sql """ set cloud_table_version_cache_ttl_ms = DEFAULT """
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
}
Expand Down
Loading