Skip to content

Commit

Permalink
ReplaySubject enhancement with time and/or size bounds
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed May 20, 2014
1 parent 07d6583 commit c7ce158
Show file tree
Hide file tree
Showing 9 changed files with 1,178 additions and 896 deletions.
16 changes: 8 additions & 8 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4821,7 +4821,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
@Override
public final Subject<T, T> call() {
return OperatorReplay.replayBuffered(bufferSize);
return ReplaySubject.<T>createWithSize(bufferSize);
}
}, selector));
}
Expand Down Expand Up @@ -4889,7 +4889,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
@Override
public final Subject<T, T> call() {
return OperatorReplay.replayWindowed(time, unit, bufferSize, scheduler);
return ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler);
}
}, selector));
}
Expand Down Expand Up @@ -4920,7 +4920,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
@Override
public final Subject<T, T> call() {
return OperatorReplay.<T> createScheduledSubject(OperatorReplay.<T> replayBuffered(bufferSize), scheduler);
return OperatorReplay.<T> createScheduledSubject(ReplaySubject.<T>createWithSize(bufferSize), scheduler);
}
}, selector));
}
Expand Down Expand Up @@ -4979,7 +4979,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
@Override
public final Subject<T, T> call() {
return OperatorReplay.replayWindowed(time, unit, -1, scheduler);
return ReplaySubject.<T>createWithTime(time, unit, scheduler);
}
}, selector));
}
Expand Down Expand Up @@ -5028,7 +5028,7 @@ public final Subject<T, T> call() {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211976.aspx">MSDN: Observable.Replay</a>
*/
public final ConnectableObservable<T> replay(int bufferSize) {
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayBuffered(bufferSize));
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithSize(bufferSize));
}

/**
Expand Down Expand Up @@ -5081,7 +5081,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize < 0");
}
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayWindowed(time, unit, bufferSize, scheduler));
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler));
}

/**
Expand All @@ -5104,7 +5104,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
return new OperatorMulticast<T, T>(this,
OperatorReplay.createScheduledSubject(
OperatorReplay.<T> replayBuffered(bufferSize), scheduler));
ReplaySubject.<T>createWithSize(bufferSize), scheduler));
}

/**
Expand Down Expand Up @@ -5148,7 +5148,7 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211811.aspx">MSDN: Observable.Replay</a>
*/
public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayWindowed(time, unit, -1, scheduler));
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTime(time, unit, scheduler));
}

/**
Expand Down
Loading

0 comments on commit c7ce158

Please sign in to comment.