Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/BenchmarksAndMicroIntegration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Benchmarks and MicroIntegration

on:
push:
branches:
- master
pull_request:

jobs:
test:
name: Benchmarks and MicroIntegration
runs-on: ubuntu-latest
strategy:
fail-fast: false
steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@v1
with:
version: 1
arch: x64
- uses: julia-actions/julia-buildpkg@latest
- name: setup enviroment
shell: julia --color=yes --project=perf {0}
run: |
using Pkg
try
# force it to use this PR's version of the package
pkg"add Turing#hg/new-libtask2" # TODO: remove this when Turing is updated
Pkg.develop(PackageSpec(path=".")) # resolver may fail with main deps
Pkg.update()
catch err
err isa Pkg.Resolve.ResolverError || rethrow()
# If we can't resolve that means this is incompatible by SemVer and this is fine
# It means we marked this as a breaking change, so we don't need to worry about
# Mistakenly introducing a breaking change, as we have intentionally made one
@info "Not compatible with this release. No problem." exception=err
exit(0) # Exit immediately, as a success
end
- name: run
run: julia --color=yes --project=perf perf/runtests.jl
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ uuid = "6f1fad26-d15e-5dc8-ae53-837a1d7b8c9f"
license = "MIT"
desc = "Tape based task copying in Turing"
repo = "https://github.com/TuringLang/Libtask.jl.git"
version = "0.6.2"
version = "0.6.3"

[deps]
IRTools = "7869d1d1-7146-5819-86e3-90919afe41df"
LRUCache = "8ac3fa9e-de4c-5943-b1dc-09c6b5f20637"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
Expand Down
14 changes: 14 additions & 0 deletions perf/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[deps]
AbstractMCMC = "80f14c24-f653-4e6a-9b94-39d6b0f70001"
AdvancedPS = "576499cb-2369-40b2-a588-c64705576edc"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
DynamicPPL = "366bfd00-2699-11ea-058f-f148b4cae6d8"
Libtask = "6f1fad26-d15e-5dc8-ae53-837a1d7b8c9f"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Turing = "fce5fe82-541a-59a6-adf8-730c64b5f9a0"

[compat]
julia = "1.3"

[targets]
test = ["Test", "BenchmarkTools"]
47 changes: 47 additions & 0 deletions perf/p0.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# ]add Turing#hg/new-libtask2

using Libtask
using Turing, DynamicPPL, AdvancedPS
using BenchmarkTools

@model gdemo(x, y) = begin
# Assumptions
σ ~ InverseGamma(2,3)
μ ~ Normal(0,sqrt(σ))
# Observations
x ~ Normal(μ, sqrt(σ))
y ~ Normal(μ, sqrt(σ))
end


# Case 1: Sample from the prior.

m = Turing.Core.TracedModel(gdemo(1.5, 2.), SampleFromPrior(), VarInfo())

f = m.evaluator[1];

args = m.evaluator[2:end];

@show "Directly call..."
@btime f(args...)
# (2.0, VarInfo (2 variables (μ, σ), dimension 2; logp: -6.162))

@show "CTask construction..."
t = @btime Libtask.CTask(f, args...)
# schedule(t.task) # work fine!
# @show Libtask.result(t.tf.tape)
@show "Step in a tape..."
@btime Libtask.step_in(t.tf.tape, args)

# Case 2: SMC sampler

m = Turing.Core.TracedModel(gdemo(1.5, 2.), Sampler(SMC(50)), VarInfo());
@show "Directly call..."
@btime m.evaluator[1](m.evaluator[2:end]...)

@show "CTask construction..."
t = @btime Libtask.CTask(m.evaluator[1], m.evaluator[2:end]...);
# schedule(t.task)
# @show Libtask.result(t.tf.tape)
@show "Step in a tape..."
@btime Libtask.step_in(t.tf.tape, m.evaluator[2:end])
39 changes: 39 additions & 0 deletions perf/p1.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Turing, Test, AbstractMCMC, DynamicPPL, Random

import AbstractMCMC.AbstractSampler

function check_numerical(chain,
symbols::Vector,
exact_vals::Vector;
atol=0.2,
rtol=0.0)
for (sym, val) in zip(symbols, exact_vals)
E = val isa Real ?
mean(chain[sym]) :
vec(mean(chain[sym], dims=1))
@info (symbol=sym, exact=val, evaluated=E)
@test E ≈ val atol=atol rtol=rtol
end
end

function check_MoGtest_default(chain; atol=0.2, rtol=0.0)
check_numerical(chain,
[:z1, :z2, :z3, :z4, :mu1, :mu2],
[1.0, 1.0, 2.0, 2.0, 1.0, 4.0],
atol=atol, rtol=rtol)
end

@model gdemo_d(x, y) = begin
s ~ InverseGamma(2, 3)
m ~ Normal(0, sqrt(s))
x ~ Normal(m, sqrt(s))
y ~ Normal(m, sqrt(s))
return s, m
end

alg = CSMC(15)
chain = sample(gdemo_d(1.5, 2.0), alg, 5_000)

@show chain

check_numerical(chain, [:s, :m], [49/24, 7/6], atol=0.1)
63 changes: 63 additions & 0 deletions perf/p2.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Turing, Test, AbstractMCMC, DynamicPPL, Random, Turing.RandomMeasures, Libtask

@model infiniteGMM(x) = begin
# Hyper-parameters, i.e. concentration parameter and parameters of H.
α = 1.0
μ0 = 0.0
σ0 = 1.0

# Define random measure, e.g. Dirichlet process.
rpm = DirichletProcess(α)

# Define the base distribution, i.e. expected value of the Dirichlet process.
H = Normal(μ0, σ0)

# Latent assignment.
z = tzeros(Int, length(x))

# Locations of the infinitely many clusters.
μ = tzeros(Float64, 0)

for i in 1:length(x)

# Number of clusters.
K = maximum(z)
nk = Vector{Int}(map(k -> sum(z .== k), 1:K))

# Draw the latent assignment.
z[i] ~ ChineseRestaurantProcess(rpm, nk)

# Create a new cluster?
if z[i] > K
push!(μ, 0.0)

# Draw location of new cluster.
μ[z[i]] ~ H
end

# Draw observation.
x[i] ~ Normal(μ[z[i]], 1.0)
end
end

# Generate some test data.
Random.seed!(1)

data = vcat(randn(10), randn(10) .- 5, randn(10) .+ 10)
data .-= mean(data)
data /= std(data)

# MCMC sampling
Random.seed!(2)
iterations = 500
model_fun = infiniteGMM(data)

m = Turing.Core.TracedModel(model_fun, Sampler(SMC(50)), VarInfo())
f = m.evaluator[1]
args = m.evaluator[2:end]

t = Libtask.CTask(f, args...)

Libtask.step_in(t.tf.tape, args)

@show Libtask.result(t.tf.tape)
3 changes: 3 additions & 0 deletions perf/runtests.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
include("p0.jl")
include("p1.jl")
include("p2.jl")
2 changes: 2 additions & 0 deletions src/Libtask.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module Libtask
using IRTools
using MacroTools

using LRUCache

export CTask, consume, produce
export TArray, tzeros, tfill, TRef

Expand Down
68 changes: 63 additions & 5 deletions src/tapedfunction.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ mutable struct Tape
owner
end

"""
Instruction

An `Instruction` stands for a function call
"""
mutable struct Instruction{F} <: AbstractInstruction
fun::F
input::Tuple
Expand Down Expand Up @@ -46,13 +51,20 @@ function Base.show(io::IO, box::Box)
println(io, "Box($(box.val))")
end

function Base.show(io::IO, instruction::AbstractInstruction)
println(io, "A $(typeof(instruction))")
end

function Base.show(io::IO, instruction::Instruction)
fun = instruction.fun
tape = instruction.tape
println(io, "Instruction($(fun)$(map(val, instruction.input)), tape=$(objectid(tape)))")
end

function Base.show(io::IO, tp::Tape)
# we use an extra IOBuffer to collect all the data and then
# output it once to avoid output interrupt during task context
# switching
buf = IOBuffer()
print(buf, "$(length(tp))-element Tape")
isempty(tp) || println(buf, ":")
Expand All @@ -66,10 +78,30 @@ function Base.show(io::IO, tp::Tape)
end

