Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Joining streams based on time #101

Closed
alexmg opened this issue Jun 24, 2019 · 7 comments
Closed

Joining streams based on time #101

alexmg opened this issue Jun 24, 2019 · 7 comments

Comments

@alexmg
Copy link

alexmg commented Jun 24, 2019

I am hoping to use Trill for some stream processing scenarios that involve joining multiple streams together based purely on time. The streams being joined are named but the events in the stream do not have identifiers that can be used as a join key. A custom operator should be invoked for each set of events when the end times of all dependent streams are known to have advanced to a point where no more data is expected.

In the example below the custom "sum" operator is invoked with values from the 3 input streams and the result is written into an output stream. The result for the last set of values in the first and second streams is waiting for the end time of the third stream to progress before the result is calculated and written to the output stream.

sum

When a new value is written it replaces any previous value, so the start time of that event becomes the end time of any previous event. I am able to achieve this part by using ClipEventDuration method and passing the source stream itself in as the clip stream.

var query1 = stream1.ClipEventDuration(stream1);
var query2 = stream2.ClipEventDuration(stream2);
var query3 = stream3.ClipEventDuration(stream3);

I am not sure how to join the above streams (query1, query2 and query3) in a way that only considers time and not keys on the individual events.

Is it possible to achieve the above scenario with Trill? I am likely missing something obvious due to my lack of familiarity with the API and internals.

@peterfreiling-zz
Copy link

peterfreiling-zz commented Jun 24, 2019

Hi @alexmg, the easiest way to do this is to just to create a Union of all three streams, then aggregate over the result using Sum (or another aggregate operator). Also note that you likely will want to Multicast when performing the self-Clip operations.

            var query = stream1.Multicast(s1 => s1.ClipEventDuration(s1)
                .Union(stream2.Multicast(s2 => s2.ClipEventDuration(s2)))
                .Union(stream3.Multicast(s3 => s3.ClipEventDuration(s3))))
                .Sum(i => i);

The Union operators will merge the two input streams in order, which means it will not output an event until both streams have caught up to that time.

@cybertyche
Copy link
Contributor

@peterfreiling 's solution is exactly right, but I want to make sure it's clear why this is the case.

You'll notice in Peter's solution that other than the ClipEventDuration operations there are no mentions of any time management or manipulation. It's just a simple union between the streams. Why is this? Internal to Trill, all data is already temporal. Union handles making sure the data is in order, but it also leaves all of the intervals in place. The aggregate Max, like every operation in Trill, is inherently temporal, so you don't need to do anything extra to make it work the way you want it to.

@alexmg , it should also be noted that the stream.Multicast(stream.ClipEventDuration(stream)) pattern has come up a few times before. It seems to be a fairly clever windowing pattern. If you're interested in making a contribution to Trill, I can open a concrete work item for it for you to tackle.

@alexmg
Copy link
Author

alexmg commented Jun 26, 2019

Thank you @peterfreiling and @cybertyche for the information. This has helped a lot.

I updated my sample to use Multicast and Union as suggested and this gets things a lot closer to what I am trying to achieve. I noticed that a result is still output at T1 even though there is only a value present for the first stream and not the other dependent streams. Here is the sample code with the suggested changes.

async Task Main()
{
	var start = new DateTime(2019, 06, 26);

	var source1 = new[]
	{
		new Event(start.AddSeconds(1), 1),
		new Event(start.AddSeconds(3), 2),
		new Event(start.AddSeconds(6), 3),
		new Event(start.AddSeconds(8), 2)
	}.ToObservable();

	var source2 = new[]
	{
		new Event(start.AddSeconds(2), 2),
		new Event(start.AddSeconds(4), 1),
		new Event(start.AddSeconds(6), 2),
		new Event(start.AddSeconds(8), 1)
	}.ToObservable();

	var source3 = new[]
	{
		new Event(start.AddSeconds(2), 3),
		new Event(start.AddSeconds(5), 2),
		new Event(start.AddSeconds(7), 1)
	}.ToObservable();

	var stream1 = source1.ToTemporalStreamable(ce => ce.Timestamp.Ticks);
	var stream2 = source2.ToTemporalStreamable(ce => ce.Timestamp.Ticks);
	var stream3 = source3.ToTemporalStreamable(ce => ce.Timestamp.Ticks);

	var query = stream1.Multicast(s1 => s1.ClipEventDuration(s1))
		.Union(stream2.Multicast(s2 => s2.ClipEventDuration(s2)))
		.Union(stream3.Multicast(s3 => s3.ClipEventDuration(s3)))
		.Sum(i => i.Value);
		
	await Display(query);
}

private class Event
{
	public DateTime Timestamp { get; set; }
	
	public int Value { get; set; }
	
	public Event(DateTime timestamp, int value)
	{
		Timestamp = timestamp;
		Value = value;
	}

	public override string ToString()
	{
		return $"Value: {Value}";
	}
}

