Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building #15817

Merged
merged 281 commits into from Apr 24, 2024

Conversation

findingrish
Copy link
Contributor

@findingrish findingrish commented Feb 1, 2024

Description

Issue: #14989

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Thereafter, we addressed the problem of publishing schema for realtime segments (#15475). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information.

This is the final change which involves publishing segment schema for finalized segments from task and periodically polling them in the Coordinator.

Design

Database

Schema Table

Table Name: SegmentSchema
Purpose: Store unique schema for segment.

Columns

Column Name Data Type Description
id autoincrement primary key
created_date varchar creation time, allows filtering schema created after a point
datasource varchar datasource
fingerprint varchar unique identifier for the schema, sha-256 hash of payload, datasource & version
payload blob includes rowSignature, aggregatorFactories
used boolean true if the schema is referenced by used segments
used_status_last_updated varchar timestamp when the used status was last updated
version int schema version

Segments Table

New columns will be added to the already existing Segments table.

Columns

Column Name Data Type Description
num_rows long number of rows in the segment
schema_fingerprint string schema fingerprint

Task

Changes are required in the task to publish schema along with segment metadata.

  • Introduce a new class SchemaPayload to encapsulate RowSignature and AggregatorFactories.
  • Introduce a new class SegmentSchemaMetadata to encapsulate SchemaPayload and numRows.
  • Introduce a new class SegmentSchemaMapping to encapsulate schema and numRows information for multiple segments.
  • Update SegmentInsertAction, SegmentTransactionalReplaceAction, SegmentTransactionalAppendAction & SegmentTransactionalInsertAction to take in segment schema.
  • Changes in AbstractBatchIndexTask#buildPublishAction to take segment schema.
  • Changes in SegmentAndCommitMetadata to take segment schema.
  • Changes in TransactionalSegmentPublisher to take segment schema for publishing to the DB.

Streaming

  • Changes in StreamAppenderator to get the RowSignature, AggregatorFactories and numRows for the segment.

Batch

  • AppenderatorImpl#push to build the segment schema and add it to SegmentsAndCommitMetadata.
  • BatchAppenderator#push to build the segment schema and add it to SegmentsAndCommitMetadata.
IndexTask
  • Changes in BatchAppenderatorDriver#publishAll to pass segment schema for publishing.
  • Change in IndexTask#generateAndPublishSegments to fetch segment schema from pushed segments and publish.
ParallelIndexSupervisorTask
  • Changes in ParallelIndexSupervisorTask#publishSegments to combine segment schema from segments and publish them.
  • SinglePhaseSubTask
  • PartialSegmentMergeTask

MSQ

  • Changes in SegmentGeneratorFrameProcessor to return segment schema along with segment metadata.
  • Changes in SegmentGeneratorFrameProcessorFactory and ControllerImpl.
    Note, these changes are reverted for now.

Overlord

Changes are required in the Overlord (IndexerSQLMetadataStorageCoordintor) to persist the schema along with segment metadata in the database.

Coordinator

Schema Poll

Changes in SqlSegmentsMetadataManager to poll schema along with segments.
Also poll schema_id and num_rows additionally from segments table.
Update schema cache.

Schema Caching

Maintain a cache of segment schema. Refer SegmentSchemaCache.
It caches following information,

Information Writer Cleanup
SegmentMetadata . SegmentId -> schema fingerprint, numRows Replaced on each DB poll Not required.
Schema for finalised segments. Schema fingerprint -> SchemaPayload Replaced on each DB poll. Not required.
Realtime segment schema. SegmentId -> SegmentSchemaMetadata Whenever Peons push schema update. When the segment is removed.
SMQResults which are not published. SegmentId -> SegmentSchemaMetadata Added after SMQ query is executed. If SegmentSchemaBackFill queue successfully writes the schema to the database, it is removed from this map.
SMQResults which have been published. SegmentId -> SegmentSchemaMetadata Added after segment schema is published to the DB. Cleared after each DB Poll.

SegmentMetadataCache changes

Changes in AbstractSegmentMetadataCache class to add new method which will be overridden by child classes,

  • additionalInitializationCondition
  • removeSegmentAction
  • segmentMetadataQueryResultHandler

Changes in CoordinatorSegmentMetadataCache to override methods from AbstractSegmentMetadataCache,

  • Implement additionalInitializationCondition to wait for the segmentSchemaCache to be initialized.
  • Implement removeSegmentAction to remove the schema from the schema cache.
  • Override segmentMetadataQueryResultHandler to additionally publish and cache the schema.

Schema Backfill

Added a new class SegmentSchemaBackFillQueue which accepts segment schema and publish them in batch.

Schema Cleanup

CoordinatorDuty to clean up schema which is not referenced by any segment.

Coordinator leader flow changes

  • CoordinatorSegmentMetadataCache refresh is executed only on the leader node.
  • CoordinatorSegmentMetadataCache timeline callback continue to function on all Coordinator nodes.
  • SegmentSchemaCache is populated only on the leader node, except for the realtime schema information which is updated on all Coordinator nodes.
  • SegmentSchemaBackFillQueue functions only on the leader node.

Testing

  • The changes have been tested locally with the wikipedia dataset.
  • Unit test has been added.
  • All of the existing integration tests have been tested with feature enabled (e8a6d9b).
  • Integration test with the group name centralized-table-schema runs successfully.
  • The changes have also been tested in a Druid cluster.

Upgrade considerations

The general upgrade order should be followed. The new code is behind a feature flag, so it is compatible with existing setups. Task with new changes can communicate with old version of Overlord.

Release Notes

This feature addresses multiple challenges outlined in the linked issue. To enable it, set druid.centralizedDatasourceSchema.enabled.
If MM is used then set druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled.

When the feature is enabled,

  • Realtime segment schema change would be periodically pushed to the Coordinator,
  • Finalized segment schema would be written to the metadata database.
  • Coordinator would poll the schema along with segment metadata.
  • Coordinator would build the datasource schema and broker would fetch it from the Coordinator.

To rollback, simply turn off the feature flag. The database schema change is not rolled back on turning off the feature.

New configs,

Name Purpose
druid.coordinator.kill.segmentSchema.on Config to enabled kill segment schema Coordinator duty
druid.coordinator.kill.segmentSchema.period Kill segment schema Coordinator duty period
druid.coordinator.kill.segmentSchema.durationToRetain Duration to retain segment schema after being marked as unused.

Important metrics to track,

Metric Purpose
metadatacache/schemaPoll/count Number of coordinator polls to fetch datasource schema.
metadatacache/schemaPoll/failed Number of failed coordinator polls to fetch datasource schema.
metadatacache/schemaPoll/time Time taken for coordinator polls to fetch datasource schema.
metadatacache/init/time Time taken to initialize the coordinator segment metadata cache. Depends on the number of segments.
metadatacache/refresh/count Number of segments to refresh in coordinator segment metadata cache.
metadatacache/refresh/time Time taken to refresh segments in coordinator segment metadata cache.
metadatacache/backfill/count Number of segments for which schema was backfill in the DB.
schemacache/realtime/size Number of realtime segment for which schema is cached.
schemacache/finalizedSegmentMetadata/size Number of finalized segments for which schema is cached.
schemacache/finalizedSchemaPayload/size Number of distinct schema payload cached.
schemacache/inTransitSMQResults/size Number of segment schema cached as a result of SMQ.
schemacache/inTransitSMQPublishedResults/size Number of segment schema backfilled in the DB.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

….coordinator.centralizedSchemaManagement.enabled
Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment. The PR seems very close to merge. I would review the SQL changes again.

@@ -192,7 +193,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput readableInput)
frameContext.indexMerger(),
meters,
parseExceptionHandler,
true
true,
CentralizedDatasourceSchemaConfig.create(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MSQ does not support centralized data source schema yet. I think we should put his comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to have this comment in the javadoc of CentralizedDatasourceSchemaConfig itself.

@@ -220,31 +217,10 @@ protected List<? extends Module> getModules()
@Override
public void configure(Binder binder)
{

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MM less ingestion would need this check.

@@ -197,4 +204,31 @@ public void stop()
return new Child();
}
}

