Skip to content
Permalink
Browse files
fix DruidSchema issue where datasources with no segments can become s…
…tuck in tables list indefinitely (#12727)
  • Loading branch information
clintropolis committed Jul 2, 2022
1 parent f5b5cb9 commit bbbb6e1c3f9ea52766fe0c330fe8e6d0249560a4
Showing 3 changed files with 26 additions and 2 deletions.
@@ -84,7 +84,8 @@ public void testBroadcastJoin() throws Exception
ImmutableList.of()
);
}
catch (Exception ignored) {
catch (Exception e) {
LOG.error(e, "Failed to post load rules");
}
});

@@ -127,6 +128,7 @@ public void testBroadcastJoin() throws Exception
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE), BROADCAST_JOIN_DATASOURCE)
);
}

finally {
closer.close();

@@ -68,6 +68,7 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
@@ -402,6 +403,11 @@ void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> dataSourc
// Rebuild the dataSources.
for (String dataSource : dataSourcesToRebuild) {
final DruidTable druidTable = buildDruidTable(dataSource);
if (druidTable == null) {
log.info("dataSource[%s] no longer exists, all metadata removed.", dataSource);
tables.remove(dataSource);
continue;
}
final DruidTable oldTable = tables.put(dataSource, druidTable);
final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource";
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
@@ -773,12 +779,13 @@ private Set<SegmentId> refreshSegmentsForDataSource(final String dataSource, fin
}

@VisibleForTesting
@Nullable
DruidTable buildDruidTable(final String dataSource)
{
ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> segmentsMap = segmentMetadataInfo.get(dataSource);
final Map<String, ColumnType> columnTypes = new TreeMap<>();

if (segmentsMap != null) {
if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (AvailableSegmentMetadata availableSegmentMetadata : segmentsMap.values()) {
final RowSignature rowSignature = availableSegmentMetadata.getRowSignature();
if (rowSignature != null) {
@@ -792,6 +799,9 @@ DruidTable buildDruidTable(final String dataSource)
}
}
}
} else {
// table has no segments
return null;
}

final RowSignature.Builder builder = RowSignature.builder();
@@ -77,6 +77,7 @@
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1199,6 +1200,17 @@ public void testSegmentMetadataFallbackType()
);
}

@Test
public void testStaleDatasourceRefresh() throws IOException
{
Set<SegmentId> segments = new HashSet<>();
Set<String> datasources = new HashSet<>();
datasources.add("wat");
Assert.assertNull(schema.getTable("wat"));
schema.refresh(segments, datasources);
Assert.assertNull(schema.getTable("wat"));
}

private static DataSegment newSegment(String datasource, int partitionId)
{
return new DataSegment(

0 comments on commit bbbb6e1

Please sign in to comment.