-
Notifications
You must be signed in to change notification settings - Fork 13k
/
SlotPoolGateway.java
161 lines (144 loc) · 6.98 KB
/
SlotPoolGateway.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.types.SerializableOptional;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
* The gateway for calls on the {@link SlotPool}.
*/
public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
// ------------------------------------------------------------------------
// shutdown
// ------------------------------------------------------------------------
void suspend();
// ------------------------------------------------------------------------
// resource manager connection
// ------------------------------------------------------------------------
/**
* Connects the SlotPool to the given ResourceManager. After this method is called, the
* SlotPool will be able to request resources from the given ResourceManager.
*
* @param resourceManagerGateway The RPC gateway for the resource manager.
*/
void connectToResourceManager(ResourceManagerGateway resourceManagerGateway);
/**
* Disconnects the slot pool from its current Resource Manager. After this call, the pool will not
* be able to request further slots from the Resource Manager, and all currently pending requests
* to the resource manager will be canceled.
*
* <p>The slot pool will still be able to serve slots from its internal pool.
*/
void disconnectResourceManager();
// ------------------------------------------------------------------------
// registering / un-registering TaskManagers and slots
// ------------------------------------------------------------------------
/**
* Registers a TaskExecutor with the given {@link ResourceID} at {@link SlotPool}.
*
* @param resourceID identifying the TaskExecutor to register
* @return Future acknowledge which is completed after the TaskExecutor has been registered
*/
CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);
/**
* Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
*
* @param resourceId identifying the TaskExecutor which shall be released from the SlotPool
* @param cause for the releasing of the TaskManager
* @return Future acknowledge which is completed after the TaskExecutor has been released
*/
CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId, final Exception cause);
/**
* Offers a slot to the {@link SlotPool}. The slot offer can be accepted or
* rejected.
*
* @param taskManagerLocation from which the slot offer originates
* @param taskManagerGateway to talk to the slot offerer
* @param slotOffer slot which is offered to the {@link SlotPool}
* @return True (future) if the slot has been accepted, otherwise false (future)
*/
CompletableFuture<Boolean> offerSlot(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
SlotOffer slotOffer);
/**
* Offers multiple slots to the {@link SlotPool}. The slot offerings can be
* individually accepted or rejected by returning the collection of accepted
* slot offers.
*
* @param taskManagerLocation from which the slot offers originate
* @param taskManagerGateway to talk to the slot offerer
* @param offers slot offers which are offered to the {@link SlotPool}
* @return A collection of accepted slot offers (future). The remaining slot offers are
* implicitly rejected.
*/
CompletableFuture<Collection<SlotOffer>> offerSlots(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers);
/**
* Fails the slot with the given allocation id.
*
* @param allocationID identifying the slot which is being failed
* @param cause of the failure
* @return An optional task executor id if this task executor has no more slots registered
*/
CompletableFuture<SerializableOptional<ResourceID>> failAllocation(AllocationID allocationID, Exception cause);
// ------------------------------------------------------------------------
// allocating and disposing slots
// ------------------------------------------------------------------------
/**
* Requests to allocate a slot for the given {@link ScheduledUnit}. The request
* is uniquely identified by the provided {@link SlotRequestId} which can also
* be used to release the slot via {@link #releaseSlot(SlotRequestId, SlotSharingGroupId, Throwable)}.
* The allocated slot will fulfill the requested {@link ResourceProfile} and it
* is tried to place it on one of the location preferences.
*
* <p>If the returned future must not be completed right away (a.k.a. the slot request
* can be queued), allowQueuedScheduling must be set to true.
*
* @param slotRequestId identifying the requested slot
* @param scheduledUnit for which to allocate slot
* @param slotProfile profile that specifies the requirements for the requested slot
* @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed)
* @param timeout for the operation
* @return Future which is completed with the allocated {@link LogicalSlot}
*/
CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
}