Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Aug 9, 2024
1 parent 74e2902 commit 42f4242
Showing 1 changed file with 28 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,8 @@ public List<InconsistentGroup> listInconsistentGroups(Set<String> groupNames) {
return inconsistentGroups;
}

private record DatacenterRepositoryHolderSyncRequest<R>(
DatacenterBoundRepositoryHolder<R> primaryHolder,
List<DatacenterBoundRepositoryHolder<R>> replicaHolders
) {
}

private <R> DatacenterRepositoryHolderSyncRequest<R> partition(List<DatacenterBoundRepositoryHolder<R>> repositoryHolders, String primaryDatacenter) {
List<DatacenterBoundRepositoryHolder<R>> replicas = new ArrayList<>();
DatacenterBoundRepositoryHolder<R> primary = null;
for (DatacenterBoundRepositoryHolder<R> repositoryHolder : repositoryHolders) {
if (repositoryHolder.getDatacenterName().equals(primaryDatacenter)) {
primary = repositoryHolder;
} else {
replicas.add(repositoryHolder);
}
}
if (primary == null) {
throw new SynchronizationException("Source of truth datacenter not found: " + primaryDatacenter);
}
return new DatacenterRepositoryHolderSyncRequest<>(primary, replicas);
}

public void syncGroup(String groupName, String sourceOfTruthDatacenter) {
sync(groupRepositories, sourceOfTruthDatacenter,
public void syncGroup(String groupName, String primaryDatacenter) {
sync(groupRepositories, primaryDatacenter,
repo -> repo.groupExists(groupName),
repo -> {
try {
Expand All @@ -153,8 +131,8 @@ public void syncGroup(String groupName, String sourceOfTruthDatacenter) {
);
}

public void syncTopic(TopicName topicName, String sourceOfTruthDatacenter) {
sync(topicRepositories, sourceOfTruthDatacenter,
public void syncTopic(TopicName topicName, String primaryDatacenter) {
sync(topicRepositories, primaryDatacenter,
repo -> repo.topicExists(topicName),
repo -> {
try {
Expand All @@ -169,8 +147,8 @@ public void syncTopic(TopicName topicName, String sourceOfTruthDatacenter) {
);
}

public void syncSubscription(SubscriptionName subscriptionName, String sourceOfTruthDatacenter) {
sync(subscriptionRepositories, sourceOfTruthDatacenter,
public void syncSubscription(SubscriptionName subscriptionName, String primaryDatacenter) {
sync(subscriptionRepositories, primaryDatacenter,
repo -> repo.subscriptionExists(subscriptionName.getTopicName(), subscriptionName.getName()),
repo -> {
try {
Expand Down Expand Up @@ -343,4 +321,26 @@ private <T> T resolveFuture(Future<T> future) {
throw new ConsistencyCheckingException("Fetching metadata failed", e);
}
}

private record DatacenterRepositoryHolderSyncRequest<R>(
DatacenterBoundRepositoryHolder<R> primaryHolder,
List<DatacenterBoundRepositoryHolder<R>> replicaHolders
) {
}

private <R> DatacenterRepositoryHolderSyncRequest<R> partition(List<DatacenterBoundRepositoryHolder<R>> repositoryHolders, String primaryDatacenter) {
List<DatacenterBoundRepositoryHolder<R>> replicas = new ArrayList<>();
DatacenterBoundRepositoryHolder<R> primary = null;
for (DatacenterBoundRepositoryHolder<R> repositoryHolder : repositoryHolders) {
if (repositoryHolder.getDatacenterName().equals(primaryDatacenter)) {
primary = repositoryHolder;
} else {
replicas.add(repositoryHolder);
}
}
if (primary == null) {
throw new SynchronizationException("Source of truth datacenter not found: " + primaryDatacenter);
}
return new DatacenterRepositoryHolderSyncRequest<>(primary, replicas);
}
}

0 comments on commit 42f4242

Please sign in to comment.