Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
added handling for session window
Browse files Browse the repository at this point in the history
  • Loading branch information
David Yan committed Jun 22, 2016
1 parent c78ea9b commit b1c6b0c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ class PlainTuple<T> implements Tuple<T>
{
private T value;

public PlainTuple()
private PlainTuple()
{
// for kryo
}

public PlainTuple(T value)
Expand Down Expand Up @@ -49,8 +50,9 @@ class TimestampedTuple<T> extends PlainTuple<T>
{
private long timestamp;

public TimestampedTuple()
private TimestampedTuple()
{
// for kryo
}

public TimestampedTuple(long timestamp, T value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public int compare(Window o1, Window o2)
return -1;
} else if (o1.getDurationMillis() > o2.getDurationMillis()) {
return 1;
} else if (o1 instanceof SessionWindow && o2 instanceof SessionWindow) {
return Long.compare(((SessionWindow) o1).getKey().hashCode(), ((SessionWindow) o2).getKey().hashCode());
} else {
return 0;
}
Expand Down Expand Up @@ -131,10 +133,14 @@ public SessionWindow(K key, long beginTimestamp, long duration)
this.key = key;
}

public K getKey()
{
return key;
}

/**
* Merges the two session windows and forms one window that spans the two windows.
* The caller of this method is responsible for checking whether the two windows are close enough for merging
* (i.e. calling {@link #shouldMerge})
*
* @param w1
* @param w2
Expand All @@ -151,29 +157,5 @@ public static <K> SessionWindow<K> merge(SessionWindow<K> w1, SessionWindow<K> w
return new SessionWindow<>(w1.key, beginTimestamp, endTimestamp - beginTimestamp);
}

/**
* Given the two session windows and the minimum gap of session windows, determine whether
* the two windows should be merged
*
* @param w1
* @param w2
* @param minGap
* @param <K>
* @return
*/
public static <K> boolean shouldMerge(SessionWindow<K> w1, SessionWindow<K> w2, long minGap)
{
if (!((w1.key == null && w2.key == null) || w1.key.equals(w2.key))) {
return false;
}
if (w1.beginTimestamp == w2.beginTimestamp) {
return true;
}
if (w1.beginTimestamp < w2.beginTimestamp) {
return (w1.beginTimestamp + w1.durationMillis + minGap > w2.beginTimestamp);
} else {
return (w2.beginTimestamp + w2.durationMillis + minGap > w1.beginTimestamp);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<
WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows) windowOption;
SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>) dataStorage;
Collection<Map.Entry<Window.SessionWindow, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis());
Window.SessionWindow<KeyT> sessionWindowToAssign;
switch (sessionEntries.size()) {
case 0: {
// There are no existing windows within the minimum gap. Create a new session window
Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, 1);
windowStateMap.put(sessionWindow, new WindowState());
windows.add(sessionWindow);
sessionWindowToAssign = sessionWindow;
break;
}
case 1: {
Expand All @@ -42,7 +43,7 @@ protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<
Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey();
if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp < sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
// The session window already covers the event
windows.add(sessionWindow);
sessionWindowToAssign = sessionWindow;
} else {
// The session window does not cover the event but is within the min gap
if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
Expand All @@ -57,6 +58,7 @@ protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<
windowStateMap.remove(sessionWindow);
sessionStorage.migrateWindow(sessionWindow, newSessionWindow);
windowStateMap.put(newSessionWindow, new WindowState());
sessionWindowToAssign = newSessionWindow;
}
break;
}
Expand All @@ -82,11 +84,13 @@ protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<
sessionStorage.remove(sessionWindow1);
sessionStorage.remove(sessionWindow2);
sessionStorage.put(newSessionWindow, key, newSessionData);
sessionWindowToAssign = newSessionWindow;
break;
}
default:
throw new IllegalStateException("There are more than two sessions matching one timestamp");
}
windows.add(sessionWindowToAssign);
}

@Override
Expand Down

0 comments on commit b1c6b0c

Please sign in to comment.