Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for writing received DICOM content to Writable streams #41

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Part of the networking code was taken from [dicom-dimse][dicom-dimse-url].
- Implements C-ECHO, C-FIND, C-STORE, C-MOVE, C-GET, C-CANCEL, N-CREATE, N-ACTION, N-DELETE, N-EVENT-REPORT, N-GET and N-SET services as SCU and SCP.
- Supports secure DICOM TLS connections and user identity negotiation.
- Allows custom DICOM implementations (Implementation Class UID and Implementation Version).
- Provides asynchronous event handlers for incoming SCP requests.
- Provides asynchronous event handlers and streaming support for incoming SCP requests.

### Examples

Expand Down
22 changes: 22 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Mixin } from 'ts-mixer';
import { Logger } from 'winston';
import { Socket } from 'net';
import { SecureContext, TLSSocket } from 'tls';
import { Writable } from 'stream';
import AsyncEventEmitter from 'async-eventemitter';

declare namespace PresentationContextResult {
Expand Down Expand Up @@ -1421,6 +1422,27 @@ declare class Scp extends Network {
}
);

/**
* Allows the caller to create a Writable stream to accumulate the C-STORE dataset.
* The default implementation creates a memory Writable stream that for, big instances,
* could cause out of memory situations.
*/
createStoreWritableStream(
acceptedPresentationContext: PresentationContext,
request: CStoreRequest
): Writable;

/**
* Allows the caller to create a Dataset from the Writable stream used to
* accumulate the C-STORE dataset. The created Dataset is passed to the
* Scp.cStoreRequest method for processing.
*/
createDatasetFromStoreWritableStream(
writable: Writable,
acceptedPresentationContext: PresentationContext,
callback: (dataset: Dataset) => void
): void;

