Skip to content

Commit

Permalink
Tune kafka and grpc maximum message size (#2661)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Apr 27, 2023
1 parent 1f0b8b9 commit 57dba68
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 14 deletions.
2 changes: 2 additions & 0 deletions charts/graphscope-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ Here we give a list of most frequently used parameters.
| javaOpts | Java options | "" |
| auth.username | Username | "" |
| auth.password | Password | "" |
| rpcMaxBytesMb | GRPC maximum message size | 4 |
| kafkaProducerCustomConfigs | Kafka producer max request size | "max.request.size:1048576000" |


### Statefulset parameters
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ data:
## Kafka Config
kafka.servers=KAFKA_SERVERS
kafka.topic={{ .Values.kafkaTopic }}
kafka.producer.custom.configs={{ .Values.kafkaProducerCustomConfigs }}
## Frontend Config
gremlin.server.port=12312
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ storeWriteThreadCount: 1

## Kafka Config
kafkaTopic: "graphscope"
kafkaProducerCustomConfigs: ""

## Frontend Config
# gremlinServerPort: 12312
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public class KafkaConfig {
public static final Config<String> KAFKA_PRODUCER_CUSTOM_CONFIGS =
Config.stringConfig("kafka.producer.custom.configs", "");

public static final Config<Integer> KAFKA_MAX_MESSEAGE_MB =
public static final Config<Integer> KAFKA_MAX_MESSAGE_MB =
Config.intConfig("kafka.max.message.mb", 20);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void start() {
Map<Integer, ManagedChannel> idxToChannel =
this.roleToChannels.computeIfAbsent(role, k -> new HashMap<>());
int count =
Integer.valueOf(
Integer.parseInt(
this.configs.get(
String.format(CommonConfig.NODE_COUNT_FORMAT, role.getName()),
"0"));
Expand Down Expand Up @@ -105,15 +105,11 @@ public void start() {
for (int i = 0; i < count; i++) {
String host = hostTemplate.replace("{}", String.valueOf(i));
logger.info(
"create channel to role ["
+ role.getName()
+ "] #["
+ i
+ "]. host ["
+ host
+ "], port ["
+ port
+ "]");
"Creating channel to role {} #{}, host {}, port {}",
role.getName(),
i,
host,
port);
ManagedChannel channel =
ManagedChannelBuilder.forAddress(host, port)
.maxInboundMessageSize(this.rpcMaxBytes)
Expand All @@ -123,7 +119,7 @@ public void start() {
}
} else {
for (int i = 0; i < count; i++) {
logger.debug("create channel to role [" + role.getName() + "] #[" + i + "]");
logger.info("Create channel to role {} #{}", role.getName(), i);
String uri = SCHEME + "://" + role.getName() + "/" + i;
ManagedChannel channel =
ManagedChannelBuilder.forTarget(uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public KafkaLogService(Configs configs) {
this.topic = KafkaConfig.KAKFA_TOPIC.get(configs);
this.queueCount = CommonConfig.INGESTOR_QUEUE_COUNT.get(configs);
this.replicationFactor = KafkaConfig.KAFKA_REPLICATION_FACTOR.get(configs);
this.maxMessageMb = KafkaConfig.KAFKA_MAX_MESSEAGE_MB.get(configs);
this.maxMessageMb = KafkaConfig.KAFKA_MAX_MESSAGE_MB.get(configs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public KafkaLogWriter(
}

@Override
public long append(LogEntry logEntry) throws IOException {
public long append(LogEntry logEntry) {
Future<RecordMetadata> future =
producer.send(
new ProducerRecord<>(this.topicName, this.partitionId, null, logEntry));
Expand Down

0 comments on commit 57dba68

Please sign in to comment.