Skip to content
Permalink
Browse files
[NO ISSUE][REP] Send correct component ID on bulkload
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- When a secondary index is created, send the correct component id
  to the replica.
- Update the component id on replicas when receiving a bulkloaded
  component.

Change-Id: I5575cf9a61477636efc7e1291189a59e0a5266ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14103
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Nov 16, 2021
1 parent fbdb710 commit 0129fdd97e24e480e395ec570f73ca45f9344ee9
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 9 deletions.
@@ -169,11 +169,12 @@ public synchronized void setLastComponentId(long componentId) throws HyracksData
}

@Override
public synchronized void advanceValidComponentSequence(long componentSequence) throws HyracksDataException {
public synchronized void advanceValidComponent(long componentSequence, long componentId)
throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
if (componentSequence > latest.getValidComponentSequence()) {
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence,
latest.getLastComponentId(), null);
final IndexCheckpoint next =
IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence, componentId, null);
persist(next);
}
}
@@ -189,7 +189,7 @@ public void deleteMaskedMergedFiles() throws Exception {
DatasetResourceReference drr = DatasetResourceReference.of(localResource);
IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
indexCheckpointManager.advanceValidComponentSequence(validComponentSequence);
indexCheckpointManager.advanceValidComponent(validComponentSequence, 1);
// create components to be merged
String btree = "_b";
String filter = "_f";
@@ -125,12 +125,13 @@ void replicated(long componentSequence, long masterLsn, long componentId, String
IndexCheckpoint getLatest() throws HyracksDataException;

/**
* Advance the last valid component sequence. Used for replicated bulkloaded components
* Advance the last valid component. Used for replicated bulkloaded components
*
* @param componentSequence
* @param componentId
* @throws HyracksDataException
*/
void advanceValidComponentSequence(long componentSequence) throws HyracksDataException;
void advanceValidComponent(long componentSequence, long componentId) throws HyracksDataException;

/**
* Set the last component id. Used during recovery or after component delete
@@ -77,7 +77,7 @@ private void updateBulkLoadedLastComponentSequence(INcApplicationContext appCtx)
final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
indexCheckpointManager.advanceValidComponentSequence(componentSequence);
indexCheckpointManager.advanceValidComponent(componentSequence, lastComponentId);
}

private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
@@ -141,11 +141,12 @@ private long getReplicatedComponentLsn() throws HyracksDataException {

private long getReplicatedComponentId() throws HyracksDataException {
final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH
&& indexReplJob.getLSMOpType() != LSMOperationType.LOAD) {
return -1L;
}
final ILSMIndexOperationContext ctx = indexReplJob.getLSMIndexOperationContext();
LSMComponentId id = (LSMComponentId) ctx.getComponentsToBeReplicated().get(0).getId();
return id.getMinId();
return id.getMaxId();
}
}

0 comments on commit 0129fdd

Please sign in to comment.