/**
* Association request received.
*/
Expand Down
18 changes: 16 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dcmjs-dimse",
"version": "0.1.27",
"version": "0.2.0",
"description": "DICOM DIMSE implementation for Node.js using dcmjs",
"main": "build/dcmjs-dimse.min.js",
"module": "build/dcmjs-dimse.min.js",
Expand Down Expand Up @@ -45,6 +45,7 @@
"dependencies": {
"async-eventemitter": "^0.2.4",
"dcmjs": "^0.30.3",
"memorystream": "^0.3.1",
"smart-buffer": "^4.2.0",
"ts-mixer": "^6.0.4",
"winston": "^3.12.0"
Expand Down
110 changes: 92 additions & 18 deletions src/Network.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const log = require('./log');
const { SmartBuffer } = require('smart-buffer');
const { EOL } = require('os');
const AsyncEventEmitter = require('async-eventemitter');
const MemoryStream = require('memorystream');

//#region Network
class Network extends AsyncEventEmitter {
Expand All @@ -70,7 +71,8 @@ class Network extends AsyncEventEmitter {
this.requests = [];
this.pending = [];

this.dimseBuffer = undefined;
this.dimseStream = undefined;
this.dimseStoreStream = undefined;
this.dimse = undefined;

opts = opts || {};
Expand Down Expand Up @@ -229,6 +231,36 @@ class Network extends AsyncEventEmitter {
return this.statistics;
}

/**
* Allows the caller to create a Writable stream to accumulate the C-STORE dataset.
* The default implementation creates a memory Writable stream that for, big instances,
* could cause out of memory situations.
* @method
* @param {PresentationContext} acceptedPresentationContext - The accepted presentation context.
* @returns {Writable} The created store writable stream.
*/
// eslint-disable-next-line no-unused-vars
createStoreWritableStream(acceptedPresentationContext) {
return MemoryStream.createWriteStream();
}

/**
* Allows the caller to create a Dataset from the Writable stream used to
* accumulate the C-STORE dataset. The created Dataset is passed to the
* Scp.cStoreRequest method for processing.
* @method
* @param {Writable} writable - The store writable stream.
* @param {PresentationContext} acceptedPresentationContext - The accepted presentation context.
* @param {function(Dataset)} callback - Created dataset callback function.
*/
createDatasetFromStoreWritableStream(writable, acceptedPresentationContext, callback) {
const dataset = new Dataset(
writable.toBuffer(),
acceptedPresentationContext.getAcceptedTransferSyntaxUid()
);
callback(dataset);
}

//#region Private Methods
/**
* Advances the message ID.
Expand Down Expand Up @@ -487,18 +519,42 @@ class Network extends AsyncEventEmitter {
try {
const pdvs = pdu.getPdvs();
pdvs.forEach((pdv) => {
if (!this.dimseBuffer) {
this.dimseBuffer = SmartBuffer.fromOptions({
encoding: 'ascii',
});
}
this.dimseBuffer.writeBuffer(pdv.getValue());
const presentationContext = this.association.getPresentationContext(
pdv.getPresentationContextId()
);

if (!this.dimse) {
// Create stream to receive command
if (!this.dimseStream) {
this.dimseStream = MemoryStream.createWriteStream();
this.dimseStoreStream = undefined;
}
} else {
// Create stream to receive dataset
if (this.dimse.getCommandFieldType() === CommandFieldType.CStoreRequest) {
if (!this.dimseStoreStream) {
this.dimseStream = undefined;
this.dimseStoreStream = this.createStoreWritableStream(
presentationContext,
this.dimse
);
}
} else {
if (!this.dimseStream) {
this.dimseStream = MemoryStream.createWriteStream();
this.dimseStoreStream = undefined;
}
}
}

const stream = this.dimseStream || this.dimseStoreStream;

// TODO: Add stream backpressure event handling
stream.write(pdv.getValue());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it should check the return value of write(). If it is not true, then we need to apply backpressure and until the drain event to resume. https://nodejs.org/docs/latest/api/stream.html#event-drain

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Thank you Kinson! The only thing that bothers me is how to wait in here for the drain event... any ideas?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas on this @kinsonho?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PantelisGeorgiadis sorry for the late reply.

A pattern that I saw others doing is the following:

if(!stream.write(pdv.getValue())) {
    await new Promise(resolve => stream.once('drain', resolve));
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kinsonho! The async/await pattern is not used in this library. Do you see any drawbacks in start using it (e.g. compatibility, etc.)?


if (pdv.isLastFragment()) {
if (pdv.isCommand()) {
const command = new Command(new Dataset(this.dimseBuffer.toBuffer()));
const command = new Command(new Dataset(this.dimseStream.toBuffer()));
const type = command.getCommandFieldType();
switch (type) {
case CommandFieldType.CEchoRequest:
Expand Down Expand Up @@ -587,17 +643,35 @@ class Network extends AsyncEventEmitter {
this.dimse = undefined;
return;
}
this.dimseBuffer = undefined;
this.dimseStream = undefined;
this.dimseStoreStream = undefined;
} else {
const dataset = new Dataset(
this.dimseBuffer.toBuffer(),
presentationContext.getAcceptedTransferSyntaxUid(),
this.datasetReadOptions
);
this.dimse.setDataset(dataset);
this._performDimse(presentationContext, this.dimse);
this.dimseBuffer = undefined;
this.dimse = undefined;
if (this.dimse.getCommandFieldType() === CommandFieldType.CStoreRequest) {
this.dimseStoreStream.end();
this.createDatasetFromStoreWritableStream(
this.dimseStoreStream,
presentationContext,
(dataset) => {
this.dimse.setDataset(dataset);
this._performDimse(presentationContext, this.dimse);
this.dimseStream = undefined;
this.dimseStoreStream = undefined;
this.dimse = undefined;
}
);
} else {
this.dimseStream.end();
const dataset = new Dataset(
this.dimseStream.toBuffer(),
presentationContext.getAcceptedTransferSyntaxUid(),
this.datasetReadOptions
);
this.dimse.setDataset(dataset);
this._performDimse(presentationContext, this.dimse);
this.dimseStream = undefined;
this.dimseStoreStream = undefined;
this.dimse = undefined;
}
}
}
});
Expand Down
31 changes: 31 additions & 0 deletions src/Server.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,37 @@ class Scp extends Network {
});
}

/**
* Allows the caller to create a Writable stream to accumulate the C-STORE dataset.
* The default implementation creates a memory Writable stream that for, big instances,
* could cause out of memory situations.
* @method
* @param {PresentationContext} acceptedPresentationContext - The accepted presentation context.
* @param {CStoreRequest} request - C-STORE request.
* @returns {Writable} The created store writable stream.
*/
// eslint-disable-next-line no-unused-vars
createStoreWritableStream(acceptedPresentationContext, request) {
return super.createStoreWritableStream();
}

/**
* Allows the caller to create a Dataset from the Writable stream used to
* accumulate the C-STORE dataset. The created Dataset is passed to the
* Scp.cStoreRequest method for processing.
* @method
* @param {Writable} writable - The store writable stream.
* @param {PresentationContext} acceptedPresentationContext - The accepted presentation context.
* @param {function(Dataset)} callback - Created dataset callback function.
*/
createDatasetFromStoreWritableStream(writable, acceptedPresentationContext, callback) {
return super.createDatasetFromStoreWritableStream(
writable,
acceptedPresentationContext,
callback
);
}

/**
* Association request received.
* @method
Expand Down
2 changes: 1 addition & 1 deletion src/version.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
module.exports = '0.1.27';
module.exports = '0.2.0';