Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling with customSource not hooked up correcly? #309

Open
jpambrun opened this issue Apr 27, 2024 · 3 comments
Open

Error handling with customSource not hooked up correcly? #309

jpambrun opened this issue Apr 27, 2024 · 3 comments

Comments

@jpambrun
Copy link

There is a good chance I am doing something wrong, but I feel like I tried everything. customSource need to be able to retry on stream error. There are two different scenarios: 1) at stream creating, 2) later while streaming.

For 1) some API don't only make the stream available only after a callback or promise. This means we need to return a passthrough immediately and pipe it later. I could retry before that.

For 2) I may have already streamed bytes and and it's simpler to throw the stream away and have the other side call file.stream() again.

In this example I am simulating 1), but regardless try to error the passthrough stream to get the other side to call file.stream() again. Unfortunately, I just can't figure how to connect it.

If it happens for the initial dictionnay I get the exception

creation error
pass error
Uncaught Exception: Error: Random creation failure (stream)
    at file:///home/jpambrun/work/unzip/index.mjs:17:34
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)

when it happens after I get no excepton, node just stops

creation error
pass error
import * as unzipper from "unzipper";
import { PassThrough } from 'node:stream';
import { Readable } from 'node:stream';

const TEST_ERROR = process.env.TEST_ERROR || false;
const URL = 'http://127.0.0.1:8080/modules.zip'

const customSource = {
    stream: function (offset, length) {
        const pass = new PassThrough();
        pass.on("error", (err) => console.log('pass error'))
        console.log(length, `bytes=${offset}-${length ? (offset + length - 1) : ''}`)
        fetch(URL, { headers: { Range: `bytes=${offset}-${length ? offset + length - 1 : ''}` } })
            .then(res => {
                if (TEST_ERROR && Math.random() > 0.9) {
                    console.log('creation error')
                    pass.destroy(new Error('Random creation failure (stream)'));
                    return;
                }
                return res.body
            })
            .then(body => {
                if(body === undefined) return;
                const stream = Readable.fromWeb(body);
                stream.pipe(pass)
            })
        return pass
    },
    size: async function () {
        const res = await fetch(URL, { method: 'HEAD' });
        return parseInt(res.headers.get('content-length'));
    }
}

process.on('unhandledRejection', (reason, promise) => {
    console.log('Unhandled Rejection:', reason.stack || reason)
    process.exitCode = 1
})

process.on('uncaughtException', (err, origin) => {
    console.log('Uncaught Exception:', err)
    process.exitCode = 1
});

const directory = await unzipper.Open.custom(customSource);
const decompressedFiles = []
for await (const file of directory.files.filter(f => f.type === 'File')) {
    try {
        console.log(file.path)
        const fileStream = await file.stream();
        //just consume the stream to decompress the file
        await new Promise((resolve, reject) => {
            fileStream.on('data', (chunk) => { /* noop*/ });
            fileStream.on('end', () => resolve());
            fileStream.on('error', (err) => {
                console.log('Stream error:', err); // don't see that
                reject(err)
            });
        });
        decompressedFiles.push(file.filename);
    } catch (error) {
        console.log('for await try/catch'); //don't see that
    }
}

console.log(decompressedFiles.length)

// zip -r modules.zip node_modules/
// npx http-server
// TEST_ERROR=true node index.mjs
@jpambrun
Copy link
Author

In the end, I think I found something that works-ish by handling all error in the customSource and trying not to ever bubble up the error throught the stream.

I am still not sure this is the best story around error handling. If I can't retry enough and destroy the passthrough stream it will terminate the node process.

import * as unzipper from "unzipper";
import { PassThrough } from 'node:stream';
import { Readable } from 'node:stream';
import pRetry from "p-retry";

const TEST_ERROR = process.env.TEST_ERROR || false;
const URL = 'http://127.0.0.1:8080/modules.zip'

