Skip to content

Commit

Permalink
Emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-zethraeus committed Dec 1, 2022
0 parents commit 443c92e
Show file tree
Hide file tree
Showing 36 changed files with 2,189 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.DS_Store
/.build
/Packages
/*.xcodeproj
xcuserdata/
DerivedData/
.swiftpm/config/registries.json
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
.netrc
14 changes: 14 additions & 0 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"pins" : [
{
"identity" : "disposable",
"kind" : "remoteSourceControl",
"location" : "https://github.com/GoodHatsLLC/Disposable.git",
"state" : {
"revision" : "2280c49e0b09b3ecd003a7c8471a42c30d66629f",
"version" : "0.1.0"
}
}
],
"version" : 2
}
44 changes: 44 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// swift-tools-version: 5.7
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription

let package = Package(
name: "Emitter",
platforms: [.macOS(.v10_15), .iOS(.v14)],
products: [
.library(
name: "Emitter",
targets: [
"Emitter",
"EmitterInterface",
]
),
.library(
name: "EmitterInterface",
targets: ["EmitterInterface"]
),
],
dependencies: [
.package(url: "https://github.com/GoodHatsLLC/Disposable.git", from: "0.1.0"),
],
targets: [
.target(
name: "Emitter",
dependencies: [
"Disposable",
"EmitterInterface",
]
),
.target(
name: "EmitterInterface",
dependencies: [
.product(name: "DisposableInterface", package: "Disposable")
]
),
.testTarget(
name: "EmitterTests",
dependencies: ["Emitter"]
),
]
)
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Emitter

A description of this package.
26 changes: 26 additions & 0 deletions Sources/Emitter/Erasure/AnyEmitter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import Disposable
import EmitterInterface

// MARK: - AnyEmitter

public struct AnyEmitter<Output: Sendable>: Emitter {

init<E: Emitter>(_ emitter: E) where E.Output == Output {
subscribeFunc = { emitter.subscribe($0) }
}

private let subscribeFunc: (any Subscriber<Output>) -> AnyDisposable

public func subscribe<S: Subscriber>(
_ subscriber: S
)
-> AnyDisposable
where S.Value == Output
{
subscribeFunc(subscriber)
}
}

extension AnyEmitter {
public func erase() -> AnyEmitter<Output> { self }
}
8 changes: 8 additions & 0 deletions Sources/Emitter/Erasure/EmitterErasureExtension.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import EmitterInterface

extension Emitter {

public func erase() -> AnyEmitter<Output> {
AnyEmitter(self)
}
}
3 changes: 3 additions & 0 deletions Sources/Emitter/Export.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
@_exported import Disposable
@_exported import DisposableInterface
@_exported import EmitterInterface
28 changes: 28 additions & 0 deletions Sources/Emitter/Operators/AsyncValues.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import Disposable
import EmitterInterface
import Foundation

extension Emitter {
public var values: AsyncThrowingStream<Output, Error> {
.init { continuation in

let disposable = subscribe(
value: { value in
continuation.yield(value)
},
finished: {
continuation.finish()
},
failed: { error in
continuation.finish(throwing: error)
}
)

continuation.onTermination = { _ in
Task {
await disposable.dispose()
}
}
}
}
}
97 changes: 97 additions & 0 deletions Sources/Emitter/Operators/CombineLatest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import Disposable
import EmitterInterface

extension Emitter {
public func combineLatest<Other: Emitter>(_ other: Other) -> some Emitter<Tuple.Size2<Output, Other.Output>> {
CombineLatest(upstreamA: self, upstreamB: other)
}
}

// MARK: - CombineLatest

@MainActor
struct CombineLatest<OutputA: Sendable, OutputB: Sendable>: Emitter {

init<UpstreamA: Emitter, UpstreamB: Emitter>(
upstreamA: UpstreamA,
upstreamB: UpstreamB
) where UpstreamA.Output == OutputA, UpstreamB.Output == OutputB {
self.upstreamA = upstreamA
self.upstreamB = upstreamB
}

typealias Output = Tuple.Size2<OutputA, OutputB>

@MainActor
final class Ref<V> {
init(_ value: V) {
self.value = value
}

var value: V
}

@MainActor
struct Sub<Downstream: Subscriber>: Subscriber
where Downstream.Value == Output
{
enum Input {
case a(OutputA)
case b(OutputB)
}

let downstream: Downstream

func receive(emission: Emission<Input>) {
switch emission {
case .value(let value):
switch value {
case .a(let aValue):
lastA.value = aValue
case .b(let bValue):
lastB.value = bValue
}
if let a = lastA.value, let b = lastB.value {
downstream
.receive(emission: .value(Tuple.create(a, b)))
}
case .finished:
downstream
.receive(emission: .finished)
case .failed(let error):
downstream
.receive(emission: .failed(error))
}
}

private let lastA = Ref<OutputA?>(nil)
private let lastB = Ref<OutputB?>(nil)

}

func subscribe<S: Subscriber>(_ subscriber: S)
-> AnyDisposable
where S.Value == Output
{
let stage = DisposalStage()
let sub = Sub(downstream: subscriber)
let mapA = Map.Sub(downstream: sub) { value in
.a(value)
}
let mapB = Map.Sub(downstream: sub) { value in
.b(value)
}
upstreamA
.subscribe(mapA)
.stage(on: stage)
upstreamB
.subscribe(mapB)
.stage(on: stage)
return stage
.erase()
}

private let upstreamA: any Emitter<OutputA>
private let upstreamB: any Emitter<OutputB>

}
61 changes: 61 additions & 0 deletions Sources/Emitter/Operators/CompactMap.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import Disposable
import EmitterInterface

extension Emitter {
public func compactMap<NewOutput: Sendable>(transformer: @escaping @MainActor (Output) -> NewOutput?)
-> some Emitter<NewOutput>
{
CompactMap(upstream: self, transformer: transformer)
}
}

// MARK: - CompactMap

@MainActor
struct CompactMap<Input: Sendable, Output: Sendable>: Emitter {

init<Upstream: Emitter>(
upstream: Upstream,
transformer: @escaping @MainActor (Input) -> Output?
) where Upstream.Output == Input {
self.transformer = transformer
self.upstream = upstream
}

@MainActor
struct Sub<Downstream: Subscriber>: Subscriber
where Downstream.Value == Output
{
let downstream: Downstream
let transformer: @MainActor (Input)
-> Output?

func receive(emission: Emission<Input>) {
let newEmission: Emission<Output>
switch emission {
case .value(let value):
guard let value = transformer(value)
else {
return
}
newEmission = .value(value)
case .finished:
newEmission = .finished
case .failed(let error):
newEmission = .failed(error)
}
downstream.receive(emission: newEmission)
}
}

func subscribe<S: Subscriber>(_ subscriber: S)
-> AnyDisposable
where S.Value == Output
{
upstream.subscribe(Sub<S>(downstream: subscriber, transformer: transformer))
}

private let transformer: @MainActor (Input) -> Output?
private let upstream: any Emitter<Input>

}
59 changes: 59 additions & 0 deletions Sources/Emitter/Operators/Filter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import Disposable
import EmitterInterface

extension Emitter {
public func filter(_ evaluator: @escaping @MainActor (Output) -> Bool) -> some Emitter<Output> {
Filter(upstream: self, evaluator: evaluator)
}
}

// MARK: - Filter

@MainActor
struct Filter<Output: Sendable>: Emitter {

@MainActor
init<Upstream: Emitter>(
upstream: Upstream,
evaluator: @escaping @MainActor (Output) -> Bool
) where Upstream.Output == Output {
self.evaluator = evaluator
self.upstream = upstream
}

struct Sub<Downstream: Subscriber>: Subscriber
where Downstream.Value == Output
{
let downstream: Downstream
let evaluator: @MainActor (Output)
-> Bool

func receive(emission: Emission<Output>) {
let newEmission: Emission<Output>?
switch emission {
case .value(let value):
if evaluator(value) {
newEmission = .value(value)
} else {
newEmission = nil
}
case _:
newEmission = emission
}
if let newEmission {
downstream.receive(emission: newEmission)
}
}
}

func subscribe<S: Subscriber>(_ subscriber: S)
-> AnyDisposable
where S.Value == Output
{
upstream.subscribe(Sub<S>(downstream: subscriber, evaluator: evaluator))
}

private let evaluator: @MainActor (Output) -> Bool
private let upstream: any Emitter<Output>

}
Loading

0 comments on commit 443c92e

Please sign in to comment.