Skip to content

Commit

Permalink
refactor: Remove operate function
Browse files Browse the repository at this point in the history
+ Inlines all operator function creation as just arrow functions. After `lift` was removed, there is no use for the `operate` function.

Related ReactiveX#7202
  • Loading branch information
benlesh committed Apr 15, 2023
1 parent 3812d16 commit 56bdf96
Show file tree
Hide file tree
Showing 70 changed files with 1,940 additions and 1,905 deletions.
79 changes: 40 additions & 39 deletions src/internal/operators/audit.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Subscriber } from '../Subscriber';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';

import { operate } from '../util/lift';
import { Observable } from '../Observable';
import { from } from '../observable/from';
import { createOperatorSubscriber } from './OperatorSubscriber';

Expand Down Expand Up @@ -51,46 +51,47 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* emissions from the source Observable.
*/
export function audit<T>(durationSelector: (value: T) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let hasValue = false;
let lastValue: T | null = null;
let durationSubscriber: Subscriber<any> | null = null;
let isComplete = false;
return (source) =>
new Observable((subscriber) => {
let hasValue = false;
let lastValue: T | null = null;
let durationSubscriber: Subscriber<any> | null = null;
let isComplete = false;

const endDuration = () => {
durationSubscriber?.unsubscribe();
durationSubscriber = null;
if (hasValue) {
hasValue = false;
const value = lastValue!;
lastValue = null;
subscriber.next(value);
}
isComplete && subscriber.complete();
};
const endDuration = () => {
durationSubscriber?.unsubscribe();
durationSubscriber = null;
if (hasValue) {
hasValue = false;
const value = lastValue!;
lastValue = null;
subscriber.next(value);
}
isComplete && subscriber.complete();
};

const cleanupDuration = () => {
durationSubscriber = null;
isComplete && subscriber.complete();
};
const cleanupDuration = () => {
durationSubscriber = null;
isComplete && subscriber.complete();
};

source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
lastValue = value;
if (!durationSubscriber) {
from(durationSelector(value)).subscribe(
(durationSubscriber = createOperatorSubscriber(subscriber, endDuration, cleanupDuration))
);
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
lastValue = value;
if (!durationSubscriber) {
from(durationSelector(value)).subscribe(
(durationSubscriber = createOperatorSubscriber(subscriber, endDuration, cleanupDuration))
);
}
},
() => {
isComplete = true;
(!hasValue || !durationSubscriber || durationSubscriber.closed) && subscriber.complete();
}
},
() => {
isComplete = true;
(!hasValue || !durationSubscriber || durationSubscriber.closed) && subscriber.complete();
}
)
);
});
)
);
});
}
67 changes: 34 additions & 33 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { OperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { Observable } from '../Observable';
import { noop } from '../util/noop';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { from } from '../observable/from';
Expand Down Expand Up @@ -43,39 +43,40 @@ import { from } from '../observable/from';
* of values.
*/
export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]> {
return operate((source, subscriber) => {
// The current buffered values.
let currentBuffer: T[] = [];
return (source) =>
new Observable((subscriber) => {
// The current buffered values.
let currentBuffer: T[] = [];

// Subscribe to the closing notifier first.
from(closingNotifier).subscribe(
createOperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
},
noop
)
);
// Subscribe to the closing notifier first.
from(closingNotifier).subscribe(
createOperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
currentBuffer = [];
subscriber.next(b);
},
noop
)
);

// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);
// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

return () => {
// Ensure buffered values are released on finalization.
currentBuffer = null!;
};
});
return () => {
// Ensure buffered values are released on finalization.
currentBuffer = null!;
};
});
}
103 changes: 52 additions & 51 deletions src/internal/operators/bufferCount.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { Observable } from '../Observable';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { arrRemove } from '../util/arrRemove';

Expand Down Expand Up @@ -59,62 +59,63 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
// opening and closing on the bufferSize itself.
startBufferEvery = startBufferEvery ?? bufferSize;

return operate((source, subscriber) => {
let buffers: T[][] = [];
let count = 0;
return (source) =>
new Observable((subscriber) => {
let buffers: T[][] = [];
let count = 0;

source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
let toEmit: T[][] | null = null;
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
let toEmit: T[][] | null = null;

// Check to see if we need to start a buffer.
// This will start one at the first value, and then
// a new one every N after that.
if (count++ % startBufferEvery! === 0) {
buffers.push([]);
}
// Check to see if we need to start a buffer.
// This will start one at the first value, and then
// a new one every N after that.
if (count++ % startBufferEvery! === 0) {
buffers.push([]);
}

// Push our value into our active buffers.
for (const buffer of buffers) {
buffer.push(value);
// Check to see if we're over the bufferSize
// if we are, record it so we can emit it later.
// If we emitted it now and removed it, it would
// mutate the `buffers` array while we're looping
// over it.
if (bufferSize <= buffer.length) {
toEmit = toEmit ?? [];
toEmit.push(buffer);
// Push our value into our active buffers.
for (const buffer of buffers) {
buffer.push(value);
// Check to see if we're over the bufferSize
// if we are, record it so we can emit it later.
// If we emitted it now and removed it, it would
// mutate the `buffers` array while we're looping
// over it.
if (bufferSize <= buffer.length) {
toEmit = toEmit ?? [];
toEmit.push(buffer);
}
}
}

if (toEmit) {
// We have found some buffers that are over the
// `bufferSize`. Emit them, and remove them from our
// buffers list.
for (const buffer of toEmit) {
arrRemove(buffers, buffer);
if (toEmit) {
// We have found some buffers that are over the
// `bufferSize`. Emit them, and remove them from our
// buffers list.
for (const buffer of toEmit) {
arrRemove(buffers, buffer);
subscriber.next(buffer);
}
}
},
() => {
// When the source completes, emit all of our
// active buffers.
for (const buffer of buffers) {
subscriber.next(buffer);
}
subscriber.complete();
},
// Pass all errors through to consumer.
undefined,
() => {
// Clean up our memory when we finalize
buffers = null!;
}
},
() => {
// When the source completes, emit all of our
// active buffers.
for (const buffer of buffers) {
subscriber.next(buffer);
}
subscriber.complete();
},
// Pass all errors through to consumer.
undefined,
() => {
// Clean up our memory when we finalize
buffers = null!;
}
)
);
});
)
);
});
}

0 comments on commit 56bdf96

Please sign in to comment.