const customSource = {
    stream: function (offset, length) {
        const pass = new PassThrough();
        pass.on("error", (err) => console.log('pass error'))
        let bytesWritten = 0;
        pRetry(async () => {
            const res = await fetch(URL, { headers: { Range: `bytes=${offset + bytesWritten}-${length ? offset + length - 1 : ''}` } })
            if (!res.ok) throw new Error(`Failed to fetch, status ${res.status}`);
            if (TEST_ERROR && Math.random() > 0.99) throw new Error(`Random fetch failure`);

            const body = await res.body;
            const stream = Readable.fromWeb(body);
            await new Promise((resolve, reject) => {
                stream.on("error", (err) => reject(err))
                stream.on("end", () => {
                    pass.end();
                    resolve();
                })
                stream.on("data", (data) => {
                    if (TEST_ERROR && Math.random() > 0.99) stream.destroy(new Error('Random streaming failure'));
                    pass.write(data);
                    bytesWritten += data.length;
                })
            })
        }, { retries: 5, onFailedAttempt: (err) => console.log(err) })
        return pass
    },
    size: async function () {
        return pRetry(async () => {
            const res = await fetch(URL, { method: 'HEAD' });
            return parseInt(res.headers.get('content-length'));
        }, { retries: 5, onFailedAttempt: (err) => console.log(err) })
    }
}

const directory = await unzipper.Open.custom(customSource);
const decompressedFiles = []
for await (const file of directory.files.filter(f => f.type === 'File')) {
    try {
        // console.log(file.path)
        const fileStream = await file.stream();
        //just consume the stream to decompress the file
        await new Promise((resolve, reject) => {
            fileStream.on('data', (chunk) => { /* noop*/ });
            fileStream.on('end', () => resolve());
            fileStream.on('error', (err) => {
                console.log('Stream error:', err); // don't see that
                reject(err)
            });
        });
        decompressedFiles.push(file.filename);
    } catch (error) {
        console.log('for await try/catch'); //don't see that
    }
}

console.log(decompressedFiles.length)

// zip -r modules.zip node_modules/
// npx http-server
// TEST_ERROR=true node index.mjs

@ZJONSSON
Copy link
Owner

I don't know if the retry mechanism should be baked into the unzipper. Some projects may want to retry, and the way they retry could be very custom (i.e., number of retries, exponential backoff, etc.). Anything "can break," not just unzipper, so it seems to me that the retry methodology is outside the scope of this library. However, if you want to create your own custom adapter with retry, the approach above makes sense, but here are some comments:

stream objects are not promises, so doing await file.stream() doesn't wait for the stream to complete. You have to consume the stream to get to the end. A simple way to do that is to do await file.buffer() instead (this method consumes all the stream's contents into a buffer, and in this case, you just throw it away instead of assigning it to a variable). Also if you await on file.buffer(), consuming the entire stream, hen you most likely don't need the promise with event emitters below.

also, FYI: adding .on('data',... is an anti-pattern since it switches the stream into flowing mode (see here) with no backpressure management (i.e. you can blow up memory if unzipping is slower than fetching the data for large files)

A better way to introduce mocked errors in the stream is to use a transform, which you pipe directly into pass instead of doing the pass. Write on the chunks. An example would be something like:

const stream = Readable.fromWeb(body)
  .pipe(new Transform({
    transform(chunk, encoding, callback) {
      if (TEST_ERROR && Math.random() > 0.99) stream.destroy(new Error('Random streaming failure'));
      bytesWritten += chunk.length;
     callback(null, chunk);
    },
}); 

@jpambrun
Copy link
Author

Thanks for the very helpful and generous anwser. I realy appreciate. Your comments are absolutely on point.

My challenge is that I will be streaming from s3 from large-ish zip (~500MB) each with thousands of files (~300KB). Tens of thoudands of zips spanning 100+ terrabytes. On the plus side, with this library I can do it at 1 gbps, from my laptop from home. Once deployed on ec2 it should reach 3-4gbps on moderatly size VM. This is pretty much amazing. The downside is that I do 1000s of request per second and that inevitably some will fail. In my testing I get one hiccup at least every 30s. When that happens I want to retry only one entry, not the whole zip file. Without the trick above, I can't find a way to catch the error and it takes down the whole node process.

I realize now that my proposal is making the stream in flowing mode wich is suboptimal. However, this made it easy the keep using the same returned passthrough stream. On a streaming error where I already have pushed some data I can resume where I felt off by adjusting the retried byte range request start position. I am struggling to see how I would do that pausing mode. I would unpipe the broken stream and re-pipe a new one later on retry? woulnd't the error have already propagated and I would be in the same situation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants