## Node.js advanced

In [None]:
'use strict';

### Event Emitter

This is useful to decouple event producers from event consumers. The API is quite simple and relies on two major methods:
* ```emit``` is used to trigger an event
* ```on``` is used to add a callback function that's going to be executed when the event is triggered

Other useful methods are:
* ```once()```: add a one-time listener
* ```removeListener()``` / ```off()```: remove an event listener from an event
* ```removeAllListeners()```: remove all listeners for an event

In [None]:
const EventEmitter = require('events');

class TicketManager extends EventEmitter {

    constructor(supply) {
        super();
        this.supply = supply;
    }

    buy(email, price) {
        
        if (this.supply == 0) {
            this.emit('error', new Error('There are no more tickets left to purchase'));
            return;
        }

        this.supply--;
        this.emit('buy', email, price, Date.now());
    }
}

const ticketManager = new TicketManager(10);

In [None]:
class EmailService {
    send(email) {
        console.log(`Sending email to ${email}`);
    }
}

const emailService = new EmailService();

In [None]:
ticketManager.on('buy', (email, price, timestamp) => {
    emailService.send(email);
});

ticketManager.on('error', (error) => {
    console.error(`Handle error: ${error}`);
});

ticketManager.buy('test@email.com', 10);
for (let i = 0; i < 10; ++i) {
    ticketManager.buy(`test${i}@email.com`, 10);
}

### Streams

Streams are a way to handle file reading / writing, network communication etc in chunks of data processed piece by piece, without keeping the whole data in memory.

e.g. Netflix doesn't ask you to wait until 5GB of video is downloaded in order to watch a movie, instead it sends to the video player a continuous stream of chunks, allowing the player to start as soon as it has filled its buffer with enough chunks.

There are four major types of streams in Node.js:

* ```Writable```: streams to which we can write data. e.g. ```fs.createWriteStream()``` lets us write data to a file using streams.

* ```Readable```: streams from which data can be read. e.g. ```fs.createReadStream()``` lets us read the contents of a file.

* ```Duplex```: streams that are both ```Readable``` and ```Writable```. e.g., ```net.Socket```

* ```Transform```: streams that can modify or transform the data as it is written and read.

In [None]:
const fs = require('fs');
let data = '';

const readable = fs.createReadStream('./files/file.txt', { encoding: 'utf8' });

readable.on('data', (chunk) => {
   data += chunk;
});

readable.on('end', () => {
   console.log(data);
});

readable.on('error', (err) => {
   console.error(err);
});

console.log('waiting for events ..');

Although Stream API usage is pretty straight-forward, implementing one is not that simple. Especially, a Transform stream. 

Whenever you have a producer and (possibly multiple) consumer(s) you will face data flow control problems.

Namely, backpressure is the common term used when the buffer used to store received data fills much faster than it is consumed, usually because the processing performed on that data takes too much.

Luckily, Node.js comes with a solution to it. The Writable stream rejects any writes if its buffer is full and only accepts new writes after it emits 'drain' event.

In [None]:
const { once } = require('events');
const { finished } = require('stream/promises');

async function writeIterableToFile(iterable, filePath) {

    const writable = fs.createWriteStream(filePath, { encoding: 'utf8' });
    for await (const chunk of iterable) {

        if (!writable.write(chunk)) {
            await once(writable, 'drain'); // Handle backpressure
        }
    }
    writable.end();
    await finished(writable); // Wait until done. Throws if there are errors.
}

const readableStream = fs.createReadStream('./files/file.txt', { encoding: 'utf8' });

(async () => {

    try {
        
        await writeIterableToFile(readableStream, './files/rewriten.txt');
        console.info('Finished rewriting');
        
    } catch (err) {
        console.error(err);
    }

})();

Transform streams are very useful if you want to transform data on the fly.

e.g. you want to process an image uploaded to the server (tag people in it) before writing it to disk.

In [None]:
const zlib = require('zlib');

const input = fs.createReadStream('./files/file.txt');
const output = fs.createWriteStream('./files/file-compressed.txt.gz');

input
    .pipe(zlib.createGzip())
    .pipe(output);