-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Pulsar SQL] Add max split entry queue size bytes limitation #9628
[Pulsar SQL] Add max split entry queue size bytes limitation #9628
Conversation
public class CacheSizeAllocator { | ||
|
||
private final long maxCacheSize; | ||
private final AtomicLong availableCacheSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use LongAdder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -136,6 +137,10 @@ public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pu | |||
pulsarConnectorConfig), | |||
new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider())); | |||
this.decoderFactory = decoderFactory; | |||
if (pulsarConnectorConfig.getMaxSplitEntryQueueSizeBytes() >= 0) { | |||
this.entryCacheSizeAllocator = new CacheSizeAllocator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can make it an interface. Then have a no-op implementation and a CacheSizeAllocator implementation. Then you don't need to add if (entryCacheSizeAllocator != null)
everywhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that seems better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit tests for the changes to make sure the cache size allocator is initialized as expected.
@@ -136,6 +137,10 @@ public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pu | |||
pulsarConnectorConfig), | |||
new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider())); | |||
this.decoderFactory = decoderFactory; | |||
if (pulsarConnectorConfig.getMaxSplitEntryQueueSizeBytes() >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the size == 0, this means disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the config value is 0, there will be only one entry in the entry queue.
@gaoran10 Could you please check the comments? |
39dc7e6
to
1c4051d
Compare
/pulsarbot run-failure-checks |
@@ -222,7 +228,7 @@ public void setPulsarSqlSchemaInfoProvider(PulsarSqlSchemaInfoProvider schemaInf | |||
@VisibleForTesting | |||
class DeserializeEntries implements Runnable { | |||
|
|||
protected boolean isRunning = false; | |||
protected boolean isRunning = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this introduced by mistake?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an indent fix.
@@ -654,4 +671,15 @@ private void checkFieldType(int field, Class<?> expected) { | |||
checkArgument(actual == expected, "Expected field %s to be type %s but is %s", field, expected, actual); | |||
} | |||
|
|||
private void initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig) { | |||
log.info("Init entry cache size allocator with max split entry queue size bytes {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this log message to the if
block?
|
||
public long getAvailableCacheSize() { | ||
try { | ||
lock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a lock?
cursor.asyncReadEntries(batchSize, maxSizeBytes, | ||
this, System.nanoTime(), PositionImpl.latest); | ||
} else { | ||
metricsTracker.incr_READ_ATTEMPTS_FAIL(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If people doesn't configure this setting, we are initializing a NullCacheSizeAllocator. So it will cause metricsTracker
to increase READ_ATTEMPTS_FAIL
again and again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, I'll fix this.
@gaoran10 Could you please check sijie's comments? |
/pulsarbot run-failure-checks |
…zeAllocator` for invalid value of the configuration `maxSplitEntryQueueSizeBytes`.
94d62ac
to
8e19763
Compare
/pulsarbot run-failure-checks |
…9628) ### Motivation In Pulsar SQL, there are two configurations `pulsar.max-split-entry-queue-size` and `pulsar.max-split-message-queue-size` to control the entry queue and message queue capacity, but some entries are so big some are small, it's hard to control the queue size bytes and the message queue size bytes. ### Modifications Add a new configuration `pulsar.max-split-queue-cache-size` to control the entry queue size bytes and the message queue size bytes. Half of this configuration will assign to entry queue size bytes and the left quota assign to message queue size bytes.
Motivation
In Pulsar SQL, there are two configurations
pulsar.max-split-entry-queue-size
andpulsar.max-split-message-queue-size
to control the entry queue and message queue capacity, but some entries are so big some are small, it's hard to control the queue size bytes and the message queue size bytes.Modifications
Add a new configuration
pulsar.max-split-queue-cache-size
to control the entry queue size bytes and the message queue size bytes. Half of this configuration will assign to entry queue size bytes and the left quota assign to message queue size bytes.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changes