Skip to content

Commit

Permalink
Fix node discovery to ignore unknown DruidServices (#12157)
Browse files Browse the repository at this point in the history
* Fix node discovery to ignore unknown DruidServices

* ignore all runtime exceptions

* fix test

* add custom deserializer

* custom serializer

* log host for unparseable druidService
  • Loading branch information
jihoonson committed Jan 19, 2022
1 parent 53c0e48 commit cc2ffc6
Show file tree
Hide file tree
Showing 17 changed files with 1,025 additions and 31 deletions.
Expand Up @@ -177,7 +177,7 @@ private DruidServer toDruidServer(DiscoveryDruidNode node)
node.getDruidNode().getHostAndPort(),
node.getDruidNode().getHostAndTlsPort(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getType(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getServerType(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority()
);
Expand Down
Expand Up @@ -22,45 +22,94 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.server.coordination.ServerType;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;

/**
* Metadata announced by any node that serves segments.
*
* Note for JSON serialization and deserialization.
*
* This class has a bug that it has the "type" property which is the duplicate name
* with the subtype key of {@link DruidService}. It seems to happen to work
* if the "type" subtype key appears first in the serialized JSON since
* Jackson uses the first "type" property as the subtype key.
* To always enforce this property order, this class does not use the {@link JsonProperty} annotation for serialization,
* but uses {@link org.apache.druid.jackson.DruidServiceSerializer}.
* Since this is a hacky-way to not break compatibility, a new "serverType" field is added
* to replace the deprecated "type" field. Once we completely remove the "type" field from this class,
* we can remove DruidServiceSerializer as well.
*
* The set of properties to serialize is hard-coded in DruidServiceSerializer.
* If you want to add a new field in this class before we remove the "type" field,
* you must add a proper handling of that new field in DruidServiceSerializer as well.
*
* For deserialization, DruidServices are deserialized as a part of {@link DiscoveryDruidNode}.
* To handle the bug of the duplicate "type" key, DiscoveryDruidNode first deserializes
* the JSON to {@link org.apache.druid.jackson.StringObjectPairList},
* handles the duplicate "type" keys in the StringObjectPairList,
* and then finally converts it to a DruidService. See {@link DiscoveryDruidNode#toMap(List)}.
*
* @see org.apache.druid.jackson.DruidServiceSerializer
* @see DiscoveryDruidNode#toMap(List)
*/
public class DataNodeService extends DruidService
{
public static final String DISCOVERY_SERVICE_KEY = "dataNodeService";
public static final String SERVER_TYPE_PROP_KEY = "serverType";

private final String tier;
private final long maxSize;
private final ServerType type;
private final ServerType serverType;
private final int priority;
private final boolean isDiscoverable;

/**
* This JSON creator requires for the "type" subtype key of {@link DruidService} to appear before
* the "type" property of this class in the serialized JSON. Deserialization can fail otherwise.
* See the Javadoc of this class for more details.
*/
@JsonCreator
public DataNodeService(
public static DataNodeService fromJson(
@JsonProperty("tier") String tier,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") ServerType type,
@JsonProperty("type") @Deprecated @Nullable ServerType type,
@JsonProperty(SERVER_TYPE_PROP_KEY) @Nullable ServerType serverType,
@JsonProperty("priority") int priority
)
{
this(tier, maxSize, type, priority, true);
if (type == null && serverType == null) {
throw new IAE("ServerType is missing");
}
final ServerType theServerType = serverType == null ? type : serverType;
return new DataNodeService(tier, maxSize, theServerType, priority);
}

public DataNodeService(
String tier,
long maxSize,
ServerType serverType,
int priority
)
{
this(tier, maxSize, serverType, priority, true);
}

public DataNodeService(
String tier,
long maxSize,
ServerType type,
ServerType serverType,
int priority,
boolean isDiscoverable
)
{
this.tier = tier;
this.maxSize = maxSize;
this.type = type;
this.serverType = serverType;
this.priority = priority;
this.isDiscoverable = isDiscoverable;
}
Expand All @@ -71,30 +120,28 @@ public String getName()
return DISCOVERY_SERVICE_KEY;
}

@JsonProperty
public String getTier()
{
return tier;
}

@JsonProperty
public long getMaxSize()
{
return maxSize;
}

@JsonProperty
public ServerType getType()
public ServerType getServerType()
{
return type;
return serverType;
}

@JsonProperty
public int getPriority()
{
return priority;
}

// leaving the "JsonIgnore" annotation to remember that "discoverable" is ignored in serialization,
// even though the annotation is not actually used.
@Override
@JsonIgnore
public boolean isDiscoverable()
Expand All @@ -115,13 +162,13 @@ public boolean equals(Object o)
return maxSize == that.maxSize &&
priority == that.priority &&
Objects.equals(tier, that.tier) &&
type == that.type;
serverType == that.serverType;
}

@Override
public int hashCode()
{
return Objects.hash(tier, maxSize, type, priority);
return Objects.hash(tier, maxSize, serverType, priority);
}

@Override
Expand All @@ -130,7 +177,7 @@ public String toString()
return "DataNodeService{" +
"tier='" + tier + '\'' +
", maxSize=" + maxSize +
", type=" + type +
", serverType=" + serverType +
", priority=" + priority +
'}';
}
Expand Down
Expand Up @@ -19,12 +19,21 @@

package org.apache.druid.discovery;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import org.apache.druid.jackson.StringObjectPairList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

/**
Expand All @@ -35,22 +44,25 @@
*/
public class DiscoveryDruidNode
{
private static final Logger LOG = new Logger(DiscoveryDruidNode.class);

private final DruidNode druidNode;
private final NodeRole nodeRole;

/**
* Other metadata associated with the node e.g.
* if it's a historical node then lookup information, segment loading capacity etc.
* Map of service name -> DruidServices.
* This map has only the DruidServices that is understandable.
* It means, if there is some DruidService not understandable found while converting rawServices to services,
* that DruidService will be ignored and not stored in this map.
*
* @see DruidNodeDiscoveryProvider#SERVICE_TO_NODE_TYPES
*/
private final Map<String, DruidService> services = new HashMap<>();

@JsonCreator
public DiscoveryDruidNode(
@JsonProperty("druidNode") DruidNode druidNode,
@JsonProperty("nodeType") NodeRole nodeRole,
@JsonProperty("services") Map<String, DruidService> services
DruidNode druidNode,
NodeRole nodeRole,
Map<String, DruidService> services
)
{
this.druidNode = druidNode;
Expand All @@ -61,6 +73,75 @@ public DiscoveryDruidNode(
}
}

@JsonCreator
private static DiscoveryDruidNode fromJson(
@JsonProperty("druidNode") DruidNode druidNode,
@JsonProperty("nodeType") NodeRole nodeRole,
@JsonProperty("services") Map<String, StringObjectPairList> rawServices,
@JacksonInject ObjectMapper jsonMapper
)
{
Map<String, DruidService> services = new HashMap<>();
if (rawServices != null && !rawServices.isEmpty()) {
for (Entry<String, StringObjectPairList> entry : rawServices.entrySet()) {
List<NonnullPair<String, Object>> val = entry.getValue().getPairs();
try {
services.put(entry.getKey(), jsonMapper.convertValue(toMap(val), DruidService.class));
}
catch (RuntimeException e) {
LOG.warn("Ignore unparseable DruidService for [%s]: %s", druidNode.getHostAndPortToUse(), val);
}
}
}
return new DiscoveryDruidNode(druidNode, nodeRole, services);
}

/**
* A JSON of a {@link DruidService} is deserialized to a Map and then converted to aDruidService
* to ignore any "unknown" DruidServices to the current node. However, directly deserializing a JSON to a Map
* is problematic for {@link DataNodeService} as it has duplicate "type" keys in its serialized form.
* Because of the duplicate key, if we directly deserialize a JSON to a Map, we will lose one of the "type" property.
* This is definitely a bug of DataNodeService, but, since renaming one of those duplicate keys will
* break compatibility, DataNodeService still has the deprecated "type" property.
* See the Javadoc of DataNodeService for more details.
*
* This function catches such duplicate keys and rewrites the deprecated "type" to "serverType",
* so that we don't lose any properties.
*
* This method can be removed together when we entirely remove the deprecated "type" property from DataNodeService.
*/
@Deprecated
private static Map<String, Object> toMap(List<NonnullPair<String, Object>> pairs)
{
final Map<String, Object> map = Maps.newHashMapWithExpectedSize(pairs.size());
for (NonnullPair<String, Object> pair : pairs) {
final Object prevVal = map.put(pair.lhs, pair.rhs);
if (prevVal != null) {
if ("type".equals(pair.lhs)) {
if (DataNodeService.DISCOVERY_SERVICE_KEY.equals(prevVal)) {
map.put("type", prevVal);
// this one is likely serverType.
map.put(DataNodeService.SERVER_TYPE_PROP_KEY, pair.rhs);
continue;
} else if (DataNodeService.DISCOVERY_SERVICE_KEY.equals(pair.rhs)) {
// this one is likely serverType.
map.put(DataNodeService.SERVER_TYPE_PROP_KEY, prevVal);
continue;
}
} else if (DataNodeService.SERVER_TYPE_PROP_KEY.equals(pair.lhs)) {
// Ignore duplicate "serverType" keys since it can happen
// when the JSON has both "type" and "serverType" keys for serverType.
continue;
}

if (!prevVal.equals(pair.rhs)) {
throw new IAE("Duplicate key[%s] with different values: [%s] and [%s]", pair.lhs, prevVal, pair.rhs);
}
}
}
return map;
}

@JsonProperty
public Map<String, DruidService> getServices()
{
Expand Down
22 changes: 20 additions & 2 deletions server/src/main/java/org/apache/druid/guice/ServerModule.java
Expand Up @@ -19,19 +19,27 @@

package org.apache.druid.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.jackson.DruidServiceSerializerModifier;
import org.apache.druid.jackson.StringObjectPairList;
import org.apache.druid.jackson.ToStringObjectPairListDeserializer;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ZkPathsConfig;

import java.util.List;

/**
*/
public class ServerModule implements Module
public class ServerModule implements DruidModule
{
public static final String ZK_PATHS_PROPERTY_BASE = "druid.zk.paths";

Expand All @@ -47,4 +55,14 @@ public ScheduledExecutorFactory getScheduledExecutorFactory(Lifecycle lifecycle)
{
return ScheduledExecutors.createFactory(lifecycle);
}

@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule()
.addDeserializer(StringObjectPairList.class, new ToStringObjectPairListDeserializer())
.setSerializerModifier(new DruidServiceSerializerModifier())
);
}
}

0 comments on commit cc2ffc6

Please sign in to comment.