Skip to content
Permalink
Browse files

#1: Implement Print operator

  • Loading branch information
broadwaylamb committed Jun 17, 2019
1 parent 8557924 commit 60dd71b0401d195d5dd6fa8a724c167b19ce5c2f
@@ -21,7 +21,7 @@ before_install:
install:
- eval "$(curl -sL https://swiftenv.fuller.li/install.sh)"
script:
- swift test -c debug --enable-code-coverage
- swift test -c debug --enable-code-coverage --sanitize thread
- swift test -c release
- if [[ $OPENCOMBINE_COMPATIBILITY_TEST == "YES" ]]; then
swift test -c release -Xswiftc -DOPENCOMBINE_COMPATIBILITY_TEST;
@@ -277,15 +277,6 @@ extension Publisher {
public func prefix(_ maxLength: Int) -> Publishers.Output<Self>
}
extension Publisher {
/// Prints log messages for all publishing events.
///
/// - Parameter prefix: A string with which to prefix all log messages. Defaults to an empty string.
/// - Returns: A publisher that prints log messages for all publishing events.
public func print(_ prefix: String = "", to stream: TextOutputStream? = nil) -> Publishers.Print<Self>
}
extension Publisher {
/// Republishes elements while a predicate closure indicates publishing should continue.
@@ -0,0 +1,162 @@
//
// Publishers.Print.swift
//
//
// Created by Sergej Jaskiewicz on 16.06.2019.
//
extension Publishers {

/// A publisher that prints log messages for all publishing events, optionally prefixed with a given string.
///
/// This publisher prints log messages when receiving the following events:
/// * subscription
/// * value
/// * normal completion
/// * failure
/// * cancellation
public struct Print<Upstream: Publisher>: Publisher {

public typealias Output = Upstream.Output

public typealias Failure = Upstream.Failure

/// A string with which to prefix all log messages.
public let prefix: String

/// The publisher from which this publisher receives elements.
public let upstream: Upstream

public let stream: TextOutputStream?

/// Creates a publisher that prints log messages for all publishing events.
///
/// - Parameters:
/// - upstream: The publisher from which this publisher receives elements.
/// - prefix: A string with which to prefix all log messages.
public init(upstream: Upstream,
prefix: String,
to stream: TextOutputStream? = nil) {
self.upstream = upstream
self.prefix = prefix
self.stream = stream
}

public func receive<S: Subscriber>(subscriber: S)
where Failure == S.Failure, Output == S.Input
{
let inner = Inner(downstream: subscriber, prefix: prefix, stream: stream)
upstream.receive(subscriber: inner)
}
}
}

extension Publisher {

/// Prints log messages for all publishing events.
///
/// - Parameter prefix: A string with which to prefix all log messages. Defaults to an empty string.
/// - Returns: A publisher that prints log messages for all publishing events.
public func print(_ prefix: String = "",
to stream: TextOutputStream? = nil) -> Publishers.Print<Self> {
Publishers.Print(upstream: self, prefix: prefix, to: stream)
}
}

private final class Inner<Downstream: Subscriber>: Subscriber,
Subscription,
CustomStringConvertible,
CustomReflectable
{
typealias Input = Downstream.Input
typealias Failure = Downstream.Failure

private var _downstream: Downstream
private let _prefix: String
private var _stream: TextOutputStream
private var _upstreamSubscription: Subscription?
private let _printerLock = Lock(recursive: false)

init(downstream: Downstream, prefix: String, stream: TextOutputStream?) {
_downstream = downstream
_prefix = prefix
_stream = stream ?? StdoutStream()
}

func receive(subscription: Subscription) {
_log("receive subscription", value: subscription)
_upstreamSubscription = subscription
_downstream.receive(subscription: self)
}

func receive(_ input: Input) -> Subscribers.Demand {
_log("receive value", value: input)
let demand = _downstream.receive(input)
_logDemand(demand, synchronous: true)
return demand
}

func receive(completion: Subscribers.Completion<Failure>) {
switch completion {
case .finished:
_log("receive finished")
case .failure(let error):
_log("receive error", value: error)
}
_downstream.receive(completion: completion)
}

func request(_ demand: Subscribers.Demand) {
_logDemand(demand, synchronous: false)
_upstreamSubscription?.request(demand)
}

func cancel() {
_log("receive cancel")
_upstreamSubscription?.cancel()
_upstreamSubscription = nil
}

var description: String { "Print" }

var customMirror: Mirror { Mirror(self, children: EmptyCollection()) }

private func _log(_ description: String,
value: Any? = nil,
additionalInfo: String = "") {
_printerLock.do {
if !_prefix.isEmpty {
_stream.write(_prefix)
_stream.write(": ")
}
_stream.write(description)
if let value = value {
_stream.write(": (")
_stream.write(String(describing: value))
_stream.write(")")
}
if !additionalInfo.isEmpty {
_stream.write(" (")
_stream.write(additionalInfo)
_stream.write(")")
}
_stream.write("\n")
}
}

private func _logDemand(_ demand: Subscribers.Demand, synchronous: Bool) {
let synchronouslyStr = synchronous ? "synchronous" : ""
switch demand {
case .max(let max):
_log("request max", value: max, additionalInfo: synchronouslyStr)
case .unlimited:
_log("request unlimited", additionalInfo: synchronouslyStr)
}
}
}

private struct StdoutStream: TextOutputStream {
mutating func write(_ string: String) {
print(string, terminator: "")
}
}
@@ -27,7 +27,6 @@ final class CustomPublisher: Publisher {
func receive<S: Subscriber>(subscriber: S)
where Failure == S.Failure, Output == S.Input
{
assert(self.subscriber == nil)
self.subscriber = AnySubscriber(subscriber)
subscription.map(subscriber.receive(subscription:))
}
@@ -21,6 +21,15 @@ final class CustomSubscription: Subscription {

private(set) var history: [Event] = []

private let _requested: ((Subscribers.Demand) -> Void)?
private let _canceled: (() -> Void)?

init(onRequest: ((Subscribers.Demand) -> Void)? = nil,
onCancel: (() -> Void)? = nil) {
_requested = onRequest
_canceled = onCancel
}

var lastRequested: Subscribers.Demand? {
history.lazy.compactMap {
switch $0 {
@@ -36,10 +45,12 @@ final class CustomSubscription: Subscription {

func request(_ demand: Subscribers.Demand) {
history.append(.requested(demand))
_requested?(demand)
}

func cancel() {
history.append(.canceled)
canceled = true
_canceled?()
}
}
@@ -8,22 +8,11 @@
import Dispatch

func race(times: Int = 100, _ bodies: () -> Void...) {

let queues = bodies.indices.lazy.map {
DispatchQueue(label: "exectuteConcurrently helper queue #\($0)")
}

let group = DispatchGroup()

for (body, queue) in zip(bodies, queues) {
queue.async(group: group) {
for _ in 0..<times {
body()
}
DispatchQueue.concurrentPerform(iterations: bodies.count) {
for _ in 0..<times {
bodies[$0]()
}
}

group.wait()
}

@dynamicMemberLookup

0 comments on commit 60dd71b

Please sign in to comment.
You can’t perform that action at this time.