Skip to content

Streaming Data

01es edited this page Aug 16, 2017 · 15 revisions

Entity companion objects, which due to historical reasons are still associated with the notion of DAO (Data Access Object), provide a family of methods for reading persisted entities from a datastore (some relational database at this stage). One of the most recent addition to this family are the two steaming methods.

These streaming method offer two major benefits:

  • The ability to process large amounts of data without blowing up the heap.
  • The ability to use Java Streaming API as if processing an ordinary stream, but with a database backend.

In order to illustrate both of the points above, let's consider a situation where a large number of timesheets needs to be processed. This assumes that such processing is only possible to be implemented in the application logic using Java. Always remember to EQL's ability to get what you need from a database if possible -- this is a lot more performant!

Processing large datasets

The snippet below uses now deprecated method getAllEntities(qem) to sequentially process the matching timesheets. Hint lightweight() is used to gain additional performance associated with entity instantiation in cases where those are used strictly for read-only purposes.

for (final Timesheet ts: getAllEntities(from(query).with(fetch).lightweight().model())) {
   // do things to a timesheet instance
}

This implementation would blow the heap if the number of entities matched by the specified query is too large. And an unwitting developer who happily covered this functionality with a unit test against a small set of data, would be very much surprised when her code blows in production!

However, one still needs to be able to process all matching timesheets. How can this be done? The answer is -- stream the data! The snippet below demonstrates how this can be done using the companion streaming methods.

try(final Stream<Timesheet> timesheets = stream(from(query).with(fetch).lightweight().model())) {
    timesheets.forEach(ts -> /* do things to a timesheet instance */);
}

Deep underneath the streaming implementation uses an instance of the JDBC ResultSet, which reads the data from the database in batches of the specified size and gets closed as the same time as the associated stream. There are two points of notice here:

  1. It is critical to close the stream once it is no longer needed. Otherwise database resources will start leaking. The best way to achieve this is by using the try-with-resource construct as in the example above. It is alright pass open streams around such as to have methods that return an open stream, but then the users of such streams are responsible for closing them.

  2. The batch size for reading the data can be controlled. Some times it make sense to increase the default batch size (currently 25), which might result in performance improvement -- usually loading less larger chunks of data is faster than loading more smaller chunks of data. Method stream has two versions -- one takes only an instance of QueryExecutionModel, another -- takes a batch size in addition.

The snippet below demonstrates streaming with a custom batch size of 100.

try(final Stream<Timesheet> timesheets = stream(from(query).with(fetch).lightweight().model(), 100)) {
    timesheets.forEach(ts -> /* do things to a timesheet instance */);
}

The general recommendation is to start with the default batch size and only increase it if there are practical benefits when running against the production size data. At the same time, one should NEVER EVER set the batch size as count(query). This is an anti-pattern to "read all data into memory".

More about the expected behaviour of the streaming methods can be obtained by studying CommonEntityDaoStreamingTestCase.

The convenience of the Streaming API

Even in cases of medium and small datasets, the Streaming API is very convenient. Stream methods such as map, takeWhile/dropWhile (Java 9), reduce, .collect(Collectors.partitioningBy(...)), .collection(Collectors.groupBy(...) and even filter are very convenient.

In general filtering a stream is extremely useful. However, in case of EQL it is best to construct a query that would produce a stream only with required entities. Filtering data at the database level is a lot more more efficient than at the application level.

A simple example that collects all entities from a stream into a list is provided below. This example can be thought of as a way to implement getAllEntities(qem). However, such total collecting should not be abuse and should never be used for large result set -- this was the reason for at first deprecating and then removing getAllEntities(qem) from the entity reader API (IEntityReader).

final List<T> allEntities;
try (final Stream<T> stream = stream(qem)) {
    allEntities = stream.collect(Collectors.toList());
}
// use allEntities

The following example demonstrates partitioning of work activities by its type. It is important to understand that the resultant Map contains all work activities that match the query. So, it won't be appropriate in cases of large datasets that can blow the heap.

final Map<WorkActivityType, List<WorkActivity>> wasByType;
try(final Stream<WorkActivity> was = stream(from(query).with(fetch).lightweight().model())) {
    wasByType = was.collect(Collectors.groupingBy(ts -> ts.getWaType()));
}
// do things with wasByType

Always remember to close the stream after using it. It is useful to know that closing of derived streams (such as those obtained by applying map or filter) close the base stream.

An interesting example with head and tail

Consider a situation where one needs to ensure that all timesheets that match some query qem with specific order, have the same person as the first retrieved timesheet entity. An imperative solution that would retrieve all matching timesheets at once could look like this:

    final List<Timesheet> selectedTimesheets = co(Timesheet.class).getAllEntities(qem);
    final Person timesheetPerson = selectedTimesheets.get(0).getPerson();
    for (final Timesheet timesheet : selectedTimesheets) {
        if (!areEqual(timesheet.getPerson(), timesheetPerson)) {
            throw failure(MULTIPLE_CRAFTS_SELECTED_ERROR);
        }
    }

In proper functional languages such as Scala or Clojure streams have operations head and tail, which would be used to translate the above imperative code to an elegant simple functional streaming solution. Regrettably, streams in Java do not have those operations defined and an ad-hoc solution is required.

Here is a Java streaming solution of the imperative code above, which creates head and tail ad-hoc.

try(final Stream<Timesheet> stream = co(Timesheet.class).stream(qem)) {
    final Iterator<Timesheet> iter = stream.iterator();
    final Person timesheetPerson = iter.next().getPerson(); // getting person from head
    final Iterable<T> iterable = () -> iter;
    final Stream<T> restOfTimesheets = StreamSupport.stream(iterable.spliterator(), false);
    if (restOfTimesheets.anyMatch(ts -> !areEqual(ts.getPerson(), timesheetPerson))) {
        throw failure(MULTIPLE_CRAFTS_SELECTED_ERROR);
    }
}

Line by line, the above implementation is a bit longer that the imperative one due to tail stream construction, but has the benefit of being based on a stream (i.e. it is lazy and can process large dataset of matching timesheets).

With the help of a little utility StreamUtils this can be streamlined and more concise.

try(final Stream<Timesheet> stream = co(Timesheet.class).stream(qem)) {
    final T2<Optional<Timesheet>, Stream<Timesheet>> head_and_tail = StreamUtils.head_and_tail(stream);
    final Person timesheetPerson = head_and_tail._1.get().getPerson(); // getting person from head
    if (head_and_tail._2.anyMatch(ts -> !areEqual(ts.getPerson(), timesheetPerson))) {
        throw failure(MULTIPLE_CRAFTS_SELECTED_ERROR);
    }
}
Clone this wiki locally