Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish segment schema from MSQ engine #16463

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,12 @@
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SegmentMetadata;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
Expand All @@ -191,6 +195,7 @@
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
Expand All @@ -200,6 +205,7 @@
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegmentExtendedWithSchema;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
Expand Down Expand Up @@ -244,6 +250,7 @@ public class ControllerImpl implements Controller
private final MSQSpec querySpec;
private final ResultsContext resultsContext;
private final ControllerContext context;
private final boolean publishSchema;
private volatile ControllerQueryKernelConfig queryKernelConfig;

/**
Expand Down Expand Up @@ -324,6 +331,10 @@ public ControllerImpl(
this.querySpec = Preconditions.checkNotNull(querySpec, "querySpec");
this.resultsContext = Preconditions.checkNotNull(resultsContext, "resultsContext");
this.context = Preconditions.checkNotNull(controllerContext, "controllerContext");
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig =
context.injector().getInstance(CentralizedDatasourceSchemaConfig.class);

publishSchema= centralizedDatasourceSchemaConfig != null && centralizedDatasourceSchemaConfig.isEnabled();
}

@Override
Expand Down Expand Up @@ -582,7 +593,8 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
queryId(),
makeQueryControllerToolKit(),
querySpec,
context.jsonMapper()
context.jsonMapper(),
publishSchema
);

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1312,19 +1324,22 @@ private void postResultPartitionBoundariesForStage(
* Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws IOException
private void publishAllSegments(final DataSegmentsWithSchemas dataSegmentsWithSchemas) throws IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) querySpec.getDestination();
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(dataSegmentsWithSchemas.getSegments());
int numTombstones = 0;
final TaskLockType taskLockType = MultiStageQueryContext.validateAndGetTaskLockType(
QueryContext.of(querySpec.getQuery().getContext()),
destination.isReplaceTimeChunks()
);

if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(
dataSegmentsWithSchemas.getSegments(),
"segments"
));

if (!intervalsToDrop.isEmpty()) {
TombstoneHelper tombstoneHelper = new TombstoneHelper(context.taskActionClient());
Expand Down Expand Up @@ -1368,24 +1383,28 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
}
performSegmentPublish(
context.taskActionClient(),
createOverwriteAction(taskLockType, segmentsWithTombstones)
createOverwriteAction(taskLockType, segmentsWithTombstones, dataSegmentsWithSchemas.getSegmentSchemaMapping())
);
}
} else if (!segments.isEmpty()) {
} else if (!dataSegmentsWithSchemas.getSegments().isEmpty()) {
if (MultiStageQueryContext.shouldWaitForSegmentLoad(querySpec.getQuery().context())) {
segmentLoadWaiter = new SegmentLoadStatusFetcher(
context.injector().getInstance(BrokerClient.class),
context.jsonMapper(),
queryId,
destination.getDataSource(),
segments,
dataSegmentsWithSchemas.getSegments(),
true
);
}
// Append mode.
performSegmentPublish(
context.taskActionClient(),
createAppendAction(segments, taskLockType)
createAppendAction(
dataSegmentsWithSchemas.getSegments(),
dataSegmentsWithSchemas.getSegmentSchemaMapping(),
taskLockType
)
);
}

Expand All @@ -1396,27 +1415,29 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept

private static TaskAction<SegmentPublishResult> createAppendAction(
Set<DataSegment> segments,
SegmentSchemaMapping segmentSchemaMapping,
TaskLockType taskLockType
)
{
if (taskLockType.equals(TaskLockType.APPEND)) {
return SegmentTransactionalAppendAction.forSegments(segments, null);
return SegmentTransactionalAppendAction.forSegments(segments, segmentSchemaMapping);
} else if (taskLockType.equals(TaskLockType.SHARED)) {
return SegmentTransactionalInsertAction.appendAction(segments, null, null, null);
return SegmentTransactionalInsertAction.appendAction(segments, null, null, segmentSchemaMapping);
} else {
throw DruidException.defensive("Invalid lock type [%s] received for append action", taskLockType);
}
}

private TaskAction<SegmentPublishResult> createOverwriteAction(
TaskLockType taskLockType,
Set<DataSegment> segmentsWithTombstones
Set<DataSegment> segmentsWithTombstones,
SegmentSchemaMapping segmentSchemaMapping
)
{
if (taskLockType.equals(TaskLockType.REPLACE)) {
return SegmentTransactionalReplaceAction.create(segmentsWithTombstones, null);
return SegmentTransactionalReplaceAction.create(segmentsWithTombstones, segmentSchemaMapping);
} else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones, null);
return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones, segmentSchemaMapping);
} else {
throw DruidException.defensive("Invalid lock type [%s] received for overwrite action", taskLockType);
}
Expand Down Expand Up @@ -1496,11 +1517,51 @@ private void handleQueryResults(
return;
}
if (MSQControllerTask.isIngestion(querySpec)) {
// Publish segments if needed.
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());

@SuppressWarnings("unchecked")
Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
Set<? extends DataSegment> segmentsPlus = (Set<? extends DataSegment>) queryKernel.getResultObjectForStage(finalStageId);

Set<DataSegment> segments;
SegmentSchemaMapping segmentSchemaMapping = null;
boolean instanceOfSegmentWithSchema = !segmentsPlus.isEmpty() && segmentsPlus.iterator().next() instanceof DataSegmentExtendedWithSchema;

if (instanceOfSegmentWithSchema) {
segments = new HashSet<>();
Set<DataSegmentExtendedWithSchema> segmentsWithSchema = (Set<DataSegmentExtendedWithSchema>) segmentsPlus;
Map<String, SegmentMetadata> segmentIdToMetadataMap = new HashMap<>();
Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>();

Set<Integer> distinctVersions = new HashSet<>();

for (DataSegmentExtendedWithSchema segmentWithSchema : segmentsWithSchema) {
segments.add(segmentWithSchema);

segmentIdToMetadataMap.put(
segmentWithSchema.getId().toString(),
new SegmentMetadata(
segmentWithSchema.getSchemaPayloadPlus().getNumRows(),
segmentWithSchema.getSchemaFingerprint()
)
);
schemaPayloadMap.put(
segmentWithSchema.getSchemaFingerprint(),
segmentWithSchema.getSchemaPayloadPlus().getSchemaPayload()
);

distinctVersions.add(segmentWithSchema.getSchemaVersion());
}

// if there are more than one schema version, it implies there is a version mismatch, skip persisting the schema
if (distinctVersions.size() == 1) {
segmentSchemaMapping = new SegmentSchemaMapping(
segmentIdToMetadataMap,
schemaPayloadMap,
distinctVersions.iterator().next()
);
}
} else {
segments = (Set<DataSegment>) segmentsPlus;
}

boolean storeCompactionState = QueryContext.of(querySpec.getQuery().getContext())
.getBoolean(
Expand Down Expand Up @@ -1533,7 +1594,8 @@ private void handleQueryResults(
}
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);

publishAllSegments(new DataSegmentsWithSchemas(segments, segmentSchemaMapping));
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
Expand All @@ -1542,7 +1604,6 @@ private void handleQueryResults(
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
//noinspection unchecked


Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId);
if (!(resultObjectForStage instanceof List)) {
// This might occur if all workers are running on an older version. We are not able to write a manifest file in this case.
Expand Down Expand Up @@ -1673,7 +1734,8 @@ private static QueryDefinition makeQueryDefinition(
final String queryId,
@SuppressWarnings("rawtypes") final QueryKit toolKit,
final MSQSpec querySpec,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final boolean publishSchema
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
Expand Down Expand Up @@ -1775,7 +1837,8 @@ private static QueryDefinition makeQueryDefinition(
new SegmentGeneratorFrameProcessorFactory(
dataSchema,
columnMappings,
tuningConfig
tuningConfig,
publishSchema
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,33 @@
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.segment.SegmentMetadata;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegmentExtendedWithSchema;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class SegmentGeneratorFrameProcessor implements FrameProcessor<DataSegment>
public class SegmentGeneratorFrameProcessor implements FrameProcessor<DataSegmentExtendedWithSchema>
{
private static final Logger log = new Logger(SegmentGeneratorFrameProcessor.class);

Expand Down Expand Up @@ -121,7 +128,7 @@ public List<WritableFrameChannel> outputChannels()
}

@Override
public ReturnOrAwait<DataSegment> runIncrementally(final IntSet readableInputs) throws InterruptedException
public ReturnOrAwait<DataSegmentExtendedWithSchema> runIncrementally(final IntSet readableInputs) throws InterruptedException
{
if (firstRun) {
log.debug("Starting job for segment [%s].", segmentIdWithShardSpec.asSegmentId());
Expand Down Expand Up @@ -157,7 +164,7 @@ public ReturnOrAwait<DataSegment> runIncrementally(final IntSet readableInputs)
appenderator.clear();

log.debug("Finished work for segment [%s].", segmentIdWithShardSpec.asSegmentId());
return ReturnOrAwait.returnObject(Iterables.getOnlyElement(metadata.getSegments()));
return ReturnOrAwait.returnObject(getSegmentAndSchema(metadata));
}
} else {
if (appenderator.getSegments().isEmpty()) {
Expand All @@ -169,6 +176,33 @@ public ReturnOrAwait<DataSegment> runIncrementally(final IntSet readableInputs)
}
}

private DataSegmentExtendedWithSchema getSegmentAndSchema(SegmentsAndCommitMetadata metadata)
{
DataSegment dataSegment = Iterables.getOnlyElement(metadata.getSegments());
SegmentSchemaMapping segmentSchemaMapping = metadata.getSegmentSchemaMapping();

SchemaPayloadPlus schemaPayloadPlus = null;
String fingerprint = null;

if (segmentSchemaMapping != null) {
Map<String, SegmentMetadata> segmentMetadataMap = segmentSchemaMapping.getSegmentIdToMetadataMap();
SegmentMetadata segmentMetadata = segmentMetadataMap.get(dataSegment.getId().toString());
if (segmentMetadata != null) {
fingerprint = segmentMetadata.getSchemaFingerprint();
SchemaPayload schemaPayload = segmentSchemaMapping.getSchemaFingerprintToPayloadMap().get(fingerprint);
long numRows = segmentMetadata.getNumRows();
schemaPayloadPlus = new SchemaPayloadPlus(schemaPayload, numRows);
}
}

return new DataSegmentExtendedWithSchema(
dataSegment,
schemaPayloadPlus,
fingerprint,
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);
}

@Override
public void cleanup() throws IOException
{
Expand Down
Loading
Loading