-
Notifications
You must be signed in to change notification settings - Fork 806
/
MultipartUploadSample.java
271 lines (237 loc) · 10.7 KB
/
MultipartUploadSample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package samples;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.aliyun.oss.ClientBuilderConfiguration;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ListPartsRequest;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PartListing;
import com.aliyun.oss.model.PartSummary;
import com.aliyun.oss.model.UploadPartRequest;
import com.aliyun.oss.model.UploadPartResult;
/**
* This sample demonstrates how to upload multiparts to Aliyun OSS
* using the OSS SDK for Java.
*/
public class MultipartUploadSample {
private static String endpoint = "*** Provide OSS endpoint ***";
private static String accessKeyId = "*** Provide your AccessKeyId ***";
private static String accessKeySecret = "*** Provide your AccessKeySecret ***";
private static OSS client = null;
private static String bucketName = "*** Provide bucket name ***";
private static String key = "*** Provide key ***";
private static String localFilePath = "*** Provide local file path ***";
private static ExecutorService executorService = Executors.newFixedThreadPool(5);
private static List<PartETag> partETags = Collections.synchronizedList(new ArrayList<PartETag>());
public static void main(String[] args) throws IOException {
/*
* Constructs a client instance with your account for accessing OSS
*/
ClientBuilderConfiguration conf = new ClientBuilderConfiguration();
conf.setIdleConnectionTime(1000);
client = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret, conf);
try {
/*
* Claim a upload id firstly
*/
String uploadId = claimUploadId();
System.out.println("Claiming a new upload id " + uploadId + "\n");
/*
* Calculate how many parts to be divided
*/
final long partSize = 5 * 1024 * 1024L; // 5MB
final File sampleFile = createSampleFile();
long fileLength = sampleFile.length();
int partCount = (int) (fileLength / partSize);
if (fileLength % partSize != 0) {
partCount++;
}
if (partCount > 10000) {
throw new RuntimeException("Total parts count should not exceed 10000");
} else {
System.out.println("Total parts count " + partCount + "\n");
}
/*
* Upload multiparts to your bucket
*/
System.out.println("Begin to upload multiparts to OSS from a file\n");
for (int i = 0; i < partCount; i++) {
long startPos = i * partSize;
long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
executorService.execute(new PartUploader(sampleFile, startPos, curPartSize, i + 1, uploadId));
}
/*
* Waiting for all parts finished
*/
executorService.shutdown();
while (!executorService.isTerminated()) {
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*
* Verify whether all parts are finished
*/
if (partETags.size() != partCount) {
throw new IllegalStateException("Upload multiparts fail due to some parts are not finished yet");
} else {
System.out.println("Succeed to complete multiparts into an object named " + key + "\n");
}
/*
* View all parts uploaded recently
*/
listAllParts(uploadId);
/*
* Complete to upload multiparts
*/
completeMultipartUpload(uploadId);
/*
* Fetch the object that newly created at the step below.
*/
System.out.println("Fetching an object");
client.getObject(new GetObjectRequest(bucketName, key), new File(localFilePath));
} catch (OSSException oe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Message: " + oe.getErrorMessage());
System.out.println("Error Code: " + oe.getErrorCode());
System.out.println("Request ID: " + oe.getRequestId());
System.out.println("Host ID: " + oe.getHostId());
} catch (ClientException ce) {
System.out.println("Caught an ClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with OSS, "
+ "such as not being able to access the network.");
System.out.println("Error Message: " + ce.getMessage());
} finally {
/*
* Do not forget to shut down the client finally to release all allocated resources.
*/
if (client != null) {
client.shutdown();
}
}
}
private static class PartUploader implements Runnable {
private File localFile;
private long startPos;
private long partSize;
private int partNumber;
private String uploadId;
public PartUploader(File localFile, long startPos, long partSize, int partNumber, String uploadId) {
this.localFile = localFile;
this.startPos = startPos;
this.partSize = partSize;
this.partNumber = partNumber;
this.uploadId = uploadId;
}
@Override
public void run() {
InputStream instream = null;
try {
instream = new FileInputStream(this.localFile);
instream.skip(this.startPos);
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(key);
uploadPartRequest.setUploadId(this.uploadId);
uploadPartRequest.setInputStream(instream);
uploadPartRequest.setPartSize(this.partSize);
uploadPartRequest.setPartNumber(this.partNumber);
UploadPartResult uploadPartResult = client.uploadPart(uploadPartRequest);
System.out.println("Part#" + this.partNumber + " done\n");
synchronized (partETags) {
partETags.add(uploadPartResult.getPartETag());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (instream != null) {
try {
instream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private static File createSampleFile() throws IOException {
File file = File.createTempFile("oss-java-sdk-", ".txt");
file.deleteOnExit();
Writer writer = new OutputStreamWriter(new FileOutputStream(file));
for (int i = 0; i < 1000000; i++) {
writer.write("abcdefghijklmnopqrstuvwxyz\n");
writer.write("0123456789011234567890\n");
}
writer.close();
return file;
}
private static String claimUploadId() {
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, key);
InitiateMultipartUploadResult result = client.initiateMultipartUpload(request);
return result.getUploadId();
}
private static void completeMultipartUpload(String uploadId) {
// Make part numbers in ascending order
Collections.sort(partETags, new Comparator<PartETag>() {
@Override
public int compare(PartETag p1, PartETag p2) {
return p1.getPartNumber() - p2.getPartNumber();
}
});
System.out.println("Completing to upload multiparts\n");
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags);
client.completeMultipartUpload(completeMultipartUploadRequest);
}
private static void listAllParts(String uploadId) {
System.out.println("Listing all parts......");
ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, uploadId);
PartListing partListing = client.listParts(listPartsRequest);
int partCount = partListing.getParts().size();
for (int i = 0; i < partCount; i++) {
PartSummary partSummary = partListing.getParts().get(i);
System.out.println("\tPart#" + partSummary.getPartNumber() + ", ETag=" + partSummary.getETag());
}
System.out.println();
}
}