LazyFutureStream #67

Closed
brusand opened this Issue Oct 17, 2015 · 7 comments

Comments

Projects
None yet
3 participants
@brusand

brusand commented Oct 17, 2015

Hi

First congratulation for this great tools.

i want to process a batch tool, in parallel for each files of the File.walk stream , how to mixed them ?
like

    LazyFutureStream.parallel(Files.walk(dir))
            .forEach(id -> { //do work here
                        System.out.println(id + "\t" + Thread.currentThread());
                    });

any idea ?

Best regards
Bruno

@johnmcclean

This comment has been minimized.

Show comment
Hide comment
@johnmcclean

johnmcclean Oct 19, 2015

Member

Thanks Bruno,

Here is an example that will load 100 files from the user directory simultaneously, into a LazyFutureStream of Strings.

LazyReact react = new LazyReact(100,110); // Stream builder, for Streams with 100 threads and 110 max futures
react.from(Files.walk(Paths.get(".")))    //create a LazyFutureStream from a standard JDK Stream
     .map(Unchecked.function(Files::readAllBytes))   //load the file contents
     .map(String::new)    //map to a String
     .flatMap(s->Stream.of(s.split("\n")))  //flatMap all the file data into  a single Stream
     .forEach(System.out::println);

The flatMap here will merge all the data from each File into a single (multi-threaded) Stream. That's probably not what you want - so to process each file separately this would also work

LazyReact react = new LazyReact(100,110);
react.from(Files.walk(Paths.get(dir)))
     .map(Unchecked.function(Files::readAllBytes))
     .map(String::new)
     .forEach(this::processFileStringData)

Or to process the raw bytes

LazyReact react = new LazyReact(100,110);
react.from(Files.walk(Paths.get(dir)))
     .map(Unchecked.function(Files::readAllBytes))
     .forEach(this::processFileRawBytes)
Member

johnmcclean commented Oct 19, 2015

Thanks Bruno,

Here is an example that will load 100 files from the user directory simultaneously, into a LazyFutureStream of Strings.

LazyReact react = new LazyReact(100,110); // Stream builder, for Streams with 100 threads and 110 max futures
react.from(Files.walk(Paths.get(".")))    //create a LazyFutureStream from a standard JDK Stream
     .map(Unchecked.function(Files::readAllBytes))   //load the file contents
     .map(String::new)    //map to a String
     .flatMap(s->Stream.of(s.split("\n")))  //flatMap all the file data into  a single Stream
     .forEach(System.out::println);

The flatMap here will merge all the data from each File into a single (multi-threaded) Stream. That's probably not what you want - so to process each file separately this would also work

LazyReact react = new LazyReact(100,110);
react.from(Files.walk(Paths.get(dir)))
     .map(Unchecked.function(Files::readAllBytes))
     .map(String::new)
     .forEach(this::processFileStringData)

Or to process the raw bytes

LazyReact react = new LazyReact(100,110);
react.from(Files.walk(Paths.get(dir)))
     .map(Unchecked.function(Files::readAllBytes))
     .forEach(this::processFileRawBytes)
@danieldietrich

This comment has been minimized.

Show comment
Hide comment
@danieldietrich

danieldietrich Oct 19, 2015

[Daniel is chiming in...]

Hi John, wow, that's really great :) I've two questions - just for curiosity because I had similar thoughts in a different context:

  1. Which part of the program is responsible for the life-cycle of the (file-)input streams? I.e. the input may not be read exhaustively (e.g. similar to take(10) or limit(10)). Also two different inputs may be composed (e.g. via zip or s.th. similar). What if one input fails - is the other also handled appropriate? Or is the user responsible for closing resources?
  2. I would expect the flatMap to return an instance of the same kind, here: LazyReact. Here a Java Stream is returned, right?

Many thanks!

[Daniel is chiming in...]

Hi John, wow, that's really great :) I've two questions - just for curiosity because I had similar thoughts in a different context:

  1. Which part of the program is responsible for the life-cycle of the (file-)input streams? I.e. the input may not be read exhaustively (e.g. similar to take(10) or limit(10)). Also two different inputs may be composed (e.g. via zip or s.th. similar). What if one input fails - is the other also handled appropriate? Or is the user responsible for closing resources?
  2. I would expect the flatMap to return an instance of the same kind, here: LazyReact. Here a Java Stream is returned, right?

Many thanks!

@johnmcclean

This comment has been minimized.

Show comment
Hide comment
@johnmcclean

johnmcclean Oct 19, 2015

Member

Hey Daniel,
1.

a) Closing Resources

