Skip to content

Commit

Permalink
Adding through merge
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Feb 19, 2014
1 parent 0e9a845 commit 25417a0
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 6 deletions.
203 changes: 202 additions & 1 deletion lib/rx/operators/multiple.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,107 @@ def combineLatest(other, &result_selector)
end
end

# Concatenates the second observable sequence to the first observable sequence upon successful termination of the first.
def concat(other)
Observable.concat([self, other].to_enum)
end

# Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences.
def merge_concurrent(max_concurrent = 1)
AnonymousObservable.new do |observer|
gate = Mutex.new
q = []
stopped = false
group = CompositeSubscription.new
active = 0

subscriber = lambda do |xs|
subscription = SingleAssignmentSubscription.new
group >> subscription

new_obs = Observer.configure do |o|
o.on_next {|x| gate.synchronize { observer.on_next x } }

o.on_error {|err| gate.synchronize { observer.on_error err } }

o.on_completed do
group.delete subscription
gate.synchronize do
if q.length > 0
s = q.shift
subscriber.call s
else
active -= 1
observer.on_completed if stopped && active == 0
end
end
end
end

xs.subscribe new_obs
end

inner_obs = Observer.configure do |o|
o.on_next do |inner_source|
gate.synchronize do
if active < max_concurrent
active += 1
subscriber.call inner_source
else
q >> inner_source
end
end
end

o.on_error {|err| gate.synchronize { observer.on_error err } }

o.on_completed do
stopped = true
observer.on_completed if active == 0
end
end

group >> subscribe(inner_obs)
end
end

# Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
def merge_all
AnonymousObservable.new do |observer|
gate = Mutex.new
stopped = false
m = SingleAssignmentSubscription.new
group = CompositeDisposable.new [m]

new_obs = Observer.configure do |o|
o.on_next do |inner_source|
inner_subscription = SingleAssignmentSubscription.new
group >> inner_subscription

inner_obs = Observer.configure do |io|
io.on_next {|x| gate.synchronize { observer.on_next x } }

io.on_error {|err| gate.synchronize { observer.on_error x } }

io.on_completed do
group.delete inner_subscription
gate.synchronize { observer.on_completed } if stopped && group.length == 1
end
end

inner_subscription.subscription = inner_source.subscribe inner_obs
end

o.on_error {|err| gate.synchronize { observer.on_error err } }

o.on_completed do
stopped = true
gate.synchronize { observer.on_completed } if group.length == 1
end
end
end
end

class << self

# Propagates the observable sequence that reacts first.
Expand All @@ -210,18 +311,26 @@ def rescue_error(*args)
gate.wait do
current = nil
has_next = false
err = nil

if !disposed
begin
current = e.next
has_next = true
rescue StopIteration => se

rescue => e
err = e
end
else
return
end

if err
observer.on_error err
return
end

unless has_next
if last_error
observer.on_error last_error
Expand Down Expand Up @@ -274,12 +383,104 @@ def combine_latest(*args, &result_selector)
end

observer.on_next(res)
#TODO FInish
elsif enumerable_select_with_index(is_done) {|x, j| j != i} .all?
observer.on_completed
return
end
end

done = lambda do |i|
is_done[i] = true
observer.on_completed if is_done.all?
end

gate = Mutex.new
subscriptions = Array.new(n) do |i|
sas = SingleAssignmentSubscription.new

sas_obs = Observer.configure do |o|
o.on_next do |x|
values[i] = x
next_item.call i
end

o.on_error &observer.method(:on_error)

o.on_completed { done.call i }
end

sas.subscription = args[i].synchronize(gate).subscribe(sas_obs)

subscriptions[i] = sas
end

CompositeSubscription.new subscriptions
end
end
end

# Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.
def concat(*args)
AnonymousObservable.new do |observer|
disposed = false
e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum
subscription = SerialSubscription.new
gate = AsyncLock.new

cancelable = CurrentThreadScheduler.instance.schedule_recursive do |this|
gate.wait do
current = nil
has_next = false
err = nil

if !disposed
begin
current = e.next
has_next = true
rescue StopIteration => se

rescue => e
err = e
end
else
return
end

if err
observer.on_error err
return
end

unless has_next
observer.on_completed
return
end

d = SingleAssignmentSubscription.new
subscription.subscription = d

new_obs = Observer.configure do |o|
o.on_next &observer.method(:on_next)
o.on_error &observer.method(:on_error)
o.on_completed { this.call }
end

current.subscribe new_obs
end
end

CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true }}]
end
end

private

def enumerable_select_with_index(arr, &block)
[].tap do |new_arr|
arr.each_with_index do |item, index|
new_arr.push item if block.call item, index
end
end
end
end
end
4 changes: 2 additions & 2 deletions lib/rx/operators/single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ def materialize

# Repeats the observable sequence indefinitely.
def repeat_infinitely
concat_enumerator(enumerator_repeat_infinitely(self))
Observable.concat(enumerator_repeat_infinitely(self))
end

# Repeats the observable sequence a specified number of times.
def repeat(repeat_count)
concat_enumerator(enumerator_repeat_times(repeat_count, self))
Observable.concat(enumerator_repeat_times(repeat_count, self))
end

# Repeats the source observable sequence until it successfully terminates.
Expand Down
7 changes: 5 additions & 2 deletions lib/rx/operators/standard_query_operators.rb
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,13 @@ def take_while_with_index(&block)
end

# Filters the elements of an observable sequence based on a predicate.
def filter(&block)
def select(&block)
filter_with_index {|x, i| block.call x }
end
alias_method :find_all, :select

# Filters the elements of an observable sequence based on a predicate by incorporating the element's index.
def filter_with_index(&block)
def select_with_index(&block)
AnonymousObservable.new do |observer|
i = 0

Expand All @@ -272,5 +273,7 @@ def filter_with_index(&block)
subscribe(new_observer)
end
end

alias_method :find_all_with_index, :select_with_index
end
end
2 changes: 1 addition & 1 deletion license.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) Microsoft Corporation. All rights reserved.
Copyright (c) Microsoft Open Technologies. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may
Expand Down
Empty file removed output.txt
Empty file.

0 comments on commit 25417a0

Please sign in to comment.