From ca33d8bf2e131801318b0c170b7e312669443026 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 14 Mar 2024 16:49:41 +0530 Subject: [PATCH 1/3] Fix build --- .../org/apache/druid/sql/calcite/CalciteArraysQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index 141baa5e5308..2c165ffe3c3b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -1160,7 +1160,7 @@ public void testArrayContainsArrayStringColumns() "SELECT ARRAY_CONTAINS(arrayStringNulls, ARRAY['a', 'b']), ARRAY_CONTAINS(arrayStringNulls, arrayString) FROM druid.arrays LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .columns("v0", "v1") .virtualColumns( From ce2af79bd3d95687c849540454151ceb1784366d Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 16 May 2024 11:26:07 +0530 Subject: [PATCH 2/3] MSQ changes to publish segment schema --- .../apache/druid/msq/exec/ControllerImpl.java | 110 ++++++++++--- .../SegmentGeneratorFrameProcessor.java | 43 +++++- ...SegmentGeneratorFrameProcessorFactory.java | 32 +++- .../DataSegmentExtendedWithSchema.java | 145 ++++++++++++++++++ 4 files changed, 299 insertions(+), 31 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/timeline/DataSegmentExtendedWithSchema.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index db7c6838ba7e..956a30592551 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -181,8 +181,12 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.DataSegmentsWithSchemas; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SegmentMetadata; +import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -191,6 +195,7 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.DruidNode; @@ -200,6 +205,7 @@ import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegmentExtendedWithSchema; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -244,6 +250,7 @@ public class ControllerImpl implements Controller private final MSQSpec querySpec; private final ResultsContext resultsContext; private final ControllerContext context; + private final boolean publishSchema; private volatile ControllerQueryKernelConfig queryKernelConfig; /** @@ -324,6 +331,12 @@ public ControllerImpl( this.querySpec = Preconditions.checkNotNull(querySpec, "querySpec"); this.resultsContext = Preconditions.checkNotNull(resultsContext, "resultsContext"); this.context = Preconditions.checkNotNull(controllerContext, "controllerContext"); + CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = + context.injector().getInstance(CentralizedDatasourceSchemaConfig.class); + + publishSchema= centralizedDatasourceSchemaConfig != null && centralizedDatasourceSchemaConfig.isEnabled(); + + log.info("publishSchema is [%s]", publishSchema); } @Override @@ -582,7 +595,8 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) queryId(), makeQueryControllerToolKit(), querySpec, - context.jsonMapper() + context.jsonMapper(), + publishSchema ); if (log.isDebugEnabled()) { @@ -1312,11 +1326,11 @@ private void postResultPartitionBoundariesForStage( * Publish the list of segments. Additionally, if {@link DataSourceMSQDestination#isReplaceTimeChunks()}, * also drop all other segments within the replacement intervals. */ - private void publishAllSegments(final Set segments) throws IOException + private void publishAllSegments(final DataSegmentsWithSchemas dataSegmentsWithSchemas) throws IOException { final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); - final Set segmentsWithTombstones = new HashSet<>(segments); + final Set segmentsWithTombstones = new HashSet<>(dataSegmentsWithSchemas.getSegments()); int numTombstones = 0; final TaskLockType taskLockType = MultiStageQueryContext.validateAndGetTaskLockType( QueryContext.of(querySpec.getQuery().getContext()), @@ -1324,7 +1338,10 @@ private void publishAllSegments(final Set segments) throws IOExcept ); if (destination.isReplaceTimeChunks()) { - final List intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments")); + final List intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull( + dataSegmentsWithSchemas.getSegments(), + "segments" + )); if (!intervalsToDrop.isEmpty()) { TombstoneHelper tombstoneHelper = new TombstoneHelper(context.taskActionClient()); @@ -1368,24 +1385,28 @@ private void publishAllSegments(final Set segments) throws IOExcept } performSegmentPublish( context.taskActionClient(), - createOverwriteAction(taskLockType, segmentsWithTombstones) + createOverwriteAction(taskLockType, segmentsWithTombstones, dataSegmentsWithSchemas.getSegmentSchemaMapping()) ); } - } else if (!segments.isEmpty()) { + } else if (!dataSegmentsWithSchemas.getSegments().isEmpty()) { if (MultiStageQueryContext.shouldWaitForSegmentLoad(querySpec.getQuery().context())) { segmentLoadWaiter = new SegmentLoadStatusFetcher( context.injector().getInstance(BrokerClient.class), context.jsonMapper(), queryId, destination.getDataSource(), - segments, + dataSegmentsWithSchemas.getSegments(), true ); } // Append mode. performSegmentPublish( context.taskActionClient(), - createAppendAction(segments, taskLockType) + createAppendAction( + dataSegmentsWithSchemas.getSegments(), + dataSegmentsWithSchemas.getSegmentSchemaMapping(), + taskLockType + ) ); } @@ -1396,13 +1417,14 @@ private void publishAllSegments(final Set segments) throws IOExcept private static TaskAction createAppendAction( Set segments, + SegmentSchemaMapping segmentSchemaMapping, TaskLockType taskLockType ) { if (taskLockType.equals(TaskLockType.APPEND)) { - return SegmentTransactionalAppendAction.forSegments(segments, null); + return SegmentTransactionalAppendAction.forSegments(segments, segmentSchemaMapping); } else if (taskLockType.equals(TaskLockType.SHARED)) { - return SegmentTransactionalInsertAction.appendAction(segments, null, null, null); + return SegmentTransactionalInsertAction.appendAction(segments, null, null, segmentSchemaMapping); } else { throw DruidException.defensive("Invalid lock type [%s] received for append action", taskLockType); } @@ -1410,13 +1432,14 @@ private static TaskAction createAppendAction( private TaskAction createOverwriteAction( TaskLockType taskLockType, - Set segmentsWithTombstones + Set segmentsWithTombstones, + SegmentSchemaMapping segmentSchemaMapping ) { if (taskLockType.equals(TaskLockType.REPLACE)) { - return SegmentTransactionalReplaceAction.create(segmentsWithTombstones, null); + return SegmentTransactionalReplaceAction.create(segmentsWithTombstones, segmentSchemaMapping); } else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) { - return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones, null); + return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones, segmentSchemaMapping); } else { throw DruidException.defensive("Invalid lock type [%s] received for overwrite action", taskLockType); } @@ -1496,11 +1519,53 @@ private void handleQueryResults( return; } if (MSQControllerTask.isIngestion(querySpec)) { - // Publish segments if needed. final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); - @SuppressWarnings("unchecked") - Set segments = (Set) queryKernel.getResultObjectForStage(finalStageId); + Set segmentsPlus = (Set) queryKernel.getResultObjectForStage(finalStageId); + + Set segments; + SegmentSchemaMapping segmentSchemaMapping = null; + boolean instanceOfSegmentWithSchema = !segmentsPlus.isEmpty() && segmentsPlus.iterator().next() instanceof DataSegmentExtendedWithSchema; + + if (instanceOfSegmentWithSchema) { + log.info("Received instance of DataSegmentWithSchema."); + segments = new HashSet<>(); + Set segmentsWithSchema = (Set) segmentsPlus; + Map segmentIdToMetadataMap = new HashMap<>(); + Map schemaPayloadMap = new HashMap<>(); + + Set distinctVersions = new HashSet<>(); + + for (DataSegmentExtendedWithSchema segmentWithSchema : segmentsWithSchema) { + segments.add(segmentWithSchema); + + segmentIdToMetadataMap.put( + segmentWithSchema.getId().toString(), + new SegmentMetadata( + segmentWithSchema.getSchemaPayloadPlus().getNumRows(), + segmentWithSchema.getSchemaFingerprint() + ) + ); + schemaPayloadMap.put( + segmentWithSchema.getSchemaFingerprint(), + segmentWithSchema.getSchemaPayloadPlus().getSchemaPayload() + ); + + distinctVersions.add(segmentWithSchema.getSchemaVersion()); + } + + // if there are more than one schema version, it implies there is a version mismatch, skip persisting the schema + if (distinctVersions.size() == 1) { + segmentSchemaMapping = new SegmentSchemaMapping( + segmentIdToMetadataMap, + schemaPayloadMap, + distinctVersions.iterator().next() + ); + } + } else { + log.info("Received instance of DataSegment."); + segments = (Set) segmentsPlus; + } boolean storeCompactionState = QueryContext.of(querySpec.getQuery().getContext()) .getBoolean( @@ -1533,7 +1598,11 @@ private void handleQueryResults( } } log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - publishAllSegments(segments); + + if (segmentSchemaMapping != null) { + log.info("SegmentSchemaMapping is not null [%s]", segmentSchemaMapping); + } + publishAllSegments(new DataSegmentsWithSchemas(segments, segmentSchemaMapping)); } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); @@ -1542,7 +1611,6 @@ private void handleQueryResults( final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); //noinspection unchecked - Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId); if (!(resultObjectForStage instanceof List)) { // This might occur if all workers are running on an older version. We are not able to write a manifest file in this case. @@ -1673,7 +1741,8 @@ private static QueryDefinition makeQueryDefinition( final String queryId, @SuppressWarnings("rawtypes") final QueryKit toolKit, final MSQSpec querySpec, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final boolean publishSchema ) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); @@ -1775,7 +1844,8 @@ private static QueryDefinition makeQueryDefinition( new SegmentGeneratorFrameProcessorFactory( dataSchema, columnMappings, - tuningConfig + tuningConfig, + publishSchema ) ) ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java index 78b3e16f6702..be043d2e5588 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java @@ -48,15 +48,21 @@ import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.segment.SegmentMetadata; +import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.realtime.appenderator.Appenderator; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegmentExtendedWithSchema; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -64,10 +70,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -public class SegmentGeneratorFrameProcessor implements FrameProcessor +public class SegmentGeneratorFrameProcessor implements FrameProcessor { private static final Logger log = new Logger(SegmentGeneratorFrameProcessor.class); @@ -121,7 +128,7 @@ public List outputChannels() } @Override - public ReturnOrAwait runIncrementally(final IntSet readableInputs) throws InterruptedException + public ReturnOrAwait runIncrementally(final IntSet readableInputs) throws InterruptedException { if (firstRun) { log.debug("Starting job for segment [%s].", segmentIdWithShardSpec.asSegmentId()); @@ -157,7 +164,7 @@ public ReturnOrAwait runIncrementally(final IntSet readableInputs) appenderator.clear(); log.debug("Finished work for segment [%s].", segmentIdWithShardSpec.asSegmentId()); - return ReturnOrAwait.returnObject(Iterables.getOnlyElement(metadata.getSegments())); + return ReturnOrAwait.returnObject(getSegmentAndSchema(metadata)); } } else { if (appenderator.getSegments().isEmpty()) { @@ -169,6 +176,36 @@ public ReturnOrAwait runIncrementally(final IntSet readableInputs) } } + private DataSegmentExtendedWithSchema getSegmentAndSchema(SegmentsAndCommitMetadata metadata) + { + DataSegment dataSegment = Iterables.getOnlyElement(metadata.getSegments()); + SegmentSchemaMapping segmentSchemaMapping = metadata.getSegmentSchemaMapping(); + + SchemaPayloadPlus schemaPayloadPlus = null; + String fingerprint = null; + + if (segmentSchemaMapping != null) { + log.info("SegmentSchemaMapping is [%s]", segmentSchemaMapping); + Map segmentMetadataMap = segmentSchemaMapping.getSegmentIdToMetadataMap(); + SegmentMetadata segmentMetadata = segmentMetadataMap.get(dataSegment.getId().toString()); + if (segmentMetadata != null) { + fingerprint = segmentMetadata.getSchemaFingerprint(); + SchemaPayload schemaPayload = segmentSchemaMapping.getSchemaFingerprintToPayloadMap().get(fingerprint); + long numRows = segmentMetadata.getNumRows(); + schemaPayloadPlus = new SchemaPayloadPlus(schemaPayload, numRows); + } + } else { + log.info("SegmentSchemaMapping is null."); + } + + return new DataSegmentExtendedWithSchema( + dataSegment, + schemaPayloadPlus, + fingerprint, + CentralizedDatasourceSchemaConfig.SCHEMA_VERSION + ); + } + @Override public void cleanup() throws IOException { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java index e925e1a1c028..d9be439da6f5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java @@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.counters.CounterTracker; import org.apache.druid.msq.counters.SegmentGenerationProgressCounter; import org.apache.druid.msq.counters.SegmentGeneratorMetricsWrapper; @@ -65,6 +66,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegmentExtendedWithSchema; import org.joda.time.Period; import javax.annotation.Nullable; @@ -77,22 +79,30 @@ @JsonTypeName("segmentGenerator") public class SegmentGeneratorFrameProcessorFactory - implements FrameProcessorFactory, List> + implements FrameProcessorFactory, List> { + + private static final Logger log = new Logger(SegmentGeneratorFrameProcessorFactory.class); + private final DataSchema dataSchema; private final ColumnMappings columnMappings; private final MSQTuningConfig tuningConfig; + @Nullable + private final Boolean publishSchema; @JsonCreator public SegmentGeneratorFrameProcessorFactory( @JsonProperty("dataSchema") final DataSchema dataSchema, @JsonProperty("columnMappings") final ColumnMappings columnMappings, - @JsonProperty("tuningConfig") final MSQTuningConfig tuningConfig + @JsonProperty("tuningConfig") final MSQTuningConfig tuningConfig, + @JsonProperty("publishSchema") @Nullable final Boolean publishSchema ) { this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); this.columnMappings = Preconditions.checkNotNull(columnMappings, "columnMappings"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); + this.publishSchema = publishSchema; + log.info("Publish schema is [%s]", publishSchema); } @JsonProperty @@ -113,8 +123,15 @@ public MSQTuningConfig getTuningConfig() return tuningConfig; } + @JsonProperty + @Nullable + public Boolean getPublishSchema() + { + return publishSchema; + } + @Override - public ProcessorsAndChannels> makeProcessors( + public ProcessorsAndChannels> makeProcessors( StageDefinition stageDefinition, int workerNumber, List inputSlices, @@ -194,8 +211,7 @@ public Pair apply(ReadableInput readableInput) meters, parseExceptionHandler, true, - // MSQ doesn't support CentralizedDatasourceSchema feature as of now. - CentralizedDatasourceSchemaConfig.create(false) + CentralizedDatasourceSchemaConfig.create(Boolean.TRUE.equals(publishSchema)) ); return new SegmentGeneratorFrameProcessor( @@ -226,14 +242,14 @@ public Pair apply(ReadableInput readableInput) } @Override - public TypeReference> getResultTypeReference() + public TypeReference> getResultTypeReference() { - return new TypeReference>() {}; + return new TypeReference>() {}; } @Nullable @Override - public Set mergeAccumulatedResult(Set accumulated, Set otherAccumulated) + public Set mergeAccumulatedResult(Set accumulated, Set otherAccumulated) { accumulated.addAll(otherAccumulated); return accumulated; diff --git a/processing/src/main/java/org/apache/druid/timeline/DataSegmentExtendedWithSchema.java b/processing/src/main/java/org/apache/druid/timeline/DataSegmentExtendedWithSchema.java new file mode 100644 index 000000000000..86dce58b7bd4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/timeline/DataSegmentExtendedWithSchema.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.druid.jackson.CommaListJoinDeserializer; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class DataSegmentExtendedWithSchema extends DataSegment +{ + @Nullable + private final SchemaPayloadPlus schemaPayloadPlus; + + @Nullable + private final String schemaFingerprint; + + @Nullable + private final Integer schemaVersion; + + @JsonCreator + public DataSegmentExtendedWithSchema( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("version") String version, + // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution + @JsonProperty("loadSpec") @Nullable Map loadSpec, + @JsonProperty("dimensions") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List dimensions, + @JsonProperty("metrics") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List metrics, + @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, + @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState, + @JsonProperty("binaryVersion") Integer binaryVersion, + @JsonProperty("size") long size, + @JsonProperty("schemaPayloadPlus") @Nullable SchemaPayloadPlus schemaPayloadPlus, + @JsonProperty("fingerprint") @Nullable String schemaFingerprint, + @JsonProperty("schemaVersion") @Nullable Integer schemaVersion, + @JacksonInject PruneSpecsHolder pruneSpecsHolder + ) + { + super(dataSource, interval, version, loadSpec, dimensions, metrics, shardSpec, lastCompactionState, binaryVersion, size, pruneSpecsHolder); + this.schemaPayloadPlus = schemaPayloadPlus; + this.schemaFingerprint = schemaFingerprint; + this.schemaVersion = schemaVersion; + } + + public DataSegmentExtendedWithSchema( + DataSegment dataSegment, + @Nullable SchemaPayloadPlus schemaPayloadPlus, + @Nullable String schemaFingerprint, + @Nullable Integer schemaVersion + ) + { + super( + dataSegment.getDataSource(), + dataSegment.getInterval(), + dataSegment.getVersion(), + dataSegment.getLoadSpec(), + dataSegment.getDimensions(), + dataSegment.getMetrics(), + dataSegment.getShardSpec(), + dataSegment.getBinaryVersion(), + dataSegment.getSize() + ); + this.schemaPayloadPlus = schemaPayloadPlus; + this.schemaFingerprint = schemaFingerprint; + this.schemaVersion = schemaVersion; + } + + @JsonProperty("schemaPayloadPlus") + @Nullable + public SchemaPayloadPlus getSchemaPayloadPlus() + { + return schemaPayloadPlus; + } + + @JsonProperty("fingerprint") + @Nullable + public String getSchemaFingerprint() + { + return schemaFingerprint; + } + + @JsonProperty("schemaVersion") + @Nullable + public Integer getSchemaVersion() + { + return schemaVersion; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DataSegmentExtendedWithSchema that = (DataSegmentExtendedWithSchema) o; + return Objects.equals(schemaPayloadPlus, that.schemaPayloadPlus) + && Objects.equals(schemaFingerprint, that.schemaFingerprint) + && Objects.equals(schemaVersion, that.schemaVersion); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), schemaPayloadPlus, schemaFingerprint, schemaVersion); + } +} From 635ebfd3b0b5bb1688a4099f281a9de99886b9cc Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 16 May 2024 13:05:24 +0530 Subject: [PATCH 3/3] minor changes --- .../java/org/apache/druid/msq/exec/ControllerImpl.java | 7 ------- .../indexing/processor/SegmentGeneratorFrameProcessor.java | 3 --- .../processor/SegmentGeneratorFrameProcessorFactory.java | 6 ------ .../indexing/common/actions/RemoteTaskActionClient.java | 2 +- 4 files changed, 1 insertion(+), 17 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 956a30592551..1cf040f77e6a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -335,8 +335,6 @@ public ControllerImpl( context.injector().getInstance(CentralizedDatasourceSchemaConfig.class); publishSchema= centralizedDatasourceSchemaConfig != null && centralizedDatasourceSchemaConfig.isEnabled(); - - log.info("publishSchema is [%s]", publishSchema); } @Override @@ -1528,7 +1526,6 @@ private void handleQueryResults( boolean instanceOfSegmentWithSchema = !segmentsPlus.isEmpty() && segmentsPlus.iterator().next() instanceof DataSegmentExtendedWithSchema; if (instanceOfSegmentWithSchema) { - log.info("Received instance of DataSegmentWithSchema."); segments = new HashSet<>(); Set segmentsWithSchema = (Set) segmentsPlus; Map segmentIdToMetadataMap = new HashMap<>(); @@ -1563,7 +1560,6 @@ private void handleQueryResults( ); } } else { - log.info("Received instance of DataSegment."); segments = (Set) segmentsPlus; } @@ -1599,9 +1595,6 @@ private void handleQueryResults( } log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); - if (segmentSchemaMapping != null) { - log.info("SegmentSchemaMapping is not null [%s]", segmentSchemaMapping); - } publishAllSegments(new DataSegmentsWithSchemas(segments, segmentSchemaMapping)); } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java index be043d2e5588..a83d984c1b40 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessor.java @@ -185,7 +185,6 @@ private DataSegmentExtendedWithSchema getSegmentAndSchema(SegmentsAndCommitMetad String fingerprint = null; if (segmentSchemaMapping != null) { - log.info("SegmentSchemaMapping is [%s]", segmentSchemaMapping); Map segmentMetadataMap = segmentSchemaMapping.getSegmentIdToMetadataMap(); SegmentMetadata segmentMetadata = segmentMetadataMap.get(dataSegment.getId().toString()); if (segmentMetadata != null) { @@ -194,8 +193,6 @@ private DataSegmentExtendedWithSchema getSegmentAndSchema(SegmentsAndCommitMetad long numRows = segmentMetadata.getNumRows(); schemaPayloadPlus = new SchemaPayloadPlus(schemaPayload, numRows); } - } else { - log.info("SegmentSchemaMapping is null."); } return new DataSegmentExtendedWithSchema( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java index d9be439da6f5..217aec756c53 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java @@ -35,7 +35,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.counters.CounterTracker; import org.apache.druid.msq.counters.SegmentGenerationProgressCounter; import org.apache.druid.msq.counters.SegmentGeneratorMetricsWrapper; @@ -65,7 +64,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegmentExtendedWithSchema; import org.joda.time.Period; @@ -81,9 +79,6 @@ public class SegmentGeneratorFrameProcessorFactory implements FrameProcessorFactory, List> { - - private static final Logger log = new Logger(SegmentGeneratorFrameProcessorFactory.class); - private final DataSchema dataSchema; private final ColumnMappings columnMappings; private final MSQTuningConfig tuningConfig; @@ -102,7 +97,6 @@ public SegmentGeneratorFrameProcessorFactory( this.columnMappings = Preconditions.checkNotNull(columnMappings, "columnMappings"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.publishSchema = publishSchema; - log.info("Publish schema is [%s]", publishSchema); } @JsonProperty diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClient.java index 4015b99558a9..52ec399522a4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -57,7 +57,7 @@ public RemoteTaskActionClient( @Override public RetType submit(TaskAction taskAction) throws IOException { - log.debug("Performing action for task[%s]: %s", task.getId(), taskAction); + log.info("Performing action for task[%s]: %s", task.getId(), taskAction); try { // We're using a ServiceClient directly here instead of OverlordClient, because OverlordClient does