Skip to content

Commit

Permalink
Add ability to configure multiple ConfigRepoUpdaters to process post MDU
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshp committed Jan 11, 2019
1 parent 37b526a commit b329ab4
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 39 deletions.
Expand Up @@ -214,6 +214,8 @@ public class SystemEnvironment implements Serializable, ConfigDirProvider {

public static GoIntSystemProperty DEPENDENCY_MATERIAL_UPDATE_LISTENERS = new GoIntSystemProperty("dependency.material.check.threads", 3);

public static GoIntSystemProperty CONFIG_MATERIAL_POST_UPDATE_LISTENERS = new GoIntSystemProperty("config.material.post.update.threads", 3);

public static GoSystemProperty<Boolean> OPTIMIZE_FULL_CONFIG_SAVE = new GoBooleanSystemProperty("optimize.full.config.save", true);
public static GoSystemProperty<String> GO_SERVER_MODE = new GoStringSystemProperty("go.server.mode", "production");
public static GoBooleanSystemProperty REAUTHENTICATION_ENABLED = new GoBooleanSystemProperty("go.security.reauthentication.enabled", true);
Expand Down Expand Up @@ -503,6 +505,10 @@ public int getNumberOfDependencyMaterialUpdateListeners() {
return DEPENDENCY_MATERIAL_UPDATE_LISTENERS.getValue();
}

public int getNumberOfConfigMaterialPostUpdateListeners() {
return CONFIG_MATERIAL_POST_UPDATE_LISTENERS.getValue();
}

public String getAgentMd5() {
return getPropertyImpl(GoConstants.AGENT_JAR_MD5, BLANK_STRING);
}
Expand Down
@@ -0,0 +1,67 @@
/*
* Copyright 2019 ThoughtWorks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.thoughtworks.go.server.materials;

import com.thoughtworks.go.config.GoRepoConfigDataSource;
import com.thoughtworks.go.config.materials.SubprocessExecutionContext;
import com.thoughtworks.go.server.persistence.MaterialRepository;
import com.thoughtworks.go.server.service.MaterialService;
import com.thoughtworks.go.util.SystemEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import static java.util.stream.IntStream.range;

@Component
public class ConfigMaterialPostUpdateListenersFactory {
private ConfigMaterialPostUpdateQueue configMaterialPostUpdateQueue;
private final GoRepoConfigDataSource repoConfigDataSource;
private final MaterialRepository materialRepository;
private final MaterialChecker materialChecker;
private final MaterialUpdateCompletedTopic materialUpdateCompletedTopic;
private final MaterialService materialService;
private final SubprocessExecutionContext subprocessExecutionContext;
private SystemEnvironment systemEnvironment;

@Autowired
public ConfigMaterialPostUpdateListenersFactory(SystemEnvironment systemEnvironment,
ConfigMaterialPostUpdateQueue configMaterialPostUpdateQueue,
GoRepoConfigDataSource repoConfigDataSource,
MaterialRepository materialRepository,
MaterialChecker materialChecker,
MaterialUpdateCompletedTopic materialUpdateCompletedTopic,
MaterialService materialService,
SubprocessExecutionContext subprocessExecutionContext) {
this.systemEnvironment = systemEnvironment;
this.configMaterialPostUpdateQueue = configMaterialPostUpdateQueue;
this.repoConfigDataSource = repoConfigDataSource;
this.materialRepository = materialRepository;
this.materialChecker = materialChecker;
this.materialUpdateCompletedTopic = materialUpdateCompletedTopic;
this.materialService = materialService;
this.subprocessExecutionContext = subprocessExecutionContext;
}

public void init() {
int numberOfConfigMaterialPostUpdateListeners = systemEnvironment.getNumberOfConfigMaterialPostUpdateListeners();

range(0, numberOfConfigMaterialPostUpdateListeners).forEach(i ->
this.configMaterialPostUpdateQueue.addListener(new ConfigMaterialUpdater(repoConfigDataSource, materialRepository,
materialChecker, materialUpdateCompletedTopic, materialService, subprocessExecutionContext))
);
}
}
@@ -0,0 +1,32 @@
/*
* Copyright 2019 ThoughtWorks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thoughtworks.go.server.materials;

import com.thoughtworks.go.server.messaging.GoMessageQueue;
import com.thoughtworks.go.server.messaging.MessagingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @understands messages about completed config material updates
*/
@Component
public class ConfigMaterialPostUpdateQueue extends GoMessageQueue<MaterialUpdateCompletedMessage> {
@Autowired
public ConfigMaterialPostUpdateQueue(MessagingService messaging) {
super(messaging, "config-material-update-completed");
}
}
Expand Up @@ -37,35 +37,28 @@
/**
* Updates configuration from repositories.
*/
@Service
public class ConfigMaterialUpdater implements GoMessageListener<MaterialUpdateCompletedMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMaterialUpdater.class);

