In [1]:
using Rx
using BenchmarkTools

In [2]:
source = file("data")

Rx.SyncFileObservable("data")

In [3]:
subscribe!(source, LambdaActor{String}(
    on_next = (d) -> println(d)
));

H
e
l
l
o
,
w
o
r
l
d
!


In [4]:
subscribe!(source |> uppercase(), LambdaActor{String}(
    on_next = (d) -> println(d)
));

H
E
L
L
O
,
W
O
R
L
D
!


In [5]:
subscribe!(source |> uppercase() |> reduce(String, *) |> map(String, reverse), LambdaActor{String}(
    on_next = (d) -> println(d)
));

HELLO,WORLD!


In [6]:
subscribe!(source |> lowercase() |> scan(String, (s, c) -> c * s), LambdaActor{String}(
    on_next = (d) -> println(d)
));

h
he
hel
hell
hello
hello,
hello,w
hello,wo
hello,wor
hello,worl
hello,world
hello,world!


In [7]:
function regular_approach()
    source = open("data", "r")
    output = ""
    for line in eachline(source)
         output = output * line
    end
    close(source)
    return uppercase(output)
end

regular_approach (generic function with 1 method)

In [8]:
mutable struct KeepActor <: NextActor{String}
    value::String
    
    KeepActor() = new()
end

function Rx.on_next!(actor::KeepActor, data::String) 
     actor.value = data
end

@CreateReduceOperator("Concatenate", String, String, (d, c) -> c * d)

function reactive_approach()
    source = file("data")
    keep = KeepActor()
    subscribe!(source |> ConcatenateReduceOperator("") |> uppercase(), keep);
    return keep.value
end

reactive_approach (generic function with 1 method)

In [9]:
regular_approach() == reactive_approach()

true

In [10]:
@btime regular_approach();

  9.858 μs (54 allocations: 1.88 KiB)


In [11]:
@btime reactive_approach();

  10.202 μs (61 allocations: 2.03 KiB)
