-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2170: Enabling writing of both new and old format offset files for stores and side-input-stores #1005
Conversation
This reverts commit 719126a.
…nd side-input-stores
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the late comments.
FileUtil.writeWithChecksum(offsetFile, fileContents); | ||
} else { | ||
offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName, taskMode), OFFSET_FILE_NAME_LEGACY); | ||
FileUtil.writeWithChecksum(offsetFile, offsets.entrySet().iterator().next().getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be good to have some validation here to make sure there is exactly one entry in offsets
?
Or maybe have two different methods: one for side inputs which takes in an offsets
map and one for non-side-inputs which takes in a single offset
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup,
i'll start a separate PR with these changes, and tests
@@ -155,7 +190,15 @@ public static void writeOffsetFile(File storeBaseDir, String storeName, TaskName | |||
* @param taskName the task name which is referencing the store | |||
*/ | |||
public static void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName) { | |||
File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName, TaskMode.Active), OFFSET_FILE_NAME); | |||
deleteOffsetFile(storeBaseDir, storeName, taskName, OFFSET_FILE_NAME_NEW); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to delete the side-inputs offset file too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this is only valid for task-stores
@@ -84,7 +84,7 @@ class TestTaskStorageManager extends MockitoSugar { | |||
val ssp = new SystemStreamPartition(ss, partition) | |||
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) | |||
val storeFile = new File(storeDirectory, "store.sst") | |||
val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do you have a test for only having the old offset file with the old format?
- Do you have a test for writing the old offset file with the old format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, sending for review in a separate pr
@@ -45,7 +45,9 @@ | |||
|
|||
public class StorageManagerUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there tests for this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No its tested from its consumers -- task-storage-manager, and sideinputstorage manager
After Samza 1.1, the offset file for stores and sideinputs has been unified and is versioned.
However, this Jira adds the logic in code to read and write both this new and old format. Because of this apps can switch between 1.0 and 1.1 versions seamlessly.
Note that the old format and filenames for store and side-input offset differed.