Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client and server lookup throttling #176

Closed
wants to merge 2 commits into from

Conversation

rdhabalia
Copy link
Contributor

Motivation

Sometimes, broker requires a way to throttle newly incoming traffic and requires to restrict number of incoming concurrent lookup requests. It is also useful to throttle at client in order to avoid large number of concurrent lookup-requests while creating producers/consumers.

Modifications

  • Add broker side throttling to serve max allowed concurrent lookup request
  • Add client side lookup throttling
  • Update value of maxConcurrentLookupRequest dynamically at broker
  • AdminApi command to update maxConcurrentLookupRequest

Result

Broker can have capability to throttle newly incoming traffic by restricting number of concurrent lookup request.

@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jan 27, 2017
@rdhabalia rdhabalia added this to the 1.17 milestone Jan 27, 2017
@rdhabalia rdhabalia self-assigned this Jan 27, 2017
@rdhabalia rdhabalia force-pushed the throttle branch 4 times, most recently from 2e4ba2c to 4a3f388 Compare January 27, 2017 19:28
} catch (Exception e) {
log.warn("Got exception when reading ZooKeeper path [{}]:", LOOKUP_THROTTLING_PATH, e);
}
return pendingLookupRequest;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a debug log with the actual value picked up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added debug log.


import io.netty.buffer.ByteBuf;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

@Path("/v2/destination/")
@NoSwaggerDocumentation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not throttle http lookup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added http-throttling change as well.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments:

  • The implementation for client side and broker side throttling is very different and ideally it would be better to break this down into 2 separate commits.
  • While I agree that having variables dynamically tunable at runtime (without restarting the broker) is a great thing, we shouldn't be doing that on a case by case, adding the REST handlers (that would not be possible to remove later.. ). We should address the dynamic configuration in a separate PR, using a more general approach

@ApiOperation(value = "Update allowed concurrent lookup permits. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Lookup permits updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to create the cluster") })
public void createCluster(@PathParam("permits") int permits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method should not be called createCluster()

@rdhabalia
Copy link
Contributor Author

The implementation for client side and broker side throttling is very different and ideally it would be better to break this down into 2 separate commits.

Agree. I will create separate commit with different PR to address it.

While I agree that having variables dynamically tunable at runtime (without restarting the broker) is a great thing, we shouldn't be doing that on a case by case, adding the REST handlers (that would not be possible to remove later.. ). We should address the dynamic configuration in a separate PR, using a more general approach

Agree. Does it make sense to create a separate resource (REST controller) Configuration which allows to add/update/delete configuration: param1: zk_path, body. json_value

@merlimat
Copy link
Contributor

Agree. Does it make sense to create a separate resource (REST controller) Configuration which allows to add/update/delete configuration: param1: zk_path, body. json_value

We need to think well how to expose the whole thing in consistent way. You have the values from the config file + the values in ZK. There should be a clear way to understand what is the actual configuration that is:

  • Supposed to be applied to all brokers
  • It's actually being applied in a given broker...

Other problem is how to clearly identify which values are dynamically tunable (eg: zk servers string will not be dynamic...)

@rdhabalia
Copy link
Contributor Author

@merlimat @saandrews this PR was having 3 changes

  • server throttling
  • client throttling
  • updating throttling-configuration using REST-Api/AdminApi

I have created separate PR for #181(server) and #182(client).
As @merlimat mentioned: For dynamic-configuration update: we can separate PR to address generic configuration update.

@merlimat
Copy link
Contributor

merlimat commented Feb 7, 2017

Closing in favor of #181

@merlimat merlimat closed this Feb 7, 2017
@merlimat merlimat removed this from the 1.17 milestone Feb 7, 2017
sijie pushed a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
* PIP-55: Refresh auth credentials

* Fixed lint issues
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
**Fixes:**
 apache#166 apache#153 apache#99 

**Issue:**
KoP uses [Kafka-2.0.0](https://github.com/streamnative/kop/blob/78d9ba3487d4d7c85a5d667d45d9b38aaa7c824f/pom.xml#L46) which supports [API_VERSION's](https://kafka.apache.org/protocol.html#The_Messages_ApiVersions) **0** --> **2**

When **_Kafka-Clients-2.4.x+_**(using `API_VERSION:  3`) connects to KoP, it panics and following error stack is observed:
`10:22:23.281 [pulsar-io-22-4] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - error to get Response ByteBuf:
java.lang.IllegalArgumentException: Invalid version for API key API_VERSIONS: 3
    at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:312) ~[?:?]
    at org.apache.kafka.common.protocol.ApiKeys.responseSchema(ApiKeys.java:286) ~[?:?]
    at org.apache.kafka.common.requests.ApiVersionsResponse.toStruct(ApiVersionsResponse.java:129) ~[?:?]
    at org.apache.kafka.common.requests.ResponseUtils.serializeResponse(ResponseUtils.java:40) ~[?:?]`


**Resolved By:**
Returning an `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes), which would notify the **_kafka-client_** to lower it's `API_VERSION`. As no list of `ApiKeys & versions` were available for the **kafka-clients** to refer, it safely falls-back to using `API_VERSION:  0` and KoP continues processing the kafka-messages using `API_VERSION:  0`.

**Tested producing/consuming with Kafka-Clients:**

> 2.0.0
2.2.2
2.3.1
2.4.0
2.4.1
2.5.0
2.5.1
2.6.0


**More...**
KoP could have returned the list of supported `ApiKeys & versions` while sending the `UNSUPPORTED_VERSION` error-code, which would have made the **_kafka-client_** select the **_latest_** supported `API_VERSION` and use `API_VERSION:  2` instead of falling all the way back and using `API_VERSION:  0` 


Notes on how **_Kafka-Brokers_** is supposed to handle this scenario: 
> 2. On receiving ApiVersionsRequest, a broker returns its full list of supported ApiKeys and versions regardless of current authentication state (e.g., before SASL authentication on an SASL listener, do note that no Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished). If this is considered to leak information about the broker version a workaround is to use SSL with client authentication which is performed at an earlier stage of the connection where the ApiVersionRequest is not available. Also, note that broker versions older than 0.10.0.0 do not support this API and will either ignore the request or close connection in response to the request.
> 
> 3. If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported by the broker and itself.


_Reference: [Kafka-Protocol Guide](https://kafka.apache.org/protocol.html#api_versions)_

We analyzed how various **_Kafka-Brokers_** respond to a similar situation where the **_kafka-client's_** `API_VERSION` is higher than what is supported by the **_Kafka-Broker_**.

![Kafka-Broker-Client-Wireshark-Results](https://user-images.githubusercontent.com/63665447/91243701-34d3a500-e710-11ea-9752-f9980333ce1d.png)
_Reference: Wireshark packet captures - [Kafka-Protocol-Study.zip](https://github.com/streamnative/kop/files/5127018/Kafka-Protocol-Study.zip)_

From the study we can infer that, in a similar `API_VERSION` mismatch scenario the **_Kafka-Brokers_** doesn't return the list of supported `ApiKeys & versions` when notifying the **_kafka-client_** with the `UNSUPPORTED_VERSION` [error-code: 35](https://kafka.apache.org/protocol.html#protocol_error_codes). Thus, forcing the **_kafka-clients_** to fall-back to using `API_VERSION:  0`.

To keep KoP working in sync with the **_Kafka-Broker_** working, we decided not to return the list of supported `ApiKeys & versions`.
michaeljmarshall pushed a commit to michaeljmarshall/pulsar that referenced this pull request Apr 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants