Skip to content

Commit

Permalink
dcap: global kafka producer
Browse files Browse the repository at this point in the history
Motivation

Currently the code is implemented in a way that each new transfer creates a new dcap client and for each of this clients a unique kafka procedure is created.
Apparently this is not the optimal way, and there should be created one (global) kafka producer when the door is started.

Modifications

the prducer is now created in  `init()` method and is latter accessed in each new instance of `DCapDoorInterpreterV3`.

Result

A global Producer is now created when the Dcap door is started and is closed when the door is stoped.

Target: master
Requires-notes: no
Requires-book: no
Avted-by: Tigran
  • Loading branch information
mksahakyan committed Mar 6, 2021
1 parent d536bd0 commit f1a8f78
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 41 deletions.
Expand Up @@ -233,12 +233,8 @@ public DCapDoorInterpreterV3(CellEndpoint cell, CellAddressCore address, DcapDoo
_loginStrategy = settings.createLoginStrategy(cell);

if ( _settings.isKafkaEnabled() ){
_kafkaProducer = settings.createKafkaProducer(_settings.getKafkaBootstrapServer(),
address.toString(),
_settings.getKafkaMaxBlockMs(),
_settings.getKafkaRetries());
_kafkaProducer = settings.getKafkaProducer();
}

_startedTS = new Date();
}

Expand Down Expand Up @@ -2471,18 +2467,6 @@ public void close() {
}
}

protected void shutdownKafka() {
/*The producer consists of a pool of buffer space that holds records that haven't yet been
transmitted to the server as well as a background I/O thread
that is responsible for turning these records into requests and transmitting them to the cluster.
Failure to close the producer after use will leak these resources. Hence we need to close Kafka Producer
*/
if (_settings.isKafkaEnabled()) {
_log.debug("Shutting down kafka");
_kafkaProducer.close();
}
}

@Override
public void getInfo( PrintWriter pw ){
pw.println( " ----- DCapDoorInterpreterV3 ----------" ) ;
Expand Down
Expand Up @@ -166,6 +166,9 @@ public class DcapDoorSettings

private CheckStagePermission checkStagePermission;

private KafkaProducer _kafkaProducer;


public void init()
{
isAuthorizationStrong = (auth != null) && auth.equals("strong");
Expand Down Expand Up @@ -198,6 +201,10 @@ public void init()

checkStagePermission = new CheckStagePermission(stageConfigurationFilePath);
checkStagePermission.setAllowAnonymousStaging(allowAnonymousStaging);
if (isKafkaEnabled) {
_kafkaProducer = createKafkaProducer(kafkaBootstrapServer, String.valueOf(kafkaMaxBlock), kafkaRetries);
_log.warn("Creating KafkaProducer" + _kafkaProducer.hashCode());
}
}

public CellPath getPnfsManager()
Expand All @@ -221,17 +228,17 @@ public boolean isKafkaEnabled() {
return isKafkaEnabled;
}

/**
* Returns a list of host/port pairs (brokers) to use for establishing the initial connection to the Kafka cluster.
* This list is just used to discover the rest of the brokers in the cluster and should be in the form
* host1:port1,host2:port2,....
*
* @return the list of of host/port pairs
*/
public String getKafkaBootstrapServer() {
return kafkaBootstrapServer;

public KafkaProducer getKafkaProducer() {
return _kafkaProducer;
}

public void destroy() {
if (isKafkaEnabled){
_log.warn("Shutting down kafka");
_kafkaProducer.close();
}
}
/**
* Returns the name of kafka topic
*
Expand All @@ -241,16 +248,6 @@ public String getKafkaTopic() {
return kafkaTopic;
}

/**
* Returns the parameter that controls how long
* how long the producer will block when calling send().
*
* @retrun a timeframe during which producer will block sending messages, by default set to 60000
*/
public String getKafkaMaxBlockMs() {
return String.valueOf(TimeUnit.MILLISECONDS.convert(kafkaMaxBlock, kafkaMaxBlockUnits));
}

/**
* Returns the number of retries that the producer will retry sending the messages before failing it.
*
Expand Down Expand Up @@ -344,15 +341,15 @@ public PoolManagerStub createPoolManagerStub(CellEndpoint cellEndpoint, CellAddr
return stub;
}



public KafkaProducer createKafkaProducer(String bootstrap_server,
String client_id,
String max_block_ms,
String retries)
{
Properties props = new Properties();

//TODO check client_id
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);
props.put(ProducerConfig.CLIENT_ID_CONFIG, client_id);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.dcache.notification.DoorRequestMessageSerializer");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, max_block_ms);
Expand Down
Expand Up @@ -21,7 +21,6 @@
import java.util.concurrent.Executor;

import diskCacheV111.util.ConfigurationException;

import dmg.cells.nucleus.CellAddressCore;
import dmg.cells.nucleus.CellEndpoint;
import dmg.util.StreamEngine;
Expand All @@ -42,6 +41,7 @@ public class DcapInterpreterFactory implements LineBasedInterpreterFactory
@Override
public void configure(Args args) throws ConfigurationException
{
//TODO this is called three times on start to be check
new OptionParser(args).inject(settings).init();
}

Expand All @@ -51,4 +51,10 @@ public LineBasedInterpreter create(CellEndpoint endpoint, CellAddressCore myAddr
{
return new DcapLineBasedInterpreterAdapter(endpoint, myAddress, engine, settings, poolManagerHandler);
}

@Override
public void destroy() {
settings.destroy();

}
}
Expand Up @@ -108,7 +108,7 @@ public void shutdown()
@Override
public void messagingClosed()
{
shutdownKafka();
//TODO delete
}

@Override
Expand Down
Expand Up @@ -95,6 +95,7 @@ protected void doStart()
@Override
protected void doStop()
{
factory.destroy();
poolManagerHandler.beforeStop();
executor.shutdown();
notifyStopped();
Expand Down
Expand Up @@ -55,6 +55,7 @@ public interface LineBasedInterpreter
* the client has not yet been closed, but the client might not accept any
* more data.
*/
//TODO not clear why do we need this method, kafka Producer is not closed anymore in this methods implementation
default void messagingClosed()
{
// do nothing.
Expand Down
Expand Up @@ -61,4 +61,11 @@ public interface LineBasedInterpreterFactory
*/
LineBasedInterpreter create(CellEndpoint endpoint, CellAddressCore myAddress,
StreamEngine engine, Executor executor, PoolManagerHandler poolManager) throws Exception;


/**
* Free allocated resources
*/
void destroy();

}

1 comment on commit f1a8f78

@mksahakyan
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retest this please

Please sign in to comment.