Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Introduce structs to enable specifying custom SLA.
Add `SlaPolicy` and `HostMaintenanceRequest` structs
to the thrift definition and introduce a new `HostMaintenanceStore`
for tracking maintenance requests. These changes will be used in
https://reviews.apache.org/r/66716 for implementing custom SLA
and scheduler driven maintenance.

This RB splits the storage related changes from https://reviews.apache.org/r/66716
for better rollback story.

Tested rollback on the vagrant.

Testing Done:
./build-support/jenkins/build.sh

Bugs closed: AURORA-1977

Reviewed at https://reviews.apache.org/r/67141/
  • Loading branch information
shanmugh committed May 21, 2018
1 parent 83025f4 commit 34be631589ebf899e663b698dc76511eb1b9ad8a
Showing 33 changed files with 828 additions and 10 deletions.
@@ -245,6 +245,37 @@ struct PartitionPolicy {
2: optional i64 delaySecs
}

/** SLA requirements expressed as the percentage of instances to be RUNNING every durationSecs */
struct PercentageSlaPolicy {
/* The percentage of active instances required every `durationSecs`. */
1: double percentage
/** Minimum time duration a task needs to be `RUNNING` to be treated as active */
2: i64 durationSecs
}

/** SLA requirements expressed as the number of instances to be RUNNING every durationSecs */
struct CountSlaPolicy {
/** The number of active instances required every `durationSecs` */
1: i64 count
/** Minimum time duration a task needs to be `RUNNING` to be treated as active */
2: i64 durationSecs
}

/** SLA requirements to be delegated to an external coordinator */
struct CoordinatorSlaPolicy {
/** URL for the coordinator service that needs to be contacted for SLA checks */
1: string coordinatorUrl
/** Field in the Coordinator response json indicating if the action is allowed or not */
2: string statusKey
}

/** SLA requirements expressed in one of the many types */
union SlaPolicy {
1: PercentageSlaPolicy percentageSlaPolicy
2: CountSlaPolicy countSlaPolicy
3: CoordinatorSlaPolicy coordinatorSlaPolicy
}

