Skip to content

Commit

Permalink
Fix bad lookup config fails task (#12021)
Browse files Browse the repository at this point in the history
This PR fixes an issue in which if a lookup is configured incorreclty; does not serialize properly when being pulled by peon node, it causes the task to fail. The failure occurs because the peon and other leaf nodes (broker, historical), have retry logic that continues to retry the lookup loading for 3 minutes by default. The http listener thread on the peon task is not started until lookup loading completes, by default, the overlord waits 1 minute by default, to communicate with the peon task to get the task status, after which is orders the task to shut down, causing the ingestion task to fail.

To fix the issue, we catch the exception serialization error, and do not retry. Also fixed an issue in which a bad lookup config interferes with any other good lookup configs from being loaded.
  • Loading branch information
zachjsh committed Dec 7, 2021
1 parent 834aae0 commit 65cadbe
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 10 deletions.
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.lookup;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.server.DruidNode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;

public class LookupUtilsTest
{
private static final TypeReference<Map<String, Object>> LOOKUPS_ALL_GENERIC_REFERENCE =
new TypeReference<Map<String, Object>>()
{
};

private static final TypeReference<Map<String, LookupExtractorFactoryContainer>> LOOKUPS_ALL_REFERENCE =
new TypeReference<Map<String, LookupExtractorFactoryContainer>>()
{
};

private static final String LOOKUP_VALID_INNER = " \"lookup_uri_good\": {\n"
+ " \"version\": \"2021-12-03T01:04:05.151Z\",\n"
+ " \"lookupExtractorFactory\": {\n"
+ " \"type\": \"cachedNamespace\",\n"
+ " \"extractionNamespace\": {\n"
+ " \"type\": \"uri\",\n"
+ " \"uri\": \"file:///home/lookup_data.json\",\n"
+ " \"namespaceParseSpec\": {\n"
+ " \"format\": \"simpleJson\"\n"
+ " },\n"
+ " \"pollPeriod\": \"PT30S\",\n"
+ " \"maxHeapPercentage\": 1\n"
+ " }\n"
+ " }\n"
+ " }";


private static final String LOOKUP_VALID = "{\n"
+ LOOKUP_VALID_INNER + "\n"
+ "}";

private static final String LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER =
" \"lookup_keyColumn_but_no_valueColumn\": {\n"
+ " \"version\": \"2021-12-03T02:17:01.983Z\",\n"
+ " \"lookupExtractorFactory\": {\n"
+ " \"type\": \"cachedNamespace\",\n"
+ " \"extractionNamespace\": {\n"
+ " \"type\": \"uri\",\n"
+ " \"fileRegex\": \".*csv\",\n"
+ " \"uriPrefix\": \"s3://bucket/path/\",\n"
+ " \"namespaceParseSpec\": {\n"
+ " \"format\": \"csv\",\n"
+ " \"columns\": [\n"
+ " \"cluster_id\",\n"
+ " \"account_id\",\n"
+ " \"manager_host\"\n"
+ " ],\n"
+ " \"keyColumn\": \"cluster_id\",\n"
+ " \"hasHeaderRow\": true,\n"
+ " \"skipHeaderRows\": 1\n"
+ " },\n"
+ " \"pollPeriod\": \"PT30S\"\n"
+ " }\n"
+ " }\n"
+ " }";

private static final String LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN =
"{\n"
+ LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER + "\n"
+ "}";

private static final String LOOKSUPS_INVALID_AND_VALID = "{\n"
+ LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN_INNER + ",\n"
+ LOOKUP_VALID_INNER + "\n"
+ "}";
private ObjectMapper mapper;

@Before
public void setup()
{
final Injector injector = makeInjector();
mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
mapper.registerSubtypes(NamespaceLookupExtractorFactory.class);
}

@Test
public void test_tryConvertObjectMapToLookupConfigMap_allValid() throws IOException
{
mapper.registerSubtypes(NamespaceLookupExtractorFactory.class);
Map<String, LookupExtractorFactoryContainer> validLookupExpected = mapper.readValue(
LOOKUP_VALID,
LOOKUPS_ALL_REFERENCE);

Map<String, Object> validLookupGeneric = mapper.readValue(
LOOKUP_VALID,
LOOKUPS_ALL_GENERIC_REFERENCE);
Map<String, LookupExtractorFactoryContainer> actualLookup =
LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper);

Assert.assertEquals(mapper.writeValueAsString(validLookupExpected), mapper.writeValueAsString(actualLookup));
}

@Test
public void test_tryConvertObjectMapToLookupConfigMap_allInvalid_emptyMap()
throws IOException
{
mapper.registerSubtypes(NamespaceLookupExtractorFactory.class);

Map<String, Object> validLookupGeneric = mapper.readValue(
LOOKUP_WITH_KEY_COLUMN_BUT_NO_VALUE_COLUMN,
LOOKUPS_ALL_GENERIC_REFERENCE);
Map<String, LookupExtractorFactoryContainer> actualLookup =
LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper);

Assert.assertTrue(actualLookup.isEmpty());
}

