Skip to content

Commit

Permalink
Issue #3534: Tag compute resources according to the tool's metadata -…
Browse files Browse the repository at this point in the history
… cleanup
  • Loading branch information
ekazachkova committed May 22, 2024
1 parent 4ebeefe commit a596c9d
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.epam.pipeline.manager.cluster.NodesManager;
import com.epam.pipeline.manager.cluster.cleaner.RunCleaner;
import com.epam.pipeline.manager.cluster.pool.NodePoolManager;
import com.epam.pipeline.manager.metadata.MetadataManager;
import com.epam.pipeline.manager.parallel.ParallelExecutorService;
import com.epam.pipeline.manager.pipeline.PipelineRunManager;
import com.epam.pipeline.manager.pipeline.RunRegionShiftHandler;
Expand Down Expand Up @@ -116,6 +117,7 @@ static class AutoscaleManagerCore {
private final List<RunCleaner> runCleaners;
private final PoolAutoscaler poolAutoscaler;
private final RunRegionShiftHandler runRegionShiftHandler;
private final MetadataManager metadataManager;
private final Set<Long> nodeUpTaskInProgress = ConcurrentHashMap.newKeySet();
private final Map<Long, Integer> nodeUpAttempts = new ConcurrentHashMap<>();
private final Map<Long, Integer> spotNodeUpAttempts = new ConcurrentHashMap<>();
Expand All @@ -136,7 +138,8 @@ static class AutoscaleManagerCore {
final ScaleDownHandler scaleDownHandler,
final List<RunCleaner> runCleaners,
final PoolAutoscaler poolAutoscaler,
final RunRegionShiftHandler runRegionShiftHandler) {
final RunRegionShiftHandler runRegionShiftHandler,
final MetadataManager metadataManager) {
this.pipelineRunManager = pipelineRunManager;
this.executorService = executorService;
this.autoscalerService = autoscalerService;
Expand All @@ -151,6 +154,7 @@ static class AutoscaleManagerCore {
this.runCleaners = runCleaners;
this.poolAutoscaler = poolAutoscaler;
this.runRegionShiftHandler = runRegionShiftHandler;
this.metadataManager = metadataManager;
}

@SchedulerLock(name = "AutoscaleManager_runAutoscaling", lockAtMostForString = "PT10M")
Expand Down Expand Up @@ -608,7 +612,7 @@ public InstanceRequest getNewRunInstance(final PipelineRun run) throws GitClient
instanceRequest.setInstance(instance);
instanceRequest.setRequestedImage(run.getActualDockerImage());
instanceRequest.setRuntimeParameters(buildRuntimeParameters(run));
instanceRequest.setCustomTags(autoscalerService.buildCustomInstanceTags(run));
instanceRequest.setCustomTags(metadataManager.buildCustomInstanceTags(run));
return instanceRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.cluster.pool.RunningInstance;
import com.epam.pipeline.entity.configuration.PipelineConfiguration;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.pipeline.RunInstance;
import io.fabric8.kubernetes.client.KubernetesClient;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -41,5 +39,4 @@ public interface AutoscalerService {
void adjustRunPrices(long runId, List<InstanceDisk> disks);
Optional<NodePool> findPool(String nodeLabel, KubernetesClient client);
void registerDisks(Long runId, RunInstance instance);
Map<String, String> buildCustomInstanceTags(PipelineRun run);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.cluster.pool.RunningInstance;
import com.epam.pipeline.entity.configuration.PipelineConfiguration;
import com.epam.pipeline.entity.metadata.MetadataEntry;
import com.epam.pipeline.entity.metadata.PipeConfValue;
import com.epam.pipeline.entity.pipeline.CommonCustomInstanceTagsTypes;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.pipeline.RunInstance;
import com.epam.pipeline.entity.pipeline.Tool;
import com.epam.pipeline.entity.region.CloudProvider;
import com.epam.pipeline.entity.security.acl.AclClass;
import com.epam.pipeline.manager.cloud.CloudFacade;
import com.epam.pipeline.manager.cluster.KubernetesConstants;
import com.epam.pipeline.manager.cluster.NodeDiskManager;
Expand All @@ -45,7 +40,6 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.SetUtils;
Expand All @@ -54,13 +48,10 @@

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

@RequiredArgsConstructor
@Service
Expand Down Expand Up @@ -212,68 +203,11 @@ public void registerDisks(final Long runId, final RunInstance instance) {
adjustRunPrices(runId, disks);
}

@Override
public Map<String, String> buildCustomInstanceTags(final PipelineRun run) {
final Map<String, String> customTags = resolveCommonCustomInstanceTags(run);

final Tool tool = toolManager.loadByNameOrId(run.getDockerImage());
final MetadataEntry toolMetadata = metadataManager.loadMetadataItem(tool.getId(), AclClass.TOOL);
return resolveInstanceTagsFromMetadata(toolMetadata, customTags);
}

private void registerNodeDisks(long runId, List<InstanceDisk> disks) {
PipelineRun run = runCRUDService.loadRunById(runId);
String nodeId = run.getInstance().getNodeId();
LocalDateTime creationDate = run.getInstanceStartDateTime();
List<DiskRegistrationRequest> requests = DiskRegistrationRequest.from(disks);
nodeDiskManager.register(nodeId, creationDate, requests);
}

private Map<String, String> resolveInstanceTagsFromMetadata(final MetadataEntry metadataEntry,
final Map<String, String> customTags) {
final Map<String, PipeConfValue> metadataData = MapUtils.emptyIfNull(Objects.isNull(metadataEntry)
? null
: metadataEntry.getData());
if (MapUtils.isEmpty(metadataData)) {
return customTags;
}
final Set<String> instanceTagsKeys = preferenceManager.getPreference(
SystemPreferences.CLUSTER_INSTANCE_ALLOWED_CUSTOM_TAGS);
if (CollectionUtils.isEmpty(instanceTagsKeys)) {
return customTags;
}
metadataData.entrySet().stream()
.filter(entry -> instanceTagsKeys.contains(entry.getKey()))
.forEach(entry -> customTags.put(entry.getKey(), entry.getValue().getValue()));
return customTags;
}

private Map<String, String> resolveCommonCustomInstanceTags(final PipelineRun run) {
final Map<String, String> customInstanceTags = new HashMap<>();
final Map<CommonCustomInstanceTagsTypes, String> commonCustomInstanceTags = MapUtils.emptyIfNull(
preferenceManager.getPreference(SystemPreferences.CLUSTER_INSTANCE_TAGS));
commonCustomInstanceTags
.forEach((tagType, tagName) -> fillCommonCustomInstanceTags(tagType, tagName, run, customInstanceTags));
return customInstanceTags;
}

private void fillCommonCustomInstanceTags(final CommonCustomInstanceTagsTypes tagType,
final String tagName, final PipelineRun run,
final Map<String, String> customInstanceTags) {
switch (tagType) {
case tool:
// TODO: check spec symbols for aws resources tags
customInstanceTags.put(tagName, run.getDockerImage());
break;
case run_id:
customInstanceTags.put(tagName, run.getId().toString());
break;
case owner:
customInstanceTags.put(tagName, run.getOwner());
break;
default:
throw new IllegalArgumentException(
String.format("Failed to resolve custom instance type '%s'", tagType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.epam.pipeline.manager.cloud.CloudFacade;
import com.epam.pipeline.manager.cluster.KubernetesConstants;
import com.epam.pipeline.manager.cluster.autoscale.filter.PoolFilterHandler;
import com.epam.pipeline.manager.metadata.MetadataManager;
import com.epam.pipeline.manager.pipeline.PipelineRunManager;
import com.epam.pipeline.utils.CommonUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand Down Expand Up @@ -57,16 +58,19 @@ public class ReassignHandler {
private final CloudFacade cloudFacade;
private final PipelineRunManager pipelineRunManager;
private final Map<PoolInstanceFilterType, PoolFilterHandler> filterHandlers;
private final MetadataManager metadataManager;


public ReassignHandler(final AutoscalerService autoscalerService,
final CloudFacade cloudFacade,
final PipelineRunManager pipelineRunManager,
final List<PoolFilterHandler> filterHandlers) {
final List<PoolFilterHandler> filterHandlers,
final MetadataManager metadataManager) {
this.autoscalerService = autoscalerService;
this.cloudFacade = cloudFacade;
this.pipelineRunManager = pipelineRunManager;
this.filterHandlers = CommonUtils.groupByKey(filterHandlers, PoolFilterHandler::type);
this.metadataManager = metadataManager;
}

public boolean tryReassignNode(final KubernetesClient client,
Expand Down Expand Up @@ -236,6 +240,6 @@ private Map<String, String> findCustomInstanceTags(final Long runId, final Optio
if (!run.isPresent()) {
return new HashMap<>();
}
return autoscalerService.buildCustomInstanceTags(run.get());
return metadataManager.buildCustomInstanceTags(run.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import com.epam.pipeline.entity.metadata.MetadataEntry;
import com.epam.pipeline.entity.metadata.MetadataEntryWithIssuesCount;
import com.epam.pipeline.entity.metadata.PipeConfValue;
import com.epam.pipeline.entity.pipeline.CommonCustomInstanceTagsTypes;
import com.epam.pipeline.entity.pipeline.Folder;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.pipeline.Tool;
import com.epam.pipeline.entity.security.acl.AclClass;
import com.epam.pipeline.manager.EntityManager;
import com.epam.pipeline.manager.metadata.parser.MetadataLineProcessor;
import com.epam.pipeline.manager.pipeline.FolderManager;
import com.epam.pipeline.manager.pipeline.ToolManager;
import com.epam.pipeline.manager.preference.PreferenceManager;
import com.epam.pipeline.manager.preference.SystemPreferences;
import com.epam.pipeline.manager.security.AuthManager;
Expand All @@ -41,7 +44,7 @@
import com.google.common.io.CharStreams;
import com.google.common.io.LineProcessor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand All @@ -57,12 +60,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -99,6 +97,9 @@ public class MetadataManager {
@Autowired
private AuthManager authManager;

@Autowired
private ToolManager toolManager;

@Transactional(propagation = Propagation.REQUIRED)
public MetadataEntry updateMetadataItemKey(MetadataVO metadataVO) {
validateMetadata(metadataVO);
Expand Down Expand Up @@ -343,6 +344,14 @@ public Set<String> getMetadataKeys(final AclClass entityClass) {
return keys;
}

public Map<String, String> buildCustomInstanceTags(final PipelineRun run) {
final Map<String, String> customTags = resolveCommonCustomInstanceTags(run);

final Tool tool = toolManager.loadByNameOrId(run.getDockerImage());
final MetadataEntry toolMetadata = loadMetadataItem(tool.getId(), AclClass.TOOL);
return resolveInstanceTagsFromMetadata(toolMetadata, customTags);
}

Map<String, PipeConfValue> convertFileContentToMetadata(MultipartFile file) {
String delimiter = MetadataParsingUtils.getDelimiterFromFileExtension(file.getOriginalFilename());
try (InputStream content = file.getInputStream()) {
Expand Down Expand Up @@ -402,4 +411,51 @@ private void checkEntityCanBeModified(final Object entity) {
.ifPresent(tool -> Assert.isTrue(tool.isNotSymlink(), messageHelper.getMessage(
MessageConstants.ERROR_TOOL_SYMLINK_MODIFICATION_NOT_SUPPORTED)));
}

private Map<String, String> resolveInstanceTagsFromMetadata(final MetadataEntry metadataEntry,
final Map<String, String> customTags) {
final Map<String, PipeConfValue> metadataData = MapUtils.emptyIfNull(Objects.isNull(metadataEntry)
? null
: metadataEntry.getData());
if (MapUtils.isEmpty(metadataData)) {
return customTags;
}
final Set<String> instanceTagsKeys = preferenceManager.getPreference(
SystemPreferences.CLUSTER_INSTANCE_ALLOWED_CUSTOM_TAGS);
if (CollectionUtils.isEmpty(instanceTagsKeys)) {
return customTags;
}
metadataData.entrySet().stream()
.filter(entry -> instanceTagsKeys.contains(entry.getKey()))
.forEach(entry -> customTags.put(entry.getKey(), entry.getValue().getValue()));
return customTags;
}

private Map<String, String> resolveCommonCustomInstanceTags(final PipelineRun run) {
final Map<String, String> customInstanceTags = new HashMap<>();
final Map<CommonCustomInstanceTagsTypes, String> commonCustomInstanceTags = MapUtils.emptyIfNull(
preferenceManager.getPreference(SystemPreferences.CLUSTER_INSTANCE_TAGS));
commonCustomInstanceTags
.forEach((tagType, tagName) -> fillCommonCustomInstanceTags(tagType, tagName, run, customInstanceTags));
return customInstanceTags;
}

private void fillCommonCustomInstanceTags(final CommonCustomInstanceTagsTypes tagType,
final String tagName, final PipelineRun run,
final Map<String, String> customInstanceTags) {
switch (tagType) {
case tool:
customInstanceTags.put(tagName, run.getDockerImage());
break;
case run_id:
customInstanceTags.put(tagName, run.getId().toString());
break;
case owner:
customInstanceTags.put(tagName, run.getOwner());
break;
default:
throw new IllegalArgumentException(
String.format("Failed to resolve custom instance type '%s'", tagType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.epam.pipeline.manager.cluster.KubernetesManager;
import com.epam.pipeline.manager.cluster.NodesManager;
import com.epam.pipeline.manager.cluster.pool.NodePoolManager;
import com.epam.pipeline.manager.metadata.MetadataManager;
import com.epam.pipeline.manager.parallel.ParallelExecutorService;
import com.epam.pipeline.manager.pipeline.PipelineRunManager;
import com.epam.pipeline.manager.pipeline.RunRegionShiftHandler;
Expand Down Expand Up @@ -105,6 +106,8 @@ public class AutoscaleManagerTest {
private PoolAutoscaler poolAutoscaler;
@Mock
private RunRegionShiftHandler runRegionShiftHandler;
@Mock
private MetadataManager metadataManager;

private AutoscaleManager.AutoscaleManagerCore autoscaleManagerCore;

Expand All @@ -117,7 +120,7 @@ public void setUp() throws Exception {
autoscalerService, nodesManager, kubernetesManager,
preferenceManager, TEST_KUBE_NAMESPACE, cloudFacade,
nodePoolManager, reassignHandler, scaleDownHandler, Collections.emptyList(), poolAutoscaler,
runRegionShiftHandler);
runRegionShiftHandler, metadataManager);
Whitebox.setInternalState(autoscaleManagerCore, "preferenceManager", preferenceManager);

when(executorService.getExecutorService()).thenReturn(new CurrentThreadExecutorService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.epam.pipeline.entity.pipeline.RunInstance;
import com.epam.pipeline.entity.pipeline.run.parameter.PipelineRunParameter;
import com.epam.pipeline.manager.cloud.CloudFacade;
import com.epam.pipeline.manager.metadata.MetadataManager;
import com.epam.pipeline.manager.pipeline.PipelineRunManager;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -42,12 +43,14 @@ public class ReassignHandlerTest {
private final AutoscalerService autoscalerService = mock(AutoscalerService.class);
private final CloudFacade cloudFacade = mock(CloudFacade.class);
private final PipelineRunManager pipelineRunManager = mock(PipelineRunManager.class);
private final MetadataManager metadataManager = mock(MetadataManager.class);

private final ReassignHandler reassignHandler = new ReassignHandler(
autoscalerService,
cloudFacade,
pipelineRunManager,
new ArrayList<>());
new ArrayList<>(),
metadataManager);

@Test
public void shouldNotReassignWithCreateNewNodeParameter() {
Expand Down

0 comments on commit a596c9d

Please sign in to comment.