Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for writing received DICOM content to Writable streams #41

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

PantelisGeorgiadis
Copy link
Owner

An attempt was made to implement C-STORE SCP streaming support, based on the ideas discussed in #39.
More specifically, the Scp class was enriched with two extra functions that give the caller the oppurtunity to create the stream that will receive the incoming fragment buffers and to translate the accumulated fragments back into a Dataset.
The functions are the createStoreWritableStream and the createDatasetFromStoreWritableStream, respectively. The first one provides to the user the accepted presentation context (to assess the dataset type and transfer syntax) and expects a Writable stream to be returned. The latter provides to the user the previously created Writable stream, the accepted presentation context and callback that could be used to return the parsed Dataset. The default implementation still uses memory buffers.

Bellow is a sample implementation of an Scp class that accumulates the incoming C-STORE data to temp file streams (using the temp library). Once the reception is over (last PDV fragment received), the files are read back into a Dataset which are passed to the Scp.cStoreRequest method for further processing (this is not a mandatory step - the callback can return undefined if there is no interest for passing the dataset in the Scp.cStoreRequest method).

const temp = require('temp');

class StreamingScp extends Scp {
  constructor(socket, opts) {
    super(socket, opts);
    this.association = undefined;

    temp.track();
  }

  createStoreWritableStream(acceptedPresentationContext) {
    return temp.createWriteStream();
  }

  createDatasetFromStoreWritableStream(writable, acceptedPresentationContext, callback) {
    writable.on('finish', () => {
      const path = writable.path;
      const datasetBuffer = fs.readFileSync(path);
      const dataset = new Dataset(
        datasetBuffer,
        acceptedPresentationContext.getAcceptedTransferSyntaxUid()
      );
      callback(dataset);
    });
    writable.on('error', (err) => {
      callback(undefined);
    });
  }

  associationRequested(association) {
    this.association = association;

    const contexts = association.getPresentationContexts();
    contexts.forEach((c) => {
      const context = association.getPresentationContext(c.id);
      if (Object.values(StorageClass).includes(context.getAbstractSyntaxUid())) {
        const transferSyntaxes = context.getTransferSyntaxUids();
        transferSyntaxes.forEach((transferSyntax) => {
          if (
            transferSyntax === TransferSyntax.ImplicitVRLittleEndian ||
            transferSyntax === TransferSyntax.ExplicitVRLittleEndian
          ) {
            context.setResult(PresentationContextResult.Accept, transferSyntax);
          } else {
            context.setResult(PresentationContextResult.RejectTransferSyntaxesNotSupported);
          }
        });
      } else {
        context.setResult(PresentationContextResult.RejectAbstractSyntaxNotSupported);
      }
    });
    this.sendAssociationAccept();
  }

  cStoreRequest(request, callback) {
    console.log(request.getDataset());

    const response = CStoreResponse.fromRequest(request);
    response.setStatus(Status.Success);
    callback(response);
  }

  associationReleaseRequested() {
    this.sendAssociationReleaseResponse();
  }
}

@richard-viney, @kinsonho, please review and let me know of your thoughts.

@PantelisGeorgiadis PantelisGeorgiadis added the enhancement New feature or request label Mar 18, 2023
@PantelisGeorgiadis PantelisGeorgiadis self-assigned this Mar 18, 2023
}

const stream = this.dimseStream || this.dimseStoreStream;
stream.write(pdv.getValue());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it should check the return value of write(). If it is not true, then we need to apply backpressure and until the drain event to resume. https://nodejs.org/docs/latest/api/stream.html#event-drain

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Thank you Kinson! The only thing that bothers me is how to wait in here for the drain event... any ideas?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas on this @kinsonho?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PantelisGeorgiadis sorry for the late reply.

A pattern that I saw others doing is the following:

