-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
GcsOptions.java
175 lines (158 loc) · 7.93 KB
/
GcsOptions.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.gcp.options;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** Options used to configure Google Cloud Storage. */
public interface GcsOptions extends ApplicationNameOptions, GcpOptions, PipelineOptions {
/** The GcsUtil instance that should be used to communicate with Google Cloud Storage. */
@JsonIgnore
@Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.")
@Default.InstanceFactory(GcsUtil.GcsUtilFactory.class)
@Hidden
GcsUtil getGcsUtil();
void setGcsUtil(GcsUtil value);
/**
* The ExecutorService instance to use to create threads, can be overridden to specify an
* ExecutorService that is compatible with the user's environment. If unset, the default is to
* create an ExecutorService with an unbounded number of threads; this is compatible with Google
* AppEngine.
*/
@JsonIgnore
@Description(
"The ExecutorService instance to use to create multiple threads. Can be overridden "
+ "to specify an ExecutorService that is compatible with the user's environment. If unset, "
+ "the default is to create an ExecutorService with an unbounded number of threads; this "
+ "is compatible with Google AppEngine.")
@Default.InstanceFactory(ExecutorServiceFactory.class)
@Hidden
ExecutorService getExecutorService();
void setExecutorService(ExecutorService value);
/** GCS endpoint to use. If unspecified, uses the default endpoint. */
@JsonIgnore
@Hidden
@Description("The URL for the GCS API.")
String getGcsEndpoint();
void setGcsEndpoint(String value);
/**
* The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
* {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
* restrictions and performance implications of this value.
*/
@Description(
"The buffer size (in bytes) to use when uploading files to GCS. Please see the "
+ "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
+ "information on the restrictions and performance implications of this value.\n\n"
+ "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
+ "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
@Nullable
Integer getGcsUploadBufferSizeBytes();
void setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
/**
* The class of the validator that should be created and used to validate paths. If pathValidator
* has not been set explicitly, an instance of this class will be constructed and used as the path
* validator.
*/
@Description(
"The class of the validator that should be created and used to validate paths. "
+ "If pathValidator has not been set explicitly, an instance of this class will be "
+ "constructed and used as the path validator.")
@Default.Class(GcsPathValidator.class)
Class<? extends PathValidator> getPathValidatorClass();
void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
/**
* The path validator instance that should be used to validate paths. If no path validator has
* been set explicitly, the default is to use the instance factory that constructs a path
* validator based upon the currently set pathValidatorClass.
*/
@JsonIgnore
@Description(
"The path validator instance that should be used to validate paths. "
+ "If no path validator has been set explicitly, the default is to use the instance factory "
+ "that constructs a path validator based upon the currently set pathValidatorClass.")
@Default.InstanceFactory(PathValidatorFactory.class)
PathValidator getPathValidator();
void setPathValidator(PathValidator validator);
/** If true, reports metrics of certain operations, such as batch copies. */
@Description("Experimental. Whether to report performance metrics of certain GCS operations.")
@Default.Boolean(false)
@Experimental(Kind.FILESYSTEM)
Boolean getGcsPerformanceMetrics();
void setGcsPerformanceMetrics(Boolean reportPerformanceMetrics);
/**
* Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The {@link
* ExecutorService} is compatible with AppEngine.
*/
class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
@SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only.
@Override
public ExecutorService create(PipelineOptions options) {
ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
threadFactoryBuilder.setDaemon(true);
/* The SDK requires an unbounded thread pool because a step may create X writers
* each requiring their own thread to perform the writes otherwise a writer may
* block causing deadlock for the step because the writers buffer is full.
* Also, the MapTaskExecutor launches the steps in reverse order and completes
* them in forward order thus requiring enough threads so that each step's writers
* can be active.
*/
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
Long.MAX_VALUE,
TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
new SynchronousQueue<>(),
threadFactoryBuilder.build());
}
}
/**
* Creates a {@link PathValidator} object using the class specified in {@link
* #getPathValidatorClass()}.
*/
class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
@Override
public PathValidator create(PipelineOptions options) {
GcsOptions gcsOptions = options.as(GcsOptions.class);
return InstanceBuilder.ofType(PathValidator.class)
.fromClass(gcsOptions.getPathValidatorClass())
.fromFactoryMethod("fromOptions")
.withArg(PipelineOptions.class, options)
.build();
}
}
}