Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Real time timeseries? #44

Closed
mookerji opened this issue Sep 22, 2016 · 2 comments
Closed

Real time timeseries? #44

mookerji opened this issue Sep 22, 2016 · 2 comments
Labels

Comments

@mookerji
Copy link

mookerji commented Sep 22, 2016

Hello! Thanks for all your work on this great project. Coming from Python, I've been looking for a columnar timeseries library in Javascript and was super-pleased to find your project. Do you any plans on adding a timeseries implementation specialized on real-time datastreams? I ask because some of the aspects of the realtime example seem like they could be abstracted out and made into an efficient, immutable OnlineTimeSeries extending the existing API. I'm imagining this would also include:

  • Sliding window with a fixed capacity (based on Immutable ... somehow ... instead of a circular buffer), dropping oldest elements.
  • Support out-of-order writes to a TimeSeries. I think the current constructor errors if the events have out-of-order timestamps? This is tangentially related to Chronological error in TimeSeries constructor #41. It would be nice to be able to relax assumptions about Events timestamps for this use case.
  • ....

In my own project (alas, currently private) I originally implemented something based on Pipeline and UnboundedIn (see below), but the implementation I ultimately used just creates a new TimeSeries from an immutable Collection update.

 let eventSource = new UnboundedIn();
 let collection = new Collection();
 let timeseries = new TimeSeries({name: "test", collection: collection});
 let pipeline = new Pipeline()
  .from(eventSource)
  .to(EventOut,
       event =>
               { collection = collection.addEvent(event);
                 timeseries = new TimeSeries({name: "test",
                                              collection: collection});});
@pjm17971
Copy link
Contributor

pjm17971 commented Sep 23, 2016

I do see the library as serving both realtime and more bounded use cases. Originally the library was designed to do some simple stuff with a Kinesis stream, so it has its roots in doing simple stream processing, but from there grew more towards dealing with static blocks of time series and unifying timeseries and event handling across our stack. So yes, I see the real time aspects being developed further. We have some use cases coming up that might push this forward a little, but I'm interested in hearing what specific things you would like to do.

What you are doing in that example with the Unbounded (now simply Stream), is exactly how it works, though if you aren't doing anything in the pipeline you don't need a Pipeline to do this.

In Pond, a Collection is the unit for dealing with potentially unordered sets of Events and operations that can be done on such a set. A TimeSeries contains a Collection, so creating a new TimeSeries like you are doing is a very light operation. I think therefore that a Collection is the main transactional unit of real time processing, while a TimeSeries is a useful end product that you could visualize, for example.

For unordered Events, that presents a couple of problems in the library as it is now:

  1. If you have an unordered Collection, then you will have to order it yourself (Collection.orderByTime()) before making a TimeSeries, rather than letting that happen automatically. This dynamic is intentional, because its often better to have control over a sort timing rather than taking that hit in a constructor unknown to the user. However, if there's a lot of unordered data and you want that always in a TimeSeries (for a chart say), you'll be sorting all the time. That's not really avoidable though.
  2. The more pressing issue is that unordered events are not supported in windowing (Pipeline.windowBy()) right now, because the trigger for taking a window (an accumulating Collection) and emitting it either as the Collection or an aggregated Event, are based on time stamps exceeding the window's end boundary. That is to say there's no concept at the moment of supporting a watermark of any kind. It would be nice if you could at least say that you didn't want to process a window until you passed some fixed time past the end of the window, or that you didn't want to process it until there was at least n items in it. That change isn't super complicated because there's already kind of a notion of when to emit (on every event, or on discarding the window), so it could do this potentially.

For sliding windows, there's #37 which I'd like to implement. The thought there was to actually accumulate a Collection for each possible position of the sliding window (given each Event's timestamp) rather than roll data in and out of an actual sliding window. This would be able to handle out of order Events (given a simple watermark). Of course this would be a time based window, not a window that always had n events in it. A time based sliding window is of most interest to us, especially for the use case of maintaining a TimeSeries of the most recent hour or day of data.

A couple of other problems for handling real world streaming data is also duplicate events, which you see sometimes coming out of distributed message queuing systems. This also presents a problem in Pond, as it is now, because essentially you'll end up with duplicate Events. Maybe that's where you are at with #46 etc. It seems like it would be nice to have an option of the Collection to replace or ignore duplicate timestamps.

Anyway, those a few thoughts on where the library could go wrt real time data processing. Hope this helps. We're always interested in having other people contribute to this library, either by providing use cases, finding bugs (thanks for #45 !) or getting more deeply involved with PRs.

@mookerji
Copy link
Author

mookerji commented Sep 23, 2016

Thanks for the detailed response!

A bit more description about the use case: we have a stream of structured binary data (https://github.com/swift-nav/libsbp) coming from one or more serial device streams, and each device connected is a GPS receiver. The application consuming this data is on a host connected to the devices, so there aren't really complications from, say, network latency. One of these receivers is a reference receiver for calibration. Events of interest across all streams have a canonical GPS/UTC system timestamp.

  • Each receiver can speak a different protocol, but at some interval will emit a GPS-timestamped message that gets canonicalized to something like:
{"epoch(gpst)": new Date("2016-06-05T00:00:00.000Z"),
 "fix_mode": "integer_rtk",
 "latency(sec)": 0.5,
 "num_sats": 3,
 "abs_error_2d": NaN,
 "abs_error_3d": NaN,
 "abs_error_v": NaN,
 "rover_pos_x(m)":6378137,
 "rover_pos_y(m)":0,
 "rover_pos_z(m)":0,
 "rover_pos_lat(deg)":0,
 "rover_pos_lon(deg)":0,
 "rover_pos_height(m)":0
}

which all gets put into a single columnar TimeSeries for a device.

  • As a real-time measurement stream, the data itself is approximately in chronological order, and this Event can actually be formed from more structured messages (i.e., one message has position, another has latency), hence having updates of Events with timestamps already in the Collection.
  • One additional use case is a dynamic join on the stream. I use events from the reference stream and compare them to the one of the other streams, updating the other streams with a field (e.g., abs_error_2d, abs_error_3d) indicating a calculated error against the reference.

All together, an Event at a single time might get updated three or four times in a short period of time (< 3 sec) after its first materialized and then persists after that. I'm aiming a bit for ease of use here: you could argue that there should be an intermediate stage before it gets written to a TimeSeries, or that maybe even I shouldn't put these calculated values in the same TimeSeries.

Other things:

  • Right about not needing a Pipeline. The current use of this leaves it out.
  • My concern about the time windowing I think is pretty identical to yours. I'm also interested in not having memory usage explode over a few hours and ultimately affect the responsiveness of the application. The collection accumulation idea for timestamps in a window is interesting, but unfortunately I don't really understand that you mean by "watermarking".
  • An option to replace duplicate timestamps in a Collection would be great! I feel like using a backing immutable datastructure would make this a reasonable thing to do.

Thanks for the feedback on PRs. I'll try to do submit them for bugs, if they come up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants