/
ConcurrencyManager.java
1112 lines (1022 loc) · 52.4 KB
/
ConcurrencyManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 1998, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
package org.eclipse.persistence.internal.helper;
import org.eclipse.persistence.config.SystemProperties;
import org.eclipse.persistence.exceptions.ConcurrencyException;
import org.eclipse.persistence.internal.identitymaps.CacheKey;
import org.eclipse.persistence.internal.localization.ToStringLocalization;
import org.eclipse.persistence.internal.localization.TraceLocalization;
import org.eclipse.persistence.internal.security.PrivilegedAccessHelper;
import org.eclipse.persistence.logging.AbstractSessionLog;
import org.eclipse.persistence.logging.SessionLog;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* INTERNAL:
* <p>
* <b>Purpose</b>: To maintain concurrency for a particular task.
* It is a wrappers of a semaphore that allows recursive waits by a single thread.
* <p>
* <b>Responsibilities</b>:
* <ul>
* <li> Keep track of the active thread.
* <li> Wait all other threads until the first thread is done.
* <li> Maintain the depth of the active thread.
* </ul>
*/
public class ConcurrencyManager implements Serializable {
public static final Map<Thread, DeferredLockManager> DEFERRED_LOCK_MANAGERS = initializeDeferredLockManagers();
// Used for logging in case of dead-lock detection. Unique instance id.
private static final AtomicLong CONCURRENCY_MANAGER_ID = new AtomicLong(0);
protected static boolean shouldTrackStack = PrivilegedAccessHelper.getSystemProperty(SystemProperties.RECORD_STACK_ON_LOCK) != null;
protected AtomicInteger numberOfReaders;
protected AtomicInteger depth;
protected AtomicInteger numberOfWritersWaiting;
protected volatile transient Thread activeThread;
protected boolean lockedByMergeManager;
protected Exception stack;
// Extended logging info fields
// Unique ID assigned each time when a new instance of a concurrency manager is created
private final long concurrencyManagerId = CONCURRENCY_MANAGER_ID.incrementAndGet();
// Creation date
private final Date concurrencyManagerCreationDate = new Date();
// In case if two threads are working on the exact same entity that leads to both threads wanting to release the same cache key
// there is tracking each increment of number of readers and their release.
private final AtomicLong totalNumberOfKeysAcquiredForReading = new AtomicLong(0);
// Same as totalNumberOfKeysAcquiredForReading but incremented each time the cache key is suffering to release cache key.
private final AtomicLong totalNumberOfKeysReleasedForReading = new AtomicLong(0);
// Total number of times the cache key caused a blow up because it suffered a release of cache key when the counter
// was set to 0. It should happen if an entity being shared by two threads.
private final AtomicLong totalNumberOfKeysReleasedForReadingBlewUpExceptionDueToCacheKeyHavingReachedCounterZero = new AtomicLong(0);
private final Lock instanceLock = new ReentrantLock();
private final Condition instanceLockCondition = instanceLock.newCondition();
private static final Map<Thread, ConcurrencyManager> THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK = new ConcurrentHashMap<>();
private static final Map<Thread, String> THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK_NAME_OF_METHOD_CREATING_TRACE = new ConcurrentHashMap<>();
private static final Map<Thread, ConcurrencyManager> THREADS_TO_WAIT_ON_ACQUIRE = new ConcurrentHashMap<>();
private static final Map<Thread, String> THREADS_TO_WAIT_ON_ACQUIRE_NAME_OF_METHOD_CREATING_TRACE = new ConcurrentHashMap<>();
// Holds as a keys threads that needed to acquire one or more read locks on different cache keys.
private static final Map<Thread, ReadLockManager> READ_LOCK_MANAGERS = new ConcurrentHashMap<>();
private static final Set<Thread> THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS = ConcurrentHashMap.newKeySet();
private static final Map<Thread, String> THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS_BUILD_OBJECT_COMPLETE_GOES_NOWHERE = new ConcurrentHashMap<>();
private static final String ACQUIRE_METHOD_NAME = ConcurrencyManager.class.getName() + ".acquire(...)";
private static final String ACQUIRE_READ_LOCK_METHOD_NAME = ConcurrencyManager.class.getName() + ".acquireReadLock(...)";
private static final String ACQUIRE_WITH_WAIT_METHOD_NAME = ConcurrencyManager.class.getName() + ".acquireWithWait(...)";
private static final String ACQUIRE_DEFERRED_LOCK_METHOD_NAME = ConcurrencyManager.class.getName() + ".acquireDeferredLock(...)";
/**
* Initialize the newly allocated instance of this class.
* Set the depth to zero.
*/
public ConcurrencyManager() {
this.depth = new AtomicInteger(0);
this.numberOfReaders = new AtomicInteger(0);
this.numberOfWritersWaiting = new AtomicInteger(0);
}
/**
* Wait for all threads except the active thread.
* If the active thread just increment the depth.
* This should be called before entering a critical section.
*/
public void acquire() throws ConcurrencyException {
this.acquire(false);
}
/**
* Wait for all threads except the active thread.
* If the active thread just increment the depth.
* This should be called before entering a critical section.
* called with true from the merge process, if true then the refresh will not refresh the object
*/
public void acquire(boolean forMerge) throws ConcurrencyException {
instanceLock.lock();
try {
//Flag the time when we start the while loop
final long whileStartTimeMillis = System.currentTimeMillis();
Thread currentThread = Thread.currentThread();
DeferredLockManager lockManager = getDeferredLockManager(currentThread);
ReadLockManager readLockManager = getReadLockManager(currentThread);
// Waiting to acquire cache key will now start on the while loop
// NOTE: this step bares no influence in acquiring or not acquiring locks
// is just storing debug metadata that we can use when we detect the system is frozen in a dead lock
final boolean currentThreadWillEnterTheWhileWait = ((this.activeThread != null) || (this.numberOfReaders.get() > 0)) && (this.activeThread != currentThread);
if (currentThreadWillEnterTheWhileWait) {
putThreadAsWaitingToAcquireLockForWriting(currentThread, ACQUIRE_METHOD_NAME);
}
while (((this.activeThread != null) || (this.numberOfReaders.get() > 0)) && (this.activeThread != Thread.currentThread())) {
// This must be in a while as multiple threads may be released, or another thread may rush the acquire after one is released.
try {
this.numberOfWritersWaiting.incrementAndGet();
instanceLockCondition.await(ConcurrencyUtil.SINGLETON.getAcquireWaitTime(), TimeUnit.MILLISECONDS);
// Run a method that will fire up an exception if we having been sleeping for too long
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(this, whileStartTimeMillis, lockManager, readLockManager, ConcurrencyUtil.SINGLETON.isAllowInterruptedExceptionFired());
} catch (InterruptedException exception) {
// If the thread is interrupted we want to make sure we release all of the locks the thread was owning
releaseAllLocksAcquiredByThread(lockManager);
// Improve concurrency manager metadata
// Waiting to acquire cache key is is over
if (currentThreadWillEnterTheWhileWait) {
removeThreadNoLongerWaitingToAcquireLockForWriting(currentThread);
}
throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
} finally {
// Since above we increments the number of writers
// whether or not the thread is exploded by an interrupt
// we need to make sure we decrement the number of writer to not allow the code to be corrupted
this.numberOfWritersWaiting.decrementAndGet();
}
} // end of while loop
// Waiting to acquire cahe key is is over
if (currentThreadWillEnterTheWhileWait) {
removeThreadNoLongerWaitingToAcquireLockForWriting(currentThread);
}
if (this.activeThread == null) {
this.activeThread = Thread.currentThread();
if (shouldTrackStack) {
this.stack = new Exception();
}
}
this.lockedByMergeManager = forMerge;
this.depth.incrementAndGet();
} finally {
instanceLock.unlock();
}
}
/**
* If the lock is not acquired already acquire it and return true.
* If it has been acquired already return false
* Added for CR 2317
*/
public boolean acquireNoWait() throws ConcurrencyException {
return acquireNoWait(false);
}
/**
* If the lock is not acquired already acquire it and return true.
* If it has been acquired already return false
* Added for CR 2317
* called with true from the merge process, if true then the refresh will not refresh the object
*/
public boolean acquireNoWait(boolean forMerge) throws ConcurrencyException {
instanceLock.lock();
try {
if ((this.activeThread == null && this.numberOfReaders.get() == 0) || (this.activeThread == Thread.currentThread())) {
//if I own the lock increment depth
acquire(forMerge);
return true;
} else {
return false;
}
} finally {
instanceLock.unlock();
}
}
/**
* If the lock is not acquired already acquire it and return true.
* If it has been acquired already return false
* Added for CR 2317
* called with true from the merge process, if true then the refresh will not refresh the object
*/
public boolean acquireWithWait(boolean forMerge, int wait) throws ConcurrencyException {
instanceLock.lock();
try {
final Thread currentThread = Thread.currentThread();
if ((this.activeThread == null && this.numberOfReaders.get() == 0) || (this.activeThread == currentThread)) {
// if I own the lock increment depth
acquire(forMerge);
return true;
} else {
try {
putThreadAsWaitingToAcquireLockForWriting(currentThread, ACQUIRE_WITH_WAIT_METHOD_NAME);
instanceLockCondition.await(wait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return false;
} finally {
removeThreadNoLongerWaitingToAcquireLockForWriting(currentThread);
}
if ((this.activeThread == null && this.numberOfReaders.get() == 0)
|| (this.activeThread == currentThread)) {
acquire(forMerge);
return true;
}
return false;
}
} finally {
instanceLock.unlock();
}
}
/**
* If the activeThread is not set, acquire it and return true.
* If the activeThread is set, it has been acquired already, return false.
* Added for Bug 5840635
* Call with true from the merge process, if true then the refresh will not refresh the object.
*/
public boolean acquireIfUnownedNoWait(boolean forMerge) throws ConcurrencyException {
instanceLock.lock();
try {
// Only acquire lock if active thread is null. Do not check current thread.
if (this.activeThread == null && this.numberOfReaders.get() == 0) {
// if lock is unowned increment depth
acquire(forMerge);
return true;
} else {
return false;
}
} finally {
instanceLock.unlock();
}
}
/**
* Add deferred lock into a hashtable to avoid deadlock
*/
public void acquireDeferredLock() throws ConcurrencyException {
Thread currentThread = Thread.currentThread();
DeferredLockManager lockManager = getDeferredLockManager(currentThread);
ReadLockManager readLockManager = getReadLockManager(currentThread);
if (lockManager == null) {
lockManager = new DeferredLockManager();
putDeferredLock(currentThread, lockManager);
}
lockManager.incrementDepth();
instanceLock.lock();
try {
final long whileStartTimeMillis = System.currentTimeMillis();
final boolean currentThreadWillEnterTheWhileWait = this.numberOfReaders.get() != 0;
if(currentThreadWillEnterTheWhileWait) {
putThreadAsWaitingToAcquireLockForWriting(currentThread, ACQUIRE_DEFERRED_LOCK_METHOD_NAME);
}
while (this.numberOfReaders.get() != 0) {
// There are readers of this object, wait until they are done before determining if
//there are any other writers. If not we will wait on the readers for acquire. If another
//thread is also waiting on the acquire then a deadlock could occur. See bug 3049635
//We could release all active locks before releasing deferred but the object may not be finished building
//we could make the readers get a hard lock, but then we would just build a deferred lock even though
//the object is not being built.
try {
this.numberOfWritersWaiting.incrementAndGet();
instanceLockCondition.await(ConcurrencyUtil.SINGLETON.getAcquireWaitTime(), TimeUnit.MILLISECONDS);
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(this, whileStartTimeMillis, lockManager, readLockManager, ConcurrencyUtil.SINGLETON.isAllowInterruptedExceptionFired());
} catch (InterruptedException exception) {
// If the thread is interrupted we want to make sure we release all of the locks the thread was owning
releaseAllLocksAcquiredByThread(lockManager);
if (currentThreadWillEnterTheWhileWait) {
removeThreadNoLongerWaitingToAcquireLockForWriting(currentThread);
}
throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
} finally {
this.numberOfWritersWaiting.decrementAndGet();
}
}
if (currentThreadWillEnterTheWhileWait) {
removeThreadNoLongerWaitingToAcquireLockForWriting(currentThread);
}
if ((this.activeThread == currentThread) || (!isAcquired())) {
lockManager.addActiveLock(this);
acquire();
} else {
lockManager.addDeferredLock(this);
if (AbstractSessionLog.getLog().shouldLog(SessionLog.FINER) && this instanceof CacheKey) {
AbstractSessionLog.getLog().log(SessionLog.FINER, SessionLog.CACHE, "acquiring_deferred_lock", ((CacheKey)this).getObject(), currentThread.getName());
}
}
} finally {
instanceLock.unlock();
}
}
/**
* Check the lock state, if locked, acquire and release a deferred lock.
* This optimizes out the normal deferred-lock check if not locked.
*/
public void checkDeferredLock() throws ConcurrencyException {
// If it is not locked, then just return.
if (this.activeThread == null) {
return;
}
acquireDeferredLock();
releaseDeferredLock();
}
/**
* Check the lock state, if locked, acquire and release a read lock.
* This optimizes out the normal read-lock check if not locked.
*/
public void checkReadLock() throws ConcurrencyException {
// If it is not locked, then just return.
if (this.activeThread == null) {
return;
}
acquireReadLock();
releaseReadLock();
}
/**
* Wait on any writer.
* Allow concurrent reads.
*/
public void acquireReadLock() throws ConcurrencyException {
instanceLock.lock();
try {
final Thread currentThread = Thread.currentThread();
final long whileStartTimeMillis = System.currentTimeMillis();
DeferredLockManager lockManager = getDeferredLockManager(currentThread);
ReadLockManager readLockManager = getReadLockManager(currentThread);
final boolean currentThreadWillEnterTheWhileWait = (this.activeThread != null) && (this.activeThread != currentThread);
if (currentThreadWillEnterTheWhileWait) {
putThreadAsWaitingToAcquireLockForReading(currentThread, ACQUIRE_READ_LOCK_METHOD_NAME);
}
// Cannot check for starving writers as will lead to deadlocks.
while ((this.activeThread != null) && (this.activeThread != Thread.currentThread())) {
try {
instanceLockCondition.await(ConcurrencyUtil.SINGLETON.getAcquireWaitTime(), TimeUnit.MILLISECONDS);
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(this, whileStartTimeMillis, lockManager, readLockManager, ConcurrencyUtil.SINGLETON.isAllowInterruptedExceptionFired());
} catch (InterruptedException exception) {
releaseAllLocksAcquiredByThread(lockManager);
if (currentThreadWillEnterTheWhileWait) {
removeThreadNoLongerWaitingToAcquireLockForReading(currentThread);
}
throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
}
}
if (currentThreadWillEnterTheWhileWait) {
removeThreadNoLongerWaitingToAcquireLockForReading(currentThread);
}
try {
addReadLockToReadLockManager();
} finally {
this.numberOfReaders.incrementAndGet();
this.totalNumberOfKeysAcquiredForReading.incrementAndGet();
}
} finally {
instanceLock.unlock();
}
}
/**
* If this is acquired return false otherwise acquire readlock and return true
*/
public boolean acquireReadLockNoWait() {
instanceLock.lock();
try {
if ((this.activeThread == null) || (this.activeThread == Thread.currentThread())) {
acquireReadLock();
return true;
} else {
return false;
}
} finally {
instanceLock.unlock();
}
}
/**
* Return the active thread.
*/
public Thread getActiveThread() {
return activeThread;
}
/**
* Return the deferred lock manager from the thread
*/
public static DeferredLockManager getDeferredLockManager(Thread thread) {
return getDeferredLockManagers().get(thread);
}
/**
* Return the deferred lock manager hashtable (thread - DeferredLockManager).
*/
protected static Map<Thread, DeferredLockManager> getDeferredLockManagers() {
return DEFERRED_LOCK_MANAGERS;
}
/**
* Init the deferred lock managers (thread - DeferredLockManager).
*/
protected static Map<Thread, DeferredLockManager> initializeDeferredLockManagers() {
return new ConcurrentHashMap<>();
}
/**
* Return the current depth of the active thread.
*/
public int getDepth() {
return depth.get();
}
/**
* Number of writer that want the lock.
* This is used to ensure that a writer is not starved.
*/
public int getNumberOfReaders() {
return numberOfReaders.get();
}
/**
* Number of writers that want the lock.
* This is used to ensure that a writer is not starved.
*/
public int getNumberOfWritersWaiting() {
return numberOfWritersWaiting.get();
}
/**
* Return if a thread has acquire this manager.
*/
public boolean isAcquired() {
return depth.get() > 0;
}
/**
* INTERNAL:
* Used byt the refresh process to determine if this concurrency manager is locked by
* the merge process. If it is then the refresh should not refresh the object
*/
public boolean isLockedByMergeManager() {
return this.lockedByMergeManager;
}
/**
* Check if the deferred locks of a thread are all released.
* Should write dead lock diagnostic information into the {@link #THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS_BUILD_OBJECT_COMPLETE_GOES_NOWHERE}.
* <br>
* @param thread
* the current thread to be explored. It starts by being the thread that it is stuck but then it evolves
* to be other that have acquired locks our main thread was needing but whcich themslves are stuck...
* threads in the deffered lock chain that are going nowhere themselves.
* @param recursiveSet
* this prevents the algorithm going into an infinite loop of expanding the same thread more than once.
* @param parentChainOfThreads
* this starts by being a basket containing the current thread, but each time we go deeper it evolves to
* contain the thread we will explore next.
* @return true if object is complete
*/
public static boolean isBuildObjectOnThreadComplete(Thread thread, Map<Thread, Thread> recursiveSet, List<Thread> parentChainOfThreads, boolean deadLockDiagnostic) {
if (recursiveSet.containsKey(thread)) {
return true;
}
recursiveSet.put(thread, thread);
DeferredLockManager lockManager = getDeferredLockManager(thread);
if (lockManager == null) {
return true;
}
Vector<ConcurrencyManager> deferredLocks = lockManager.getDeferredLocks();
for (Iterator<ConcurrencyManager> iterator = deferredLocks.iterator();
iterator.hasNext();) {
ConcurrencyManager deferedLock = iterator.next();
Thread activeThread = null;
if (deferedLock.isAcquired()) {
activeThread = deferedLock.getActiveThread();
// the active thread may be set to null at anypoint
// if added for CR 2330
if (activeThread != null) {
DeferredLockManager currentLockManager = getDeferredLockManager(activeThread);
if (currentLockManager == null) {
// deadlock diagnostic extension
if (deadLockDiagnostic && parentChainOfThreads != null) {
StringBuilder justificationForReturningFalse = new StringBuilder();
enrichStringBuildingExplainWhyThreadIsStuckInIsBuildObjectOnThreadComplete(parentChainOfThreads, deferedLock, activeThread, false, justificationForReturningFalse);
setJustificationWhyMethodIsBuildingObjectCompleteReturnsFalse(justificationForReturningFalse.toString());
}
return false;
} else if (currentLockManager.isThreadComplete()) {
activeThread = deferedLock.getActiveThread();
// The lock may suddenly finish and no longer have an active thread.
if (activeThread != null) {
// deadlock diagnostic extension
List<Thread> currentChainOfThreads = null;
if (deadLockDiagnostic) {
currentChainOfThreads = (parentChainOfThreads == null) ? new ArrayList<>() : new ArrayList<>(parentChainOfThreads);
currentChainOfThreads.add(activeThread);
}
if (!isBuildObjectOnThreadComplete(activeThread, recursiveSet, currentChainOfThreads, deadLockDiagnostic)) {
return false;
}
}
} else {
if (deadLockDiagnostic && parentChainOfThreads != null) {
StringBuilder justificationForReturningFalse = new StringBuilder();
enrichStringBuildingExplainWhyThreadIsStuckInIsBuildObjectOnThreadComplete(parentChainOfThreads, deferedLock, activeThread, true, justificationForReturningFalse);
setJustificationWhyMethodIsBuildingObjectCompleteReturnsFalse(justificationForReturningFalse.toString());
}
return false;
}
}
}
}
if (parentChainOfThreads != null && parentChainOfThreads.size() == 1) {
clearJustificationWhyMethodIsBuildingObjectCompleteReturnsFalse();
}
return true;
}
/**
* When the recursive algorithm decides to return false it is because it is confronted with a cache key that had to
* be deferred. And the cache key is either being owned by a thread that did not flage itsef as being finished and
* waiting in the wait for deferred locks. Or the thread that ows the cache key is not playing nice - and not using
* deferred locks - so it has acquire the cache key, it is going about its business (e.g. committing a transaction
* or perhaps doing object building. Normally, but not always, in object building threads do have a lock manager,
* but sometimes not when they agressive acquire lock policy. )
*
* @param chainOfThreadsExpandedInRecursion
* This the chaing threads that were expanded as we went down with the recursion
* @param finalDeferredLockCausingTrouble
* this is a lock that was deferred either by current thread or by a thread that is also itself waiting
* around . This lock is what is causing us ultimately to return FALSE, because the lock is still ACUIRED
* so not yet free. And the thread that owns it is also still not finished yet.
*
* @param activeThreadOnDeferredLock
* this is the thread that was spotted as owning/being actively owning the the deferred lock. So we can
* consider this thread as being the ultimate cause of why the current thread and perhaps a hole chaing
* of related threads are not evolving. But certainly the current thread.
* @param hasDeferredLockManager
* Some threads have deferred lock managers some not. Not clear when they do. But threads doing object
* building typically end up creating a deferred lock manager when they find themselves unable to acquire
* an object and need to defer on the cache key.
* @param justification
* this is what we want to populate it will allow us to build a trace to explain why the thread on the
* wait for deferred lock is going nowhere. This trace will be quite important to help us interpret the
* massive dumps since it is quite typical to find threads in this state.
*
*/
public static void enrichStringBuildingExplainWhyThreadIsStuckInIsBuildObjectOnThreadComplete(
List<Thread> chainOfThreadsExpandedInRecursion,
ConcurrencyManager finalDeferredLockCausingTrouble,
Thread activeThreadOnDeferredLock,
boolean hasDeferredLockManager,
StringBuilder justification) {
// (a) summarize the threads navigated via deferred locks
int currentThreadNumber = 0;
for (Thread currentExpandedThread : chainOfThreadsExpandedInRecursion) {
currentThreadNumber++;
justification.append(TraceLocalization.buildMessage("concurrency_manager_build_object_thread_complete_1", new Object[] {currentThreadNumber, currentExpandedThread.getName()}));
}
justification.append(TraceLocalization.buildMessage("concurrency_manager_build_object_thread_complete_2"));
// (b) Described the cache key blocking us from finishing the oject building
String cacheKeyStr = ConcurrencyUtil.SINGLETON.createToStringExplainingOwnedCacheKey(finalDeferredLockCausingTrouble);
justification.append(TraceLocalization.buildMessage("concurrency_manager_build_object_thread_complete_3", new Object[] {cacheKeyStr}));
// (c) Describe the thread that has acquired the cache key and is not done yet
justification.append(TraceLocalization.buildMessage("concurrency_manager_build_object_thread_complete_4", new Object[] {activeThreadOnDeferredLock, hasDeferredLockManager}));
}
/**
* Return if this manager is within a nested acquire.
*/
public boolean isNested() {
return depth.get() > 1;
}
public void putDeferredLock(Thread thread, DeferredLockManager lockManager) {
getDeferredLockManagers().put(thread, lockManager);
}
/**
* Decrement the depth for the active thread.
* Assume the current thread is the active one.
* Raise an error if the depth become < 0.
* The notify will release the first thread waiting on the object,
* if no threads are waiting it will do nothing.
*/
public void release() throws ConcurrencyException {
instanceLock.lock();
try {
if (this.depth.get() == 0) {
throw ConcurrencyException.signalAttemptedBeforeWait();
} else {
this.depth.decrementAndGet();
}
if (this.depth.get() == 0) {
this.activeThread = null;
if (shouldTrackStack) {
this.stack = null;
}
this.lockedByMergeManager = false;
instanceLockCondition.signalAll();
}
} finally {
instanceLock.unlock();
}
}
/**
* Release the deferred lock.
* This uses a deadlock detection and resolution algorithm to avoid cache deadlocks.
* The deferred lock manager keeps track of the lock for a thread, so that other
* thread know when a deadlock has occurred and can resolve it.
*/
public void releaseDeferredLock() throws ConcurrencyException {
Thread currentThread = Thread.currentThread();
DeferredLockManager lockManager = getDeferredLockManager(currentThread);
ReadLockManager readLockManager = getReadLockManager(currentThread);
if (lockManager == null) {
return;
}
int depth = lockManager.getThreadDepth();
if (depth > 1) {
lockManager.decrementDepth();
return;
}
// If the set is null or empty, means there is no deferred lock for this thread, return.
if (!lockManager.hasDeferredLock()) {
lockManager.releaseActiveLocksOnThread();
removeDeferredLockManager(currentThread);
return;
}
lockManager.setIsThreadComplete(true);
final long whileStartTimeMillis = System.currentTimeMillis();
boolean releaseAllLocksAquiredByThreadAlreadyPerformed = false;
boolean currentThreadRegisteredAsWaitingForisBuildObjectOnThreadComplete = false;
clearJustificationWhyMethodIsBuildingObjectCompleteReturnsFalse();
// Thread have three stages, one where they are doing work (i.e. building objects)
// two where they are done their own work but may be waiting on other threads to finish their work,
// and a third when they and all the threads they are waiting on are done.
// This is essentially a busy wait to determine if all the other threads are done.
while (true) {
boolean isBuildObjectCompleteSlow = ConcurrencyUtil.SINGLETON.tooMuchTimeHasElapsed(whileStartTimeMillis, ConcurrencyUtil.SINGLETON.getBuildObjectCompleteWaitTime());
try{
// 2612538 - the default size of Map (32) is appropriate
Map<Thread, Thread> recursiveSet = new IdentityHashMap<>();
if (isBuildObjectOnThreadComplete(currentThread, recursiveSet, Arrays.asList(currentThread), isBuildObjectCompleteSlow)) {// Thread job done.
// Remove from debug metadata the fact that the current thread needed to wait
// for one or more build objects to be completed by other threads.
if(currentThreadRegisteredAsWaitingForisBuildObjectOnThreadComplete) {
THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS.remove(currentThread);
}
clearJustificationWhyMethodIsBuildingObjectCompleteReturnsFalse();
lockManager.releaseActiveLocksOnThread();
removeDeferredLockManager(currentThread);
AbstractSessionLog.getLog().log(SessionLog.FINER, SessionLog.CACHE, "deferred_locks_released", currentThread.getName());
return;
} else {// Not done yet, wait and check again.
try {
// Add debug metadata to concurrency manager state
// The current thread will now be waiting for other threads to build the object(s) it could not acquire
if(!currentThreadRegisteredAsWaitingForisBuildObjectOnThreadComplete) {
currentThreadRegisteredAsWaitingForisBuildObjectOnThreadComplete = true;
THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS.add(currentThread);
}
Thread.sleep(20);
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(this, whileStartTimeMillis, lockManager, readLockManager, ConcurrencyUtil.SINGLETON.isAllowInterruptedExceptionFired());
} catch (InterruptedException interrupted) {
THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS.remove(currentThread);
AbstractSessionLog.getLog().logThrowable(SessionLog.SEVERE, SessionLog.CACHE, interrupted);
releaseAllLocksAcquiredByThread(lockManager);
releaseAllLocksAquiredByThreadAlreadyPerformed = true;
clearJustificationWhyMethodIsBuildingObjectCompleteReturnsFalse();
throw ConcurrencyException.waitWasInterrupted(interrupted.getMessage());
}
}
} catch (Error error) {
if (!releaseAllLocksAquiredByThreadAlreadyPerformed) {
THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS.remove(currentThread);
AbstractSessionLog.getLog().logThrowable(SessionLog.SEVERE, SessionLog.CACHE, error);
releaseAllLocksAcquiredByThread(lockManager);
clearJustificationWhyMethodIsBuildingObjectCompleteReturnsFalse();
}
throw error;
}
}
}
/**
* Decrement the number of readers. Used to allow concurrent reads.
*/
public void releaseReadLock() throws ConcurrencyException {
instanceLock.lock();
try {
if (this.numberOfReaders.get() == 0) {
this.totalNumberOfKeysReleasedForReadingBlewUpExceptionDueToCacheKeyHavingReachedCounterZero.incrementAndGet();
try {
removeReadLockFromReadLockManager();
} catch (Exception e) {
AbstractSessionLog.getLog().logThrowable(SessionLog.SEVERE, SessionLog.CACHE, e);
}
throw ConcurrencyException.signalAttemptedBeforeWait();
} else {
try {
removeReadLockFromReadLockManager();
} finally {
this.numberOfReaders.decrementAndGet();
this.totalNumberOfKeysReleasedForReading.incrementAndGet();
}
}
if (this.numberOfReaders.get() == 0) {
instanceLockCondition.signalAll();
}
} finally {
instanceLock.unlock();
}
}
/**
* Remove the deferred lock manager for the thread
*/
public static DeferredLockManager removeDeferredLockManager(Thread thread) {
return getDeferredLockManagers().remove(thread);
}
/**
* Set the active thread.
*/
public void setActiveThread(Thread activeThread) {
this.activeThread = activeThread;
}
/**
* Set the current depth of the active thread.
*/
protected void setDepth(int depth) {
this.depth.set(depth);
}
/**
* INTERNAL:
* Used by the mergemanager to let the read know not to refresh this object as it is being
* loaded by the merge process.
*/
public void setIsLockedByMergeManager(boolean state) {
this.lockedByMergeManager = state;
}
/**
* Track the number of readers.
*/
protected void setNumberOfReaders(int numberOfReaders) {
this.numberOfReaders.set(numberOfReaders);
}
/**
* Number of writers that want the lock.
* This is used to ensure that a writer is not starved.
*/
protected void setNumberOfWritersWaiting(int numberOfWritersWaiting) {
this.numberOfWritersWaiting.set(numberOfWritersWaiting);
}
public void transitionToDeferredLock() {
instanceLock.lock();
try {
Thread currentThread = Thread.currentThread();
DeferredLockManager lockManager = getDeferredLockManager(currentThread);
if (lockManager == null) {
lockManager = new DeferredLockManager();
putDeferredLock(currentThread, lockManager);
}
lockManager.incrementDepth();
lockManager.addActiveLock(this);
} finally {
instanceLock.unlock();
}
}
/**
* For the thread to release all of its locks.
*
* @param lockManager
* the deferred lock manager
*/
public void releaseAllLocksAcquiredByThread(DeferredLockManager lockManager) {
Thread currentThread = Thread.currentThread();
//When this method is invoked during an acquire lock sometimes there is no lock manager
if (lockManager == null) {
String cacheKeyToString = ConcurrencyUtil.SINGLETON.createToStringExplainingOwnedCacheKey(this);
StringWriter writer = new StringWriter();
writer.write(TraceLocalization.buildMessage("concurrency_manager_release_locks_acquired_by_thread_1", new Object[] {currentThread.getName(), cacheKeyToString}));
AbstractSessionLog.getLog().log(SessionLog.SEVERE, SessionLog.CACHE, writer.toString(), new Object[] {}, false);
return;
}
StringWriter writer = new StringWriter();
writer.write(TraceLocalization.buildMessage("concurrency_manager_release_locks_acquired_by_thread_2", new Object[] {currentThread.toString()}));
AbstractSessionLog.getLog().log(SessionLog.SEVERE, SessionLog.CACHE, writer.toString(), new Object[] {}, false);
lockManager.releaseActiveLocksOnThread();
removeDeferredLockManager(currentThread);
}
/**
* The method is not synchronized because for now we assume that each thread will ask for its own lock manager. If
* we were writing a dead lock detection mechanism then a ThreadA could be trying understand the ReadLocks of a
* ThreadB and this would no longer be true.
*
* @param thread
* The thread for which we want to have look at the acquired read locks.
* @return Never null if the read lock manager does not yet exist for the current thread. otherwise its read log
* manager is returned.
*/
protected static ReadLockManager getReadLockManager(Thread thread) {
Map<Thread, ReadLockManager> readLockManagers = getReadLockManagers();
return readLockManagers.get(thread);
}
/**
* Return the deferred lock manager hashtable (thread - DeferredLockManager).
*/
protected static Map<Thread, ReadLockManager> getReadLockManagers() {
return READ_LOCK_MANAGERS;
}
/**
* Print the nested depth.
*/
@Override
public String toString() {
Object[] args = {getDepth()};
return getClass().getSimpleName() + ToStringLocalization.buildMessage("nest_level", args);
}
public Exception getStack() {
return stack;
}
public void setStack(Exception stack) {
this.stack = stack;
}
public static boolean shouldTrackStack() {
return shouldTrackStack;
}
/**
* INTERNAL:
* This can be set during debugging to record the stacktrace when a lock is acquired.
* Then once IdentityMapAccessor.printIdentityMapLocks() is called the stack call for each
* lock will be printed as well. Because locking issues are usually quite time sensitive setting
* this flag may inadvertently remove the deadlock because of the change in timings.
* <p>
* There is also a system level property for this setting. "eclipselink.cache.record-stack-on-lock"
*/
public static void setShouldTrackStack(boolean shouldTrackStack) {
ConcurrencyManager.shouldTrackStack = shouldTrackStack;
}
private static String getPropertyRecordStackOnLock() {
return PrivilegedAccessHelper.callDoPrivileged(() -> System.getProperty(SystemProperties.RECORD_STACK_ON_LOCK));
}
/**
* Normally this mehtod should only be called from withing the concurrency manager.
* However the write lock manager while it is building clones also does some while loop waiting
* to try to acquire a cache key this acquiring logic is not being managed directly inside of the wait manager.
*
*/
public void putThreadAsWaitingToAcquireLockForWriting(Thread thread, String methodName){
THREADS_TO_WAIT_ON_ACQUIRE.put(thread, this);
THREADS_TO_WAIT_ON_ACQUIRE_NAME_OF_METHOD_CREATING_TRACE.put(thread, methodName);
}
/**
* The thread has acquired the lock for writing or decided to defer acquiring the lock putting this lock into its
* deferred lock list.
*/
public void removeThreadNoLongerWaitingToAcquireLockForWriting(Thread thread) {
THREADS_TO_WAIT_ON_ACQUIRE.remove(thread);
THREADS_TO_WAIT_ON_ACQUIRE_NAME_OF_METHOD_CREATING_TRACE.remove(thread);
}
/**
* The thread is trying to acquire a read lock but it is not being able to make process on getting the read lock.
*
* @param methodName
* metadata to help us debug trace leaking. If we start blowing up threads we do not want the traces
* created by the current thread to remain.
*/
public void putThreadAsWaitingToAcquireLockForReading(Thread currentThread, String methodName) {
THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK.put(currentThread, this);
THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK_NAME_OF_METHOD_CREATING_TRACE.put(currentThread, methodName);
}
public void removeThreadNoLongerWaitingToAcquireLockForReading(Thread thread) {
THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK.remove(thread);
THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK_NAME_OF_METHOD_CREATING_TRACE.remove(thread);
}
/** Getter for {@link #concurrencyManagerId} */
public long getConcurrencyManagerId() {
return concurrencyManagerId;
}
/** Getter for {@link #concurrencyManagerCreationDate} */
public Date getConcurrencyManagerCreationDate() {
return concurrencyManagerCreationDate;
}
/** Getter for {@link #totalNumberOfKeysAcquiredForReading} */
public long getTotalNumberOfKeysAcquiredForReading() {
return totalNumberOfKeysAcquiredForReading.get();
}
/** Getter for {@link #totalNumberOfKeysReleasedForReading} */
public long getTotalNumberOfKeysReleasedForReading() {
return totalNumberOfKeysReleasedForReading.get();
}
/** Getter for {@link #totalNumberOfKeysReleasedForReadingBlewUpExceptionDueToCacheKeyHavingReachedCounterZero} */
public long getTotalNumberOfKeysReleasedForReadingBlewUpExceptionDueToCacheKeyHavingReachedCounterZero() {
return totalNumberOfKeysReleasedForReadingBlewUpExceptionDueToCacheKeyHavingReachedCounterZero.get();
}
/** Getter for {@link #THREADS_TO_WAIT_ON_ACQUIRE} */
public static Map<Thread, ConcurrencyManager> getThreadsToWaitOnAcquire() {
return new HashMap<>(THREADS_TO_WAIT_ON_ACQUIRE);
}
/** Getter for {@link #THREADS_TO_WAIT_ON_ACQUIRE_NAME_OF_METHOD_CREATING_TRACE} */
public static Map<Thread, String> getThreadsToWaitOnAcquireMethodName() {
return new HashMap<>(THREADS_TO_WAIT_ON_ACQUIRE_NAME_OF_METHOD_CREATING_TRACE);
}
/** Getter for {@link #THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK} */
public static Map<Thread, ConcurrencyManager> getThreadsToWaitOnAcquireReadLock() {
return THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK;
}
/** Getter for {@link #THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK_NAME_OF_METHOD_CREATING_TRACE} */
public static Map<Thread, String> getThreadsToWaitOnAcquireReadLockMethodName() {
return THREADS_TO_WAIT_ON_ACQUIRE_READ_LOCK_NAME_OF_METHOD_CREATING_TRACE;
}
/** Getter for {@link #THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS} */
public static Set<Thread> getThreadsWaitingToReleaseDeferredLocks() {
return new HashSet<>(THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS);
}
/** Getter for {@link #THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS_BUILD_OBJECT_COMPLETE_GOES_NOWHERE} */
public static Map<Thread, String> getThreadsWaitingToReleaseDeferredLocksJustification() {
return new HashMap<>(THREADS_WAITING_TO_RELEASE_DEFERRED_LOCKS_BUILD_OBJECT_COMPLETE_GOES_NOWHERE);