Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] subject should not call receiveValue concurrently #39

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Sources/CombineX/Subjects/CurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ extension CurrentValueSubject {
typealias Sub = AnySubscriber<Output, Failure>

let lock = Lock()
let downstreamLock = Lock(recursive: true)

var pub: Pub?
var sub: Sub?
Expand All @@ -175,8 +176,9 @@ extension CurrentValueSubject {
let sub = self.sub!
self.lock.unlock()

// FIXME: Yes, no guarantee of synchronous backpressure. See CurrentValueSubjectSpec#4.3 for more information.
self.downstreamLock.lock()
let more = sub.receive(value)
self.downstreamLock.unlock()

self.lock.withLock {
_ = self.state.add(more)
Expand All @@ -195,7 +197,9 @@ extension CurrentValueSubject {
self.sub = nil
self.lock.unlock()

self.downstreamLock.lock()
sub.receive(completion: completion)
self.downstreamLock.unlock()
}

func request(_ demand: Subscribers.Demand) {
Expand All @@ -213,7 +217,9 @@ extension CurrentValueSubject {
let current = self.pub!.value
self.lock.unlock()

self.downstreamLock.lock()
let more = sub.receive(current)
self.downstreamLock.unlock()

self.lock.withLock {
_ = self.state.add(more)
Expand Down
6 changes: 5 additions & 1 deletion Sources/CombineX/Subjects/PassthroughSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ extension PassthroughSubject {
typealias Sub = AnySubscriber<Output, Failure>

let lock = Lock()
let downstreamLock = Lock(recursive: true)

var pub: Pub?
var sub: Sub?
Expand All @@ -141,8 +142,9 @@ extension PassthroughSubject {
let sub = self.sub!
self.lock.unlock()

// FIXME: Yes, no guarantee of synchronous backpressure. See PassthroughSubjectSpec#3.3 for more information.
self.downstreamLock.lock()
let more = sub.receive(value)
self.downstreamLock.unlock()

self.lock.withLock {
_ = self.state.add(more)
Expand All @@ -161,7 +163,9 @@ extension PassthroughSubject {
self.sub = nil
self.lock.unlock()

self.downstreamLock.lock()
sub.receive(completion: completion)
self.downstreamLock.unlock()
}

func request(_ demand: Subscribers.Demand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class FailingSubjectSpec: QuickSpec {

describe("Subject should not invoke receiveValue on multiple threads at the same time") {

it("PassthroughSubject") {
xit("PassthroughSubject") {
let sequenceLength = 100
let subject = PassthroughSubject<Int, Never>()
let semaphore = DispatchSemaphore(value: 0)
Expand Down Expand Up @@ -52,7 +52,7 @@ class FailingSubjectSpec: QuickSpec {
expect(collision).toFail(beFalse())
}

it("CurrentValueSubject") {
xit("CurrentValueSubject") {
let sequenceLength = 100
let subject = CurrentValueSubject<Int, Never>(0)
let semaphore = DispatchSemaphore(value: 0)
Expand Down
50 changes: 45 additions & 5 deletions Tests/CombineXTests/Subjects/CurrentValueSubjectSpec.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Foundation
import CXUtility
import CXShim
import Quick
import Nimble
Expand Down Expand Up @@ -371,8 +372,8 @@ class CurrentValueSubjectSpec: QuickSpec {
let sub = TestSubscriber<Int, Never>(receiveSubscription: { (s) in
s.request(.max(10))
}, receiveValue: { v in
if v == 1 {
Thread.sleep(forTimeInterval: 0.1)
if v == 9 {
Thread.sleep(forTimeInterval: 0.5)
return .max(5)
}
return .none
Expand All @@ -382,16 +383,55 @@ class CurrentValueSubjectSpec: QuickSpec {
subject.subscribe(sub)

let g = DispatchGroup()
let q = DispatchQueue(label: UUID().uuidString, attributes: .concurrent)

100.times { i in
DispatchQueue.global().async(group: g) {
15.times { i in
q.async(group: g) {
subject.send(i)
}
}

g.wait()

expect(sub.events.count).to(equal(10))
expect(sub.events.count).toNot(equal(15))
}

// MARK: 4.4 should not invoke receiveValue on multiple threads at the same time
it("should not invoke receiveValue on multiple threads at the same time") {
let sequenceLength = 100
let subject = CurrentValueSubject<Int, Never>(0)
let semaphore = DispatchSemaphore(value: 0)

let total = Atom<Int>(val: 0)
var collision = false
let c = subject
.sink(receiveValue: { value in
if total.isMutating {
// Check to see if this closure is concurrently invoked
collision = true
}
total.withLockMutating { total in
// Make sure we're in the handler for enough time to get a concurrent invocation
Thread.sleep(forTimeInterval: 0.001)
total += value
if total == sequenceLength {
semaphore.signal()
}
}
})

// Try to send from a hundred different threads at once
for _ in 1...sequenceLength {
DispatchQueue.global().async {
subject.send(1)
}
}

semaphore.wait()
c.cancel()
expect(total.get()).to(equal(sequenceLength))

expect(collision).to(beFalse())
}
}

Expand Down
50 changes: 45 additions & 5 deletions Tests/CombineXTests/Subjects/PassthroughSubjectSpec.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Foundation
import CXUtility
import CXShim
import Quick
import Nimble
Expand Down Expand Up @@ -441,8 +442,8 @@ class PassthroughSubjectSpec: QuickSpec {
let sub = TestSubscriber<Int, Never>(receiveSubscription: { (s) in
s.request(.max(10))
}, receiveValue: { v in
if v == 1 {
Thread.sleep(forTimeInterval: 0.1)
if v == 9 {
Thread.sleep(forTimeInterval: 0.5)
return .max(5)
}
return .none
Expand All @@ -452,16 +453,55 @@ class PassthroughSubjectSpec: QuickSpec {
subject.subscribe(sub)

let g = DispatchGroup()
let q = DispatchQueue(label: UUID().uuidString, attributes: .concurrent)

100.times { i in
DispatchQueue.global().async(group: g) {
15.times { i in
q.async(group: g) {
subject.send(i)
}
}

g.wait()

expect(sub.events.count).to(equal(10))
expect(sub.events.count).toNot(equal(15))
}

// MARK: 4.4 should not invoke `receiveValue` on multiple threads at the same time
it("should not invoke `receiveValue` on multiple threads at the same time") {
let sequenceLength = 100
let subject = PassthroughSubject<Int, Never>()
let semaphore = DispatchSemaphore(value: 0)

let total = Atom<Int>(val: 0)
var collision = false
let c = subject
.sink(receiveValue: { value in
if total.isMutating {
// Check to see if this closure is concurrently invoked
collision = true
}
total.withLockMutating { total in
// Make sure we're in the handler for enough time to get a concurrent invocation
Thread.sleep(forTimeInterval: 0.001)
total += value
if total == sequenceLength {
semaphore.signal()
}
}
})

// Try to send from a hundred different threads at once
for _ in 1...sequenceLength {
DispatchQueue.global().async {
subject.send(1)
}
}

semaphore.wait()
c.cancel()
expect(total.get()).to(equal(sequenceLength))

expect(collision).to(beFalse())
}
}
}
Expand Down