Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Add call metadata as a job attribute (#1170)
Browse files Browse the repository at this point in the history
* Add call metadata as a job attribute
  • Loading branch information
Amita Ekbote committed Dec 1, 2021
1 parent 84123b7 commit 3e596fd
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public final class JobAttributes {
*/
public static final String JOB_ATTRIBUTES_CREATED_BY = TITUS_ATTRIBUTE_PREFIX + "createdBy";


/**
* Job call reason
*/

public static final String JOB_ATTRIBUTES_CALL_REASON = TITUS_ATTRIBUTE_PREFIX + "callReason";

/**
* Federated stack name. All cells under the same federated stack must share the same value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.netflix.titus.api.jobmanager.model.job.retry.RetryPolicy;
import com.netflix.titus.api.jobmanager.model.job.vpc.SignedIpAddressAllocation;
import com.netflix.titus.api.jobmanager.service.JobManagerException;
import com.netflix.titus.api.model.callmetadata.CallMetadata;
import com.netflix.titus.common.util.CollectionsExt;
import com.netflix.titus.common.util.StringExt;
import com.netflix.titus.common.util.retry.Retryer;
Expand Down Expand Up @@ -511,6 +512,19 @@ public static <E extends JobDescriptorExt> Job<E> changeDisruptionBudget(Job<E>
return input.toBuilder().withJobDescriptor(changeDisruptionBudget(input.getJobDescriptor(), disruptionBudget)).build();
}

public static <E extends JobDescriptorExt> Job<E> appendCallMetadataJobAttributes(Job<E> input, CallMetadata callMetadata) {
// Add call metadata as job attribute
Map<String, String> callMetadataAttribute = new HashMap<>();
String callerId = callMetadata.getCallers().isEmpty()
? "unknown"
: callMetadata.getCallers().get(0).getId();
callMetadataAttribute.put(JobAttributes.JOB_ATTRIBUTES_CREATED_BY, callerId);
callMetadataAttribute.put(JobAttributes.JOB_ATTRIBUTES_CALL_REASON, callMetadata.getCallReason());
JobDescriptor<E> jobDescriptor = input.getJobDescriptor();
Map<String, String> updatedAttributes = CollectionsExt.merge(jobDescriptor.getAttributes(), callMetadataAttribute);
return input.toBuilder().withJobDescriptor(jobDescriptor.toBuilder().withAttributes(updatedAttributes).build()).build();
}

public static <E extends JobDescriptorExt> Job<E> updateJobAttributes(Job<E> input, Map<String, String> attributes) {
JobDescriptor<E> jobDescriptor = input.getJobDescriptor();
Map<String, String> updatedAttributes = CollectionsExt.merge(jobDescriptor.getAttributes(), attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import javax.inject.Named;
import javax.inject.Singleton;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.spectator.api.patterns.PolledMeter;
import com.netflix.titus.api.FeatureActivationConfiguration;
Expand Down Expand Up @@ -233,8 +234,9 @@ public Observable<String> createJob(JobDescriptor<?> jobDescriptor, CallMetadata
? "unknown"
: callMetadata.getCallers().get(0).getId();

JobDescriptor<?> jobDescriptorWithCallerId = JobFunctions.appendJobDescriptorAttribute(jobDescriptor,
JobAttributes.JOB_ATTRIBUTES_CREATED_BY, callerId
JobDescriptor<?> jobDescriptorWithCallerId = JobFunctions.appendJobDescriptorAttributes(jobDescriptor,
ImmutableMap.of(JobAttributes.JOB_ATTRIBUTES_CREATED_BY, callerId,
JobAttributes.JOB_ATTRIBUTES_CALL_REASON, callMetadata.getCallReason())
);

return Observable.fromCallable(() -> jobSubmitLimiter.reserveId(jobDescriptorWithCallerId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package com.netflix.titus.master.jobmanager.service.service.action;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.netflix.titus.api.jobmanager.JobAttributes;
import com.netflix.titus.api.jobmanager.model.job.Capacity;
import com.netflix.titus.api.jobmanager.model.job.CapacityAttributes;
import com.netflix.titus.api.jobmanager.model.job.Job;
import com.netflix.titus.api.jobmanager.model.job.JobDescriptor;
import com.netflix.titus.api.jobmanager.model.job.JobFunctions;
import com.netflix.titus.api.jobmanager.model.job.JobState;
import com.netflix.titus.api.jobmanager.model.job.ServiceJobProcesses;
Expand Down Expand Up @@ -95,9 +99,12 @@ public static TitusChangeAction updateJobCapacityAction(ReconciliationEngine<Job
serviceJob.getJobDescriptor().getExtensions().getServiceJobProcesses()));
}

// append callmetadata job attributes
Job<ServiceJobExt> serviceJobExtCallMetadata = JobFunctions.appendCallMetadataJobAttributes(serviceJob, callMetadata);

// ready to update job capacity
Job<ServiceJobExt> updatedJob = VersionSuppliers.nextVersion(
JobFunctions.changeServiceJobCapacity(serviceJob, newCapacity), versionSupplier
JobFunctions.changeServiceJobCapacity(serviceJobExtCallMetadata, newCapacity), versionSupplier
);
TitusModelAction modelAction = TitusModelAction.newModelUpdate(self).jobUpdate(jobHolder -> jobHolder.setEntity(updatedJob));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void testSubmitSimpleBatchJobWhichEndsOk() {
.inJob(job -> {
assertThat(job.getJobDescriptor().getAttributes().containsKey(JobAttributes.JOB_ATTRIBUTE_ROUTING_CELL));
assertThat(job.getJobDescriptor().getAttributes().containsKey(JobAttributes.JOB_ATTRIBUTES_CREATED_BY));
assertThat(job.getJobDescriptor().getAttributes().containsKey(JobAttributes.JOB_ATTRIBUTES_CALL_REASON));
})
.inStrippedJob(job -> assertThat(job.getJobDescriptor()).isEqualTo(ONE_TASK_BATCH_JOB))
.template(ScenarioTemplates.startTasksInNewJob())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ public JobScenarioBuilder inStrippedJob(Consumer<Job<?>> consumer) {
job.getJobDescriptor().getAttributes().forEach((key, value) -> {
if (!key.startsWith("event.propagation")
&& !key.equals(JobAttributes.JOB_ATTRIBUTES_CREATED_BY)
&& !key.equals(JobAttributes.JOB_ATTRIBUTE_ROUTING_CELL)) {
&& !key.equals(JobAttributes.JOB_ATTRIBUTE_ROUTING_CELL)
&& !key.equals(JobAttributes.JOB_ATTRIBUTES_CALL_REASON)) {
filteredAttributes.put(key, value);
}
});
Expand Down

0 comments on commit 3e596fd

Please sign in to comment.