private static async Task Display<T>(IStreamable<Empty, T> stream, Func<StreamEvent<T>, bool> filter = null)
{
	filter = filter ?? (e => true);
	await stream.ToStreamEventObservable().ForEachAsync(async r =>
	{
		if (!filter(r)) return;
		
		switch (r.Kind)
		{
			case StreamEventKind.Interval:
				await Console.Out.WriteLineAsync($"INTERVAL:    start={new DateTime(r.StartTime)}, end={new DateTime(r.EndTime)}, payload={r.Payload}");
				break;
			case StreamEventKind.Start:
				await Console.Out.WriteLineAsync($"START EDGE:  start={new DateTime(r.StartTime)}, payload={r.Payload}");
				break;
			case StreamEventKind.End:
				await Console.Out.WriteLineAsync($"END EDGE:    " +
					$"end={new DateTime(r.EndTime)}, original start={new DateTime(r.StartTime)}, payload={r.Payload}");
				break;
			case StreamEventKind.Punctuation:
				await Console.Out.WriteLineAsync($"PUNCTUATION: start={new DateTime(r.StartTime)}");
				break;
		}
	});
}

That results in the output below. A value is calculated for stream1 at T1 even though stream2 and stream3 do not receive any values until T2. My aim is to treat all streams as a set of dependencies where a result cannot be calculated until a value is present in all dependent streams.

START EDGE:  start=26/06/2019 12:00:01 AM, payload=1
END EDGE:    end=26/06/2019 12:00:02 AM, original start=26/06/2019 12:00:01 AM, payload=1
START EDGE:  start=26/06/2019 12:00:02 AM, payload=6
END EDGE:    end=26/06/2019 12:00:03 AM, original start=26/06/2019 12:00:02 AM, payload=6
START EDGE:  start=26/06/2019 12:00:03 AM, payload=7
END EDGE:    end=26/06/2019 12:00:04 AM, original start=26/06/2019 12:00:03 AM, payload=7
START EDGE:  start=26/06/2019 12:00:04 AM, payload=6
END EDGE:    end=26/06/2019 12:00:05 AM, original start=26/06/2019 12:00:04 AM, payload=6
START EDGE:  start=26/06/2019 12:00:05 AM, payload=5
END EDGE:    end=26/06/2019 12:00:06 AM, original start=26/06/2019 12:00:05 AM, payload=5
START EDGE:  start=26/06/2019 12:00:06 AM, payload=7
END EDGE:    end=26/06/2019 12:00:07 AM, original start=26/06/2019 12:00:06 AM, payload=7
START EDGE:  start=26/06/2019 12:00:07 AM, payload=6
END EDGE:    end=26/06/2019 12:00:08 AM, original start=26/06/2019 12:00:07 AM, payload=6
START EDGE:  start=26/06/2019 12:00:08 AM, payload=4
PUNCTUATION: start=31/12/9999 11:59:59 PM

I would also like to execute a custom function over the values. Is there a way to get an array of values for each time interval such as [1,2,3] at T2 and [2,2,3] at T3 instead of just the result of an aggregate operation?

@cybertyche What did you have in mind for a contribution around the above windowing pattern? This may be a good way for me to become more familiar with Trill.

@peterfreiling-zz
Copy link

To eliminate that first interval from the results of the Union version of the query, and instead wait for all streams to have matched input, you should be able to use Join instead of Union:

private static int CustomAggregationFunction(int value1, int value2, int value3) => value1 + value2 + value3;
...
var query =
    stream1.Multicast(s1 => s1.ClipEventDuration(s1))
    .Join(
        stream2.Multicast(s2 => s2.ClipEventDuration(s2)),
        (left, right) => new { Item1 = left, Item2 = right })
    .Join(
        stream3.Multicast(s3 => s3.ClipEventDuration(s3)),
        (left, right) => new { left.Item1, left.Item2, Item3 = right })
    .Select(tuple => CustomAggregationFunction(tuple.Item1, tuple.Item2, tuple.Item3));

Or, if your custom aggregation allows, you can calculate incremental results, like the following for Sum:

var query =
    stream1.Multicast(s1 => s1.ClipEventDuration(s1))
    .Join(
        stream2.Multicast(s2 => s2.ClipEventDuration(s2)),
        (left, right) => left + right)
    .Join(
        stream3.Multicast(s3 => s3.ClipEventDuration(s3)),
        (left, right) => left + right);

@cybertyche
Copy link
Contributor

Peter literally just typed exactly what I was in the process of typing. :-)

@cybertyche
Copy link
Contributor

Of particular note may be this blog entry:

https://cloudblogs.microsoft.com/opensource/2019/05/01/trill-102-temporal-joins/

@cybertyche
Copy link
Contributor

cybertyche commented Jun 26, 2019

@alexmg Regarding a contribution, what I'm thinking is something along the lines of this checkin:

3f60d2a

Another user was interested in having a FullOuterJoin operator (not just a Join and a LeftOuterJoin), so we walked through what the LeftOuterJoin method looks like (it calls down to other methods) and then the user did a PR with the new method.

It seems clear that Trill could use a method in the public API that:

  • Takes incoming data that is generally expected to be "non-overlapping", and
  • Edits their lifetimes so that each data point's lifetime lasts until the next point, so that there is one point always valid at a time

Such a method could be implemented just like the FullOuterJoin method, where one does not need to have deep knowledge of the Trill implementation but still needs to understand the algebra to do it right. For instance:

  • Should there be a contingency in case multiple events have the same timestamp? And if so, what?
  • Should there be a "timeout"? If most data points arrive fairly close together, but the some are really far apart, should there be a maximum timespan for each data point?

And possibly most importantly, the new method would need a good name. :-)

If you're interested, I can start a work item just for this and we could work through the various issues as they come up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants