Skip to content

Commit

Permalink
[pulsar-broker] Fix: Deserialization failing for ZkIsolatedBookieEnse…
Browse files Browse the repository at this point in the history
…mblePlacementPolicy (#3918)
  • Loading branch information
rdhabalia authored and merlimat committed Mar 28, 2019
1 parent 4a739e5 commit 012b818
Showing 1 changed file with 7 additions and 10 deletions.
Expand Up @@ -36,21 +36,20 @@
import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;


import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;


public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy
implements Deserializer<Map<String, Map<BookieSocketAddress, BookieInfo>>> { implements Deserializer<BookiesRackConfiguration> {
private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class); private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);


public static final String ISOLATION_BOOKIE_GROUPS = "isolationBookieGroups"; public static final String ISOLATION_BOOKIE_GROUPS = "isolationBookieGroups";
Expand All @@ -59,8 +58,6 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl


private final List<String> isolationGroups = new ArrayList<String>(); private final List<String> isolationGroups = new ArrayList<String>();
private final ObjectMapper jsonMapper = ObjectMapperFactory.create(); private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
private final TypeReference<Map<String, Map<BookieSocketAddress, BookieInfo>>> typeRef = new TypeReference<Map<String, Map<BookieSocketAddress, BookieInfo>>>() {
};


public ZkIsolatedBookieEnsemblePlacementPolicy() { public ZkIsolatedBookieEnsemblePlacementPolicy() {
super(); super();
Expand Down Expand Up @@ -139,14 +136,14 @@ private Set<BookieSocketAddress> getBlacklistedBookies() {
Set<BookieSocketAddress> blacklistedBookies = new HashSet<BookieSocketAddress>(); Set<BookieSocketAddress> blacklistedBookies = new HashSet<BookieSocketAddress>();
try { try {
if (bookieMappingCache != null) { if (bookieMappingCache != null) {
Map<String, Map<BookieSocketAddress, BookieInfo>> allGroupsBookieMapping = bookieMappingCache BookiesRackConfiguration allGroupsBookieMapping = bookieMappingCache
.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this) .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
.orElseThrow(() -> new KeeperException.NoNodeException( .orElseThrow(() -> new KeeperException.NoNodeException(
ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)); ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
for (String group : allGroupsBookieMapping.keySet()) { for (String group : allGroupsBookieMapping.keySet()) {
if (!isolationGroups.contains(group)) { if (!isolationGroups.contains(group)) {
for (BookieSocketAddress bookieAddress : allGroupsBookieMapping.get(group).keySet()) { for (String bookieAddress : allGroupsBookieMapping.get(group).keySet()) {
blacklistedBookies.add(bookieAddress); blacklistedBookies.add(new BookieSocketAddress(bookieAddress));
} }
} }
} }
Expand All @@ -158,11 +155,11 @@ private Set<BookieSocketAddress> getBlacklistedBookies() {
} }


@Override @Override
public Map<String, Map<BookieSocketAddress, BookieInfo>> deserialize(String key, byte[] content) throws Exception { public BookiesRackConfiguration deserialize(String key, byte[] content) throws Exception {
LOG.info("Reloading the bookie isolation groups mapping cache."); LOG.info("Reloading the bookie isolation groups mapping cache.");
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loading the bookie mappings with bookie info data: {}", new String(content)); LOG.debug("Loading the bookie mappings with bookie info data: {}", new String(content));
} }
return jsonMapper.readValue(content, typeRef); return jsonMapper.readValue(content, BookiesRackConfiguration.class);
} }
} }

0 comments on commit 012b818

Please sign in to comment.