Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -956,16 +956,16 @@ timestamp_precision=ms
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
# pipe_sink_timeout_ms=900000

# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1
# The maximum number of selectors that can be used in the sink.
# pipe_sink_selector_number=1

# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8
# The core number of clients that can be used in the sink.
# pipe_sink_core_client_number=8

# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16
# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16

# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;

import java.io.File;
import java.util.Optional;
import java.util.Properties;

public class CommonDescriptor {
Expand Down Expand Up @@ -294,67 +295,96 @@ private void loadPipeProps(Properties properties) {

config.setPipeExtractorAssignerDisruptorRingBufferSize(
Integer.parseInt(
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_size",
String.valueOf(config.getPipeExtractorAssignerDisruptorRingBufferSize()))));
Optional.ofNullable(
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
.orElse(
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_size",
String.valueOf(
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
Integer.parseInt(
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
String.valueOf(
config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()))));
Optional.ofNullable(
properties.getProperty(
"pipe_source_assigner_disruptor_ring_buffer_entry_size_in_bytes"))
.orElse(
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
String.valueOf(
config
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
config.setPipeExtractorMatcherCacheSize(
Integer.parseInt(
properties.getProperty(
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
.orElse(
properties.getProperty(
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));

config.setPipeConnectorHandshakeTimeoutMs(
Long.parseLong(
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs()))));
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
.orElse(
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
config.setPipeConnectorTransferTimeoutMs(
Long.parseLong(
properties.getProperty(
"pipe_connector_timeout_ms",
String.valueOf(config.getPipeConnectorTransferTimeoutMs()))));
Optional.ofNullable(properties.getProperty("pipe_sink_timeout_ms"))
.orElse(
properties.getProperty(
"pipe_connector_timeout_ms",
String.valueOf(config.getPipeConnectorTransferTimeoutMs())))));
config.setPipeConnectorReadFileBufferSize(
Integer.parseInt(
properties.getProperty(
"pipe_connector_read_file_buffer_size",
String.valueOf(config.getPipeConnectorReadFileBufferSize()))));
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_size",
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
config.setPipeConnectorRetryIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_connector_retry_interval_ms",
String.valueOf(config.getPipeConnectorRetryIntervalMs()))));
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
.orElse(
properties.getProperty(
"pipe_connector_retry_interval_ms",
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
config.setPipeConnectorPendingQueueSize(
Integer.parseInt(
properties.getProperty(
"pipe_connector_pending_queue_size",
String.valueOf(config.getPipeConnectorPendingQueueSize()))));
Optional.ofNullable(properties.getProperty("pipe_sink_pending_queue_size"))
.orElse(
properties.getProperty(
"pipe_connector_pending_queue_size",
String.valueOf(config.getPipeConnectorPendingQueueSize())))));
config.setPipeConnectorRPCThriftCompressionEnabled(
Boolean.parseBoolean(
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled()))));
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
.orElse(
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));

config.setPipeAsyncConnectorSelectorNumber(
Integer.parseInt(
properties.getProperty(
"pipe_async_connector_selector_number",
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
.orElse(
properties.getProperty(
"pipe_async_connector_selector_number",
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
config.setPipeAsyncConnectorCoreClientNumber(
Integer.parseInt(
properties.getProperty(
"pipe_async_connector_core_client_number",
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber()))));
Optional.ofNullable(properties.getProperty("pipe_sink_core_client_number"))
.orElse(
properties.getProperty(
"pipe_async_connector_core_client_number",
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber())))));
config.setPipeAsyncConnectorMaxClientNumber(
Integer.parseInt(
properties.getProperty(
"pipe_async_connector_max_client_number",
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
.orElse(
properties.getProperty(
"pipe_async_connector_max_client_number",
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));

config.setSeperatedPipeHeartbeatEnabled(
Boolean.parseBoolean(
Expand Down