private GoRepoConfigDataSource repoConfigDataSource;
private MaterialRepository materialRepository;
private MaterialChecker materialChecker;
private ConfigMaterialUpdateCompletedTopic configCompleted;
private MaterialUpdateCompletedTopic topic;
private MaterialService materialService;
private SubprocessExecutionContext subprocessExecutionContext;

@Autowired
public ConfigMaterialUpdater(GoRepoConfigDataSource repoConfigDataSource,
MaterialRepository materialRepository,
MaterialChecker materialChecker,
ConfigMaterialUpdateCompletedTopic configCompletedTopic,
MaterialUpdateCompletedTopic topic,
MaterialService materialService,
SubprocessExecutionContext subprocessExecutionContext) {
this.repoConfigDataSource = repoConfigDataSource;
this.materialChecker = materialChecker;
this.materialRepository = materialRepository;
this.configCompleted = configCompletedTopic;
this.topic = topic;
this.materialService = materialService;
this.subprocessExecutionContext = subprocessExecutionContext;

this.configCompleted.addListener(this);
}

@Override
Expand Down
Expand Up @@ -18,8 +18,8 @@

import com.thoughtworks.go.domain.materials.Material;
import com.thoughtworks.go.server.cronjob.GoDiskSpaceMonitor;
import com.thoughtworks.go.server.messaging.GoMessageChannel;
import com.thoughtworks.go.server.messaging.GoMessageListener;
import com.thoughtworks.go.server.messaging.GoMessageTopic;
import com.thoughtworks.go.server.perf.MDUPerformanceLogger;
import com.thoughtworks.go.server.service.DrainModeService;
import org.slf4j.Logger;
Expand All @@ -33,15 +33,15 @@
public class MaterialUpdateListener implements GoMessageListener<MaterialUpdateMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(MaterialUpdateListener.class);

private final GoMessageTopic<MaterialUpdateCompletedMessage> topic;
private final GoMessageChannel<MaterialUpdateCompletedMessage> channel;
private final MaterialDatabaseUpdater updater;
private final MDUPerformanceLogger mduPerformanceLogger;
private final GoDiskSpaceMonitor diskSpaceMonitor;
private DrainModeService drainModeService;