protected void validateCentralizedDatasourceSchemaConfig(Properties properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static as well.

@@ -45,18 +51,27 @@ public FingerprintGenerator(ObjectMapper objectMapper)
/**
* Generates fingerprint or hash string for an object using SHA-256 hash algorithm.
*/
public String generateFingerprint(Object payload)
@SuppressWarnings("UnstableApiUsage")
public String generateFingerprint(SchemaPayload schemaPayload, String dataSource, int version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have UT's so that we can assert the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dependency update might cause issues.

@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||
|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|
|`schemacache/realtime/size`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments.|
|`schemacache/finalizedSegmentMetadata/size`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this should be count and not size. We can do this change as a followup.

*/
SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@Nullable MinimalSegmentSchemas minimalSegmentSchemas
String taskAllocatorId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required ?
There are no javadocs for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Schema version [%d] doesn't match the current version [%d], dropping the schema [%s].",
minimalSegmentSchemas.getSchemaVersion(),
"Schema version [%d] doesn't match the current version [%d]. Not persisting this schema [%s]. "
+ "Schema for this segment will be poppulated by the schema backfill job in Coordinator.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ "Schema for this segment will be poppulated by the schema backfill job in Coordinator.",
+ "Schema for this segment will be populated by the schema back-fill job in Coordinator.",

@@ -175,7 +175,7 @@ public void stop()

public void leaderStart()
{
log.info("%s starting cache initialization.", getClass().getSimpleName());
log.info("Initializing cache.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend adding the name of the class so I can search
cache %s in the logs :). It makes it easier for me to search thought tons of logs. since each loggers can have its own format.
Nit: can be done in a followup.

finalizedSegmentStats = ImmutableMap.of();
finalizedSegmentSchema.clear();
finalizedSegmentMetadata = ImmutableMap.of();
finalizedSegmentSchema = ImmutableMap.of();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this cause threadSafety issues if we change the reference ?

@@ -106,10 +108,10 @@ public SegmentSchemaCache(ServiceEmitter emitter)

public void setInitialized()
{
log.info("[%s] initializing.", getClass().getSimpleName());
log.info("Initializing SegmentSchemaCache.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have isInitalized() on the top no ?

@findingrish
Copy link
Contributor Author

I have addressed the feedback on the PR. I will raise a followup PR to enable schema publish in MSQ and address any feedback meant for later.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes lgtm. There are some rough edges that can be taken care as part of a follow up PR.
Thanks @findingrish for taking up this monumental effort.

log.error(
"Schema version [%d] doesn't match the current version [%d]. Not persisting this schema [%s]. "
+ "Schema for this segment will be populated by the schema backfill job in Coordinator.",
segmentSchemaMapping.getSchemaVersion(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should be outside the transaction. Lets create a follow up patch for that.


Set<String> columnsToAdd = new HashSet<>();

for (String columnName : columnNameTypes.keySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test case where we are checking this logic.

Note: Followup item.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what @cryptoe said, I agree that we can refine this as we go but overall the changes seem okay.

Thanks for your patience on this, @findingrish !!

{
log.debug("Updating segment with schema and numRows information: [%s].", batch);

// update schemaId and numRows in segments table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// update schemaId and numRows in segments table
// update fingerprint and numRows in segments table

@@ -1435,6 +1435,7 @@ MiddleManagers pass their configurations down to their child peons. The MiddleMa
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. For example, `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`|
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`|
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. |false|
Copy link
Contributor

@kfaraz kfaraz Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR:
The config description should be more like Indicates whether centralized schema management is enabled. The description should also link to the page which contains the details of the feature.

@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||
|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|
|`schemacache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments.|
Copy link
Contributor

@kfaraz kfaraz Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR
Do these rows render correctly? The preceding rows have only 3 columns, this one seems to have 4.

Comment on lines +80 to +83
|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.|
|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.|
|`schemacache/inTransitSMQResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.|
|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which schema is cached after back filling in the database.||Eventually it should be 0.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR:
Is schemacache/ not the same as metadatacache? The similar yet different names can be confusing.


for (String column : columns) {
createStatementBuilder.append(column);
createStatementBuilder.append(",");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR:
Nit: We seem to have removed the new line characters. They formatted the statement nicely in case we wanted to debug it.

@@ -905,7 +907,7 @@ private TaskStatus generateAndPublishSegments(
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
driver.startJob();

SegmentsAndCommitMetadata pushed = InputSourceProcessor.process(
Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> commitMetadataAndSchema = InputSourceProcessor.process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR
Why is SegmentSchemaMapping not included inside the SegmentsAndCommitMetadata object itself?

@@ -58,7 +61,7 @@ public class InputSourceProcessor
*
* @return {@link SegmentsAndCommitMetadata} for the pushed segments.
*/
public static SegmentsAndCommitMetadata process(
public static Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR
We should include SegmentSchemaMapping inside the SegmentsAndCommitMetadata itself.

Comment on lines +422 to +427
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata,
segmentSchemaMapping
)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata,
segmentSchemaMapping
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR
Please fix the formatting here.

@@ -110,4 +115,14 @@ public TaskStatus runTask(TaskToolbox toolbox)
{
return status;
}

public TaskAction<SegmentPublishResult> testBuildPublishAction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR

Suggested change
public TaskAction<SegmentPublishResult> testBuildPublishAction(
public TaskAction<SegmentPublishResult> buildPublishAction(

@@ -65,7 +65,7 @@ public static void setup() throws IOException
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), UNUSED_V1));

actionTestKit.getMetadataStorageCoordinator()
.commitSegments(expectedUnusedSegments);
.commitSegments(expectedUnusedSegments, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up PR
Since passing null is a very common usage right now, it would be better to keep two variants of the new methods. It would be easier to identify the usages which pass non-null values and we could also avoid passing nulls all over the place.

@cryptoe cryptoe merged commit e30790e into apache:master Apr 24, 2024
87 checks passed
@adarshsanjeev adarshsanjeev added this to the 30.0.0 milestone May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants