Skip to content

Commit

Permalink
kafka: add ability to specify Kafka topic name
Browse files Browse the repository at this point in the history
Motivation:

Kafka topic name has been hardcoded to "billing" in dCache code making
it difficult to work with external Kafka installations.

Modification:

Add environment variable

dcache.kafka.topic=billing

and

service.kafka.topic=${dcache.kafka.topic} where service=dcap,ftp,nfs,pool,webdav,xrootd
to be able to specify Kafka receiving topic name

Result:

Added ability to specify Kafka receiving topic name

	Target: master
	Request: 5.0
	Request: 4.2

	Require-book: yes
	Require-notes: yes

	Acked-by: Paul Millar <paul.millar@desy.de>
	Patch: https://rb.dcache.org/r/11610/
        Issue: #4722
  • Loading branch information
DmitryLitvintsev committed Mar 15, 2019
1 parent 83c8b1b commit 806de40
Show file tree
Hide file tree
Showing 17 changed files with 53 additions and 10 deletions.
Expand Up @@ -2573,7 +2573,7 @@ private void postToBilling(DoorRequestInfoMessage info) {

private void sendAsynctoKafka(DoorRequestInfoMessage info) {

ProducerRecord<String, DoorRequestInfoMessage> record = new ProducerRecord<String, DoorRequestInfoMessage>("billing", info);
ProducerRecord<String, DoorRequestInfoMessage> record = new ProducerRecord<String, DoorRequestInfoMessage>(_settings.getKafkaTopic(), info);
_kafkaProducer.send(record, (rm, e) -> {
if (e != null) {
_log.error("Unable to send message to topic {} on partition {}: {}",
Expand Down
Expand Up @@ -88,6 +88,9 @@ public class DcapDoorSettings
@Option(name = "bootstrap-server-kafka")
protected String kafkaBootstrapServer;

@Option(name = "kafka-topic")
protected String kafkaTopic;

@Option(name = "kafka-max-block", required = true)
protected long kafkaMaxBlock;

Expand Down Expand Up @@ -249,6 +252,15 @@ public String getKafkaBootstrapServer() {
return kafkaBootstrapServer;
}

/**
* Returns the name of kafka topic
*
* @return kafka topic name
*/
public String getKafkaTopic() {
return kafkaTopic;
}

/**
* Returns the parameter that controls how long
* how long the producer will block when calling send().
Expand Down
Expand Up @@ -1019,7 +1019,7 @@ public FtpTransfer(FsPath path,

if (_sendToKafka) {
setKafkaSender(m -> {
_kafkaProducer.send(new ProducerRecord<String, DoorRequestInfoMessage>("billing", m));
_kafkaProducer.send(new ProducerRecord<String, DoorRequestInfoMessage>(_settings.getKafkaTopic(), m));
});
}

Expand Down
Expand Up @@ -53,6 +53,9 @@ public class FtpDoorSettings
@Option(name = "bootstrap-server-kafka")
protected String kafkaBootstrapServer;

@Option(name = "kafka-topic")
protected String kafkaTopic;

@Option(name = "kafka-max-block",
defaultValue = "1")
protected long kafkaMaxBlock;
Expand Down Expand Up @@ -375,6 +378,15 @@ public String getKafkaBootstrapServer() {
return kafkaBootstrapServer;
}

/**
* Returns name of kafka topic
*
* @return kafka topic name
*/
public String getKafkaTopic() {
return kafkaTopic;
}

/**
* Returns the parameter that controls how long
* how long the producer will block when calling send(). By default set to 60000.
Expand Down
Expand Up @@ -250,7 +250,7 @@
</bean>
</constructor-arg>
<constructor-arg name="autoFlush" value="false" />
<property name="defaultTopic" value="billing"/>
<property name="defaultTopic" value="${nfs.kafka.topic}"/>
<property name="producerListener" ref="listener"/>
</bean>
</beans>
Expand Down
Expand Up @@ -631,7 +631,7 @@
</constructor-arg>
</bean>
</constructor-arg>
<property name="defaultTopic" value="billing"/>
<property name="defaultTopic" value="${webdav.kafka.topic}"/>
<property name="producerListener" ref="listener"/>

</bean>
Expand Down
Expand Up @@ -226,7 +226,7 @@
</constructor-arg>
</bean>
</constructor-arg>
<property name="defaultTopic" value="billing"/>
<property name="defaultTopic" value="${xrootd.kafka.topic}"/>
<property name="producerListener" ref="listener"/>
</bean>
</beans>
Expand Down
Expand Up @@ -593,7 +593,7 @@
</constructor-arg>
</bean>
</constructor-arg>
<property name="defaultTopic" value="billing"/>
<property name="defaultTopic" value="${pool.kafka.topic}"/>
<property name="producerListener" ref="listener"/>
</bean>

Expand All @@ -616,7 +616,7 @@
</constructor-arg>
</bean>
</constructor-arg>
<property name="defaultTopic" value="billing"/>
<property name="defaultTopic" value="${pool.kafka.topic}"/>
<property name="producerListener" ref="listener"/>
</bean>

Expand All @@ -640,7 +640,7 @@
</bean>
</constructor-arg>
<constructor-arg name="autoFlush" value="false" />
<property name="defaultTopic" value="billing"/>
<property name="defaultTopic" value="${pool.kafka.topic}"/>
<property name="producerListener" ref="listener"/>
</bean>
</beans>
Expand Down
3 changes: 3 additions & 0 deletions skel/share/defaults/dcache.properties
Expand Up @@ -1215,6 +1215,9 @@ dcache.oidc.hostnames =
# host1:port1,host2:port2,....
dcache.kafka.bootstrap-servers = localhost:9092

# Kafka topic name
dcache.kafka.topic = billing

# Maximum time dCache will spend trying to send an event to the Kafka
# service. If dCache is unable to send the event to Kafka within
# this time limit, the an error is logged and the event is dropped.
Expand Down
2 changes: 2 additions & 0 deletions skel/share/defaults/dcap.properties
Expand Up @@ -219,3 +219,5 @@ dcap.kafka.maximum-block.unit = ${dcache.kafka.maximum-block.unit}
# A list of host/port pairs (brokers) host1:port1,host2:port2,....
dcap.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers}

# Kafka topic name
dcap.kafka.topic = ${dcache.kafka.topic}
3 changes: 3 additions & 0 deletions skel/share/defaults/ftp.properties
Expand Up @@ -387,3 +387,6 @@ ftp.kafka.maximum-block.unit = ${dcache.kafka.maximum-block.unit}

# A list of host/port pairs (brokers) host1:port1,host2:port2,....
ftp.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers}

# Kafka topic name
ftp.kafka.topic = ${dcache.kafka.topic}
3 changes: 3 additions & 0 deletions skel/share/defaults/nfs.properties
Expand Up @@ -265,3 +265,6 @@ nfs.kafka.maximum-block.unit = ${dcache.kafka.maximum-block.unit}

# A list of host/port pairs (brokers) host1:port1,host2:port2,....
nfs.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers}

# Kafka topic name
nfs.kafka.topic = ${dcache.kafka.topic}
4 changes: 2 additions & 2 deletions skel/share/defaults/pool.properties
Expand Up @@ -584,6 +584,8 @@ pool.kafka.maximum-block.unit = ${dcache.kafka.maximum-block.unit}
# A list of host/port pairs (brokers) host1:port1,host2:port2,....
pool.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers}

# Kafka topic name
pool.kafka.topic = ${dcache.kafka.topic}

# Support for encrypted transfers.
#
Expand All @@ -609,5 +611,3 @@ pool.mover.https.hostcert.key=${dcache.authn.hostcert.key}
# connections react to this property.
#
pool.mover.https.capath=${dcache.authn.capath}


3 changes: 3 additions & 0 deletions skel/share/defaults/webdav.properties
Expand Up @@ -725,3 +725,6 @@ webdav.kafka.maximum-block.unit = ${dcache.kafka.maximum-block.unit}

# A list of host/port pairs (brokers) host1:port1,host2:port2,....
webdav.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers}

# Kafka topic name
webdav.kafka.topic = ${dcache.kafka.topic}
3 changes: 3 additions & 0 deletions skel/share/defaults/xrootd.properties
Expand Up @@ -275,3 +275,6 @@ xrootd.kafka.maximum-block=${dcache.kafka.maximum-block}

(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS|${dcache.kafka.maximum-block.unit})\
xrootd.kafka.maximum-block.unit=${dcache.kafka.maximum-block.unit}

# Kafka topic name
xrootd.kafka.topic = ${dcache.kafka.topic}
1 change: 1 addition & 0 deletions skel/share/services/dcap.batch
Expand Up @@ -114,6 +114,7 @@ create dmg.cells.services.login.LoginManager ${dcap.cell.name} \
-billing=\"${dcap.service.billing}\" \
-kafka=\"${dcacp.enable.kafka}\" \
-bootstrap-server-kafka=\"${dcap.kafka.bootstrap-servers}\" \
-kafka-topic=\"{dcap.kafka.topic}\" \
-kafka-max-block=${dcap.kafka.maximum-block}\
-kafka-max-block-units=${dcap.kafka.maximum-block.unit}\
-retries-kafka=0 \
Expand Down
1 change: 1 addition & 0 deletions skel/share/services/ftp.batch
Expand Up @@ -94,6 +94,7 @@ create dmg.cells.services.login.LoginManager ${ftp.cell.name} \
-billing=\"${ftp.service.billing}\" \
-kafka=\"${ftp.enable.kafka}\" \
-bootstrap-server-kafka=\"${ftp.kafka.bootstrap-servers}\" \
-kafka-topic=\"${ftp.kafka.topic}\" \
-kafka-max-block=${ftp.kafka.maximum-block}\
-kafka-max-block-units=${ftp.kafka.maximum-block.unit}\
-retries-kafka=0 \
Expand Down

0 comments on commit 806de40

Please sign in to comment.