diff --git a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip2Observable.scala b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip2Observable.scala index 65535de55..ba639f204 100644 --- a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip2Observable.scala +++ b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip2Observable.scala @@ -154,7 +154,7 @@ private[reactive] final class Zip2Observable[A1, A2, +R](obsA1: Observable[A1], def onError(ex: Throwable): Unit = signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA1) + lock.synchronized(signalOnComplete(hasElemA1)) }) composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] { @@ -176,7 +176,7 @@ private[reactive] final class Zip2Observable[A1, A2, +R](obsA1: Observable[A1], def onError(ex: Throwable): Unit = signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA2) + lock.synchronized(signalOnComplete(hasElemA2)) }) composite diff --git a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip3Observable.scala b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip3Observable.scala index 0efac6886..40f6fcaa4 100644 --- a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip3Observable.scala +++ b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip3Observable.scala @@ -163,7 +163,7 @@ private[reactive] final class Zip3Observable[A1, A2, A3, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA1) + lock.synchronized(signalOnComplete(hasElemA1)) }) composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] { @@ -186,7 +186,7 @@ private[reactive] final class Zip3Observable[A1, A2, A3, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA2) + lock.synchronized(signalOnComplete(hasElemA2)) }) composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] { @@ -209,7 +209,7 @@ private[reactive] final class Zip3Observable[A1, A2, A3, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA3) + lock.synchronized(signalOnComplete(hasElemA3)) }) composite diff --git a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip4Observable.scala b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip4Observable.scala index 379729377..e27144518 100644 --- a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip4Observable.scala +++ b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip4Observable.scala @@ -169,7 +169,7 @@ private[reactive] final class Zip4Observable[A1, A2, A3, A4, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA1) + lock.synchronized(signalOnComplete(hasElemA1)) }) composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] { @@ -192,7 +192,7 @@ private[reactive] final class Zip4Observable[A1, A2, A3, A4, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA2) + lock.synchronized(signalOnComplete(hasElemA2)) }) composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] { @@ -215,7 +215,7 @@ private[reactive] final class Zip4Observable[A1, A2, A3, A4, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA3) + lock.synchronized(signalOnComplete(hasElemA3)) }) composite += obsA4.unsafeSubscribeFn(new Subscriber[A4] { @@ -238,7 +238,7 @@ private[reactive] final class Zip4Observable[A1, A2, A3, A4, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA4) + lock.synchronized(signalOnComplete(hasElemA4)) }) composite diff --git a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip5Observable.scala b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip5Observable.scala index 49856a371..8beb1a69a 100644 --- a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip5Observable.scala +++ b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip5Observable.scala @@ -175,7 +175,7 @@ private[reactive] final class Zip5Observable[A1, A2, A3, A4, A5, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA1) + lock.synchronized(signalOnComplete(hasElemA1)) }) composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] { @@ -198,7 +198,7 @@ private[reactive] final class Zip5Observable[A1, A2, A3, A4, A5, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA2) + lock.synchronized(signalOnComplete(hasElemA2)) }) composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] { @@ -221,7 +221,7 @@ private[reactive] final class Zip5Observable[A1, A2, A3, A4, A5, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA3) + lock.synchronized(signalOnComplete(hasElemA3)) }) composite += obsA4.unsafeSubscribeFn(new Subscriber[A4] { @@ -244,7 +244,7 @@ private[reactive] final class Zip5Observable[A1, A2, A3, A4, A5, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA4) + lock.synchronized(signalOnComplete(hasElemA4)) }) composite += obsA5.unsafeSubscribeFn(new Subscriber[A5] { @@ -267,7 +267,7 @@ private[reactive] final class Zip5Observable[A1, A2, A3, A4, A5, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA5) + lock.synchronized(signalOnComplete(hasElemA5)) }) composite diff --git a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip6Observable.scala b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip6Observable.scala index 7285bcbb8..9098c82f4 100644 --- a/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip6Observable.scala +++ b/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/Zip6Observable.scala @@ -181,7 +181,7 @@ private[reactive] final class Zip6Observable[A1, A2, A3, A4, A5, A6, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA1) + lock.synchronized(signalOnComplete(hasElemA1)) }) composite += obsA2.unsafeSubscribeFn(new Subscriber[A2] { @@ -204,7 +204,7 @@ private[reactive] final class Zip6Observable[A1, A2, A3, A4, A5, A6, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA2) + lock.synchronized(signalOnComplete(hasElemA2)) }) composite += obsA3.unsafeSubscribeFn(new Subscriber[A3] { @@ -227,7 +227,7 @@ private[reactive] final class Zip6Observable[A1, A2, A3, A4, A5, A6, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA3) + lock.synchronized(signalOnComplete(hasElemA3)) }) composite += obsA4.unsafeSubscribeFn(new Subscriber[A4] { @@ -250,7 +250,7 @@ private[reactive] final class Zip6Observable[A1, A2, A3, A4, A5, A6, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA4) + lock.synchronized(signalOnComplete(hasElemA4)) }) composite += obsA5.unsafeSubscribeFn(new Subscriber[A5] { @@ -273,7 +273,7 @@ private[reactive] final class Zip6Observable[A1, A2, A3, A4, A5, A6, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA5) + lock.synchronized(signalOnComplete(hasElemA5)) }) composite += obsA6.unsafeSubscribeFn(new Subscriber[A6] { @@ -296,7 +296,7 @@ private[reactive] final class Zip6Observable[A1, A2, A3, A4, A5, A6, +R]( signalOnError(ex) def onComplete(): Unit = - signalOnComplete(hasElemA6) + lock.synchronized(signalOnComplete(hasElemA6)) }) composite