/
SharedResourceHolder.java
189 lines (172 loc) · 6.38 KB
/
SharedResourceHolder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import com.google.common.base.Preconditions;
import java.util.IdentityHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
/**
* A holder for shared resource singletons.
*
* <p>Components like client channels and servers need certain resources, e.g. a thread pool, to
* run. If the user has not provided such resources, these components will use a default one, which
* is shared as a static resource. This class holds these default resources and manages their
* life-cycles.
*
* <p>A resource is identified by the reference of a {@link Resource} object, which is typically a
* singleton, provided to the get() and release() methods. Each Resource object (not its class) maps
* to an object cached in the holder.
*
* <p>Resources are ref-counted and shut down after a delay when the ref-count reaches zero.
*/
@ThreadSafe
public final class SharedResourceHolder {
static final long DESTROY_DELAY_SECONDS = 1;
// The sole holder instance.
private static final SharedResourceHolder holder = new SharedResourceHolder(
new ScheduledExecutorFactory() {
@Override
public ScheduledExecutorService createScheduledExecutor() {
return Executors.newSingleThreadScheduledExecutor(
GrpcUtil.getThreadFactory("grpc-shared-destroyer-%d", true));
}
});
private final IdentityHashMap<Resource<?>, Instance> instances =
new IdentityHashMap<>();
private final ScheduledExecutorFactory destroyerFactory;
private ScheduledExecutorService destroyer;
// Visible to tests that would need to create instances of the holder.
SharedResourceHolder(ScheduledExecutorFactory destroyerFactory) {
this.destroyerFactory = destroyerFactory;
}
/**
* Try to get an existing instance of the given resource. If an instance does not exist, create a
* new one with the given factory.
*
* @param resource the singleton object that identifies the requested static resource
*/
public static <T> T get(Resource<T> resource) {
return holder.getInternal(resource);
}
/**
* Releases an instance of the given resource.
*
* <p>The instance must have been obtained from {@link #get(Resource)}. Otherwise will throw
* IllegalArgumentException.
*
* <p>Caller must not release a reference more than once. It's advisory that you clear the
* reference to the instance with the null returned by this method.
*
* @param resource the singleton Resource object that identifies the released static resource
* @param instance the released static resource
*
* @return a null which the caller can use to clear the reference to that instance.
*/
public static <T> T release(final Resource<T> resource, final T instance) {
return holder.releaseInternal(resource, instance);
}
/**
* Visible to unit tests.
*
* @see #get(Resource)
*/
@SuppressWarnings("unchecked")
synchronized <T> T getInternal(Resource<T> resource) {
Instance instance = instances.get(resource);
if (instance == null) {
instance = new Instance(resource.create());
instances.put(resource, instance);
}
if (instance.destroyTask != null) {
instance.destroyTask.cancel(false);
instance.destroyTask = null;
}
instance.refcount++;
return (T) instance.payload;
}
/**
* Visible to unit tests.
*/
synchronized <T> T releaseInternal(final Resource<T> resource, final T instance) {
final Instance cached = instances.get(resource);
if (cached == null) {
throw new IllegalArgumentException("No cached instance found for " + resource);
}
Preconditions.checkArgument(instance == cached.payload, "Releasing the wrong instance");
Preconditions.checkState(cached.refcount > 0, "Refcount has already reached zero");
cached.refcount--;
if (cached.refcount == 0) {
if (GrpcUtil.IS_RESTRICTED_APPENGINE) {
// AppEngine must immediately release shared resources, particularly executors
// which could retain request-scoped threads which become zombies after the request
// completes.
resource.close(instance);
instances.remove(resource);
} else {
Preconditions.checkState(cached.destroyTask == null, "Destroy task already scheduled");
// Schedule a delayed task to destroy the resource.
if (destroyer == null) {
destroyer = destroyerFactory.createScheduledExecutor();
}
cached.destroyTask = destroyer.schedule(new LogExceptionRunnable(new Runnable() {
@Override
public void run() {
synchronized (SharedResourceHolder.this) {
// Refcount may have gone up since the task was scheduled. Re-check it.
if (cached.refcount == 0) {
resource.close(instance);
instances.remove(resource);
if (instances.isEmpty()) {
destroyer.shutdown();
destroyer = null;
}
}
}
}
}), DESTROY_DELAY_SECONDS, TimeUnit.SECONDS);
}
}
// Always returning null
return null;
}
/**
* Defines a resource, and the way to create and destroy instances of it.
*/
public interface Resource<T> {
/**
* Create a new instance of the resource.
*/
T create();
/**
* Destroy the given instance.
*/
void close(T instance);
}
interface ScheduledExecutorFactory {
ScheduledExecutorService createScheduledExecutor();
}
private static class Instance {
final Object payload;
int refcount;
ScheduledFuture<?> destroyTask;
Instance(Object payload) {
this.payload = payload;
}
}
}