if(!stream.write(pdv.getValue())) {
    await new Promise(resolve => stream.once('drain', resolve));
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kinsonho! The async/await pattern is not used in this library. Do you see any drawbacks in start using it (e.g. compatibility, etc.)?

@kinsonho
Copy link

kinsonho commented Apr 1, 2023

With createStoreWriteableStream(), the result is just the dataset, not a full P10 object, right?

For createDatasetFromStoreWriteableStream(), if I would like to have a filtered dataset to subsequent processing after the cStoreRequest, then I will need to split the StoreWriteableStream and create another accumulator writer stream to collect the attributes that I need. Then pass to the createDatasetFromStoreWritableStream(). Correct?

@kinsonho
Copy link

kinsonho commented Apr 1, 2023

Thanks for working on this. Just some quick comments when reviewing the changes for now. I will try it out later.

@PantelisGeorgiadis
Copy link
Owner Author

PantelisGeorgiadis commented Apr 5, 2023

With createStoreWriteableStream(), the result is just the dataset, not a full P10 object, right?

For createDatasetFromStoreWriteableStream(), if I would like to have a filtered dataset to subsequent processing after the cStoreRequest, then I will need to split the StoreWriteableStream and create another accumulator writer stream to collect the attributes that I need. Then pass to the createDatasetFromStoreWritableStream(). Correct?

You are right Kinson. The result accumulated in the writable created by createStoreWriteableStream is just the dataset, not the full P10. However, this is still parsable by the Dataset class, as long as you know whether the syntax is implicit or explicit (hence the acceptedPresentationContext param). I just added a third param to the Dataset constructor that can allow you to pass parsing options to the dcmjs library. These options can help in parsing datasets up to a specific tag (e.g. skip the pixel data).

Regarding, the "need to split the StoreWriteableStream and create another accumulator writer stream to collect the attributes", I don't think that this is nessesary now that the Dataset can read partial datasets. You can create something like a custom "hybrid" writable which can early-decide about the proper course of action and definitely not wait for the Scp.cStoreRequest event to occur.
Here's a super-naive implementation (which actually works!) but you will get the point:

const { Writable } = require('stream');

class StreamingScp extends Scp {
  constructor(socket, opts) {
    super(socket, opts);
    this.association = undefined;
  }

  associationRequested(association) {
    this.association = association;

    const contexts = association.getPresentationContexts();
    contexts.forEach((c) => {
      const context = association.getPresentationContext(c.id);
      if (Object.values(StorageClass).includes(context.getAbstractSyntaxUid())) {
        const transferSyntaxes = context.getTransferSyntaxUids();
        transferSyntaxes.forEach((transferSyntax) => {
          if (
            transferSyntax === TransferSyntax.ImplicitVRLittleEndian ||
            transferSyntax === TransferSyntax.ExplicitVRLittleEndian
          ) {
            context.setResult(PresentationContextResult.Accept, transferSyntax);
          } else {
            context.setResult(PresentationContextResult.RejectTransferSyntaxesNotSupported);
          }
        });
      } else {
        context.setResult(PresentationContextResult.RejectAbstractSyntaxNotSupported);
      }
    });
    this.sendAssociationAccept();
  }

  createStoreWritableStream(acceptedPresentationContext) {
    // Create a custom Writable that will handle the incoming chunks.
    return new StreamingWritable(acceptedPresentationContext);
  }

  createDatasetFromStoreWritableStream(writable, acceptedPresentationContext, callback) {
    // At this point, if there is no interest for the full Dataset that includes pixel data 
    // (because it was written to a file or uploaded to the cloud),
    // just return the metadata dataset to the Scp.cStoreRequest handler (could also be undefined).
    callback(writable.getMetadataDataset());
  }

  cStoreRequest(request, callback) {
    console.log(request.getDataset());

    const response = CStoreResponse.fromRequest(request);
    response.setStatus(Status.Success);
    callback(response);
  }

  associationReleaseRequested() {
    this.sendAssociationReleaseResponse();
  }
}

class StreamingWritable extends Writable {
  constructor(acceptedPresentationContext, options) {
    super(options);

    this.metadataDataset = undefined;
    this.shouldParse = true;
    this.acceptedPresentationContext = acceptedPresentationContext;
  }

  _write(chunk, encoding, callback) {
    if (this.shouldParse) {
      // First write occurred. There's a good chance to have the complete
      // metadata, depending on the max PDU value.
      // Try to parse the chunk, up to the pixel data and create a Dataset.
      this.metadataDataset = new Dataset(
        chunk,
        this.acceptedPresentationContext.getAcceptedTransferSyntaxUid(),
        {
          ignoreErrors: true,
          untilTag: '7FE00010', // PixelData
          includeUntilTagValue: false,
        }
      );
      this.shouldParse = false;
    }

    // Evaluate the metadata Dataset (this.metadataDataset) and decide the proper course of action.
    // i.e. Write/append the chunk to a file, upload it to the cloud or just accumulate it in memory.
    // doStuffWithChunk(chunk)

    // Call the callback to notify that you have finished working on this chunk.
    callback(/* new Error ('Something went wrong during chunk handling') */);
  }

  _final(callback) {
    // At this point no other chunk will be received.
    // Call the callback to notify that you have finished receiving chunks and free resources.
    callback();
  }

  // Returns the parsed metadata dataset.
  getMetadataDataset() {
    return this.metadataDataset;
  }
}

@PantelisGeorgiadis
Copy link
Owner Author

@kinsonho, @richard-viney did you have the chance to test this? Any feedback?

@kinsonho
Copy link

kinsonho commented Jun 5, 2023 via email

@PantelisGeorgiadis PantelisGeorgiadis force-pushed the pgeorgiadis/c-store-streaming branch 2 times, most recently from 4f3adf3 to 9802f86 Compare June 7, 2023 08:58
@richard-viney
Copy link
Contributor

Sorry for the big delay, this looks good and we'll be testing it out next week. The branch is now in conflict with the master branch but we can sort that out no problem.

@PantelisGeorgiadis
Copy link
Owner Author

@richard-viney I just rebased to latest master. My only concern is that we still haven't handled Kinson's comment regarding the stream drain event. Looking forward for your test results!

@richard-viney
Copy link
Contributor

Yes the drain comment is correct. In practice if streaming is going to disk then the incoming network data will most likely be at a slower pace than local disk writes, so you'd get away with it, but for other use cases it could become an issue.

I haven't yet looked at the implications for the library/protocol of adding a wait on the stream here.

@richard-viney
Copy link
Contributor

Would it be possible to pass the CStoreRequest to createStoreWritableStream()? This would allow it access to information such as getAffectedSopInstanceUid() which would be useful for trying to write the file meta information prior to the main data set coming in, which is what I'm aiming to do.

@richard-viney
Copy link
Contributor

I've implemented the above on a fork and it seems to work. Another couple of things have come up too:

  • Have createStoreWritableStream take a callback to allow it to perform asynchronous actions.
  • Should we allow createStoreWritableStream to return an error of some kind? Or do you think this should be handled downstream in the cStoreRequest method?

@richard-viney
Copy link
Contributor

I've got this working, ended up writing a custom DICOM P10 header prior to streaming the rest to a temporary file, which results in a valid DICOM P10. Still more testing to be done though.

My previous comment about making createStoreWritableStream asynchronous isn't important as I worked around it and deferred error handling to the existing cStoreRequest() method, which I think is reasonable given that it decides on the response to send to the SCU.

IMO the most important change to add to this branch is making the C-STORE request available, which I did in a fork here: HeartLab@4192ec9.

The only other issue was regarding drain() and back pressure, however this change is still a big improvement even without that issue being addressed. Have you had a chance to look into it? If not, I'll likely do so in the next couple of days.

Thanks.

@PantelisGeorgiadis PantelisGeorgiadis force-pushed the pgeorgiadis/c-store-streaming branch 2 times, most recently from 5e0e9e7 to 2b858dc Compare February 26, 2024 09:19
@PantelisGeorgiadis
Copy link
Owner Author

Great news @richard-viney! Thanks for working on this! I incorporated your changes to the working branch and made an unsuccessful effort to handle stream backpressure, which I reverted… ☹
However, I’m pretty sure that we will find a way to handle it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory pressure and streaming received DICOM content to disk
3 participants