public MaterialUpdateListener(GoMessageTopic<MaterialUpdateCompletedMessage> topic, MaterialDatabaseUpdater updater,
public MaterialUpdateListener(GoMessageChannel<MaterialUpdateCompletedMessage> channel, MaterialDatabaseUpdater updater,
MDUPerformanceLogger mduPerformanceLogger, GoDiskSpaceMonitor diskSpaceMonitor, DrainModeService drainModeService) {
this.topic = topic;
this.channel = channel;
this.updater = updater;
this.mduPerformanceLogger = mduPerformanceLogger;
this.diskSpaceMonitor = diskSpaceMonitor;
Expand All @@ -53,7 +53,7 @@ public void onMessage(MaterialUpdateMessage message) {

if (drainModeService.isDrainMode()) {
LOGGER.debug("[Drain Mode] GoCD server is in 'drain' mode, skip performing MDU for material {}.", material);
topic.post(new MaterialUpdateSkippedMessage(material, message.trackingId()));
channel.post(new MaterialUpdateSkippedMessage(material, message.trackingId()));
return;
}

Expand All @@ -63,9 +63,9 @@ public void onMessage(MaterialUpdateMessage message) {
bombIf(diskSpaceMonitor.isLowOnDisk(), "Cruise server is too low on disk to continue with material update");
updater.updateMaterial(material);
mduPerformanceLogger.postingMessageAboutMDUCompletion(message.trackingId(), material);
topic.post(new MaterialUpdateSuccessfulMessage(material, message.trackingId())); //This should happen only if the transaction is committed.
channel.post(new MaterialUpdateSuccessfulMessage(material, message.trackingId())); //This should happen only if the transaction is committed.
} catch (Exception e) {
topic.post(new MaterialUpdateFailedMessage(material, message.trackingId(), e));
channel.post(new MaterialUpdateFailedMessage(material, message.trackingId(), e));
mduPerformanceLogger.postingMessageAboutMDUFailure(message.trackingId(), material);
} finally {
drainModeService.mduFinishedForMaterial(material);
Expand Down
Expand Up @@ -17,13 +17,12 @@
package com.thoughtworks.go.server.materials;

import com.thoughtworks.go.server.cronjob.GoDiskSpaceMonitor;
import com.thoughtworks.go.server.messaging.GoMessageChannel;
import com.thoughtworks.go.server.messaging.GoMessageQueue;
import com.thoughtworks.go.server.messaging.GoMessageTopic;
import com.thoughtworks.go.server.perf.MDUPerformanceLogger;
import com.thoughtworks.go.server.persistence.MaterialRepository;
import com.thoughtworks.go.server.service.DrainModeService;
import com.thoughtworks.go.server.service.MaterialExpansionService;
import com.thoughtworks.go.server.service.support.DaemonThreadStatsCollector;
import com.thoughtworks.go.server.transaction.TransactionTemplate;
import com.thoughtworks.go.serverhealth.ServerHealthService;
import com.thoughtworks.go.util.SystemEnvironment;
Expand All @@ -33,13 +32,12 @@
@Component
public class MaterialUpdateListenerFactory {
private MaterialUpdateCompletedTopic topic;
private ConfigMaterialUpdateCompletedTopic configTopic;
private final MaterialRepository materialRepository;
private MaterialUpdateQueue queue;
private ConfigMaterialUpdateQueue configQueue;
private DependencyMaterialUpdateQueue dependencyMaterialQueue;
private final DaemonThreadStatsCollector daemonThreadStatsCollector;
private DrainModeService drainModeService;
private ConfigMaterialPostUpdateQueue configMaterialPostUpdateQueue;
private SystemEnvironment systemEnvironment;
private final ServerHealthService serverHealthService;
private final GoDiskSpaceMonitor diskSpaceMonitor;
Expand All @@ -53,7 +51,6 @@ public class MaterialUpdateListenerFactory {

@Autowired
public MaterialUpdateListenerFactory(MaterialUpdateCompletedTopic topic,
ConfigMaterialUpdateCompletedTopic configTopic,
MaterialUpdateQueue queue,
ConfigMaterialUpdateQueue configQueue,
MaterialRepository materialRepository,
Expand All @@ -68,10 +65,9 @@ public MaterialUpdateListenerFactory(MaterialUpdateCompletedTopic topic,
MaterialExpansionService materialExpansionService,
MDUPerformanceLogger mduPerformanceLogger,
DependencyMaterialUpdateQueue dependencyMaterialQueue,
DaemonThreadStatsCollector daemonThreadStatsCollector,
DrainModeService drainModeService) {
DrainModeService drainModeService,
ConfigMaterialPostUpdateQueue configMaterialPostUpdateQueue) {
this.topic = topic;
this.configTopic = configTopic;
this.queue = queue;
this.configQueue = configQueue;
this.materialRepository = materialRepository;
Expand All @@ -86,11 +82,11 @@ public MaterialUpdateListenerFactory(MaterialUpdateCompletedTopic topic,
this.materialExpansionService = materialExpansionService;
this.mduPerformanceLogger = mduPerformanceLogger;
this.dependencyMaterialQueue = dependencyMaterialQueue;
this.daemonThreadStatsCollector = daemonThreadStatsCollector;
this.drainModeService = drainModeService;
this.configMaterialPostUpdateQueue = configMaterialPostUpdateQueue;
}

public void init(){
public void init() {
int numberOfStandardMaterialListeners = systemEnvironment.getNumberOfMaterialCheckListener();
int numberOfConfigListeners = systemEnvironment.getNumberOfConfigMaterialCheckListener();
int numberOfDependencyMaterialCheckListeners = systemEnvironment.getNumberOfDependencyMaterialUpdateListeners();
Expand All @@ -100,15 +96,15 @@ public void init(){
}

for (int i = 0; i < numberOfConfigListeners; i++) {
createWorker(this.configQueue, this.configTopic);
createWorker(this.configQueue, this.configMaterialPostUpdateQueue);
}

for (int i = 0; i < numberOfDependencyMaterialCheckListeners; i++) {
createWorker(this.dependencyMaterialQueue, this.topic);
}
}

private void createWorker(GoMessageQueue<MaterialUpdateMessage> queue, GoMessageTopic<MaterialUpdateCompletedMessage> topic) {
private void createWorker(GoMessageQueue<MaterialUpdateMessage> queue, GoMessageChannel<MaterialUpdateCompletedMessage> topic) {
MaterialDatabaseUpdater updater = new MaterialDatabaseUpdater(materialRepository, serverHealthService, transactionTemplate, dependencyMaterialUpdater, scmMaterialUpdater,
packageMaterialUpdater, pluggableSCMMaterialUpdater, materialExpansionService);
queue.addListener(new MaterialUpdateListener(topic, updater, mduPerformanceLogger, diskSpaceMonitor, drainModeService));
Expand Down
@@ -0,0 +1,54 @@
/*
* Copyright 2019 ThoughtWorks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.thoughtworks.go.server.materials;

import com.thoughtworks.go.util.SystemEnvironment;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.internal.verification.Times;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class ConfigMaterialPostUpdateListenersFactoryTest {
@Mock
private SystemEnvironment systemEnvironment;

@Mock
ConfigMaterialPostUpdateQueue configMaterialPostUpdateQueue;

@Before
public void setUp() throws Exception {
initMocks(this);
}

@Test
public void shouldCreateCompetingConsumersForSuppliedDependencyMaterialQueue() {
int numberOfConfigMaterialPostUpdateListeners = 3;

when(systemEnvironment.getNumberOfConfigMaterialPostUpdateListeners()).thenReturn(numberOfConfigMaterialPostUpdateListeners);

ConfigMaterialPostUpdateListenersFactory factory = new ConfigMaterialPostUpdateListenersFactory(systemEnvironment, configMaterialPostUpdateQueue,
null, null, null, null, null, null);
factory.init();

verify(configMaterialPostUpdateQueue, new Times(numberOfConfigMaterialPostUpdateListeners)).addListener(any(ConfigMaterialUpdater.class));
}
}
Expand Up @@ -70,10 +70,8 @@ public void SetUp() {

when(materialRepository.findLatestModification(material)).thenReturn(mods);

configUpdater = new ConfigMaterialUpdater(
repoConfigDataSource, materialRepository, materialChecker,
configCompleted, topic, materialService, new TestSubprocessExecutionContext());

configUpdater = new ConfigMaterialUpdater(repoConfigDataSource, materialRepository, materialChecker,
topic, materialService, new TestSubprocessExecutionContext());
}

private MaterialRevisions revisions(Material material, Modification modification) {
Expand Down

0 comments on commit b329ab4

Please sign in to comment.