29
29
import android .os .Process ;
30
30
import android .os .RemoteException ;
31
31
import android .os .TransactionTooLargeException ;
32
+ import androidx .annotation .BinderThread ;
32
33
import com .google .common .annotations .VisibleForTesting ;
33
34
import com .google .common .base .Ticker ;
34
35
import com .google .common .base .Verify ;
105
106
* https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md
106
107
*/
107
108
@ ThreadSafe
108
- public abstract class BinderTransport
109
- implements LeakSafeOneWayBinder .TransactionHandler , IBinder .DeathRecipient {
109
+ public abstract class BinderTransport implements IBinder .DeathRecipient {
110
110
111
111
private static final Logger logger = Logger .getLogger (BinderTransport .class .getName ());
112
112
@@ -210,9 +210,11 @@ protected enum TransportState {
210
210
private final FlowController flowController ;
211
211
212
212
/** The number of incoming bytes we've received. */
213
- private final AtomicLong numIncomingBytes ;
213
+ // Only read/written on @BinderThread.
214
+ private long numIncomingBytes ;
214
215
215
216
/** The number of incoming bytes we've told our peer we've received. */
217
+ // Only read/written on @BinderThread.
216
218
private long acknowledgedIncomingBytes ;
217
219
218
220
private BinderTransport (
@@ -225,10 +227,9 @@ private BinderTransport(
225
227
this .attributes = attributes ;
226
228
this .logId = logId ;
227
229
scheduledExecutorService = executorServicePool .getObject ();
228
- incomingBinder = new LeakSafeOneWayBinder (this );
230
+ incomingBinder = new LeakSafeOneWayBinder (this :: handleTransaction );
229
231
ongoingCalls = new ConcurrentHashMap <>();
230
232
flowController = new FlowController (TRANSACTION_BYTES_WINDOW );
231
- numIncomingBytes = new AtomicLong ();
232
233
}
233
234
234
235
// Override in child class.
@@ -423,8 +424,9 @@ final void sendOutOfBandClose(int callId, Status status) {
423
424
}
424
425
}
425
426
426
- @ Override
427
- public final boolean handleTransaction (int code , Parcel parcel ) {
427
+ @ BinderThread
428
+ @ VisibleForTesting
429
+ final boolean handleTransaction (int code , Parcel parcel ) {
428
430
try {
429
431
return handleTransactionInternal (code , parcel );
430
432
} catch (RuntimeException e ) {
@@ -440,6 +442,7 @@ public final boolean handleTransaction(int code, Parcel parcel) {
440
442
}
441
443
}
442
444
445
+ @ BinderThread
443
446
private boolean handleTransactionInternal (int code , Parcel parcel ) {
444
447
if (code < FIRST_CALL_ID ) {
445
448
synchronized (this ) {
@@ -483,11 +486,12 @@ private boolean handleTransactionInternal(int code, Parcel parcel) {
483
486
if (inbound != null ) {
484
487
inbound .handleTransaction (parcel );
485
488
}
486
- long nib = numIncomingBytes . addAndGet ( size ) ;
487
- if ((nib - acknowledgedIncomingBytes ) > TRANSACTION_BYTES_WINDOW_FORCE_ACK ) {
489
+ numIncomingBytes += size ;
490
+ if ((numIncomingBytes - acknowledgedIncomingBytes ) > TRANSACTION_BYTES_WINDOW_FORCE_ACK ) {
488
491
synchronized (this ) {
489
- sendAcknowledgeBytes (checkNotNull (outgoingBinder ));
492
+ sendAcknowledgeBytes (checkNotNull (outgoingBinder ), numIncomingBytes );
490
493
}
494
+ acknowledgedIncomingBytes = numIncomingBytes ;
491
495
}
492
496
return true ;
493
497
}
@@ -519,10 +523,8 @@ private final void handlePing(Parcel requestParcel) {
519
523
protected void handlePingResponse (Parcel parcel ) {}
520
524
521
525
@ GuardedBy ("this" )
522
- private void sendAcknowledgeBytes (OneWayBinderProxy iBinder ) {
526
+ private void sendAcknowledgeBytes (OneWayBinderProxy iBinder , long n ) {
523
527
// Send a transaction to acknowledge reception of incoming data.
524
- long n = numIncomingBytes .get ();
525
- acknowledgedIncomingBytes = n ;
526
528
try (ParcelHolder parcel = ParcelHolder .obtain ()) {
527
529
parcel .get ().writeLong (n );
528
530
iBinder .transact (ACKNOWLEDGE_BYTES , parcel );
0 commit comments