Files.walk(Paths.get(".")) differs from Files.lines in that it is returning a Stream, which hasn't actually opened the files yet. The files are read by Files::readAllBytes, which does a try with resources to close the input stream if it fails.

For Files.lines which returns a Stream and actually opens the files you need to do something like

 try(Stream<String> stream = Files.lines(Paths.get("."))) {
             react.from(stream)
                  .forEach(System.out::println);
         } 

b) Limit / Take

If the limit is applied before the flatMap we will limit the number of files, so excluded files will either just be dropped, or not read.

We can either act on the underlying data (via actOnFutures()) or the results.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .actOnFutures().limit(3)    //take the first 3 files only.
             .map(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);

In the above example only 3 files will be loaded. In the example below we will take the first three files to complete loading.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .map(Unchecked.function(Files::readAllBytes))
             .limit(3)    //take the first 3 files LOADED.
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);

For these examples the file clossing is handled by Files::readAllBytes on a per file basis so we are ok.

c) Failure handling

File opening / closing is taken care of in the Try with resource block inside readAllBytes, so we don't need to worry about closing the Stream on error.

 try (SeekableByteChannel sbc = Files.newByteChannel(path);
             InputStream in = Channels.newInputStream(sbc)) {
            long size = sbc.size();
            if (size > (long)MAX_BUFFER_SIZE)
                throw new OutOfMemoryError("Required array size too large");

            return read(in, (int)size);
        }

But it is possible we may want to retry, or provide an alternative source for a corrupted file. We can use retry to indicate that we should retry a failed load (retry parameters are separately configurable).

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .retry(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);

We can also use onFail or recover from a failed load.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .retry(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .recover(error->"default value")
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);
  1. flatMap

Yep it should be the same kind, but it's not LazyReact. LazyReact is the builder for the Stream, once react#from is called it returns a LazyFutureStream. So, when flatMap is called it's called on a LazyFutureStream which is an implementation of the JDK Stream interface - so it can accept any JDK 8 Stream and still obey the appropriate laws :)

Member

johnmcclean commented Oct 19, 2015

Hey Daniel,
1.

a) Closing Resources

Files.walk(Paths.get(".")) differs from Files.lines in that it is returning a Stream, which hasn't actually opened the files yet. The files are read by Files::readAllBytes, which does a try with resources to close the input stream if it fails.

For Files.lines which returns a Stream and actually opens the files you need to do something like

 try(Stream<String> stream = Files.lines(Paths.get("."))) {
             react.from(stream)
                  .forEach(System.out::println);
         } 

b) Limit / Take

If the limit is applied before the flatMap we will limit the number of files, so excluded files will either just be dropped, or not read.

We can either act on the underlying data (via actOnFutures()) or the results.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .actOnFutures().limit(3)    //take the first 3 files only.
             .map(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);

In the above example only 3 files will be loaded. In the example below we will take the first three files to complete loading.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .map(Unchecked.function(Files::readAllBytes))
             .limit(3)    //take the first 3 files LOADED.
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);

For these examples the file clossing is handled by Files::readAllBytes on a per file basis so we are ok.

c) Failure handling

File opening / closing is taken care of in the Try with resource block inside readAllBytes, so we don't need to worry about closing the Stream on error.

 try (SeekableByteChannel sbc = Files.newByteChannel(path);
             InputStream in = Channels.newInputStream(sbc)) {
            long size = sbc.size();
            if (size > (long)MAX_BUFFER_SIZE)
                throw new OutOfMemoryError("Required array size too large");

            return read(in, (int)size);
        }

But it is possible we may want to retry, or provide an alternative source for a corrupted file. We can use retry to indicate that we should retry a failed load (retry parameters are separately configurable).

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .retry(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);

We can also use onFail or recover from a failed load.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .retry(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .recover(error->"default value")
             .flatMap(s->Stream.of(s.split("\n")))
             .forEach(System.out::println);
  1. flatMap

Yep it should be the same kind, but it's not LazyReact. LazyReact is the builder for the Stream, once react#from is called it returns a LazyFutureStream. So, when flatMap is called it's called on a LazyFutureStream which is an implementation of the JDK Stream interface - so it can accept any JDK 8 Stream and still obey the appropriate laws :)

@johnmcclean

This comment has been minimized.

Show comment
Hide comment
@johnmcclean

johnmcclean Oct 19, 2015

Member

More on limit /take.

On the above response we applied the limit to the number of files (either number of files loaded limit(x), or the number of files to be loaded actOnFutures().limit(x)).