@Test
public void test_tryConvertObjectMapToLookupConfigMap_goodAndBadConfigs_skipsBad()
throws IOException
{
mapper.registerSubtypes(NamespaceLookupExtractorFactory.class);
Map<String, LookupExtractorFactoryContainer> validLookupExpected = mapper.readValue(
LOOKUP_VALID,
LOOKUPS_ALL_REFERENCE);

Map<String, Object> validLookupGeneric = mapper.readValue(
LOOKSUPS_INVALID_AND_VALID,
LOOKUPS_ALL_GENERIC_REFERENCE);
Map<String, LookupExtractorFactoryContainer> actualLookup =
LookupUtils.tryConvertObjectMapToLookupConfigMap(validLookupGeneric, mapper);

Assert.assertEquals(mapper.writeValueAsString(validLookupExpected), mapper.writeValueAsString(actualLookup));
}

private Injector makeInjector()
{
return Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder,
Key.get(DruidNode.class, Self.class),
new DruidNode("test-inject", null, false, null, null, true, false)
);
}
}
)
);
}
}
Expand Up @@ -46,8 +46,8 @@ class LookupListeningResource extends ListenerResource
{
private static final Logger LOG = new Logger(LookupListeningResource.class);

private static final TypeReference<LookupsState<LookupExtractorFactoryContainer>> LOOKUPS_STATE_TYPE_REFERENCE =
new TypeReference<LookupsState<LookupExtractorFactoryContainer>>()
private static final TypeReference<LookupsState<Object>> LOOKUPS_STATE_GENERIC_REFERENCE =
new TypeReference<LookupsState<Object>>()
{
};

Expand All @@ -68,9 +68,22 @@ public LookupListeningResource(
@Override
public Response handleUpdates(InputStream inputStream, ObjectMapper mapper)
{
final LookupsState<Object> stateGeneric;
final LookupsState<LookupExtractorFactoryContainer> state;
final Map<String, LookupExtractorFactoryContainer> current;
final Map<String, LookupExtractorFactoryContainer> toLoad;
try {
state = mapper.readValue(inputStream, LOOKUPS_STATE_TYPE_REFERENCE);
stateGeneric = mapper.readValue(inputStream, LOOKUPS_STATE_GENERIC_REFERENCE);
current = LookupUtils.tryConvertObjectMapToLookupConfigMap(
stateGeneric.getCurrent(),
mapper
);
toLoad = LookupUtils.tryConvertObjectMapToLookupConfigMap(
stateGeneric.getToLoad(),
mapper
);

state = new LookupsState<>(current, toLoad, stateGeneric.getToDrop());
}
catch (final IOException ex) {
LOG.debug(ex, "Bad request");
Expand Down
Expand Up @@ -84,8 +84,8 @@ public class LookupReferencesManager implements LookupExtractorFactoryContainerP
{
private static final EmittingLogger LOG = new EmittingLogger(LookupReferencesManager.class);

private static final TypeReference<Map<String, LookupExtractorFactoryContainer>> LOOKUPS_ALL_REFERENCE =
new TypeReference<Map<String, LookupExtractorFactoryContainer>>()
private static final TypeReference<Map<String, Object>> LOOKUPS_ALL_GENERIC_REFERENCE =
new TypeReference<Map<String, Object>>()
{
};

Expand Down Expand Up @@ -429,7 +429,8 @@ private List<LookupBean> getLookupListFromCoordinator(String tier)
}

@Nullable
private Map<String, LookupExtractorFactoryContainer> tryGetLookupListFromCoordinator(String tier) throws Exception
private Map<String, LookupExtractorFactoryContainer> tryGetLookupListFromCoordinator(String tier)
throws IOException, InterruptedException
{
final StringFullResponseHolder response = fetchLookupsForTier(tier);
if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
Expand All @@ -454,7 +455,12 @@ private Map<String, LookupExtractorFactoryContainer> tryGetLookupListFromCoordin
);
return null;
} else {
return jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_REFERENCE);
Map<String, Object> lookupNameToGenericConfig =
jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_GENERIC_REFERENCE);
return LookupUtils.tryConvertObjectMapToLookupConfigMap(
lookupNameToGenericConfig,
jsonMapper
);
}
}

