Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk Issues #12220

Merged

Conversation

aishikbh
Copy link
Contributor

@aishikbh aishikbh commented Jan 4, 2024

Problem

During the segment generation process for offline ingestion through SegmentProcessorFramework, the size of the intermediate files during the map phase can sometimes increase drastically causing Out of Disk issues. This feature aims to aid segment generation configured by bounded size of the intermediate files in the map phase.

Solution

Design Doc

Adaptive map-transform Phase (Original idea from Manish’s Design for Smart Ingestion)

  • During the map phase, we will keep track of the total bytes written to the disk so far, for the intermediate files (Y). We will need to keep track of the bytes written for each write for the intermediate files in the map phase.

  • Currently SegmentProcessorFramework does all the steps (Map/Reduce/Segment Generation) in one step. The idea is to break the process into multiple iterations bound by a configurable threshold (targetFileSize) for the size of mapper output.

  • In each iteration at any point we can have 2 possibilities:

    • If Y < targetFileSize

      • We let the mapper continue its transformation using the record readers, one row at a time.
    • If Y >= targetFileSize, we do the following:

  • Keep track of current file record reader being ingested (i.e we are at ith recordReader, and that internally keeps track of current row via its iterator).

  • Terminate map phase and pass the output of the mapper to reduce phase.

  • The output from the reduce phase will be input to the segment generation phase and segments will be generated based on this.

  • Resume map phase from where we left off (RecordReader) in the next iteration.

This process ensures that the map phase is bound by the targetFileSize we set, as we ingest rows from a file.

This PR implements the adaptive segment generation configured by bounded intermediate file size.

Release Notes

  • Config key segmentMapperFileSizeThresholdInBytes is introduced to specify the threshold of the size of intermediate files during map phase in the taskConfig. The default value of this threshold will be Long.MAX_VALUE.
  • The default behaviour for SegmentProcessorFramework will be the current behaviour i.e. do map and reduce in one step to create intermediate files for all the record readers and generate segments from them.
  • To enable the feature, we need to set segmentMapperFileSizeThresholdInBytes in the taskConfig of the respective tasks.
  • Currently MergeRollupTask and RealtimeToOfflineSegmentsTask generates segments through SegmentProcessorFramework so these tasks are supported for this feature.

Example Config :

set intermediate mapper output file size as 1000000000 bytes

"task": {
        "taskTypeConfigsMap": {
          "<task_name>": {
            .
            .
            .
            "segmentMapperFileSizeThresholdInBytes": "1000000000"
          }
        }
      },

@codecov-commenter
Copy link

codecov-commenter commented Jan 4, 2024

Codecov Report

Attention: 6 lines in your changes are missing coverage. Please review.

Comparison is base (35faeb6) 61.58% compared to head (2d98e31) 61.47%.

Files Patch % Lines
...pinot/spi/data/readers/RecordReaderFileConfig.java 84.21% 1 Missing and 2 partials ⚠️
...re/segment/processing/framework/SegmentConfig.java 80.00% 0 Missing and 2 partials ⚠️
.../core/segment/processing/mapper/SegmentMapper.java 95.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12220      +/-   ##
============================================
- Coverage     61.58%   61.47%   -0.11%     
+ Complexity     1153     1147       -6     
============================================
  Files          2417     2418       +1     
  Lines        131611   131697      +86     
  Branches      20317    20327      +10     
============================================
- Hits          81057    80966      -91     
- Misses        44613    44790     +177     
  Partials       5941     5941              
Flag Coverage Δ
custom-integration1 ?
integration 0.00% <0.00%> (-0.01%) ⬇️
integration1 ?
integration2 0.00% <0.00%> (ø)
java-11 ?
java-21 61.47% <95.20%> (+0.01%) ⬆️
skip-bytebuffers-false 61.46% <95.20%> (-0.11%) ⬇️
skip-bytebuffers-true 46.55% <95.08%> (-14.88%) ⬇️
temurin 61.47% <95.20%> (-0.11%) ⬇️
unittests 61.47% <95.20%> (-0.11%) ⬇️
unittests1 46.58% <95.08%> (-0.10%) ⬇️
unittests2 27.70% <2.40%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch from 8f24cc7 to 96f1279 Compare January 5, 2024 05:20
@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch from fb03ae9 to a4e98ea Compare January 8, 2024 18:05
Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me in high level 👍
Can we add the test for this feature?

mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
_recordReaderFileConfigs.get(i)._recordReader = recordReader;
}
if (!_adaptiveSizeBasedWriter.canWrite()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking for this again, we can have mapAndTransformRow pass a return value to terminate this loop ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. I have added this logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further inspection I have kept the check only in one place. Right after mapAndTransformRow. If we got kicked out of mapAndTransformRow because of size constraints this check should be enough.

Please let me know if returning a boolean value makes more sense. I will change accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check if you can keep this constraints logic where its needed (in mapAndTransformRow) and add the corresponding logs there itself. The method can return boolean to break this loop?

In the log.info, please how many readers we completed, how many pending. Or you can add this log in SegmentProcessorFramework where you have the overall counts?

Copy link
Contributor Author

@aishikbh aishikbh Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responded in the comment below.

@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch from 0fbb2c2 to ea8181d Compare January 11, 2024 15:27
@aishikbh
Copy link
Contributor Author

Looks good to me in high level 👍 Can we add the test for this feature?

Added the test.

@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch 3 times, most recently from 59e86ac to 8076adb Compare January 12, 2024 06:51
@aishikbh aishikbh changed the title [Work in progress]Reduce disk footprint for segment processor framework Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk Issues Jan 12, 2024
@aishikbh aishikbh marked this pull request as ready for review January 12, 2024 17:06
mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
_recordReaderFileConfigs.get(i)._recordReader = recordReader;
}
if (!_adaptiveSizeBasedWriter.canWrite()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check if you can keep this constraints logic where its needed (in mapAndTransformRow) and add the corresponding logs there itself. The method can return boolean to break this loop?

In the log.info, please how many readers we completed, how many pending. Or you can add this log in SegmentProcessorFramework where you have the overall counts?

@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch 4 times, most recently from 02cfad9 to 00f5449 Compare January 16, 2024 08:17
Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise. Thanks for adding this functionality !

@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch 3 times, most recently from f9f16a4 to 45a4ac4 Compare January 17, 2024 16:54
Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM !

@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch 2 times, most recently from 4ec7355 to acfcdeb Compare January 19, 2024 08:38
@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch from acfcdeb to f7f783b Compare January 23, 2024 10:19
Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Thank you for working on this feature!
Can we make sure that all tests pass?

@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch 3 times, most recently from ce32e17 to dfeb7d6 Compare January 23, 2024 17:56
- PR apache#12290 touched dimBaseballTeams.csv on which the tests depends on. Modified the test to reflect that.
- Changed import statements (addressed comment).
@aishikbh aishikbh force-pushed the reduceDiskFootprintForSegmentProcessorFramework branch from dfeb7d6 to 2d98e31 Compare January 23, 2024 18:10
@aishikbh
Copy link
Contributor Author

LGTM! Thank you for working on this feature! Can we make sure that all tests pass?

The test was failing because of another change. I have fixed them now.

@snleee
Copy link
Contributor

snleee commented Jan 24, 2024

LGTM! Thank you for working on this 👍

@snleee snleee merged commit 91ffcc7 into apache:master Jan 24, 2024
19 checks passed
@Jackie-Jiang Jackie-Jiang added the release-notes Referenced by PRs that need attention when compiling the next release notes label Feb 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature ingestion release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants