Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Commit

Permalink
ci: Update dev packages
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason3S committed Apr 2, 2021
1 parent a2b4e0c commit defb359
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .prettierrc.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"printWidth": 120,
"printWidth": 160,
"singleQuote": true,
"overrides": [
{
Expand Down
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
"homepage": "https://github.com/Jason3S/rx-stream#readme",
"devDependencies": {
"@types/chai": "^4.2.11",
"@types/fs-extra": "^8.1.1",
"@types/fs-extra": "^9.0.9",
"@types/mocha": "^8.2.2",
"@types/node": "^8.10.66",
"@types/node": "^14.14.37",
"chai": "^4.2.0",
"coveralls": "^3.0.11",
"cspell": "^4.2.8",
Expand Down
22 changes: 8 additions & 14 deletions src/streamToRx.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import {Observable, Subscription} from 'rxjs';
import {map, distinctUntilChanged} from 'rxjs/operators';
import { Observable, Subscription } from 'rxjs';
import { map, distinctUntilChanged } from 'rxjs/operators';

export function streamToRx<T = Buffer>(
stream: NodeJS.ReadableStream,
pauser?: Observable<boolean>
): Observable<T> {
return new Observable<T>(subscriber => {
export function streamToRx<T = Buffer>(stream: NodeJS.ReadableStream, pauser?: Observable<boolean>): Observable<T> {
return new Observable<T>((subscriber) => {
const endHandler = () => subscriber.complete();
const errorHandler = (e: Error) => subscriber.error(e);
const dataHandler = (data: T) => subscriber.next(data);
Expand All @@ -17,9 +14,7 @@ export function streamToRx<T = Buffer>(
stream.addListener('data', dataHandler);

if (pauser) {
pauseSubscription = pauser.pipe(
distinctUntilChanged()
).subscribe(function(b) {
pauseSubscription = pauser.pipe(distinctUntilChanged()).subscribe(function (b) {
if (b === false) {
stream.resume();
} else if (b === true) {
Expand All @@ -41,9 +36,8 @@ export function streamToRx<T = Buffer>(
});
}

export function streamToStringRx(stream: NodeJS.ReadableStream, encoding?: string, pauser?: Observable<boolean>): Observable<string>;
export function streamToStringRx(stream: NodeJS.ReadableStream, encoding?: BufferEncoding, pauser?: Observable<boolean>): Observable<string>;
export function streamToStringRx(stream: NodeJS.ReadableStream, encoding: BufferEncoding, pauser?: Observable<boolean>): Observable<string>;
export function streamToStringRx(stream: NodeJS.ReadableStream, encoding: string = 'utf8', pauser?: Observable<boolean>): Observable<string> {
return streamToRx(stream, pauser)
.pipe(map(buffer => buffer.toString(encoding)));
export function streamToStringRx(stream: NodeJS.ReadableStream, encoding: BufferEncoding = 'utf8', pauser?: Observable<boolean>): Observable<string> {
return streamToRx(stream, pauser).pipe(map((buffer) => buffer.toString(encoding)));
}

0 comments on commit defb359

Please sign in to comment.