Skip to content

Commit

Permalink
nfs: add embedded Kafka producer
Browse files Browse the repository at this point in the history
Motivation

This patch is the continuation of  https://rb.dcache.org/r/10691/.

It aims  to embed KafkaProducer so that dCache, if required, may publish DoorRequestInfoMessage info to Kafka.

Modification

Add KafkaTemplate and kafkaProducer to 'nfs41-common.xml'.

Target: master
Require-book: no
Require-notes: no
Patch: https://rb.dcache.org/r/10771/
Acked-by:Tigran Mkrtchyan
  • Loading branch information
mksahakyan committed Feb 26, 2018
1 parent eefb964 commit 5f97c5a
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

import javax.security.auth.Subject;
Expand All @@ -27,7 +30,8 @@
import diskCacheV111.util.FileNotFoundCacheException;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.PnfsHandler;
import diskCacheV111.util.PnfsId;;
import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.DoorRequestInfoMessage;
import diskCacheV111.vehicles.DoorTransferFinishedMessage;
import diskCacheV111.vehicles.IoDoorEntry;
import diskCacheV111.vehicles.IoDoorInfo;
Expand Down Expand Up @@ -61,6 +65,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.dcache.auth.Subjects;
Expand Down Expand Up @@ -139,6 +144,8 @@
import javax.annotation.concurrent.GuardedBy;

import org.dcache.auth.attributes.Restrictions;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;

import static org.dcache.chimera.nfsv41.door.ExceptionUtils.asNfsException;

Expand Down Expand Up @@ -235,6 +242,8 @@ layouttype4.LAYOUT4_NFSV4_1_FILES, new NfsV41FileLayoutDriver()

private ProxyIoFactory _proxyIoFactory;

private Consumer<DoorRequestInfoMessage> _kafkaSender = (s) -> {};

/**
* Retry policy used for accessing files.
*/
Expand Down Expand Up @@ -318,6 +327,27 @@ public void setAccessLogMode(AccessLogMode accessLogMode) {
_accessLogMode = accessLogMode;
}

@Autowired(required = false)
private void setTransferTemplate(KafkaTemplate kafkaTemplate) {
kafkaTemplate.setProducerListener(new ProducerListener() {
@Override
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
_log.info("Sent message.");
}

@Override
public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
_log.error("Unable to send message: {}", exception.getMessage());
}

@Override
public boolean isInterestedInSuccess() {
return false;
}
});
_kafkaSender = kafkaTemplate::sendDefault;
}

