Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

javascript part I don't understand #4

Closed
cybernadinou opened this issue Aug 26, 2020 · 7 comments
Closed

javascript part I don't understand #4

cybernadinou opened this issue Aug 26, 2020 · 7 comments

Comments

@cybernadinou
Copy link

Hi !

Sorry to bother you but I found out your article online and wanted to try all the solutions you did to insert some csv file into DynamoDB but i dont' get this part of code :/

const batchOf = (size = 25, execute) => {
  const items = [];
  return async (item, last = false) => {
    if (item) {
      items.push(item);
    }
    if (last || items.length === size) {
      await execute(items);
      items.length = 0;
    }
  };
};

First I didn't try with thousands of lines in the csv but just 5.
I don't understand how the params last can ever be true because when I tried with only 5 items it never goes to the execute part.

Second i think it's more javascript related but I don't get how can you have the item as parameter of the function ?

I mean if i try to split the function :

const batchOf = (size = 25, execute) => {
    const items = [];
    async function passItem (item, last = false) {
        if (item) {
            items.push(item);
        }
        if (last || items.length === size) {
            await execute(items);
            items.length = 0;
        }
    };
    return passItem(?, ? );  // i should pass some parameters here.. 
};

How come is it working for the parameters to be pass to the function when you do the return async ? because in my example I need the declaration of the item parameter.

If you could help me for my 2 questions it'd be greatly appreciated.
Thanks a lot :)

@cybernadinou cybernadinou changed the title javascript par i don't get javascript part I don't understand Aug 26, 2020
@a-h
Copy link
Owner

a-h commented Aug 26, 2020

Hi @cybernadinou, glad you asked the question!

re: I don't understand how the params last can ever be true because when I tried with only 5 items it never goes to the execute part.

The last param has to be passed in. I admit that it's a bit complicated, because batchOf is actually a function that returns a function. Even more complicated is that it's a non-async function that returns an async function...

I've written a standalone example that hopefully makes it clearer how it works:

const batchOf = (size = 25, execute) => {
  let items = [];
  return async (item, last = false) => {
    if (item) {
      items.push(item);
    }
    if (last || items.length === size) {
      await execute(items);
      items.length = 0;
    }
  };
};

const main = async () => {
  let processedItemCount = 0;
  const batchRunner = batchOf(2, items => {
    processedItemCount += items.length;
    console.log(`Executing batch of ${items.length} items`);
  });

  const itemsToProcess = [1, 2, 3, 4, 5, 6, 7, 8, 9];

  for (let i = 0; i < itemsToProcess.length; i++) {
    const item = itemsToProcess[i];
    const isLast = i === itemsToProcess.length - 1;
    console.log(`Adding item '${item}' to batch processor...`);
    await batchRunner(item, isLast);
  }

  // This won't work because the forEach won't wait for the async function to complete execution.
  //itemsToProcess.forEach(async (item, index) => {
    //console.log(`Adding item '${item}' to batch processor...`);
    //const isLast = index === itemsToProcess.length;
    //await batchRunner(item, isLast);
  //});

  console.log(`Processed ${processedItemCount} items.`);
};

main().then(() => console.log("Done"));

You should be able to run that with node <filename>.js.

Does that help?

@cybernadinou
Copy link
Author

cybernadinou commented Aug 26, 2020

Hi !!
Yes that does help to understand how things works.

So just 2 questions now :
Because in our case we need to have a stream so we can't do the for loop , does it means I need to do processor(null, true) :

const parser = fastCsv
    .parseFile(fileName, { headers: true, delimiter })
    .on("error", (error) => console.error(error))
    .on("data", async (row) => {
      parser.pause();
      try {
        await processor(row);
        rowIndex++;
      } catch (err) {
        console.log(`Error processing row ${rowIndex}: ${err}`);
      } finally {
        parser.resume();
      }
    })
    .on("end", async (rowCount) => {
      await batch(null, true);  // should I use processor(null, true) ?
      console.log(`Parsed ${rowCount} rows`);
    });

