Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Add indexes to ManyReconciler (#1198)
Browse files Browse the repository at this point in the history
* Add indexes to ManyReconciler

* Metrics
  • Loading branch information
tbak committed Jan 5, 2022
1 parent e805ea8 commit 4aa4797
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import com.netflix.titus.common.framework.simplereconciler.internal.provider.ActionProviderSelectorFactory;
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.closeable.CloseableReference;
import com.netflix.titus.common.util.collections.index.IndexSet;
import com.netflix.titus.common.util.collections.index.IndexSetHolderBasic;
import com.netflix.titus.common.util.collections.index.IndexSetHolderConcurrent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand All @@ -54,6 +57,8 @@ public interface ManyReconciler<DATA> {

Map<String, DATA> getAll();

IndexSet<String, DATA> getIndexSet();

Optional<DATA> findById(String id);

int size();
Expand Down Expand Up @@ -81,6 +86,7 @@ class Builder<DATA> {
private Scheduler notificationScheduler;
private TitusRuntime titusRuntime;
private int shardCount;
private IndexSet<String, DATA> indexes;

private Builder() {
}
Expand Down Expand Up @@ -148,6 +154,11 @@ public Builder<DATA> withNotificationScheduler(Scheduler notificationScheduler)
return this;
}

public Builder<DATA> withIndexes(IndexSet<String, DATA> indexes) {
this.indexes = indexes;
return this;
}

public Builder<DATA> withTitusRuntime(TitusRuntime titusRuntime) {
this.titusRuntime = titusRuntime;
return this;
Expand Down Expand Up @@ -175,6 +186,7 @@ private ManyReconciler<DATA> buildDefaultManyReconciler() {
buildActionProviderSelectorFactory(),
reconcilerSchedulerRef,
notificationSchedulerRef,
new IndexSetHolderBasic<>(indexes),
titusRuntime
);
}
Expand All @@ -201,6 +213,7 @@ private ManyReconciler<DATA> buildShardedManyReconciler() {
buildActionProviderSelectorFactory(),
reconcilerSchedulerSupplier,
notificationSchedulerRef,
new IndexSetHolderConcurrent<>(indexes),
titusRuntime
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.Evaluators;
import com.netflix.titus.common.util.closeable.CloseableReference;
import com.netflix.titus.common.util.collections.index.IndexSet;
import com.netflix.titus.common.util.collections.index.IndexSetHolder;
import com.netflix.titus.common.util.time.Clock;
import com.netflix.titus.common.util.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -62,6 +64,7 @@ public class DefaultManyReconciler<DATA> implements ManyReconciler<DATA> {
private final long longCycleMs;
private final CloseableReference<Scheduler> reconcilerSchedulerRef;
private final CloseableReference<Scheduler> notificationSchedulerRef;
private final IndexSetHolder<String, DATA> indexSetHolder;
private final EventDistributor<DATA> eventDistributor;
private final Clock clock;
private final TitusRuntime titusRuntime;
Expand All @@ -87,12 +90,14 @@ public DefaultManyReconciler(
ActionProviderSelectorFactory<DATA> selectorFactory,
CloseableReference<Scheduler> reconcilerSchedulerRef,
CloseableReference<Scheduler> notificationSchedulerRef,
IndexSetHolder<String, DATA> indexSetHolder,
TitusRuntime titusRuntime) {
this.quickCycleMs = quickCycle.toMillis();
this.longCycleMs = longCycle.toMillis();
this.selectorFactory = selectorFactory;
this.reconcilerSchedulerRef = reconcilerSchedulerRef;
this.notificationSchedulerRef = notificationSchedulerRef;
this.indexSetHolder = indexSetHolder;
this.eventDistributor = new EventDistributor<>(this::buildSnapshot, titusRuntime.getRegistry());
this.clock = titusRuntime.getClock();
this.titusRuntime = titusRuntime;
Expand Down Expand Up @@ -161,6 +166,11 @@ public Map<String, DATA> getAll() {
return all;
}

@Override
public IndexSet<String, DATA> getIndexSet() {
return indexSetHolder.getIndexSet();
}

@Override
public Optional<DATA> findById(String id) {
return Optional.ofNullable(Evaluators.applyNotNull(executors.get(id), ReconcilerEngine::getCurrent));
Expand Down Expand Up @@ -293,7 +303,11 @@ private List<SimpleReconcilerEvent<DATA>> buildSnapshot() {

private void doProcess(boolean fullReconciliationCycle) {
// Data update
executors.forEach((id, executor) -> executor.processDataUpdates());
executors.forEach((id, executor) -> {
if (executor.processDataUpdates()) {
indexSetHolder.add(Collections.singletonList(executor.getCurrent()));
}
});

// Complete subscribers
Set<String> justChanged = new HashSet<>();
Expand Down Expand Up @@ -328,6 +342,7 @@ private void doProcess(boolean fullReconciliationCycle) {
Optional<Runnable> runnable = executor.tryToClose();
if (executor.getState() == ReconcilerState.Closed) {
it.remove();
indexSetHolder.remove(Collections.singletonList(executor.getId()));
metrics.remove(executor.getId());

// This is the very last action, so we can take safely the next transaction id without progressing it.
Expand All @@ -344,6 +359,7 @@ private void doProcess(boolean fullReconciliationCycle) {
for (AddHolder holder; (holder = addHolders.poll()) != null; ) {
ReconcilerEngine<DATA> executor = holder.getExecutor();
executors.put(holder.getId(), executor);
indexSetHolder.add(Collections.singletonList(executor.getCurrent()));

// We set transaction id "0" for the newly added executors.
eventDistributor.addEvents(Collections.singletonList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,21 @@ void close() {
Evaluators.acceptNotNull(pendingTransaction, Transaction::cancel);
}

void processDataUpdates() {
boolean processDataUpdates() {
if (pendingTransaction != null) {
if (pendingTransaction.getStatus().getState() == TransactionStatus.State.ResultReady) {
pendingTransaction.readyToClose(current);
if (pendingTransaction.getStatus().getState() == TransactionStatus.State.Completed) {
this.current = pendingTransaction.getStatus().getResult();
return true;
} else {
Throwable error = pendingTransaction.getStatus().getError();
logger.warn("Reconciliation action failure during data merging: status={}, error={}", pendingTransaction.getStatus(), error.getMessage());
logger.debug("Stack trace", error);
}
}
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.netflix.titus.common.framework.simplereconciler.internal.provider.ActionProviderSelectorFactory;
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.closeable.CloseableReference;
import com.netflix.titus.common.util.collections.index.IndexSet;
import com.netflix.titus.common.util.collections.index.IndexSetHolder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand All @@ -44,15 +46,18 @@ public class ShardedManyReconciler<DATA> implements ManyReconciler<DATA> {

private final Function<String, Integer> shardIndexSupplier;
private final CloseableReference<Scheduler> notificationSchedulerRef;
private final IndexSetHolder<String, DATA> indexSetHolder;
private final List<ManyReconciler<DATA>> shards;
private final AtomicReference<ReconcilerState> stateRef = new AtomicReference<>(ReconcilerState.Running);

public ShardedManyReconciler(int shardCount,
Function<String, Integer> shardIndexSupplier,
Function<Integer, ManyReconciler<DATA>> reconcilerShardFactory,
CloseableReference<Scheduler> notificationSchedulerRef) {
CloseableReference<Scheduler> notificationSchedulerRef,
IndexSetHolder<String, DATA> indexSetHolder) {
this.shardIndexSupplier = shardIndexSupplier;
this.notificationSchedulerRef = notificationSchedulerRef;
this.indexSetHolder = indexSetHolder;
List<ManyReconciler<DATA>> shards = new ArrayList<>();
for (int i = 0; i < shardCount; i++) {
shards.add(reconcilerShardFactory.apply(i));
Expand Down Expand Up @@ -106,6 +111,11 @@ public Map<String, DATA> getAll() {
return result;
}

@Override
public IndexSet<String, DATA> getIndexSet() {
return indexSetHolder.getIndexSet();
}

@Override
public Optional<DATA> findById(String id) {
return getShard(id).findById(id);
Expand Down Expand Up @@ -158,6 +168,7 @@ public static <DATA> ManyReconciler<DATA> newSharedDefaultManyReconciler(String
ActionProviderSelectorFactory<DATA> selectorFactory,
Function<Integer, CloseableReference<Scheduler>> reconcilerSchedulerSupplier,
CloseableReference<Scheduler> notificationSchedulerRef,
IndexSetHolder<String, DATA> indexSetHolder,
TitusRuntime titusRuntime) {
return new ShardedManyReconciler<>(
shardCount,
Expand All @@ -169,9 +180,11 @@ public static <DATA> ManyReconciler<DATA> newSharedDefaultManyReconciler(String
selectorFactory,
reconcilerSchedulerSupplier.apply(shardIndex),
notificationSchedulerRef,
indexSetHolder,
titusRuntime
),
notificationSchedulerRef
notificationSchedulerRef,
indexSetHolder
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,48 @@
import java.util.HashMap;
import java.util.Map;

class DefaultIndexSet<PRIMARY_KEY, INPUT, OUTPUT> implements IndexSet<PRIMARY_KEY, INPUT, OUTPUT> {
class DefaultIndexSet<PRIMARY_KEY, INPUT> implements IndexSet<PRIMARY_KEY, INPUT> {

private final Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, OUTPUT>> groups;
private final Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, OUTPUT>> indexes;
private final Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, ?>> groups;
private final Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, ?>> indexes;

DefaultIndexSet(Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, OUTPUT>> groups,
Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, OUTPUT>> indexes) {
DefaultIndexSet(Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, ?>> groups,
Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, ?>> indexes) {
this.groups = groups;
this.indexes = indexes;
}

@Override
public <INDEX_KEY> Group<INDEX_KEY, PRIMARY_KEY, OUTPUT> getGroup(String groupId) {
public Map<String, Group<?, PRIMARY_KEY, ?>> getGroups() {
return (Map) groups;
}

@Override
public <INDEX_KEY, OUTPUT> Group<INDEX_KEY, PRIMARY_KEY, OUTPUT> getGroup(String groupId) {
return (Group<INDEX_KEY, PRIMARY_KEY, OUTPUT>) groups.get(groupId);
}

@Override
public Index<OUTPUT> getIndex(String indexId) {
return indexes.get(indexId);
public Map<String, Index<?>> getIndexes() {
return (Map) indexes;
}

@Override
public <OUTPUT> Index<OUTPUT> getIndex(String indexId) {
return (Index<OUTPUT>) indexes.get(indexId);
}

@Override
public IndexSet<PRIMARY_KEY, INPUT, OUTPUT> add(Collection<INPUT> values) {
Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, OUTPUT>> newGroups;
public IndexSet<PRIMARY_KEY, INPUT> add(Collection<INPUT> values) {
Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, ?>> newGroups;
if (groups.isEmpty()) {
newGroups = Collections.emptyMap();
} else {
newGroups = new HashMap<>();
groups.forEach((groupId, group) -> newGroups.put(groupId, group.add(values)));
}

Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, OUTPUT>> newIndexes;
Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, ?>> newIndexes;
if (indexes.isEmpty()) {
newIndexes = Collections.emptyMap();
} else {
Expand All @@ -64,16 +74,16 @@ public IndexSet<PRIMARY_KEY, INPUT, OUTPUT> add(Collection<INPUT> values) {
}

@Override
public IndexSet<PRIMARY_KEY, INPUT, OUTPUT> remove(Collection<PRIMARY_KEY> primaryKeys) {
Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, OUTPUT>> newGroups;
public IndexSet<PRIMARY_KEY, INPUT> remove(Collection<PRIMARY_KEY> primaryKeys) {
Map<String, DefaultGroup<?, PRIMARY_KEY, INPUT, ?>> newGroups;
if (groups.isEmpty()) {
newGroups = Collections.emptyMap();
} else {
newGroups = new HashMap<>();
groups.forEach((groupId, group) -> newGroups.put(groupId, group.remove(primaryKeys)));
}

Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, OUTPUT>> newIndexes;
Map<String, DefaultIndex<?, PRIMARY_KEY, INPUT, ?>> newIndexes;
if (indexes.isEmpty()) {
newIndexes = Collections.emptyMap();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

/**
* Grouping of a collection values by an arbitrary grouping key.
* {@link Group} object is immutable. Changes to a group produce new copies of it.
*/
public interface Group<GROUP_KEY, PRIMARY_KEY, VALUE> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

/**
* Ordered list of values. The ordering is set by a pair of an arbitrary index key and the primary key.
* {@link Index} object is immutable. Changes to an index produce new copies of it.
*/
public interface Index<VALUE> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
package com.netflix.titus.common.util.collections.index;

import java.util.Collection;
import java.util.Map;

public interface IndexSet<PRIMARY_KEY, INPUT, OUTPUT> {
public interface IndexSet<PRIMARY_KEY, INPUT> {

<GROUP_KEY> Group<GROUP_KEY, PRIMARY_KEY, OUTPUT> getGroup(String groupId);
Map<String, Group<?, PRIMARY_KEY, ?>> getGroups();

Index<OUTPUT> getIndex(String indexId);
<GROUP_KEY, OUTPUT> Group<GROUP_KEY, PRIMARY_KEY, OUTPUT> getGroup(String groupId);

IndexSet<PRIMARY_KEY, INPUT, OUTPUT> add(Collection<INPUT> values);
Map<String, Index<?>> getIndexes();

IndexSet<PRIMARY_KEY, INPUT, OUTPUT> remove(Collection<PRIMARY_KEY> values);
<OUTPUT> Index<OUTPUT> getIndex(String indexId);

IndexSet<PRIMARY_KEY, INPUT> add(Collection<INPUT> values);

IndexSet<PRIMARY_KEY, INPUT> remove(Collection<PRIMARY_KEY> values);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.titus.common.util.collections.index;

import java.util.Collection;

/**
* {@link IndexSet} is immutable. {@link IndexSetHolder} provides a wrapper that keep the last version of the index.
*/
public interface IndexSetHolder<PRIMARY_KEY, INPUT> {

IndexSet<PRIMARY_KEY, INPUT> getIndexSet();

void add(Collection<INPUT> values);

void remove(Collection<PRIMARY_KEY> values);
}
Loading

0 comments on commit 4aa4797

Please sign in to comment.