public void init() throws Exception {

_chimeraVfs = new ChimeraVfs(_fileFileSystemProvider, _idMapper);
Expand Down Expand Up @@ -564,6 +594,7 @@ public Layout layoutGet(CompoundContext context, Inode nfsInode, layouttype4 lay
transfer.setPnfsId(pnfsId);
transfer.setClientAddress(remote);
transfer.setIoQueue(_ioQueue);
transfer.setKafkaSender(_kafkaSender);

/*
* As all our layouts marked 'return-on-close', stop mover when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,30 @@
crreate a place holder to keep depemdency happy
-->
<bean id="rpcbind" class="java.lang.Object"/>

</beans>
<beans profile="kafka-true">
<bean id="transfer-template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${nfs.kafka.bootstrap-servers}"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.dcache.notification.DoorRequestMessageSerializer" />
<entry key="client.id" value="${nfs.cell.name}@${dcache.domain.name}" />
<!-- TODO values for max.block.ms and retries are important for Non Blocking (Async) callback.-->
<!-- maximum time producer.send() will block by default set to 60000. Could be changed latter. -->
<entry key="max.block.ms" value="0" />
<!-- The maximum number of times to retry a call before failing it.-->
<entry key="retries" value="0" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg name="autoFlush" value="false" />
<property name="defaultTopic" value="billing"/>
</bean>
</beans>

</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.dcache.notification;

import org.apache.kafka.common.serialization.Serializer;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.Date;
import java.util.Map;

import diskCacheV111.vehicles.DoorRequestInfoMessage;

import static java.nio.charset.StandardCharsets.UTF_8;

public class DoorRequestMessageSerializer implements Serializer<DoorRequestInfoMessage> {

@Override
public byte[] serialize(String topic, DoorRequestInfoMessage data) {

JSONObject o = new JSONObject();
o.put("VERSION", "1.0");
o.put("msgType", data.getMessageType());
o.put("date", new Date(data.getTimestamp()));
o.put("queuingTime", data.getTimeQueued());
o.put("cellName", data.getCellAddress().getCellName());
o.put("cellType", data.getCellType());
o.put("cellDomain", data.getCellAddress().getCellDomainName());

JSONObject status = new JSONObject();
status.put("code", data.getResultCode());
status.put("msg", data.getMessage());
o.put("status", status);

o.put("session", data.getTransaction());
o.put("sessionDuration", data.getTransactionDuration());
o.put("transferPath", data.getTransferPath());

JSONArray subject = new JSONArray();
data.getSubject().getPrincipals().forEach(s -> subject.put(s));
o.put("subject", subject);
o.put("client", data.getClient());
o.put("clientChain", data.getClientChain());
o.put("mappedUID", data.getUid());
o.put("mappedGID", data.getGid());
o.put("owner", data.getOwner());
o.put("pnfsid", data.getPnfsId());
o.put("billingPath", data.getBillingPath());
o.put("fileSize", data.getFileSize());
o.put("storageInfo", data.getStorageInfo());
return o.toString().getBytes(UTF_8);

}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public void close() {
}
}






14 changes: 14 additions & 0 deletions modules/dcache/src/main/java/org/dcache/util/Transfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import diskCacheV111.poolManager.RequestContainerV5;
Expand Down Expand Up @@ -60,6 +62,7 @@
import diskCacheV111.vehicles.PoolMoverKillMessage;
import diskCacheV111.vehicles.ProtocolInfo;


import dmg.cells.nucleus.CDC;
import dmg.cells.nucleus.CellAddressCore;
import dmg.cells.nucleus.CellPath;
Expand Down Expand Up @@ -138,6 +141,9 @@ public class Transfer implements Comparable<Transfer>
private Set<FileAttribute> _additionalAttributes =
EnumSet.noneOf(FileAttribute.class);

private Consumer<DoorRequestInfoMessage> _kafkaSender = (s) -> {};


private static final ThreadFactory RETRY_THREAD_FACTORY =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("transfer-retry-timer-%d").build();
private static final ListeningScheduledExecutorService RETRY_EXECUTOR =
Expand Down Expand Up @@ -282,6 +288,12 @@ public synchronized void setBillingStub(CellStub stub)
_billing = stub;
}

public synchronized void setKafkaSender(Consumer<DoorRequestInfoMessage> kafkaSender)
{
_kafkaSender = kafkaSender;
}



public synchronized void
setCheckStagePermission(CheckStagePermission checkStagePermission)
Expand Down Expand Up @@ -1235,8 +1247,10 @@ public synchronized void notifyBilling(int code, String error)
_billing.notify(msg);

_isBillingNotified = true;
_kafkaSender.accept(msg);
}


private static long getTimeoutFor(long deadline)
{
return subWithInfinity(deadline, System.currentTimeMillis());
Expand Down
12 changes: 12 additions & 0 deletions skel/share/defaults/nfs.properties
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,15 @@ nfs.cell.max-messages-queued = 1000
(obsolete)nfs.cell.limits.message.threads.max = See nfs.cell.max-message-threads
(obsolete)nfs.cell.limits.message.queue.max = See nfs.cell.max-message-queues
(obsolete)nfs.pnfs.layouts = Use `lt=` option in export file


# ---- Kafka service enabled
#
# If enabled, the pool will publish a messages to a Kafka cluster after each transfer.
# These messages contain information also sent to billing.
(one-of?true|false)nfs.enable.kafka = false

# A list of host/port pairs (brokers) to use for establishing the initial connection to the Kafka cluster.
# This list is just used to discover the rest of the brokers in the cluster and should be in the form
# host1:port1,host2:port2,....
nfs.kafka.bootstrap-servers= localhost:9092
2 changes: 1 addition & 1 deletion skel/share/services/nfs.batch
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ create org.dcache.cells.UniversalSpringCell ${nfs.cell.name} \
-consume=${nfs.cell.consume} \
-subscribe=${nfs.cell.subscribe} \
-cell.max-message-threads=${nfs.cell.max-message-threads} -cell.max-messages-queued=${nfs.cell.max-messages-queued} \
-profiles=portmap-${nfs.enable.portmap}"
-profiles=portmap-${nfs.enable.portmap},kafka-${nfs.enable.kafka}"

0 comments on commit 5f97c5a

Please sign in to comment.