Skip to content

Commit

Permalink
refactor(engine): apply MT migration in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
npepinpe committed Jan 30, 2024
1 parent 079596c commit d6a6aa3
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ public void runMigrations() {
final var executedMigrations = new ArrayList<MigrationTask>();
for (int index = 1; index <= migrationTasks.size(); index++) {
// one based index looks nicer in logs

final var migration = migrationTasks.get(index - 1);

final var executed = handleMigrationTask(migration, index, migrationTasks.size());
if (executed) {
executedMigrations.add(migration);
Expand Down Expand Up @@ -109,10 +107,7 @@ private void logSummary(final List<MigrationTask> migrationTasks) {
private boolean handleMigrationTask(
final MigrationTask migrationTask, final int index, final int total) {
if (migrationTask.needsToRun(processingState)) {
try {
runMigration(migrationTask, index, total);
} finally {
}
runMigration(migrationTask, index, total);
return true;
} else {
logMigrationSkipped(migrationTask, index, total);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.camunda.zeebe.db.impl.DbString;
import io.camunda.zeebe.db.impl.DbTenantAwareKey;
import io.camunda.zeebe.db.impl.DbTenantAwareKey.PlacementType;
import io.camunda.zeebe.engine.state.migration.MemoryBoundedColumnIteration;
import io.camunda.zeebe.engine.state.migration.to_8_3.legacy.LegacyJobState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.record.value.TenantOwned;
Expand All @@ -33,21 +34,21 @@ public DbJobMigrationState(
}

public void migrateJobStateForMultiTenancy() {
final var iterator = new MemoryBoundedColumnIteration();
// setting the tenant id key once, because it's the same for all steps below
to.tenantIdKey.wrapString(TenantOwned.DEFAULT_TENANT_IDENTIFIER);

/*
`DEPRECATED_JOB_ACTIVATABLE` -> `JOB_ACTIVATABLE`
- Suffix tenant to key
*/
from.getActivatableColumnFamily()
.forEach(
(key, value) -> {
to.jobTypeKey.wrapString(key.first().toString());
to.fkJob.inner().wrapLong(key.second().inner().getValue());
to.activatableColumnFamily.insert(to.tenantAwareTypeJobKey, DbNil.INSTANCE);
from.getActivatableColumnFamily().deleteExisting(key);
});
iterator.drain(
from.getActivatableColumnFamily(),
(key, value) -> {
to.jobTypeKey.wrapString(key.first().toString());
to.fkJob.inner().wrapLong(key.second().inner().getValue());
to.activatableColumnFamily.insert(to.tenantAwareTypeJobKey, DbNil.INSTANCE);
});
}

private static final class DbJobState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.camunda.zeebe.db.impl.DbTenantAwareKey.PlacementType;
import io.camunda.zeebe.engine.state.message.DbMessageStartEventSubscriptionState;
import io.camunda.zeebe.engine.state.message.MessageStartEventSubscription;
import io.camunda.zeebe.engine.state.migration.MemoryBoundedColumnIteration;
import io.camunda.zeebe.engine.state.migration.to_8_3.legacy.LegacyMessageStartEventSubscriptionState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.record.value.TenantOwned;
Expand All @@ -34,6 +35,7 @@ public DbMessageStartEventSubscriptionMigrationState(
}

public void migrateMessageStartEventSubscriptionForMultiTenancy() {
final var iterator = new MemoryBoundedColumnIteration();
// setting the tenant id key once, because it's the same for all steps below
to.tenantIdKey.wrapString(TenantOwned.DEFAULT_TENANT_IDENTIFIER);

Expand All @@ -42,29 +44,28 @@ public void migrateMessageStartEventSubscriptionForMultiTenancy() {
- Prefix first part of composite key with tenant
- Set tenant on value
*/
from.getSubscriptionsColumnFamily()
.forEach(
(key, value) -> {
value.getRecord().setTenantId(TenantOwned.DEFAULT_TENANT_IDENTIFIER);
to.messageName.wrapBuffer(key.first().getBuffer());
to.processDefinitionKey.wrapLong(key.second().getValue());
to.subscriptionsColumnFamily.insert(to.messageNameAndProcessDefinitionKey, value);
from.getSubscriptionsColumnFamily().deleteExisting(key);
});
iterator.drain(
from.getSubscriptionsColumnFamily(),
(key, value) -> {
value.getRecord().setTenantId(TenantOwned.DEFAULT_TENANT_IDENTIFIER);
to.messageName.wrapBuffer(key.first().getBuffer());
to.processDefinitionKey.wrapLong(key.second().getValue());
to.subscriptionsColumnFamily.insert(to.messageNameAndProcessDefinitionKey, value);
});

/*
- `DEPRECATED_MESSAGE_START_EVENT_SUBSCRIPTION_BY_KEY_AND_NAME` -> `MESSAGE_START_EVENT_SUBSCRIPTION_BY_KEY_AND_NAME`
- Prefix second part of composite key with tenant
*/
from.getSubscriptionsOfProcessDefinitionKeyColumnFamily()
.forEach(
(key, value) -> {
to.processDefinitionKey.wrapLong(key.first().getValue());
to.messageName.wrapBuffer(key.second().getBuffer());
to.subscriptionsOfProcessDefinitionKeyColumnFamily.insert(
to.processDefinitionKeyAndMessageName, DbNil.INSTANCE);
from.getSubscriptionsOfProcessDefinitionKeyColumnFamily().deleteExisting(key);
});
iterator.drain(
from.getSubscriptionsOfProcessDefinitionKeyColumnFamily(),
(key, value) -> {
to.processDefinitionKey.wrapLong(key.first().getValue());
to.messageName.wrapBuffer(key.second().getBuffer());
to.subscriptionsOfProcessDefinitionKeyColumnFamily.insert(
to.processDefinitionKeyAndMessageName, DbNil.INSTANCE);
from.getSubscriptionsOfProcessDefinitionKeyColumnFamily().deleteExisting(key);
});
}

private static final class DbMessageStartEventSubscriptionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.camunda.zeebe.db.impl.DbTenantAwareKey;
import io.camunda.zeebe.db.impl.DbTenantAwareKey.PlacementType;
import io.camunda.zeebe.engine.state.message.MessageSubscription;
import io.camunda.zeebe.engine.state.migration.MemoryBoundedColumnIteration;
import io.camunda.zeebe.engine.state.migration.to_8_3.legacy.LegacyMessageSubscriptionState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.record.value.TenantOwned;
Expand All @@ -33,23 +34,23 @@ public DbMessageSubscriptionMigrationState(
}

public void migrateMessageSubscriptionForMultiTenancy() {
final var iterator = new MemoryBoundedColumnIteration();
// setting the tenant id key once, because it's the same for all steps below
to.tenantIdKey.wrapString(TenantOwned.DEFAULT_TENANT_IDENTIFIER);

/*
- `DEPRECATED_MESSAGE_SUBSCRIPTION_BY_NAME_AND_CORRELATION_KEY` -> `MESSAGE_SUBSCRIPTION_BY_NAME_AND_CORRELATION_KEY`
- Prefix first part of composite key with tenant
*/
from.getMessageNameAndCorrelationKeyColumnFamily()
.forEach(
(key, value) -> {
to.messageName.wrapBuffer(key.first().first().getBuffer());
to.correlationKey.wrapBuffer(key.first().second().getBuffer());
to.elementInstanceKey.wrapLong(key.second().getValue());
to.messageNameAndCorrelationKeyColumnFamily.insert(
to.tenantAwareNameCorrelationAndElementInstanceKey, DbNil.INSTANCE);
from.getMessageNameAndCorrelationKeyColumnFamily().deleteExisting(key);
});
iterator.drain(
from.getMessageNameAndCorrelationKeyColumnFamily(),
(key, value) -> {
to.messageName.wrapBuffer(key.first().first().getBuffer());
to.correlationKey.wrapBuffer(key.first().second().getBuffer());
to.elementInstanceKey.wrapLong(key.second().getValue());
to.messageNameAndCorrelationKeyColumnFamily.insert(
to.tenantAwareNameCorrelationAndElementInstanceKey, DbNil.INSTANCE);
});
}

private static final class DbMessageSubscriptionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.camunda.zeebe.db.impl.DbTenantAwareKey;
import io.camunda.zeebe.db.impl.DbTenantAwareKey.PlacementType;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.migration.MemoryBoundedColumnIteration;
import io.camunda.zeebe.engine.state.migration.to_8_3.legacy.LegacyProcessMessageSubscriptionState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.record.value.TenantOwned;
Expand All @@ -32,6 +33,7 @@ public DbProcessMessageSubscriptionMigrationState(
}

public void migrateProcessMessageSubscriptionForMultiTenancy() {
final var iterator = new MemoryBoundedColumnIteration();
// setting the tenant id key once, because it's the same for all steps below
to.tenantIdKey.wrapString(TenantOwned.DEFAULT_TENANT_IDENTIFIER);

Expand All @@ -40,15 +42,14 @@ public void migrateProcessMessageSubscriptionForMultiTenancy() {
- Prefix second part of composite key with tenant
- Set tenant on value
*/
from.getSubscriptionColumnFamily()
.forEach(
(key, value) -> {
to.elementInstanceKey.wrapLong(key.first().getValue());
to.messageName.wrapBuffer(key.second().getBuffer());
value.getRecord().setTenantId(TenantOwned.DEFAULT_TENANT_IDENTIFIER);
to.subscriptionColumnFamily.insert(to.elementKeyAndMessageName, value);
from.getSubscriptionColumnFamily().deleteExisting(key);
});
iterator.drain(
from.getSubscriptionColumnFamily(),
(key, value) -> {
to.elementInstanceKey.wrapLong(key.first().getValue());
to.messageName.wrapBuffer(key.second().getBuffer());
value.getRecord().setTenantId(TenantOwned.DEFAULT_TENANT_IDENTIFIER);
to.subscriptionColumnFamily.insert(to.elementKeyAndMessageName, value);
});
}

private static final class DbProcessMessageSubscriptionState {
Expand Down

0 comments on commit d6a6aa3

Please sign in to comment.