SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit#1490
Conversation
dxichen
commented
Apr 16, 2021
- Introduce new state backend APIs for blobstore and kafka changelog
- Change the task commit lifecycle to separate snapshot, upload and cleanup phases
- Make the TaskInstance commit upload and cleanup phases nonblocking
|
Please disregard the Checkpoint v2 migration commit since it is part of #1489 |
23fdb14 to
b03306b
Compare
There was a problem hiding this comment.
[P1] Can you elaborate why job and container model are passed down here? Seems like unused parameters in this PR at the least.
There was a problem hiding this comment.
jobmodel is used for potential forwards compatibility, and container model is used for restores, so I wanted to keep it symmetrical.
samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
Show resolved
Hide resolved
| TaskRestoreManager getRestoreManager(JobContext jobContext, | ||
| ContainerContext containerContext, | ||
| TaskModel taskModel, |
There was a problem hiding this comment.
[P1] Seems inconsistent with the above. Maybe consider passing the same context as part of the signature of getBackupManager too so that access to JobModel and ContainerModel are all through the context variables which makes access pattern consistent in the code base and helps with evolution.
That said, is the TaskContext available before instantiation so that TaskModel is also accessed through the context?
There was a problem hiding this comment.
will change the backup to context passing
samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
Show resolved
Hide resolved
| stateBackendToBackupManager.values() | ||
| .forEach(storageBackupManager -> storageBackupManager.init(null)); |
There was a problem hiding this comment.
[P2] Can we use a sentinel checkpoint instead of null?
There was a problem hiding this comment.
will address this as a follow up
| CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor, | ||
| StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) { | ||
| this.taskName = taskName; | ||
| this.containerStorageManager = containerStorageManager; |
There was a problem hiding this comment.
[P1] Can we persist the StorageEngines here instead of getting a handle of ContainerStorageManager?
There was a problem hiding this comment.
Unfortunately storageEngines are not created at this point, it is created after init is called, that is the reason we are handling containerStorageManager
| } | ||
|
|
||
| public void init() { | ||
| // Assuming that container storage manager has already started and created to stores |
There was a problem hiding this comment.
Is there a way to enforce or validate this assumption? Refer to the above comment on moving this to constructor. Is it here because the data may not be available during construction?
There was a problem hiding this comment.
Yes the data is not avail during construction
| storageEngines.forEach((storeName, storageEngine) -> { | ||
| if (storageEngine.getStoreProperties().isPersistedToDisk() && | ||
| storageEngine.getStoreProperties().isDurableStore()) { | ||
| storageEngine.checkpoint(checkpointId); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Is isDurableStore() equivalent to isLoggedStore()? We used to checkpoint for persisted and logged store. Making sure this is just a rename?
There was a problem hiding this comment.
isDurable is a superset of isLogged, isLogged is specifically for kafka durability but isDurable means it is durable either blob store or kafka changelog
| // for each configured state backend factory, backup the state for all stores in this task. | ||
| stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> { | ||
| Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId); | ||
| LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}", | ||
| taskName, checkpointId, stateBackendFactoryName, snapshotSCMs); | ||
| stateBackendToStoreSCMs.put(stateBackendFactoryName, snapshotSCMs); | ||
| }); |
There was a problem hiding this comment.
Do we need to backup the state for all stores or the stores within each of the backup factories satisfy the above requirement for checkpointing (persisted & durable)?
Looking at the Kafka implementations, storeChangelogs are the ones that are iterated for fetching the snapshot but the above criteria needs both durable & persisted.
Checking if changelog enabled stores are persisted to disk by default and what the behavior is otherwise?
There was a problem hiding this comment.
We need to backup all the stores that are durable (or logged) and write as a checkpoint (to file) all the state that are persisted. Similarly, changelog enabled stores are persisted to disk only if they are persisted (ie non inmem), which should perserve the existing behavior.
b03306b to
0f7be25
Compare
3008a98 to
cef0f1e
Compare
…mit (apache#1490) Introduce new state backend APIs for blobstore and kafka changelog Change the task commit lifecycle to separate snapshot, upload and cleanup phases Make the TaskInstance commit upload and cleanup phases nonblocking