Because on the part on("end", in the original code, the function called is batch(null, true) but it's not defined anywhere :/ So am I guessing right to call the processor function ?

Second question :

  // This won't work because the forEach won't wait for the async function to complete execution.
  itemsToProcess.forEach(async (item, index) => {
    console.log(`Adding item '${item}' to batch processor...`);
    const isLast = index === itemsToProcess.length;
    await batchRunner(item, isLast);
  });

Is it because it's not supported by javascript yet that's why we need to go through the for loop ?
Sorry for the dummy question :/

Thanks for your help.

@a-h
Copy link
Owner

a-h commented Aug 26, 2020

  1. Yes, looks like a bug. Should have been a call to processor. I guess I did a bad git commit, because there's no way that would run, sorry for the confusion! I was testing on a huge file, so it's likely I never let it get to the end of it, it would have taken a long time with the JavaScript solution.

  2. Async functions return a Promise instead of accepting a callback function as an argument. If you call an async function without awaiting it await fn() or handling the promise fn().then(() => doThis()) then execution just moves on to the next statement.

The forEach function accepts function argument, but it doesn't expect to have to look at the return value fo that function and wait for any promises returned by it to resolve.

So, the forEach happens instantly, while in the background n number of things are happening in promises that aren't being tracked - some might succeed, others might fail, but no code is tracking what happens to the promises. If the program exits, then all execution of that background activity stops.

For example, if you don't wait for the slow counter to complete before your program exits, the count won't complete:

const main = async () => {
  let count = 0;
  // Slow adder takes 250 ms to add 1 to a number.
  const slowAdder = n =>
    new Promise((resolve, reject) => {
      setTimeout(function() {
        count += n;
        resolve();
      }, 250);
    });

  // Wait for each count to be added up.
  for (let i = 0; i < 5; i++) {
    await slowAdder(1);
  }
  console.log(`Expected count of 5, got ${count}`);

  // If we don't wait, we won't finish our counting.
  for (let i = 0; i < 95; i++) {
    slowAdder(1);
  }
  console.log(`Expected count of 100, got ${count}`);
};

main().then(() => console.log("Done"));

@cybernadinou
Copy link
Author

cybernadinou commented Aug 27, 2020

Plop !

So now I'm trying to fix the issue about the items left but as your code is way to advance for me, here is what I'm trying to do :

let itemsToProcess = [];
let sizeChunk = 25;
stream
    .pipe(fastCsv.parse({headers : true}))
    .on("data", async (row) => {
        stream.pause();
        try {
            if(itemsToProcess.length === sizeChunk){
                console.log('on data');
                console.log(itemsToProcess.length);
                await batchWrite(client, itemsToProcess);
                itemsToProcess = [];
            }
            itemsToProcess.push(row);
        }catch (e) {
            console.log(e);
        } finally {
            stream.resume();
        }
    })
    .on("error", (err) => { console.log(err)})
    .on("end", async () => {
        console.log('end');
        console.log(itemsToProcess.length);
    });

The result is so weird...
I got into my console :

on data
25
on data
25
on data
25
on data
25
on data
25
end
25

Expected result :

on data
25
on end
5

I actually don't get why do I have several times the 25 data. because in my loop I do reset the arrayOfItems to process just after the call to my function to insert the data. I tried with both this lines :

itemsToProcess.length = 0;
itemsToProcess = [];

I'm seriously lost right now...

Thx again for you help.

@a-h
Copy link
Owner

a-h commented Aug 28, 2020

You probably want something like this, taking a copy of the data to prevent mutation of it.

import "path";
import fastCsv from "fast-csv";
import * as fs from "fs";
import * as path from "path";

const batchWrite = async (items) =>
  items.forEach((item) => console.log(`processing: ${JSON.stringify(item)}`));

const main = () =>
  new Promise((resolve, reject) => {
    let itemsToProcess = [];
    let sizeChunk = 5;
    const stream = fs.createReadStream(path.resolve("parse.csv"));

    stream
      .pipe(fastCsv.parse({ headers: true }))
      .on("data", (row) => {
        itemsToProcess.push(row);
        if (itemsToProcess.length === sizeChunk) {
          console.log("processing chunk...");
          stream.pause();
          batchWrite(new Array(...itemsToProcess)).finally(() => {
            stream.resume();
          });
          itemsToProcess.length = 0;
        }
      })
      .on("error", (err) => {
        reject(err);
      })
      .on("end", () => {
        stream.pause();
        console.log("processing end...");
        batchWrite(itemsToProcess).then(() => resolve());
      });
  });

main().then(() => console.log("done"));

With a file of parse.csv in the same directory:

id,value
1,a
2,b
3,c
4,d
5,d
6,e
7,f

It outputs:

processing chunk...
processing: {"id":"1","value":"a"}
processing: {"id":"2","value":"b"}
processing: {"id":"3","value":"c"}
processing: {"id":"4","value":"d"}
processing: {"id":"5","value":"d"}
processing end...
processing: {"id":"6","value":"e"}
processing: {"id":"7","value":"f"}
done

@cybernadinou
Copy link
Author

Thanks a lot it's working fine now ! Basically it's because I didn't put the stream.resume() at the good place..

But I thought that :

try { 
  await myFunctionReturningPromise();
} catch (e) {
  console.log(e);
}
finally {
   //final code here
}

was equal to this :

myFunctionReturningPromise().finally(() => {
   // final my code
});

Am I wrong ?

One more time, thank you very much for your help !

@a-h
Copy link
Owner

a-h commented Aug 31, 2020

The thing you were missing was really that you'd done this:

.on("data", async (row) => {

If you look at my code, it does this:

.on("data", (row) => {

The function passed into the on("data", ...) call cannot be asynchronous due to the design of that library. Maybe the library could wait for the promise to complete, but there's probably a good reason it doesn't.

@a-h a-h closed this as completed Sep 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants