Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] KAFKA-7739: Tiered storage #7561

Draft
wants to merge 135 commits into
base: trunk
from
Draft

Conversation

@satishd
Copy link
Member

satishd commented Oct 19, 2019

[WIP] This is the initial draft version of the KIP-405. It includes the initial set of changes required for plugging in a RemoteStorageManager. We will update KIP-405 and this PR in the next few days with more details.

KIP is located at https://s.apache.org/pk53b

This PR contains HDFS implementation to discuss RemoteStorageManager APIs and it will also be easier to review and test the proposed APIs end to end. This implementation will be removed before it is ready to be merged.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
satishd and others added 30 commits Apr 29, 2019
Reviewers: satishd, harshach

Subscribers: #streaming_data

Differential Revision: https://code.uberinternal.com/D2894829
Summary:
1. Add a RemoteLogIndexEntry "constructor", to allow calculating crc inside the RemoteLogIndexEntry class
2. Index entry length should be 16 bits
3. Allow RSM read the content of RDI

Reviewers: harshach, satishd

Reviewed By: harshach, satishd

Subscribers: jenkins, #streaming_data

Differential Revision: https://code.uberinternal.com/D2911255
…ssive segments for copying

Summary: Added periodic task to poll topic-partition log directories to add passive segments for copying

Reviewers: harshach, yingz

Reviewed By: harshach

Differential Revision: https://code.uberinternal.com/D2921725
Summary:
1. Implement HDFS RSM
2. Some changes to the RSM interface:
    2.1 throw IOException in most methods
    2.2 copyLogSegment do not need to return RDI, cause its already in RemoteIndexEntry
    2.3 read method need RemoteIndexEntry in addition to RDI
3. Move RemoteLogIndex.parseEntry to RemoteLogIndexEntry.parseEntry. This method is also needed by HDFS RSM

Reviewers: harshach, satishd

Reviewed By: harshach

Subscribers: jenkins, #streaming_data

Differential Revision: https://code.uberinternal.com/D2960007
Summary:
Added initial implementation of RLM follower
- Refactored the existing indexes for RLM follower
- Followup PRs will address remaining issues about boundary cases of adding indexes for conflicting offsets  and scale up with fine-grained locking

Reviewers: harshach, yingz

Reviewed By: yingz

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D2972601
…r. It includes inter broker fetch protol changes for sending local log offset when the requested offset is in remote tier. This will allow follower broker to know that the...

Summary:
Initial version of fetch implementation for RemoteLogManager.
- It includes inter broker fetch protol changes for sending local log offset when the requested offset is in remote tier. This will allow follower broker to know that therequested offset is moved to remote tier and it can start fetching the messages available in local log from the leader.
- Added test for RLM fetch API.
- Fixed spotbug errors with recent additions.

Reviewers: harshach, yingz

Reviewed By: harshach, yingz

Differential Revision: https://code.uberinternal.com/D3014447
Summary:
1. Copy dependent jars in the same way as Kafka core and the other modules do.
2. Remove the dependency on slf4japi, which is already included in Kafka core dependency

Reviewers: satishd, harshach

Reviewed By: satishd, harshach

Subscribers: #streaming_data_kafka

Differential Revision: https://code.uberinternal.com/D3035243
…mote tier storage

- Fixed issues in finding the right local segments while copying to remote tier storage

Summary:
Local segments are scheduled to be deleted once they are copied to remote tier storage.
  - Fixed issues in finding the right local segments while copying to remote tier storage .

Reviewers: harshach, yingz

Reviewed By: harshach

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D3046097
Summary:
1. Use java list instead of Seq in RSM interface
2. Sort remote log segments in RemoteStorageManager.listRemoteSegments

Reviewers: satishd, harshach

Reviewed By: satishd

Subscribers: jenkins, #streaming_data_kafka

Differential Revision: https://code.uberinternal.com/D3102715
Summary: Fix some format issues reported by CheckStyle

Reviewers: satishd, harshach

Reviewed By: harshach

Subscribers: #streaming_data_kafka

