-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable adding new segments to a upsert-enabled realtime table #6567
Conversation
Codecov Report
@@ Coverage Diff @@
## master #6567 +/- ##
==========================================
- Coverage 66.44% 65.89% -0.56%
==========================================
Files 1075 1403 +328
Lines 54773 67986 +13213
Branches 8168 9836 +1668
==========================================
+ Hits 36396 44797 +8401
- Misses 15700 19997 +4297
- Partials 2677 3192 +515
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the validation done to make sure that the segments in the realtime table do not overlap data with the newly loaded ones?
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we manage the stream offset if we upload a segment to the realtime table? Also, how about the segment name format?
We need to have an integration test to ensure the PinotLLCRealtimeSegmentManager
can work properly on uploaded segments.
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Show resolved
Hide resolved
|
||
if (isRealtimeOnlyTable(tableName)) { | ||
tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); | ||
instancePartitionsType = InstancePartitionsType.CONSUMING; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be COMPLETED
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an upsert enabled LLC realtime table, all segments of the same partition are colocated on the same server -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
In addition, the current RealtimeSegmentAssignment.assignSegment(..) method, it is hardcoded explicitly to assign new segments to CONSUMING instance partition.
https://github.com/apache/incubator-pinot/blob/3b7093039469b2a00269f747543c6c29c8f1801f/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java#L102
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Jackie that it is counter-intuitive. Add a comment here reasoning why it should be set to CONSUMING. If we enable uploading realtime segments to non-upsert tables, at least we will know why/what we will need to fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments about why we chose CONSUMING today. Left TODO for further improvement.
@chenboat Can you open up the access for the linked design doc? |
Please refer to the comments I added with respect to Subbu's questions. For upsert tables, there should be validation that the offset of uploaded segments will not affect the consuming segment's offset. This is inline with the goals of segment upload — which are mainly about bootstrapping historical data. The segment name format I propose conforms to the LLC name format with the following parts. tableName(string)partitionId(int)_sequenceNumber(int)_creationTime(string). The main difference is that the creationTime string now does not need to an actual timestamp but rather any unique string (e.g., timestamp + an uuid). Yes. I will add an integration test. |
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Outdated
Show resolved
Hide resolved
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
Outdated
Show resolved
Hide resolved
...ntroller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
@@ -358,7 +358,7 @@ | |||
public static class Segment { | |||
public static class Realtime { | |||
public enum Status { | |||
IN_PROGRESS, DONE | |||
IN_PROGRESS, DONE, UPLOAD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think if we need to have separated status for UPLOAD_UPSERT
and UPLOAD_APPEND
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently UPLOAD is only applicable to upsert enable tables. Is there an effort to expand the scope? for upsert table, UPLOAD_APPEND is not needed.
pinot-integration-tests/src/test/resources/upsert_table_test.schema
Outdated
Show resolved
Hide resolved
...core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
Outdated
Show resolved
Hide resolved
public boolean isUpsertTable(String tableName) { | ||
if (hasOfflineTable(tableName)) | ||
return false; | ||
if (!hasRealtimeTable(tableName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this if-condition redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not redundant because a table which does not exist may reach here.
...ntroller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
Outdated
Show resolved
Hide resolved
@@ -245,21 +245,28 @@ private SuccessResponse uploadSegment(@Nullable String tableName, FormDataMultiP | |||
LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from segment metadata)", segmentName, tableName, uploadType); | |||
} | |||
|
|||
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); | |||
String tableNameWithType; | |||
if (_pinotHelixResourceManager.isUpsertTable(rawTableName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we throw an exception on the non-upsert realtime table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems a little strange. Does this mean that we cannot upload segment to the offline part of an upsert table ? Or that offline part does not exist for an upsert table? What if we introduce this part later?
Instead, I suggest that we include the type query-part as well in the url. If the type is not present, we assume it is OFFLINE. If it is present, we treat it accordingly, but throw an exception if it is realtime type but upsert is not turned on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it is a good idea to divide into realtime and offline cases rather than upsert vs non-upsert case. Added the table type parameters to the REST end point too.
0b6a400
to
0f93df0
Compare
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
@@ -245,21 +245,28 @@ private SuccessResponse uploadSegment(@Nullable String tableName, FormDataMultiP | |||
LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from segment metadata)", segmentName, tableName, uploadType); | |||
} | |||
|
|||
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); | |||
String tableNameWithType; | |||
if (_pinotHelixResourceManager.isUpsertTable(rawTableName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems a little strange. Does this mean that we cannot upload segment to the offline part of an upsert table ? Or that offline part does not exist for an upsert table? What if we introduce this part later?
Instead, I suggest that we include the type query-part as well in the url. If the type is not present, we assume it is OFFLINE. If it is present, we treat it accordingly, but throw an exception if it is realtime type but upsert is not turned on
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
Outdated
Show resolved
Hide resolved
...core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at PinotLLCRealtimeSegmentManager
and ensure it works on uploaded segments:
- Segment name for uploaded segment does not follow the llc segment name convention
- Exclude uploaded segment from the processing
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Outdated
Show resolved
Hide resolved
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
Outdated
Show resolved
Hide resolved
A new integration test UpsertTableSegmentUploadIntegrationTest is added to verify the uploaded segment is added to a realtime upsert enabled table correctly. The segment name used in the test does not follow llc segment convention but it is loaded successfully. The query result is good. I do quite understand why we should "Exclude uploaded segment from the processing". During segment load, the upsert table metadata should be processed based on the loaded segment, right?
|
I took another look at For the issue within the If possible, I would recommend setting up an actual table, upload some segments and keep it running for some time to ensure all components can properly work on the uploaded segments |
The integration test I added uploads a segment to a LLC table with all Pinot components up. The test shows the query can be run successfully on a Pinot upsert table with correct query results returned. It also verifies the idealstate. I can add more verification to the test such as more metadata check and more segments uploads. I prefer Pinot integration tests than manual tests. for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise.
I would still recommend running some long lasting manual test because integration test won't cover the periodic tasks running in controller. Also, in the integration test there is no event pushing to kafka, so we don't test the case where CONSUMING, COMPLETED and UPLOADED segments co-exist case
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Outdated
Show resolved
Hide resolved
...ntroller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
Outdated
Show resolved
Hide resolved
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
Outdated
Show resolved
Hide resolved
For the new integration test, it would be good if we push some events into kafka as well and make the commit threshold low so that we have CONSUMING, DONE, UPLOADED segments at the same time. Also, we can run the |
Added the Kafka data ingestion to the integration test. Also add more segments (now 2 realtime segments and 3 uploaded segments are involved in the tests). Discovered a bug in the previous version. Revised the integration test so that it runs the |
…loaded: 2 realtime and 3 offline.
38cca2c
to
f987ce8
Compare
// Means the segment is completed and sealed by some Pinot server and uploaded | ||
DONE, | ||
// Means the segment is uploaded to a Pinot controller by an external party | ||
UPLOADED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we take this opportunity to add two more enums: CONSUMING and COMPLETED (these are the terms used elsewhere), and slowly obsolete the IN_PROGRESS and DONE. For now, they will be treated equivalent to their counterpart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should only add enums at the end. But anyways, this is a deeper change. I was just asking of the other reviewers. Let us revert it and stick to your PR for now.
@npawar and @Jackie-Jiang may have an opinion. We can discuss separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel CONSUMING
and COMPLETED
is more clear, and consistent with the term we used in the segment assignment strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the two enums.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jackie-Jiang agreed, just that it will need a migration of the existing metadata and/or code to scan both. Best to keep it out of this PR. @chenboat will be great if you can file an issue for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#6777 filed for this issue.
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
Outdated
Show resolved
Hide resolved
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
Outdated
Show resolved
Hide resolved
|
||
if (isRealtimeOnlyTable(tableName)) { | ||
tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); | ||
instancePartitionsType = InstancePartitionsType.CONSUMING; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Jackie that it is counter-intuitive. Add a comment here reasoning why it should be set to CONSUMING. If we enable uploading realtime segments to non-upsert tables, at least we will know why/what we will need to fix
pinot-core/src/main/java/org/apache/pinot/core/util/SegmentUtils.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/util/SegmentUtils.java
Outdated
Show resolved
Hide resolved
…nConstants.java Co-authored-by: Subbu Subramaniam <mcvsubbu@users.noreply.github.com>
…nConstants.java Co-authored-by: Subbu Subramaniam <mcvsubbu@users.noreply.github.com>
LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName); | ||
// TODO Allow segment refreshing for realtime tables. | ||
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { | ||
LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log line is misleading right? We are actually throwing an exception below. I think this log line belongs after line 84. It may be useful to have another log line here that says we do not support refresing existing segment for realtime table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the log line after the if block.
@chenboat Hi, can you open the read permission of the design document? |
+1. |
Description
Currently segment upload is only applicable for offline Pinot tables. This PR opens the segment upload to Pinot realtime tables too. Segment upload to realtime tables makes bootstrapping of a realtime table much faster because the segments can now be computed outside of Pinot. This features allows Pinot realtime tables with longer retention (e.g., Pinot upsert enable table) to be bootstrapped in a much efficient manner.
Design doc: https://docs.google.com/document/d/1STYxZsUYGcYrzHdmOBymQ1NkpxuEzLa4OTiw4mGHiKk/edit#
Upgrade Notes
Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
backward-incompat
, and complete the section below on Release Notes)Does this PR fix a zero-downtime upgrade introduced earlier?
backward-incompat
, and complete the section below on Release Notes)Does this PR otherwise need attention when creating release notes? Things to consider:
release-notes
and complete the section on Release Notes)Release Notes
If you have tagged this as either backward-incompat or release-notes,
you MUST add text here that you would like to see appear in release notes of the
next release.
If you have a series of commits adding or enabling a feature, then
add this section only in final commit that marks the feature completed.
Refer to earlier release notes to see examples of text
Documentation
If you have introduced a new feature or configuration, please add it to the documentation as well.
See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document