Expand Down
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.lookup;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import org.apache.druid.java.util.emitter.EmittingLogger;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;

/**
* Utility class for lookup related things
*/
public class LookupUtils
{

private static final EmittingLogger LOG = new EmittingLogger(LookupUtils.class);

private LookupUtils()
{

}

/**
* Takes a map of String to Object, representing lookup name to generic lookup config, and attempts to construct
* a map from String to {@link LookupExtractorFactoryContainer}. Any lookup configs that are not able to be converted
* to {@link LookupExtractorFactoryContainer}, will be logged as warning, and will not be included in the map
* returned.
*
* @param lookupNameToGenericConfig The lookup generic config map.
* @param objectMapper The object mapper to use to convert bytes to {@link LookupExtractorFactoryContainer}
* @return
*/
public static Map<String, LookupExtractorFactoryContainer> tryConvertObjectMapToLookupConfigMap(
Map<String, Object> lookupNameToGenericConfig,
ObjectMapper objectMapper
)
{
Map<String, LookupExtractorFactoryContainer> lookupNameToConfig =
Maps.newHashMapWithExpectedSize(lookupNameToGenericConfig.size());
for (Map.Entry<String, Object> lookupNameAndConfig : lookupNameToGenericConfig.entrySet()) {
String lookupName = lookupNameAndConfig.getKey();
LookupExtractorFactoryContainer lookupConfig = tryConvertObjectToLookupConfig(
lookupName,
lookupNameAndConfig.getValue(),
objectMapper
);
if (lookupConfig != null) {
lookupNameToConfig.put(lookupName, lookupConfig);
}

}
return lookupNameToConfig;
}

@Nullable
private static LookupExtractorFactoryContainer tryConvertObjectToLookupConfig(
String lookupName,
Object o,
ObjectMapper objectMapper)
{
try {
byte[] lookupConfigBytes = objectMapper.writeValueAsBytes(o);
return objectMapper.readValue(
lookupConfigBytes,
LookupExtractorFactoryContainer.class
);
}
catch (IOException e) {
LOG.warn("Lookup [%s] could not be serialized properly. Please check its configuration. Error: %s",
lookupName,
e.getMessage()
);
}
return null;
}
}
Expand Up @@ -28,9 +28,9 @@
import java.util.Objects;

/**
* This is same as LookupExtractorFactoryContainer except it uses Map<String, Object> instead of
* LookupExtractorFactory for referencing lookup spec so that lookup extensions are not required to
* be loaded at the Coordinator.
* This is same as {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainer } except it uses
* Map<String, Object> instead of LookupExtractorFactory for referencing lookup spec so that lookup extensions are not
* required to be loaded at the Coordinator.
*/
public class LookupExtractorFactoryMapContainer
{
Expand Down

0 comments on commit 65cadbe

Please sign in to comment.