We could also limit the number of lines of data, by applying the limit after the flatMap.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .map(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .limit(500)   //take only the first 500 lines
             .forEach(System.out::println);

Note actOnFutures().limit() doesn't really make sense here (in fact it will give you the same answer as limit()) as the result of flatMap is a merged Stream of results from the previous stage.

In this case we may load more files than we need to get the first 500 lines.

Member

johnmcclean commented Oct 19, 2015

More on limit /take.

On the above response we applied the limit to the number of files (either number of files loaded limit(x), or the number of files to be loaded actOnFutures().limit(x)).

We could also limit the number of lines of data, by applying the limit after the flatMap.

LazyReact react = new LazyReact(100,110);
        react.from(Files.walk(Paths.get(".")))
             .map(Unchecked.function(Files::readAllBytes))
             .map(String::new)
             .flatMap(s->Stream.of(s.split("\n")))
             .limit(500)   //take only the first 500 lines
             .forEach(System.out::println);

Note actOnFutures().limit() doesn't really make sense here (in fact it will give you the same answer as limit()) as the result of flatMap is a merged Stream of results from the previous stage.

In this case we may load more files than we need to get the first 500 lines.

@danieldietrich

This comment has been minimized.

Show comment
Hide comment
@danieldietrich

danieldietrich Oct 19, 2015

Thx John, that really sounds elaborated. I will try it out if I find some time!

Thx John, that really sounds elaborated. I will try it out if I find some time!

@brusand

This comment has been minimized.

Show comment
Hide comment
@brusand

brusand Oct 19, 2015

Awersome,

my test go from 40000 ms with stream files.walk().parallel to ... 97 ms with Lazyreact ;-)

  LazyReact streamBuilder = new LazyReact(100, 100);
  streamBuilder.from(Files.walk(dir))
            .map(id -> processExif(id))   //load the stdout of the command line to the pipe
            .map(exifStdout -> {  // transform the piped to json object and return the map
                // json unmap
                Map<String, Object> exifs = null;
                ObjectMapper mapper = new ObjectMapper();
                List<Map<String, Object>> jsonMaps = null;

                if (exifStdout != null) {
                    try {
                        jsonMaps = mapper.readValue(exifStdout, List.class);
                        if (jsonMaps != null) {
                            exifs = jsonMaps.get(0);
                            System.out.println("Exif sourceFile // :" + exifs.get("SourceFile") + "\t" + Thread.currentThread());

                        }

                    } catch (IOException e) {
                        System.out.println("Exif Exception :" + exifStdout.toString() + "\t" + Thread.currentThread());

                    }
                }
                return exifs;
            })
            .run(); // and return 97ms later
    ;

Many thanks John

brusand commented Oct 19, 2015

Awersome,

my test go from 40000 ms with stream files.walk().parallel to ... 97 ms with Lazyreact ;-)

  LazyReact streamBuilder = new LazyReact(100, 100);
  streamBuilder.from(Files.walk(dir))
            .map(id -> processExif(id))   //load the stdout of the command line to the pipe
            .map(exifStdout -> {  // transform the piped to json object and return the map
                // json unmap
                Map<String, Object> exifs = null;
                ObjectMapper mapper = new ObjectMapper();
                List<Map<String, Object>> jsonMaps = null;

                if (exifStdout != null) {
                    try {
                        jsonMaps = mapper.readValue(exifStdout, List.class);
                        if (jsonMaps != null) {
                            exifs = jsonMaps.get(0);
                            System.out.println("Exif sourceFile // :" + exifs.get("SourceFile") + "\t" + Thread.currentThread());

                        }

                    } catch (IOException e) {
                        System.out.println("Exif Exception :" + exifStdout.toString() + "\t" + Thread.currentThread());

                    }
                }
                return exifs;
            })
            .run(); // and return 97ms later
    ;

Many thanks John

@johnmcclean

This comment has been minimized.

Show comment
Hide comment
@johnmcclean

johnmcclean Oct 19, 2015

Member

You are welcome Bruno.

The run() method makes the whole thing run asynchronously, I'm guessing that processing hasn't quite completed after 97ms (as the max speed up from your current example code is some function of 100/[num cores]- rather than 400+). You may well see even faster processing times at higher number of threads though - right the way to a thread per file (providing the number of threads remains at a level the OS can handle thousands or even tens of thousands are normally ok).

Member

johnmcclean commented Oct 19, 2015

You are welcome Bruno.

The run() method makes the whole thing run asynchronously, I'm guessing that processing hasn't quite completed after 97ms (as the max speed up from your current example code is some function of 100/[num cores]- rather than 400+). You may well see even faster processing times at higher number of threads though - right the way to a thread per file (providing the number of threads remains at a level the OS can handle thousands or even tens of thousands are normally ok).

@johnmcclean johnmcclean closed this Nov 3, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment