Skip to content
Permalink
Browse files
Fix bug when spark on doris run long time (#2485)
  • Loading branch information
Youngwb authored and imay committed Dec 18, 2019
1 parent 51e8ee6 commit b53dc00c2c895b3879c9f2053243a3e92e6c9f29
Showing 4 changed files with 14 additions and 0 deletions.
@@ -98,6 +98,7 @@ dorisSparkRDD.collect()
| doris.request.retries | 3 | 向Doris发送请求的重试次数 |
| doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 |
| doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 |
| doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 |
| doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。<br />从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 |
| doris.batch.size | 1024 | 一次从BE读取数据的最大行数。<br />增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的的额外时间开销。 |

@@ -40,9 +40,11 @@ public interface ConfigurationOptions {
String DORIS_REQUEST_RETRIES = "doris.request.retries";
String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms";
String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms";
String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s";
int DORIS_REQUEST_RETRIES_DEFAULT = 3;
int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;

String DORIS_TABLET_SIZE = "doris.request.tablet.size";
int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
@@ -64,7 +64,15 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
DORIS_BATCH_SIZE_DEFAULT
}

val queryDorisTimeout = Try {
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
}

params.setBatch_size(batchSize)
params.setQuery_timeout(queryDorisTimeout)
params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))

@@ -74,6 +82,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
s"table: ${params.getTable}, " +
s"tabletId: ${params.getTablet_ids}, " +
s"batch size: $batchSize, " +
s"query timeout: $queryDorisTimeout, " +
s"user: ${params.getUser}, " +
s"query plan: ${params.opaqued_query_plan}")

@@ -54,6 +54,8 @@ struct TScanOpenParams {
10: optional string passwd
// max keep alive time min
11: optional i16 keep_alive_min

12: optional i32 query_timeout
}

struct TScanColumnDesc {

0 comments on commit b53dc00

Please sign in to comment.