-
Notifications
You must be signed in to change notification settings - Fork 548
/
TransferService.java
446 lines (405 loc) · 15.9 KB
/
TransferService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
/**
* Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.mobileconnectors.s3.transferutility;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.content.pm.ApplicationInfo;
import android.database.Cursor;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.IBinder;
import android.os.Looper;
import android.os.Message;
import android.util.Log;
import com.amazonaws.services.s3.AmazonS3;
import java.io.FileDescriptor;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* Performs background upload and download tasks. Uses a thread pool to manage
* the upload and download threads and limits the concurrent running threads.
* When there are no active tasks, TransferService will stop itself.
*/
public class TransferService extends Service {
private static final String TAG = "TransferService";
/*
* Constants of message sent to update handler.
*/
static final int MSG_EXEC = 100;
static final int MSG_CHECK = 200;
static final int MSG_DISCONNECT = 300;
private static final int MINUTE_IN_MILLIS = 60 * 1000;
/*
* Constants of intent action sent to the service.
*/
static final String INTENT_ACTION_TRANSFER_ADD = "add_transfer";
static final String INTENT_ACTION_TRANSFER_PAUSE = "pause_transfer";
static final String INTENT_ACTION_TRANSFER_RESUME = "resume_transfer";
static final String INTENT_ACTION_TRANSFER_CANCEL = "cancel_transfer";
static final String INTENT_BUNDLE_TRANSFER_ID = "id";
static final String INTENT_BUNDLE_S3_REFERENCE_KEY = "s3_reference_key";
private AmazonS3 s3;
/*
* updateHandler manages update requests in a queue. It updates transfers
* from database and start/stop threads if needed.
*/
private HandlerThread handlerThread;
private Handler updateHandler;
/*
* registers a BroadcastReceiver to receive network status change events. It
* will update transfer records in database directly.
*/
private NetworkInfoReceiver networkInfoReceiver;
/*
* A flag indicates whether a database scan is necessary. This is true when
* service starts and when network is disconnected.
*/
private boolean shouldScan = true;
/*
* A flag indicates whether the service is started the first time.
*/
private boolean isFirst = true;
/*
* A timestamp when the service is last known active. The service will stop
* after a minute of inactivity.
*/
private volatile long lastActiveTime;
private volatile int startId;
private TransferDBUtil dbUtil;
TransferStatusUpdater updater;
@Override
public IBinder onBind(Intent intent) {
throw new UnsupportedOperationException("Can't bind to TransferService");
}
/**
* <ul>
* <li>The service starts upon intents from transfer utility.</li>
* <li>It remains alive when there are active transfers.</li>
* <li>It also stays alive when network is disconnected and there are
* transfers waiting.</li>
* </ul>
*/
@Override
public void onCreate() {
super.onCreate();
Log.d(TAG, "Starting Transfer Service");
dbUtil = new TransferDBUtil(getApplicationContext());
updater = new TransferStatusUpdater(dbUtil);
handlerThread = new HandlerThread(TAG + "-AWSTransferUpdateHandlerThread");
handlerThread.start();
setHandlerLooper(handlerThread.getLooper());
}
/**
* A Broadcast receiver to receive network connection change events.
*/
static class NetworkInfoReceiver extends BroadcastReceiver {
private final Handler handler;
private final ConnectivityManager connManager;
/**
* Constructs a NetworkInfoReceiver.
*
* @param handler a handle to send message to
*/
public NetworkInfoReceiver(Context context, Handler handler) {
this.handler = handler;
connManager = (ConnectivityManager) context
.getSystemService(Context.CONNECTIVITY_SERVICE);
}
@Override
public void onReceive(Context context, Intent intent) {
if (ConnectivityManager.CONNECTIVITY_ACTION.equals(intent.getAction())) {
final boolean networkConnected = isNetworkConnected();
Log.d(TAG, "Network connected: " + networkConnected);
handler.sendEmptyMessage(networkConnected ? MSG_CHECK : MSG_DISCONNECT);
}
}
/**
* Gets the status of network connectivity.
*
* @return true if network is connected, false otherwise.
*/
boolean isNetworkConnected() {
final NetworkInfo info = connManager.getActiveNetworkInfo();
return info != null && info.isConnected();
}
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
this.startId = startId;
if (intent == null) {
return START_REDELIVER_INTENT;
}
final String keyForS3Client = intent.getStringExtra(INTENT_BUNDLE_S3_REFERENCE_KEY);
s3 = S3ClientReference.get(keyForS3Client);
if (s3 == null) {
Log.w(TAG, "TransferService can't get s3 client, and it will stop.");
stopSelf(startId);
return START_NOT_STICKY;
}
updateHandler.sendMessage(updateHandler.obtainMessage(MSG_EXEC, intent));
if (isFirst) {
registerReceiver(networkInfoReceiver, new IntentFilter(
ConnectivityManager.CONNECTIVITY_ACTION));
isFirst = false;
}
/*
* The service will not restart if it's killed by system.
*/
return START_NOT_STICKY;
}
@Override
public void onDestroy() {
try {
unregisterReceiver(networkInfoReceiver);
} catch (final IllegalArgumentException iae) {
/*
* Ignore on purpose, just in case the service stops before
* onStartCommand where the receiver is registered.
*/
}
handlerThread.quit();
TransferThreadPool.closeThreadPool();
S3ClientReference.clear();
super.onDestroy();
}
class UpdateHandler extends Handler {
public UpdateHandler(Looper looper) {
super(looper);
}
@Override
public void handleMessage(Message msg) {
if (msg.what == MSG_CHECK) {
// remove messages of the same type
updateHandler.removeMessages(MSG_CHECK);
checkTransfers();
} else if (msg.what == MSG_EXEC) {
execCommand((Intent) msg.obj);
} else if (msg.what == MSG_DISCONNECT) {
pauseAllForNetwork();
} else {
Log.e(TAG, "Unknown command: " + msg.what);
}
}
}
/**
* Checks two things: whether they are active transfers and whether a
* database scan is necessary.
*/
void checkTransfers() {
// scan database for previously unfinished transfers
if (shouldScan && networkInfoReceiver.isNetworkConnected() && s3 != null) {
loadTransfersFromDB();
shouldScan = false;
}
removeCompletedTransfers();
// update last active time if service is active
if (isActive()) {
lastActiveTime = System.currentTimeMillis();
// check after one minute
updateHandler.sendEmptyMessageDelayed(MSG_CHECK, MINUTE_IN_MILLIS);
} else {
/*
* Stop the service when it's been idled for more than a minute.
*/
Log.d(TAG, "Stop self");
stopSelf(startId);
}
}
/**
* Executes command received by the service.
*
* @param intent received intent
*/
void execCommand(Intent intent) {
// update last active time
lastActiveTime = System.currentTimeMillis();
final String action = intent.getAction();
final int id = intent.getIntExtra(INTENT_BUNDLE_TRANSFER_ID, 0);
if (id == 0) {
Log.e(TAG, "Invalid id: " + id);
return;
}
if (INTENT_ACTION_TRANSFER_ADD.equals(action)) {
if (updater.getTransfer(id) != null) {
Log.w(TAG, "Transfer has already been added: " + id);
} else {
/*
* only add transfer when network is available or else relies on
* the network change listener to scan the database
*/
final TransferRecord transfer = dbUtil.getTransferById(id);
if (transfer != null) {
updater.addTransfer(transfer);
transfer.start(s3, dbUtil, updater, networkInfoReceiver);
} else {
Log.e(TAG, "Can't find transfer: " + id);
}
}
} else if (INTENT_ACTION_TRANSFER_PAUSE.equals(action)) {
TransferRecord transfer = updater.getTransfer(id);
if (transfer == null) {
transfer = dbUtil.getTransferById(id);
}
if (transfer != null) {
transfer.pause(s3, updater);
}
} else if (INTENT_ACTION_TRANSFER_RESUME.equals(action)) {
TransferRecord transfer = updater.getTransfer(id);
if (transfer == null) {
transfer = dbUtil.getTransferById(id);
if (transfer != null) {
updater.addTransfer(transfer);
} else {
Log.e(TAG, "Can't find transfer: " + id);
}
}
transfer.start(s3, dbUtil, updater, networkInfoReceiver);
} else if (INTENT_ACTION_TRANSFER_CANCEL.equals(action)) {
TransferRecord transfer = updater.getTransfer(id);
if (transfer == null) {
transfer = dbUtil.getTransferById(id);
}
if (transfer != null) {
transfer.cancel(s3, updater);
}
} else {
Log.e(TAG, "Unknown action: " + action);
}
}
/**
* Checks whether the service is active. If a service is inactive, it can
* stop itself safely.
*
* @return true if service active, false otherwise
*/
private boolean isActive() {
if (shouldScan) {
return true;
}
for (final TransferRecord transfer : updater.getTransfers().values()) {
if (transfer.isRunning()) {
return true;
}
}
return System.currentTimeMillis() - lastActiveTime < MINUTE_IN_MILLIS;
}
/**
* Remove completed transfers from status updater.
*/
private void removeCompletedTransfers() {
final List<Integer> ids = new ArrayList<Integer>();
for (final TransferRecord transfer : updater.getTransfers().values()) {
if (TransferState.COMPLETED.equals(transfer.state)) {
/*
* Add completed transfers to remove. Removing transfers with
* updater.removeTransfer(transfer.id) will result in
* ConcurrentModificationException
*/
ids.add(transfer.id);
}
}
for (final Integer id : ids) {
updater.removeTransfer(id);
}
}
/**
* Loads transfers from database. These transfers are unfinished from
* previous session or are new transfers waiting for network. It skips any
* transfer that is already tracked by the status updater. Also starts
* transfers whose states indicate running but aren't.
*/
void loadTransfersFromDB() {
Log.d(TAG, "Loading transfers from database");
final Cursor c = dbUtil.queryAllTransfersWithType(TransferType.ANY);
int count = 0;
try {
while (c.moveToNext()) {
final int id = c.getInt(c.getColumnIndexOrThrow(TransferTable.COLUMN_ID));
final TransferState state = TransferState.getState(c.getString(c
.getColumnIndexOrThrow(TransferTable.COLUMN_STATE)));
final int partNumber = c.getInt(c.getColumnIndexOrThrow(TransferTable.COLUMN_PART_NUM));
// add unfinished transfers
if (partNumber == 0 && (TransferState.WAITING.equals(state)
|| TransferState.WAITING_FOR_NETWORK.equals(state)
|| TransferState.RESUMED_WAITING.equals(state))
|| TransferState.IN_PROGRESS.equals(state)) {
if (updater.getTransfer(id) == null) {
final TransferRecord transfer = new TransferRecord(id);
transfer.updateFromDB(c);
if (transfer.start(s3, dbUtil, updater, networkInfoReceiver)) {
updater.addTransfer(transfer);
count++;
}
} else {
final TransferRecord transfer = updater.getTransfer(id);
if (!transfer.isRunning()) {
transfer.start(s3, dbUtil, updater, networkInfoReceiver);
}
}
}
}
} finally {
c.close();
}
Log.d(TAG, count + " transfers are loaded from database");
}
/**
* Pause all running transfers and set state to WAITING_FOR_NETWORK.
*/
void pauseAllForNetwork() {
for (final TransferRecord transfer : updater.getTransfers().values()) {
if (s3 != null && transfer != null && transfer.pause(s3, updater)) {
// change status to waiting
updater.updateState(transfer.id, TransferState.WAITING_FOR_NETWORK);
}
}
shouldScan = true;
}
/**
* A helper method to swap a different looper for testing purpose.
*
* @param looper new looper
*/
void setHandlerLooper(Looper looper) {
updateHandler = new UpdateHandler(looper);
networkInfoReceiver = new NetworkInfoReceiver(getApplicationContext(), updateHandler);
}
@Override
protected void dump(FileDescriptor fd, PrintWriter writer, String[] args) {
// only available when the application is debuggable
if ((getApplicationInfo().flags & ApplicationInfo.FLAG_DEBUGGABLE) == 0) {
return;
}
writer.printf("start id: %d\n", startId);
writer.printf("network status: %s\n", networkInfoReceiver.isNetworkConnected());
writer.printf("lastActiveTime: %s, shouldScan: %s\n", new Date(lastActiveTime), shouldScan);
final Map<Integer, TransferRecord> transfers = updater.getTransfers();
writer.printf("# of active transfers: %d\n", transfers.size());
for (final TransferRecord transfer : transfers.values()) {
writer.printf("bucket: %s, key: %s, status: %s, total size: %d, current: %d\n",
transfer.bucketName, transfer.key, transfer.state, transfer.bytesTotal,
transfer.bytesCurrent);
}
writer.flush();
}
}