/** Description of the tasks contained within a job. */
struct TaskConfig {
/** Job task belongs to. */
@@ -279,6 +310,8 @@ struct TaskConfig {
27: optional set<Metadata> metadata
/** Policy for how to deal with task partitions */
34: optional PartitionPolicy partitionPolicy
/** SLA requirements to be met during maintenance */
35: optional SlaPolicy slaPolicy

// This field is deliberately placed at the end to work around a bug in the immutable wrapper
// code generator. See AURORA-1185 for details.
@@ -867,6 +900,13 @@ struct JobUpdateQuery {
7: i32 limit
}

struct HostMaintenanceRequest {
1: string host
2: SlaPolicy defaultSlaPolicy
3: i64 timeoutSecs
4: i64 createdTimestampMs
}

struct ListBackupsResult {
1: set<string> backups
}
@@ -92,6 +92,14 @@ struct PruneJobUpdateHistory {
2: i64 historyPruneThresholdMs
}

struct SaveHostMaintenanceRequest {
1: api.HostMaintenanceRequest hostMaintenanceRequest
}

struct RemoveHostMaintenanceRequest {
1: string host
}

union Op {
1: SaveFrameworkId saveFrameworkId
2: SaveCronJob saveCronJob
@@ -109,6 +117,8 @@ union Op {
16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent
17: PruneJobUpdateHistory pruneJobUpdateHistory
18: RemoveJobUpdates removeJobUpdate
19: SaveHostMaintenanceRequest saveHostMaintenanceRequest
20: RemoveHostMaintenanceRequest removeHostMaintenanceRequest
}

// The current schema version ID. This should be incremented each time the
@@ -152,6 +162,7 @@ struct Snapshot {
10: set<StoredJobUpdateDetails> jobUpdateDetails
//11: removed
//12: removed
13: set<api.HostMaintenanceRequest> hostMaintenanceRequests
}

// A message header that calls out the number of expected FrameChunks to follow to form a complete
@@ -0,0 +1,61 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage;

import java.util.Optional;
import java.util.Set;

import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;

/**
* Stores maintenance requests for hosts.
*/
public interface HostMaintenanceStore {

/**
* Returns the maintenance request for the given host if exists.
* {@link Optional#empty()} otherwise.
* @param host Name of the host
* @return {@link Optional} of {@link IHostMaintenanceRequest}
*/
Optional<IHostMaintenanceRequest> getHostMaintenanceRequest(String host);

/**
* Returns all {@link IHostMaintenanceRequest}s currently in storage.
* @return {@link Set} of {@link IHostMaintenanceRequest}s
*/
Set<IHostMaintenanceRequest> getHostMaintenanceRequests();

/**
* Provides write operations to the {@link HostMaintenanceStore}.
*/
interface Mutable extends HostMaintenanceStore {
/**
* Deletes all attributes in the store.
*/
void deleteHostMaintenanceRequests();

/**
* Saves the maintenance request for the given host.
* @param hostMaintenanceRequest {@link IHostMaintenanceRequest}
*/
void saveHostMaintenanceRequest(IHostMaintenanceRequest hostMaintenanceRequest);

/**
* Removes the maintenance request for the given host if one exists.
* @param host Name of the host
*/
void removeHostMaintenanceRequest(String host);
}
}
@@ -42,6 +42,7 @@ interface StoreProvider {
QuotaStore getQuotaStore();
AttributeStore getAttributeStore();
JobUpdateStore getJobUpdateStore();
HostMaintenanceStore getHostMaintenanceStore();
}

/**
@@ -70,6 +71,7 @@ interface MutableStoreProvider extends StoreProvider {
QuotaStore.Mutable getQuotaStore();
AttributeStore.Mutable getAttributeStore();
JobUpdateStore.Mutable getJobUpdateStore();
HostMaintenanceStore.Mutable getHostMaintenanceStore();
}

/**
@@ -25,6 +25,7 @@
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.HostMaintenanceStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
@@ -106,6 +107,7 @@ interface TransactionManager {
@Volatile QuotaStore.Mutable quotaStore,
@Volatile AttributeStore.Mutable attributeStore,
@Volatile JobUpdateStore.Mutable jobUpdateStore,
@Volatile HostMaintenanceStore.Mutable hostMaintenanceStore,
EventSink eventSink,
ReentrantLock writeLock,
ThriftBackfill thriftBackfill) {
@@ -137,6 +139,7 @@ public void log(Op op) {
quotaStore,
attributeStore,
jobUpdateStore,
hostMaintenanceStore,
LoggerFactory.getLogger(WriteRecorder.class),
eventSink);
}
@@ -23,6 +23,7 @@
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
@@ -61,6 +62,7 @@ private static void load(MutableStoreProvider stores, ThriftBackfill backfill, E
stores.getQuotaStore().deleteQuotas();
stores.getAttributeStore().deleteHostAttributes();
stores.getJobUpdateStore().deleteAllUpdates();
stores.getHostMaintenanceStore().deleteHostMaintenanceRequests();
return;
}

@@ -143,6 +145,17 @@ private static void load(MutableStoreProvider stores, ThriftBackfill backfill, E
IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys()));
break;

case SAVE_HOST_MAINTENANCE_REQUEST:
stores.getHostMaintenanceStore().saveHostMaintenanceRequest(
IHostMaintenanceRequest
.build(op.getSaveHostMaintenanceRequest().getHostMaintenanceRequest()));
break;

case REMOVE_HOST_MAINTENANCE_REQUEST:
stores.getHostMaintenanceStore().removeHostMaintenanceRequest(
op.getRemoveHostMaintenanceRequest().getHost());
break;

default:
throw new IllegalArgumentException("Unrecognized op type " + op.getSetField());
}
@@ -24,12 +24,14 @@
import com.google.common.collect.ImmutableSet;

import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.RemoveHostMaintenanceRequest;
import org.apache.aurora.gen.storage.RemoveJob;
import org.apache.aurora.gen.storage.RemoveQuota;
import org.apache.aurora.gen.storage.RemoveTasks;
import org.apache.aurora.gen.storage.SaveCronJob;
import org.apache.aurora.gen.storage.SaveFrameworkId;
import org.apache.aurora.gen.storage.SaveHostAttributes;
import org.apache.aurora.gen.storage.SaveHostMaintenanceRequest;
import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
import org.apache.aurora.gen.storage.SaveJobUpdate;
import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
@@ -40,13 +42,15 @@
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.HostMaintenanceStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -73,7 +77,8 @@ public class WriteRecorder implements
TaskStore.Mutable,
QuotaStore.Mutable,
AttributeStore.Mutable,
JobUpdateStore.Mutable {
JobUpdateStore.Mutable,
HostMaintenanceStore.Mutable {

private final TransactionManager transactionManager;
private final SchedulerStore.Mutable schedulerStore;
@@ -82,6 +87,7 @@ public class WriteRecorder implements
private final QuotaStore.Mutable quotaStore;
private final AttributeStore.Mutable attributeStore;
private final JobUpdateStore.Mutable jobUpdateStore;
private final HostMaintenanceStore.Mutable hostMaintenanceStore;
private final Logger log;
private final EventSink eventSink;

@@ -104,6 +110,7 @@ public WriteRecorder(
QuotaStore.Mutable quotaStore,
AttributeStore.Mutable attributeStore,
JobUpdateStore.Mutable jobUpdateStore,
HostMaintenanceStore.Mutable hostMaintenanceStore,
Logger log,
EventSink eventSink) {

@@ -114,6 +121,7 @@ public WriteRecorder(
this.quotaStore = requireNonNull(quotaStore);
this.attributeStore = requireNonNull(attributeStore);
this.jobUpdateStore = requireNonNull(jobUpdateStore);
this.hostMaintenanceStore = requireNonNull(hostMaintenanceStore);
this.log = requireNonNull(log);
this.eventSink = requireNonNull(eventSink);
}
@@ -242,6 +250,24 @@ public void removeJobUpdates(Set<IJobUpdateKey> keys) {
jobUpdateStore.removeJobUpdates(keys);
}

@Override
public void saveHostMaintenanceRequest(IHostMaintenanceRequest hostMaintenanceRequest) {
requireNonNull(hostMaintenanceRequest);

write(Op.saveHostMaintenanceRequest(
new SaveHostMaintenanceRequest(hostMaintenanceRequest.newBuilder())));
this.hostMaintenanceStore.saveHostMaintenanceRequest(hostMaintenanceRequest);
}

@Override
public void removeHostMaintenanceRequest(String host) {
requireNonNull(host);

write(Op.removeHostMaintenanceRequest(
new RemoveHostMaintenanceRequest(host)));
this.hostMaintenanceStore.removeHostMaintenanceRequest(host);
}

@Override
public void deleteAllTasks() {
throw new UnsupportedOperationException(
@@ -254,6 +280,12 @@ public void deleteHostAttributes() {
"Unsupported since casual storage users should never be doing this.");
}

@Override
public void deleteHostMaintenanceRequests() {
throw new UnsupportedOperationException(
"Unsupported since casual storage users should never be doing this.");
}

@Override
public void deleteJobs() {
throw new UnsupportedOperationException(
@@ -307,6 +339,11 @@ public JobUpdateStore.Mutable getJobUpdateStore() {
return this;
}

@Override
public HostMaintenanceStore.Mutable getHostMaintenanceStore() {
return this;
}

@Override
public Optional<String> fetchFrameworkId() {
return this.schedulerStore.fetchFrameworkId();
@@ -366,4 +403,14 @@ public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
return this.jobUpdateStore.fetchJobUpdate(key);
}

@Override
public Optional<IHostMaintenanceRequest> getHostMaintenanceRequest(String host) {
return this.hostMaintenanceStore.getHostMaintenanceRequest(host);
}

@Override
public Set<IHostMaintenanceRequest> getHostMaintenanceRequests() {
return this.hostMaintenanceStore.getHostMaintenanceRequests();
}
}
@@ -77,7 +77,8 @@ public void snapshot() throws StorageException {
+ ", cron jobs: " + snapshot.getCronJobsSize()
+ ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+ ", tasks: " + snapshot.getTasksSize()
+ ", updates: " + snapshot.getJobUpdateDetailsSize());
+ ", updates: " + snapshot.getJobUpdateDetailsSize()
+ ", host maintenance requests: " + snapshot.getHostMaintenanceRequestsSize());
});
} catch (CodingException e) {
throw new StorageException("Failed to encode a snapshot", e);

0 comments on commit 34be631

Please sign in to comment.