Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Drop zen-observable, use observable-fns only (#185)
  • Loading branch information
andywer committed Dec 15, 2019
1 parent 345d6cb commit f43fcdc
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 33 deletions.
4 changes: 2 additions & 2 deletions docs/usage-observables.md
Expand Up @@ -26,8 +26,8 @@ counter.subscribe(newCount => console.log(`Counter incremented to:`, newCount))

```js
// workers/counter.js
import { Observable } from "observable-fns"
import { expose } from "threads/worker"
import Observable from "zen-observable"

function startCounting() {
return new Observable(observer => {
Expand All @@ -43,7 +43,7 @@ expose(startCounting)

### Hot observables

Note that in contrast to the usual `zen-observable` behavior, the observable returned here is "hot". That means that if you subscribe to it twice, the second subscription will mirror the first one, yielding the same values.
Note that in contrast to the default Observable behavior, the observable returned here is "hot". That means that if you subscribe to it twice, the second subscription will mirror the first one, yielding the same values without subscribing to the data source a second time.

It will **not** replay values from the past, in case the second subscriber subscribes after the first one has already received values.

Expand Down
16 changes: 3 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions package.json
Expand Up @@ -34,12 +34,10 @@
"worker_threads"
],
"dependencies": {
"@types/zen-observable": "^0.8.0",
"callsites": "^3.1.0",
"debug": "^4.1.1",
"is-observable": "^1.1.0",
"observable-fns": "^0.4.0",
"zen-observable": "^0.8.14"
"observable-fns": "^0.5.0"
},
"devDependencies": {
"@babel/cli": "^7.4.4",
Expand Down
6 changes: 3 additions & 3 deletions src/master/pool.ts
@@ -1,5 +1,5 @@
import DebugLogger from "debug"
import { Observable } from "observable-fns"
import { Observable, SubscriptionObserver } from "observable-fns"
import { makeHot } from "../observable-promise"
import Implementation from "./implementation"
import { Thread } from "./thread"
Expand Down Expand Up @@ -101,7 +101,7 @@ async function runPoolTask<ThreadType extends Thread>(
task: QueuedTask<ThreadType, any>,
availableWorker: WorkerDescriptor<ThreadType>,
workerID: number,
eventSubject: ZenObservable.SubscriptionObserver<PoolEvent<ThreadType>>,
eventSubject: SubscriptionObserver<PoolEvent<ThreadType>>,
debug: DebugLogger.Debugger
) {
debug(`Running task #${task.id} on worker #${workerID}...`)
Expand Down Expand Up @@ -227,7 +227,7 @@ function PoolConstructor<ThreadType extends Thread>(
const initErrors: Error[] = []
const workers = spawnWorkers(spawnWorker, size)

let eventSubject: ZenObservable.SubscriptionObserver<PoolEvent<ThreadType>>
let eventSubject: SubscriptionObserver<PoolEvent<ThreadType>>

const eventObservable = makeHot(new Observable<PoolEvent<ThreadType>>(subscriber => {
eventSubject = subscriber
Expand Down
8 changes: 4 additions & 4 deletions src/observable-promise.ts
@@ -1,4 +1,4 @@
import { Observable } from "observable-fns"
import { Observable, SubscriptionObserver } from "observable-fns"

export type ObservablePromise<T> = Promise<T> & Observable<T>

Expand All @@ -8,7 +8,7 @@ type OnRejected<Result = void> = (error: Error) => Result
type Initializer<T> = (
resolve: (value?: T) => void,
reject: (error: Error) => void,
observer: ZenObservable.SubscriptionObserver<T>
observer: SubscriptionObserver<T>
) => UnsubscribeFn | void

type UnsubscribeFn = () => void
Expand All @@ -28,7 +28,7 @@ function fail(error: Error): never {
* if that async process will yield values once (-> Promise) or multiple
* times (-> Observable).
*
* Note that the observable promise inherits some of zen-observable's characteristics:
* Note that the observable promise inherits some of the observable's characteristics:
* The `init` function will be called *once for every time anyone subscribes to it*.
*
* If this is undesired, derive a hot observable from it using `makeHot()` and
Expand Down Expand Up @@ -178,7 +178,7 @@ export function ObservablePromise<T>(init: Initializer<T>): ObservablePromise<T>
* That one subscription on the input observable promise is setup immediately.
*/
export function makeHot<T>(async: ObservablePromise<T> | Observable<T>): ObservablePromise<T> {
let observers: Array<ZenObservable.SubscriptionObserver<T>> = []
let observers: Array<SubscriptionObserver<T>> = []

async.subscribe({
complete() {
Expand Down
6 changes: 3 additions & 3 deletions src/observable.ts
@@ -1,4 +1,4 @@
import { Observable } from "observable-fns"
import { Observable, ObservableLike, SubscriptionObserver } from "observable-fns"
export { Observable }

const $observers = Symbol("observers")
Expand All @@ -12,8 +12,8 @@ const $observers = Symbol("observers")
* values, errors and the completion raised on this subject, but does not
* expose the `next()`, `error()`, `complete()` methods.
*/
export class Subject<T> extends Observable<T> implements ZenObservable.ObservableLike<T> {
private [$observers]: Array<ZenObservable.SubscriptionObserver<T>>
export class Subject<T> extends Observable<T> implements ObservableLike<T> {
private [$observers]: Array<SubscriptionObserver<T>>

constructor() {
super(observer => {
Expand Down
7 changes: 5 additions & 2 deletions src/worker/index.ts
@@ -1,5 +1,5 @@
import isSomeObservable from "is-observable"
import Observable from "zen-observable"
import { Observable } from "observable-fns"
import { serializeError } from "../common"
import { isTransferDescriptor, TransferDescriptor } from "../transferable"
import {
Expand All @@ -21,7 +21,10 @@ let exposeCalled = false

const isMasterJobRunMessage = (thing: any): thing is MasterJobRunMessage => thing && thing.type === MasterMessageType.run

/** There are issues with `is-observable` not recognizing zen-observable's instances */
/**
* There are issues with `is-observable` not recognizing zen-observable's instances.
* We are using `observable-fns`, but it's based on zen-observable, too.
*/
const isObservable = (thing: any): thing is Observable<any> => isSomeObservable(thing) || isZenObservable(thing)

function isZenObservable(thing: any): thing is Observable<any> {
Expand Down
2 changes: 1 addition & 1 deletion test/spawn.test.ts
@@ -1,5 +1,5 @@
import test from "ava"
import Observable from "zen-observable"
import { Observable } from "observable-fns"
import { spawn, Thread, Worker } from "../src/index"
import { Counter } from "./workers/counter"

Expand Down
2 changes: 1 addition & 1 deletion test/workers/count-to-five.ts
@@ -1,4 +1,4 @@
import Observable from "zen-observable"
import { Observable } from "observable-fns"
import { expose } from "../../src/worker"

expose(function countToFive() {
Expand Down
2 changes: 1 addition & 1 deletion types/is-observable.d.ts
@@ -1,5 +1,5 @@
declare module "is-observable" {
import Observable from "zen-observable"
import { Observable } from "observable-fns"

function isObservable(thing: any): thing is Observable<any>
export = isObservable
Expand Down

0 comments on commit f43fcdc

Please sign in to comment.