function (instr::Instruction{F})() where F
output = instr.fun(map(val, instr.input)...)
instr.output.val = output
# catch run-time exceptions / errors.
try
output = instr.fun(map(val, instr.input)...)
instr.output.val = output
catch e
println(e, catch_backtrace());
rethrow(e);
end
end

function _new end
function (instr::Instruction{typeof(_new)})()
# catch run-time exceptions / errors.
try
expr = Expr(:new, map(val, instr.input)...)
output = eval(expr)
instr.output.val = output
catch e
println(e, catch_backtrace());
rethrow(e);
end
end


function increase_counter!(t::Tape)
t.counter > length(t) && return
# instr = t[t.counter]
Expand Down Expand Up @@ -101,6 +133,19 @@ function run_and_record!(tape::Tape, f, args...)
return output
end

function run_and_record!(tape::Tape, ::typeof(_new), args...)
output = try
expr = Expr(:new, map(val, args)...)
box(eval(expr))
catch e
@warn e
Box{Any}(nothing)
end
ins = Instruction(_new, args, output, tape)
push!(tape, ins)
return output
end

function unbox_condition(ir)
for blk in IRTools.blocks(ir)
vars = keys(blk)
Expand Down Expand Up @@ -169,9 +214,15 @@ function intercept(ir; recorder=:run_and_record!)

for (x, st) in ir
x == tape && continue
Meta.isexpr(st.expr, :call) || continue
new_args = (x == args_var) ? st.expr.args : _replace_args(st.expr.args, arg_pairs)
ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, new_args...)
if Meta.isexpr(st.expr, :call)
new_args = (x == args_var) ? st.expr.args : _replace_args(st.expr.args, arg_pairs)
ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, new_args...)
elseif Meta.isexpr(st.expr, :new)
args = st.expr.args
ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, _new, args...)
else
@warn "Unknown IR code: " st
end
end
# the real return value will be in the last instruction on the tape
IRTools.return!(ir, tape)
Expand All @@ -190,6 +241,13 @@ mutable struct TapedFunction
end
end

function reset!(tf::TapedFunction, ir::IRTools.IR, tape::Tape)
tf.ir = ir
tf.tape = tape
setowner!(tape, tf)
return tf
end

function (tf::TapedFunction)(args...)
if isempty(tf.tape)
ir = IRTools.@code_ir tf.func(args...)
Expand Down
29 changes: 22 additions & 7 deletions src/tapedtask.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,22 @@ struct TapedTask
end
end

const TRCache = LRU{Any, Any}(maxsize=10)

function TapedTask(tf::TapedFunction, args...)
tf.owner != nothing && error("TapedFunction is owned to another task.")
isempty(tf.tape) && tf(args...)
tf.owner !== nothing && error("TapedFunction is owned by another task.")
if isempty(tf.tape)
cache_key = (tf.func, typeof.(args)...)
if haskey(TRCache, cache_key)
ir, tape = TRCache[cache_key]
# Here we don't need change the initial arguments of the tape,
# it will be set when we `step_in` to the tape.
reset!(tf, ir, copy(tape, Dict{UInt64, Any}(); partial=false))
else
tf(args...)
TRCache[cache_key] = (tf.ir, tf.tape)
end
end
produce_ch = Channel()
consume_ch = Channel{Int}()
task = @task try
Expand Down Expand Up @@ -199,14 +212,16 @@ function Base.copy(x::Instruction, on_tape::Tape, roster::Dict{UInt64, Any})
Instruction(x.fun, input, output, on_tape)
end

function Base.copy(t::Tape, roster::Dict{UInt64, Any})
function Base.copy(t::Tape, roster::Dict{UInt64, Any}; partial=true)
old_data = t.tape
new_data = Vector{AbstractInstruction}()
new_tape = Tape(new_data, t.counter, t.owner)
len = partial ? length(old_data) - t.counter + 1 : length(old_data)
start = partial ? t.counter : 1
new_data = Vector{AbstractInstruction}(undef, len)
new_tape = Tape(new_data, 1, t.owner)

for x in old_data
for (i, x) in enumerate(old_data[start:end])
new_ins = copy(x, new_tape, roster)
push!(new_data, new_ins)
new_data[i] = new_ins
end

return new_tape
Expand Down
Loading