-
Notifications
You must be signed in to change notification settings - Fork 1
/
Rxn.scala
1656 lines (1449 loc) · 56 KB
/
Rxn.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2016-2024 Daniel Urban and contributors listed in NOTICE.txt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.tauri.choam
package core
import scala.concurrent.duration._
import cats.{ Align, Applicative, Defer, Functor, StackSafeMonad, Monoid, MonoidK, Semigroup, Show }
import cats.arrow.ArrowChoice
import cats.data.{ Ior, State }
import cats.mtl.Local
import cats.effect.kernel.{ Async, Clock, Unique, Ref => CatsRef }
import cats.effect.std.{ Random, SecureRandom, UUIDGen }
import internal.mcas.{ MemoryLocation, Mcas, LogEntry, McasStatus, Descriptor, Consts, Hamt }
/**
* An effectful function from `A` to `B`; when executed,
* it may update any number of [[Ref]]s atomically. (It
* may also create new [[Ref]]s.)
*
* These functions are composable (see below), and composition
* preserves their atomicity. That is, all affected [[Ref]]s
* will be updated atomically.
*
* A [[Rxn]] forms an [[cats.arrow.Arrow Arrow]] (more
* specifically, an [[cats.arrow.ArrowChoice ArrowChoice]]).
* It also forms a [[cats.Monad Monad]] in `B`; however, consider
* using the arrow combinators (when possible) instead of `flatMap`
* (since a static combination of `Rxn`s may be more performant).
*
* The relation between [[Rxn]] and [[Axn]] is approximately
* `Rxn[A, B] ≡ (A => Axn[B])`; or, alternatively
* `Axn[A] ≡ Rxn[Any, A]`.
*/
sealed abstract class Rxn[-A, +B] { // short for 'reaction'
/*
* An implementation similar to reagents, described in [Reagents: Expressing and
* Composing Fine-grained Concurrency](https://web.archive.org/web/20220214132428/https://www.ccis.northeastern.edu/home/turon/reagents.pdf)
* by Aaron Turon; originally implemented at [aturon/ChemistrySet](
* https://github.com/aturon/ChemistrySet).
*
* This implementation is significantly simplified by the fact
* that offers and permanent failure are not implemented. As a
* consequence, these `Rxn`s are always lock-free (provided
* that the underlying k-CAS implementation is lock-free, and
* that `unsafe*` operations are not used, and there is no
* infinite recursion).
*
* On the other hand, this implementation uses an optimized and
* stack-safe interpreter (see `interpreter`). A limited version
* of an `Exchanger` is also implemented, which can be used to
* implement elimination arrays. (The `Exchanger` by itself could
* cause indefinite retries, so it must always be combined with
* a lock-free operation.)
*
* Another difference is the referentially transparent ("purely
* functional") API. All side-effecting APIs are prefixed by
* `unsafe`. (But not all `unsafe` APIs are side-effecting, some
* of them are `unsafe` for another reason.)
*
* We also offer [*opacity*](https://nbronson.github.io/scala-stm/semantics.html#opacity),
* a consistency guarantee of the read values visible inside a
* running `Rxn`.
*
* Finally (unlike with reagents), two `Rxn`s which touch the same
* `Ref`s are composable with each other. This allows multiple
* reads and writes to the same `Ref` in one `Rxn`. (`Exchanger` is
* an exception, this is part of the reason it is `unsafe`).
*
* Existing reagent implementations:
* - https://github.com/aturon/Caper (Racket)
* - https://github.com/ocamllabs/reagents (OCaml)
*/
/*
* Implementation note: in some cases, composing
* `Rxn`s with `>>>` (or `*>`) will be faster
* than using `flatMap`. An example (with measurements)
* is in `ArrowBench`.
*
* TODO: More benchmarks needed to determine exactly
* TODO: what it is that makes them faster. Also,
* TODO: maybe we could optimize `flatMap`.
*/
import Rxn._
/**
* Tag for the interpreter (see `interpreter`)
*
* This attempts to be an optimization, inspired by an old optimization in
* the Scala compiler for matching on sealed subclasses
* (see https://github.com/scala/scala/commit/b98eb1d74141a4159539d373e6216e799d6b6dcd).
* Except we do it by hand, which is ugly, but might be worth it.
*
* In Cats Effect 3 the IO/SyncIO runloop also uses something like this
* (see https://github.com/typelevel/cats-effect/blob/v3.0.2/core/shared/src/main/scala/cats/effect/SyncIO.scala#L195),
*
* The ZIO runloop seems to do something similar too
* (see https://github.com/zio/zio/blob/v1.0.6/core/shared/src/main/scala/zio/internal/FiberContext.scala#L320).
*
* The idea is to `match` on `r.tag` instead of `r` itself. That match
* should be compiled to a JVM tableswitch instruction. Which is supposed
* to be very fast. The match arms require `.asInstanceOf`, which is unsafe
* and makes maintenance harder. However, if there are a lot of cases,
* a chain of instanceof/checkcast instructions could be slower.
*
* TODO: Check if it's indeed faster than a simple `match` (apparently "tag"
* TODO: was removed from the Scala compiler because it was not worth it).
*/
private[core] def tag: Byte
final def + [X <: A, Y >: B](that: Rxn[X, Y]): Rxn[X, Y] =
new Choice[X, Y](this, that)
final def >>> [C](that: Rxn[B, C]): Rxn[A, C] =
new AndThen[A, B, C](this, that)
final def × [C, D](that: Rxn[C, D]): Rxn[(A, C), (B, D)] =
new AndAlso[A, B, C, D](this, that)
final def * [X <: A, C](that: Rxn[X, C]): Rxn[X, (B, C)] =
(this × that).contramap[X](x => (x, x))
final def ? : Rxn[A, Option[B]] =
this.attempt
final def attempt: Rxn[A, Option[B]] =
this.map(Some(_)) + pure[Option[B]](None)
final def maybe: Rxn[A, Boolean] =
this.as(true) + pure(false)
final def map[C](f: B => C): Rxn[A, C] =
this >>> lift(f)
final def as[C](c: C): Rxn[A, C] =
new As(this, c)
// old implementation with map:
private[choam] final def asOld[C](c: C): Rxn[A, C] =
this.map(_ => c)
final def void: Rxn[A, Unit] =
this.as(())
// FIXME: do we need this?
final def dup: Rxn[A, (B, B)] =
this.map { b => (b, b) }
final def contramap[C](f: C => A): Rxn[C, B] =
lift(f) >>> this
final def provide(a: A): Axn[B] =
new Provide[A, B](this, a)
// old implementation with contramap:
private[choam] final def provideOld(a: A): Axn[B] =
contramap[Any](_ => a)
final def dimap[C, D](f: C => A)(g: B => D): Rxn[C, D] =
this.contramap(f).map(g)
final def toFunction: A => Axn[B] = { (a: A) =>
this.provide(a)
}
final def map2[X <: A, C, D](that: Rxn[X, C])(f: (B, C) => D): Rxn[X, D] =
(this * that).map(f.tupled)
final def <* [X <: A, C](that: Rxn[X, C]): Rxn[X, B] =
this.productL(that)
final def productL [X <: A, C](that: Rxn[X, C]): Rxn[X, B] =
(this * that).map(_._1)
final def *> [X <: A, C](that: Rxn[X, C]): Rxn[X, C] =
this.productR(that)
final def productR[X <: A, C](that: Rxn[X, C]): Rxn[X, C] =
new ProductR[X, B, C](this, that)
final def first[C]: Rxn[(A, C), (B, C)] =
this × identity[C]
final def second[C]: Rxn[(C, A), (C, B)] =
identity[C] × this
final def flatMap[X <: A, C](f: B => Rxn[X, C]): Rxn[X, C] =
new FlatMap(this, f)
// TODO: Unoptimized impl.:
private[choam] final def flatMapOld[X <: A, C](f: B => Rxn[X, C]): Rxn[X, C] = {
val self: Rxn[X, (X, B)] = this.second[X].contramap[X](x => (x, x))
val comp: Rxn[(X, B), C] = computed[(X, B), C](xb => f(xb._2).provide(xb._1))
self >>> comp
}
final def flatMapF[C](f: B => Axn[C]): Rxn[A, C] =
new FlatMapF(this, f)
// TODO: Unoptimized impl.:
private[choam] final def flatMapFOld[C](f: B => Axn[C]): Rxn[A, C] =
this >>> computed(f)
// TODO: optimize
final def >> [X <: A, C](that: => Rxn[X, C]): Rxn[X, C] =
this.flatMap { _ => that }
final def flatTap(rxn: Rxn[B, Unit]): Rxn[A, B] =
this.flatMapF { b => rxn.provide(b).as(b) }
final def flatten[C](implicit ev: B <:< Axn[C]): Rxn[A, C] =
this.flatMapF(ev)
final def postCommit(pc: Rxn[B, Unit]): Rxn[A, B] =
this >>> Rxn.postCommit[B](pc)
/**
* Execute the [[Rxn]] with the specified input `a`.
*
* This method is `unsafe` because it performs side-effects.
*
* @param a the input to the [[Rxn]].
* @param mcas the [[internal.mcas.Mcas]] implementation to use.
* @param strategy the retry strategy to use.
* @return the result of the executed [[Rxn]].
*/
final def unsafePerform(
a: A,
mcas: Mcas,
strategy: RetryStrategy.Spin = RetryStrategy.Default,
): B = {
new InterpreterState[A, B](
rxn = this,
x = a,
mcas = mcas,
strategy = strategy,
).interpretSync()
}
final def perform[F[_], X >: B](
a: A,
mcas: Mcas,
strategy: RetryStrategy = RetryStrategy.Default,
)(implicit F: Async[F]): F[X] = {
new InterpreterState[A, X](
this,
a,
mcas = mcas,
strategy = strategy,
).interpretAsync(F)
}
/** Only for tests/benchmarks */
private[choam] final def unsafePerformInternal(
a: A,
ctx: Mcas.ThreadContext,
maxBackoff: Int = BackoffPlatform.maxPauseDefault,
randomizeBackoff: Boolean = BackoffPlatform.randomizePauseDefault,
): B = {
// TODO: this allocation can hurt us in benchmarks!
val str = RetryStrategy
.Default
.withMaxSpin(maxBackoff)
.withRandomizeSpin(randomizeBackoff)
new InterpreterState[A, B](
this,
a,
ctx.impl,
strategy = str,
).interpretSyncWithContext(ctx)
}
override def toString: String
}
object Rxn extends RxnInstances0 {
private[this] final val interruptCheckPeriod =
16384
/** This is just exporting `DefaultMcas`, because that's in an internal package */
final def DefaultMcas: Mcas =
Mcas.DefaultMcas
// API:
final def pure[A](a: A): Axn[A] =
new Pure[A](a)
/** Old name of `pure` */
private[choam] final def ret[A](a: A): Axn[A] =
pure(a)
final def identity[A]: Rxn[A, A] =
lift(a => a)
final def lift[A, B](f: A => B): Rxn[A, B] =
new Lift(f)
final def unit[A]: Rxn[A, Unit] =
pure(())
final def computed[A, B](f: A => Axn[B]): Rxn[A, B] =
new Computed(f)
final def postCommit[A](pc: Rxn[A, Unit]): Rxn[A, A] =
new PostCommit[A](pc)
final def tailRecM[X, A, B](a: A)(f: A => Rxn[X, Either[A, B]]): Rxn[X, B] =
new TailRecM[X, A, B](a, f)
private[choam] final def tailRecMWithFlatMap[X, A, B](a: A)(f: A => Rxn[X, Either[A, B]]): Rxn[X, B] = {
f(a).flatMap {
case Left(a) => tailRecMWithFlatMap(a)(f)
case Right(b) => Rxn.pure(b)
}
}
// Utilities:
private[this] val _osRng: random.OsRng = {
// Under certain circumstances (e.g.,
// Linux right after boot in a
// fresh VM), this call might block.
// We really can't do anything about
// it, but at least it's not during
// executing a `Rxn` (it happens when
// the very first `Rxn` is *created*,
// and the `Rxn` class is loaded).
random.OsRng.mkNew()
}
private[core] final def osRng: random.OsRng =
_osRng
private[this] val _fastRandom: Random[Axn] =
random.newFastRandom
private[this] val _secureRandom: SecureRandom[Axn] =
random.newSecureRandom(_osRng)
final def unique: Axn[Unique.Token] =
unsafe.delay { _ => new Unique.Token() }
final def fastRandom: Random[Axn] =
_fastRandom
final def secureRandom: SecureRandom[Axn] =
_secureRandom
final def deterministicRandom(initialSeed: Long): Axn[random.SplittableRandom[Axn]] =
random.deterministicRandom(initialSeed)
private[choam] final object ref {
private[choam] final def get[A](r: Ref[A]): Axn[A] =
new Read(r.loc)
private[choam] final def upd[A, B, C](r: Ref[A])(f: (A, B) => (A, C)): Rxn[B, C] =
new Upd(r.loc, f)
private[choam] final def updWith[A, B, C](r: Ref[A])(f: (A, B) => Axn[(A, C)]): Rxn[B, C] =
new UpdWith[A, B, C](r.loc, f)
}
final object unsafe {
sealed abstract class Ticket[A] {
def unsafePeek: A
def unsafeSet(nv: A): Axn[Unit]
def unsafeIsReadOnly: Boolean
final def unsafeValidate: Axn[Unit] =
this.unsafeSet(this.unsafePeek)
}
private[Rxn] final class TicketImpl[A](hwd: LogEntry[A])
extends Ticket[A] {
final def unsafePeek: A =
hwd.nv
final def unsafeSet(nv: A): Axn[Unit] =
new TicketWrite(hwd, nv)
final def unsafeIsReadOnly: Boolean =
hwd.readOnly
}
private[choam] final def directRead[A](r: Ref[A]): Axn[A] =
new DirectRead[A](r.loc)
def ticketRead[A](r: Ref[A]): Axn[unsafe.Ticket[A]] =
new TicketRead[A](r.loc)
private[choam] final def cas[A](r: Ref[A], ov: A, nv: A): Axn[Unit] =
new Cas[A](r.loc, ov, nv)
def retry[A, B]: Rxn[A, B] =
new AlwaysRetry[A, B]
private[choam] def delay[A, B](uf: A => B): Rxn[A, B] =
lift(uf)
private[choam] def suspend[A, B](uf: A => Axn[B]): Rxn[A, B] =
delay(uf).flatten // TODO: optimize
// TODO: Calling `unsafePerform` (or similar) inside
// TODO: `uf` is dangerous; currently it only messes
// TODO: up exchanger statistics; in the future, who knows...
private[choam] def delayContext[A](uf: Mcas.ThreadContext => A): Axn[A] =
new Ctx[A](uf)
private[choam] def suspendContext[A](uf: Mcas.ThreadContext => Axn[A]): Axn[A] =
this.delayContext(uf).flatten // TODO: optimize
final def exchanger[A, B]: Axn[Exchanger[A, B]] =
Exchanger.apply[A, B]
private[choam] final def exchange[A, B](ex: Exchanger[A, B]): Rxn[A, B] =
ex.exchange
/**
* This is not unsafe by itself, but it is only useful
* if there are other unsafe things going on (validation
* is handled automatically otherwise). This is why it
* is part of the `unsafe` API.
*/
final def forceValidate: Axn[Unit] =
new ForceValidate
}
private[core] final object internal {
final def exchange[A, B](ex: ExchangerImpl[A, B]): Rxn[A, B] =
new Exchange[A, B](ex)
final def finishExchange[D](
hole: Ref[Exchanger.NodeResult[D]],
restOtherContK: ObjStack.Lst[Any],
lenSelfContT: Int,
): Rxn[D, Unit] = new FinishExchange(hole, restOtherContK, lenSelfContT)
}
// Representation:
/** Only the interpreter can use this! */
private final class Commit[A]() extends Rxn[A, A] {
private[core] final override def tag = 0
final override def toString: String = "Commit()"
}
private final class AlwaysRetry[A, B]() extends Rxn[A, B] {
private[core] final override def tag = 1
final override def toString: String = "AlwaysRetry()"
}
private final class PostCommit[A](val pc: Rxn[A, Unit]) extends Rxn[A, A] {
private[core] final override def tag = 2
final override def toString: String = s"PostCommit(${pc})"
}
private final class Lift[A, B](val func: A => B) extends Rxn[A, B] {
private[core] final override def tag = 3
final override def toString: String = "Lift(<function>)"
}
private final class Computed[A, B](val f: A => Axn[B]) extends Rxn[A, B] {
private[core] final def tag = 4
final override def toString: String = "Computed(<function>)"
}
// tag = 5 (unused)
private final class Choice[A, B](val left: Rxn[A, B], val right: Rxn[A, B]) extends Rxn[A, B] {
private[core] final override def tag = 6
final override def toString: String = s"Choice(${left}, ${right})"
}
private final class Cas[A](val ref: MemoryLocation[A], val ov: A, val nv: A) extends Rxn[Any, Unit] {
private[core] final override def tag = 7
final override def toString: String = s"Cas(${ref}, ${ov}, ${nv})"
}
private final class Upd[A, B, X](val ref: MemoryLocation[X], val f: (X, A) => (X, B)) extends Rxn[A, B] {
private[core] final override def tag = 8
final override def toString: String = s"Upd(${ref}, <function>)"
}
private final class DirectRead[A](val ref: MemoryLocation[A]) extends Rxn[Any, A] {
private[core] final override def tag = 9
final override def toString: String = s"DirectRead(${ref})"
}
private final class Exchange[A, B](val exchanger: ExchangerImpl[A, B]) extends Rxn[A, B] {
private[core] final override def tag = 10
final override def toString: String = s"Exchange(${exchanger})"
}
private final class AndThen[A, B, C](val left: Rxn[A, B], val right: Rxn[B, C]) extends Rxn[A, C] {
private[core] final override def tag = 11
final override def toString: String = s"AndThen(${left}, ${right})"
}
private final class AndAlso[A, B, C, D](val left: Rxn[A, B], val right: Rxn[C, D]) extends Rxn[(A, C), (B, D)] {
private[core] final override def tag = 12
final override def toString: String = s"AndAlso(${left}, ${right})"
}
/** Only the interpreter can use this! */
private final class Done[A](val result: A) extends Rxn[Any, A] {
private[core] final override def tag = 13
final override def toString: String = s"Done(${result})"
}
private final class Ctx[A](val uf: Mcas.ThreadContext => A) extends Rxn[Any, A] {
private[core] final override def tag = 14
final override def toString: String = s"Ctx(<block>)"
}
private final class Provide[A, B](val rxn: Rxn[A, B], val a: A) extends Rxn[Any, B] {
private[core] final override def tag = 15
final override def toString: String = s"Provide(${rxn}, ${a})"
}
private final class UpdWith[A, B, C](val ref: MemoryLocation[A], val f: (A, B) => Axn[(A, C)]) extends Rxn[B, C] {
private[core] final override def tag = 16
final override def toString: String = s"UpdWith(${ref}, <function>)"
}
private final class As[A, B, C](val rxn: Rxn[A, B], val c: C) extends Rxn[A, C] {
private[core] final override def tag = 17
final override def toString: String = s"As(${rxn}, ${c})"
}
/** Only the interpreter/exchanger can use this! */
private final class FinishExchange[D](
val hole: Ref[Exchanger.NodeResult[D]],
val restOtherContK: ObjStack.Lst[Any],
val lenSelfContT: Int,
) extends Rxn[D, Unit] {
private[core] final override def tag = 18
final override def toString: String = {
val rockLen = ObjStack.Lst.length(this.restOtherContK)
s"FinishExchange(${hole}, <ObjStack.Lst of length ${rockLen}>, ${lenSelfContT})"
}
}
private final class Read[A](val ref: MemoryLocation[A]) extends Rxn[Any, A] {
private[core] final override def tag = 19
final override def toString: String = s"Read(${ref})"
}
private final class TicketRead[A](val ref: MemoryLocation[A]) extends Rxn[Any, unsafe.Ticket[A]] {
private[core] final override def tag = 20
final override def toString: String = s"TicketRead(${ref})"
}
private final class TicketWrite[A](val hwd: LogEntry[A], val newest: A) extends Rxn[Any, Unit] {
private[core] final override def tag = 21
final override def toString: String = s"TicketWrite(${hwd}, ${newest})"
}
private final class ForceValidate() extends Rxn[Any, Unit] {
private[core] final override def tag = 22
final override def toString: String = s"ForceValidate()"
}
private final class Pure[A](val a: A) extends Rxn[Any, A] {
private[core] final override def tag = 23
final override def toString: String = s"Pure(${a})"
}
private final class ProductR[A, B, C](val left: Rxn[A, B], val right: Rxn[A, C]) extends Rxn[A, C] {
private[core] final override def tag = 24
final override def toString: String = s"ProductR(${left}, ${right})"
}
private final class FlatMapF[A, B, C](val rxn: Rxn[A, B], val f: B => Axn[C]) extends Rxn[A, C] {
private[core] final override def tag = 25
final override def toString: String = s"FlatMapF(${rxn}, <function>)"
}
private final class FlatMap[A, B, C](val rxn: Rxn[A, B], val f: B => Rxn[A, C]) extends Rxn[A, C] {
private[core] final override def tag = 26
final override def toString: String = s"FlatMap(${rxn}, <function>)"
}
/** Only the interpreter can use this! */
private final class Suspend(val token: Long) extends Rxn[Any, Nothing] {
private[core] final override def tag = 27
final override def toString: String = s"Suspend(${token.toHexString})"
}
private final class TailRecM[X, A, B](val a: A, val f: A => Rxn[X, Either[A, B]]) extends Rxn[X, B] {
private[core] final override def tag = 28
final override def toString: String = s"TailRecM(${a}, <function>)"
}
// Interpreter:
private[this] def newStack[A]() = {
new ObjStack[A]
}
private[this] final val ContAndThen = 0.toByte
private[this] final val ContAndAlso = 1.toByte
private[this] final val ContAndAlsoJoin = 2.toByte
private[this] final val ContTailRecM = 3.toByte
private[this] final val ContPostCommit = 4.toByte
private[this] final val ContAfterPostCommit = 5.toByte // TODO: rename(?)
private[this] final val ContCommitPostCommit = 6.toByte
private[this] final val ContUpdWith = 7.toByte
private[this] final val ContAs = 8.toByte
private[this] final val ContProductR = 9.toByte
private[this] final val ContFlatMapF = 10.toByte
private[this] final val ContFlatMap = 11.toByte
private[this] final class PostCommitResultMarker // TODO: make this a java enum?
private[this] final val postCommitResultMarker =
new PostCommitResultMarker
private[core] final val commitSingleton: Rxn[Any, Any] = // TODO: make this a java enum?
new Commit[Any]
final class MaxRetriesReached(val maxRetries: Int)
extends Exception(s"reached maxRetries of ${maxRetries}")
private final class InterpreterState[X, R](
rxn: Rxn[X, R],
x: X,
mcas: Mcas,
strategy: RetryStrategy,
) extends Hamt.ComputeVisitor[MemoryLocation[Any], LogEntry[Any], Rxn[Any, Any]] {
private[this] val maxRetries: Int =
strategy.maxRetriesInt
private[this] val canSuspend: Boolean = {
val cs = strategy.canSuspend
assert((!cs) == strategy.isInstanceOf[RetryStrategy.Spin]) // just to be sure
cs
}
private[this] var ctx: Mcas.ThreadContext =
null
private[this] final def invalidateCtx(): Unit = {
this.ctx = null
this._stats = null
this._exParams = null
}
private[this] var startRxn: Rxn[Any, Any] = rxn.asInstanceOf[Rxn[Any, Any]]
private[this] var startA: Any = x
private[this] var _desc: Descriptor =
null
private[this] final def desc: Descriptor = {
if (_desc ne null) {
_desc
} else {
_desc = ctx.start()
_desc
}
}
@inline
private[this] final def desc_=(d: Descriptor): Unit = {
require(d ne null) // we want to be explicit, see `clearDesc`
_desc = d
}
@inline
private[this] final def clearDesc(): Unit = {
_desc = null
}
private[this] val alts: ObjStack[Any] = newStack[Any]()
private[this] val contT: ByteStack = new ByteStack(initSize = 8)
private[this] val contK: ObjStack[Any] = newStack[Any]()
private[this] val pc: ObjStack[Rxn[Any, Unit]] = newStack[Rxn[Any, Unit]]()
private[this] val commit = commitSingleton
contT.push(ContAfterPostCommit)
contT.push(ContAndThen)
contK.push(commit)
private[this] var contTReset: Array[Byte] = contT.takeSnapshot()
private[this] var contKReset: ObjStack.Lst[Any] = contK.takeSnapshot()
private[this] var a: Any =
x
private[this] var retries: Int =
0
/** How many times was `desc` revalidated and successfully extended? */
private[this] var descExtensions: Int =
0
/** Strats from `true`, and after 1 MCAS failure, goes to `false` */
private[this] var optimisticMcas: Boolean =
true
/**
* Used by `Read`/`TicketWrite` as an "out" parameter
*
* @see `entryPresent`/`entryAbsent`
*/
private[this] var _entryHolder: LogEntry[Any] =
null
final override def entryAbsent(ref: MemoryLocation[Any], curr: Rxn[Any, Any]): LogEntry[Any] = {
val res: LogEntry[Any] = curr match {
case _: Read[_] =>
revalidateIfNeeded(this.ctx.readIntoHwd(ref))
case c: Upd[_, _, _] =>
val hwd = revalidateIfNeeded(this.ctx.readIntoHwd(c.ref))
if (hwd ne null) {
val ox = hwd.nv
val (nx, b) = c.f(ox, this.a)
this.a = b
hwd.withNv(nx).cast[Any]
} else {
null
}
case c: TicketWrite[_] =>
val hwd = revalidateIfNeeded(c.hwd.cast[Any])
if (hwd ne null) {
hwd.withNv(c.newest)
} else {
null
}
case _ =>
throw new AssertionError
}
this._entryHolder = res // can be null
res
}
final override def entryPresent(ref: MemoryLocation[Any], hwd: LogEntry[Any], curr: Rxn[Any, Any]): LogEntry[Any] = {
assert(hwd ne null)
val res: LogEntry[Any] = curr match {
case _: Read[_] =>
hwd
case c: Upd[_, _, _] =>
val ox = hwd.nv
val (nx, b) = c.asInstanceOf[Upd[Any, Any, Any]].f(ox, this.a)
this.a = b
hwd.withNv(nx)
case c: TicketWrite[_] =>
// NB: This throws if it was modified in the meantime.
// NB: This doesn't need extra validation, as
// NB: `tryMergeTicket` checks that they have the
// NB: same version.
hwd.tryMergeTicket(c.hwd.cast[Any], c.newest)
case _ =>
throw new AssertionError
}
this._entryHolder = res
res
}
private[this] var _stats: ExStatMap =
null
private[this] final def stats: ExStatMap = {
val s = this._stats
if (s eq null) {
val s2 = this.ctx.getStatisticsP().asInstanceOf[ExStatMap]
this._stats = s2
s2
} else {
s
}
}
private[this] final def saveStats(): Unit = {
this._stats match {
case null =>
()
case s =>
this.ctx.setStatisticsP(s.asInstanceOf[Map[AnyRef, AnyRef]])
}
}
private[this] var _exParams: Exchanger.Params =
null
private[this] final def exParams: Exchanger.Params = {
val ep = this._exParams
if (ep eq null) {
// TODO: this is a hack
val ep2 = (stats.getOrElse(Exchanger.paramsKey, null): Any) match {
case null =>
val p = Exchanger.params // volatile read
_stats = (_stats.asInstanceOf[Map[AnyRef, AnyRef]] + (Exchanger.paramsKey -> p)).asInstanceOf[ExStatMap]
p
case p: Exchanger.Params =>
p
case something =>
impossible(s"found ${something.getClass.getName} instead of Exchanger.Params")
}
this._exParams = ep2
ep2
} else {
ep
}
}
private[this] final def setContReset(): Unit = {
contTReset = contT.takeSnapshot()
contKReset = contK.takeSnapshot()
}
private[this] final def resetConts(): Unit = {
contT.loadSnapshot(contTReset)
contK.loadSnapshot(contKReset)
}
private[this] final def clearAlts(): Unit = {
alts.clear()
}
private[this] final def saveAlt(k: Rxn[Any, R]): Unit = {
val alts = this.alts
alts.push(ctx.snapshot(_desc))
alts.push(a)
alts.push(contT.takeSnapshot())
alts.push(contK.takeSnapshot())
alts.push(pc.takeSnapshot())
alts.push(k)
}
private[this] final def loadAlt(): Rxn[Any, R] = {
val alts = this.alts
val res = alts.pop().asInstanceOf[Rxn[Any, R]]
pc.loadSnapshotUnsafe(alts.pop().asInstanceOf[ObjStack.Lst[Any]])
contK.loadSnapshot(alts.pop().asInstanceOf[ObjStack.Lst[Any]])
contT.loadSnapshot(alts.pop().asInstanceOf[Array[Byte]])
a = alts.pop()
_desc = alts.pop().asInstanceOf[Descriptor]
res
}
private[this] final def loadAltFrom(msg: Exchanger.Msg): Rxn[Any, R] = {
pc.loadSnapshot(msg.postCommit)
contK.loadSnapshot(msg.contK)
contT.loadSnapshot(msg.contT)
a = msg.value
desc = msg.desc
next().asInstanceOf[Rxn[Any, R]]
}
private[this] final def popFinalResult(): Any = {
val r = contK.pop()
assert(!equ(r, postCommitResultMarker))
r
}
@tailrec
private[this] final def next(): Rxn[Any, Any] = {
val contK = this.contK
(contT.pop() : @switch) match {
case 0 => // ContAndThen
contK.pop().asInstanceOf[Rxn[Any, Any]]
case 1 => // ContAndAlso
val savedA = a
a = contK.pop()
val res = contK.pop().asInstanceOf[Rxn[Any, Any]]
contK.push(savedA)
res
case 2 => // ContAndAlsoJoin
val savedA = contK.pop()
a = (savedA, a)
next()
case 3 => // ContTailRecM
val e = a.asInstanceOf[Either[Any, Any]]
a = contK.peek()
val f = contK.peekSecond().asInstanceOf[Any => Rxn[Any, Any]]
e match {
case Left(more) =>
contT.push(ContTailRecM)
f(more)
case Right(done) =>
a = done
contK.pop() // a
contK.pop() // f
next()
}
case 4 => // ContPostCommit
val pcAction = contK.pop().asInstanceOf[Rxn[Any, Any]]
clearAlts()
setContReset()
a = () : Any
startA = () : Any
startRxn = pcAction
this.retries = 0
clearDesc()
pcAction
case 5 => // ContAfterPostCommit
val res = popFinalResult()
assert(contK.isEmpty)
assert(contT.isEmpty, s"contT is not empty: ${contT.toString}") // TODO: remove logging
new Done(res)
case 6 => // ContCommitPostCommit
a = postCommitResultMarker : Any
commit.asInstanceOf[Rxn[Any, Any]]
case 7 => // ContUpdWith
val ox = contK.pop()
val ref = contK.pop().asInstanceOf[MemoryLocation[Any]]
val (nx, res) = a.asInstanceOf[Tuple2[_, _]]
val hwd = desc.getOrElseNull(ref)
assert(hwd ne null)
if (equ(hwd.nv, ox)) {
desc = desc.overwrite(hwd.withNv(nx))
a = res
} else {
// TODO: "during" the updWith, we wrote to
// TODO: the same ref; what to do?
throw new UnsupportedOperationException("wrote during updWith")
}
next()
case 8 => // ContAs
a = contK.pop()
next()
case 9 => // ContProductR
a = contK.pop()
contK.pop().asInstanceOf[Rxn[Any, Any]]
case 10 => // ContFlatMapF
val n = contK.pop().asInstanceOf[Function1[Any, Rxn[Any, Any]]].apply(a)
a = () : Any
n
case 11 => // ContFlatMap
val n = contK.pop().asInstanceOf[Function1[Any, Rxn[Any, Any]]].apply(a)
a = contK.pop()
n
case ct => // mustn't happen
throw new UnsupportedOperationException(
s"Unknown contT: ${ct}"
)
}
}
private[this] final def retry(): Rxn[Any, Any] =
this.retry(this.canSuspend)
private[this] final def retry(canSuspend: Boolean): Rxn[Any, Any] = {
if (alts.nonEmpty) {
// we're not actually retrying,
// just going to the other side
// of a `+` (so we're not
// incrementing `retries`):
loadAlt()
} else {
// really retrying:
val retriesNow = this.retries + 1
this.retries = retriesNow
// check abnormal conditions:
val mr = this.maxRetries
if ((mr >= 0) && ((retriesNow > mr) || (retriesNow == Integer.MAX_VALUE))) {
// TODO: maybe we could represent "infinity" with MAX_VALUE instead of -1?
throw new MaxRetriesReached(mr)
} else {
maybeCheckInterrupt(retriesNow)
}
// restart everything:
clearDesc()
a = startA
resetConts()
pc.clear()
backoffAndNext(retriesNow, canSuspend)
}
}
private[this] final def backoffAndNext(retries: Int, canSuspend: Boolean): Rxn[Any, Any] = {
val token = Backoff2.backoffStrTok(
retries = retries,
strategy = this.strategy,
canSuspend = canSuspend,