Differential Revision: https://code.uberinternal.com/D3160621
This commit adds a new variant of listRemoteSegments method to RemoteStorageManager, which has minBaseOffset parameter. This is useful for reducing listing time on the remote tier in some implementations (like S3).
…cleaningup expired remote log segments based on retention period by using tasks run at regular intervals.

Summary:
RLM enhancements for copying segments, syncing indexes and cleaningup expired remote log segments based on retention period by using tasks run at regular intervals.
 Added configs for task thread pool size and interval.

 Need to add remote log startoffset handling, which will be done in a followup PR/diff.

Reviewers: harshach, yingz

Reviewed By: harshach, yingz

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D3211435
Summary: Handle remote storage fetch requests in a separated thread pool, to avoid blocking Kafka I/O threads. Added a new purgatory to handle timeout.

Reviewers: satishd, harshach

Subscribers: #streaming_data_kafka

Differential Revision: https://code.uberinternal.com/D3224503
satishd and others added 30 commits Apr 6, 2020
…h internal protocol schema.

 - Added serdes for the messages stored in remote log metadata topic with internal protocol schema
 - Refactored RLMMWithTopicStorage to be more modular.
…e recceived messages from remote log metadata topic.

 - Added tests for RLMM on leader and follower events from remote log metadata topic.
…mentMetadata arg only.

- RemoteLogMetadataManager#putRemoteLogSegmentData to take RemoteLogSegmentMetadata arg only.
… fetch max bytes is less than a single batch of message.
Optimized RLM fetch to read only once from RSM to get the desired batch of messages.
Fix build error
- Add an option to deactivate actual delete in the LocalRemoteStorageManager. Eases testing by allowing simulation of non-strongly consistent storage systems which do not guarantee visibility of a successful delete for subsequent read or list operations.
- Add visitor to traverse locally emulated remote storage to provide support for test assertions.
- Add local remote storage listener to allow tests to be notified of modifications in the storage; - Add snaphost support for the local remote storage.
- Add waiter on local remote storage to allow tests to formulate expectations on asynchronously populated remote storage.
- Set readOffset's initial value to -1 to handle the edge case of a single record for the first segment of a topic-partition.
- Use the index size instead of log segment size to enable the creation of a single-record log segment.
- Added another segment to be offloaded in base integration test. Couple of fixes in test assertions.
- Add a test case for a segment with multiple records.
Handled a boundary scenario RemoteLogManager.read for the target offset.
Added timestamp lookup in remote storage.
…t the complete set of partitions the broker hosted. (#54)

* Fixed leadership changes of partitions in RLMM as updates only but not the complete set of partitions the broker hosted.
- Add S3RemoteStorageManager implementation.
 - Minor fixes in RLMMWithTopicStorage to call configure only once
 - Made requried fields to be volatile to avoid any stale reads being updated/read in different threads.
Merge trunk branch on to tiered storage.
…age test utils.

 - Added consume records and other tests, and refactoring of tiered storage test utils.  
 - Added broker id to the configuration properties of RSM. 
 - Added test for disaster recovery with deletion of broker local storage on leader.
(#52 )

Reviewed-by: satishd@apache.org
…uce-action in tiered storage tests

- Support producing to a log segment already contained records in produce-action.
- Generate basic test reports with one description per test action.
- Add utility to dump files from local tiered storage.
- Fix condition on local broker storage (filter out inactive and non-assigned brokers).

Reviewed-by: satishd@apache.org
…ename to provide it in local tiered storage dumps.

Add the broker ID which offloaded a given fileset to metadata and filename to provide it in local tiered storage dumps.

Reviewd-by: satishd@apache.org
* Support batch size > 1 in tiered storage integration tests.
* Enforce earliest offset in log directory equality rather than lower bound.
* Add all records and offsets to the tiered storage content dump.
* Explicit the records to be found in remote log segments in integration tests.

Reviewed-by: Satish Duggana<satishd@apache.org>
Fix deadlock in remote metadata ConsumerTask.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

7 participants
You can’t perform that action at this time.