Skip to content

Commit

Permalink
Add task debugging wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
eschnett committed Mar 27, 2019
1 parent b728e35 commit 354b667
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
18 changes: 13 additions & 5 deletions src/DropboxCLI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ function cmd_put(args)
end

upload_channel = Channel{Tuple{String, String}}(1000)
upload_task = schedule(@task upload_many_files(auth, upload_channel))
# upload_task = schedule(@task upload_many_files(auth, upload_channel))
upload_task = start_task(() -> upload_many_files(auth, upload_channel))
for source in sources
# Distinguish between files and directories
if !ispath(source)
Expand Down Expand Up @@ -675,8 +676,10 @@ function upload_many_files(auth::Authorization,
while isready(upload_channel) || isopen(upload_channel)

upload_spec_channel = Channel{UploadSpec}(0)
# upload_spec_task =
# schedule(@task files_upload(auth, upload_spec_channel))
upload_spec_task =
schedule(@task files_upload(auth, upload_spec_channel))
start_task(() -> files_upload(auth, upload_spec_channel))

max_tasks = 4
tasks = Task[]
Expand All @@ -688,9 +691,12 @@ function upload_many_files(auth::Authorization,
# TODO: run these truly in parallel, using multiple
# processes (this requires transferring the files_upload
# state to other processes)
# push!(tasks,
# schedule(@task upload_one_file(auth, i, source, destination,
# upload_spec_channel)))
push!(tasks,
schedule(@task upload_one_file(auth, i, source, destination,
upload_spec_channel)))
start_task(() -> upload_one_file(auth, i, source, destination,
upload_spec_channel)))
while length(tasks) >= max_tasks
yield()
filter!(!istaskdone, tasks)
Expand Down Expand Up @@ -737,7 +743,9 @@ function upload_one_file(auth::Authorization,
# content_hash = calc_content_hash(content)
# Read in chunks
data_channel = Channel{Vector{UInt8}}(0)
content_hash_task = schedule(@task calc_content_hash(data_channel))
# content_hash_task = schedule(@task calc_content_hash(data_channel))
content_hash_task =
start_task(() -> calc_content_hash(data_channel))
open(source, "r") do io
while !eof(io)
chunksize = 4 * 1024 * 1024
Expand Down
32 changes: 29 additions & 3 deletions src/DropboxSDK.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ end



export start_task
"""
start_task(fun::Function)::Task
Create a new task from `fun` and schedule the task, similar to the
`@task` macro. Also register a hook that shows exception and backtrace
if the task fails.
"""
function start_task(fun::Function)::Task
function wrap()
try
fun()
catch ex
@show ex
rethrow(ex)
end
end
schedule(Task(wrap))
end



include("types.jl")


Expand Down Expand Up @@ -548,7 +570,8 @@ function files_upload(auth::Authorization,
session_id = nothing
offset = Int64(0)
data_channel = Channel{Vector{UInt8}}(0)
content_hash_task = schedule(@task calc_content_hash(data_channel))
# content_hash_task = schedule(@task calc_content_hash(data_channel))
content_hash_task = start_task(() -> calc_content_hash(data_channel))
for chunk in content_channel
isempty(chunk) && continue
if session_id === nothing
Expand Down Expand Up @@ -663,7 +686,9 @@ function files_upload(auth::Authorization,
# TODO: parallelize loop
upload_tasks = Task[]
for upload_spec in upload_spec_channel
push!(upload_tasks, schedule(@task upload_one_file(auth, upload_spec)))
# push!(upload_tasks, schedule(@task upload_one_file(auth, upload_spec)))
push!(upload_tasks,
start_task(() -> upload_one_file(auth, upload_spec)))
end
upload_states = UploadState[]
for t in upload_tasks
Expand Down Expand Up @@ -785,7 +810,8 @@ function upload_one_file(auth::Authorization,
session_id = nothing
offset = Int64(0)
data_channel = Channel{Vector{UInt8}}(0)
content_hash_task = schedule(@task calc_content_hash(data_channel))
# content_hash_task = schedule(@task calc_content_hash(data_channel))
content_hash_task = start_task(() -> calc_content_hash(data_channel))
for chunk in upload_spec.content_channel
isempty(chunk) && continue
if session_id === nothing
Expand Down
21 changes: 14 additions & 7 deletions test/testsdk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ end
# @assert vcat(contents...) == content

data_channel = Channel{Vector{UInt8}}(0)
content_hash_task = schedule(@task calc_content_hash(data_channel))
# content_hash_task = schedule(@task calc_content_hash(data_channel))
content_hash_task = start_task(() -> calc_content_hash(data_channel))
for content in contents
put!(data_channel, content)
end
Expand Down Expand Up @@ -116,8 +117,10 @@ end

@testset "Upload file in chunks" begin
upload_channel = Channel{Vector{UInt8}}(0)
upload_task = schedule(
@task files_upload(auth, "/$folder/file1", upload_channel))
# upload_task = schedule(
# @task files_upload(auth, "/$folder/file1", upload_channel))
upload_task =
start_task(() -> files_upload(auth, "/$folder/file1", upload_channel))
put!(upload_channel, Vector{UInt8}("Hello, "))
put!(upload_channel, Vector{UInt8}("World!\n"))
close(upload_channel)
Expand All @@ -139,8 +142,10 @@ end

@testset "Upload empty file in chunks" begin
upload_channel = Channel{Vector{UInt8}}(0)
upload_task = schedule(
@task files_upload(auth, "/$folder/file2", upload_channel))
# upload_task = schedule(
# @task files_upload(auth, "/$folder/file2", upload_channel))
upload_task =
start_task(() -> files_upload(auth, "/$folder/file2", upload_channel))
close(upload_channel)
metadata = fetch(upload_task)
@test metadata isa FileMetadata
Expand All @@ -161,7 +166,8 @@ end
const numfiles = 4
@testset "Upload several files" begin
upload_spec_channel = Channel{UploadSpec}(0)
upload_task = schedule(@task files_upload(auth, upload_spec_channel))
# upload_task = schedule(@task files_upload(auth, upload_spec_channel))
upload_task = start_task(() -> files_upload(auth, upload_spec_channel))
for i in 0:numfiles-1
data_channel = Channel{Vector{UInt8}}(0)
put!(upload_spec_channel,
Expand Down Expand Up @@ -197,7 +203,8 @@ end

@testset "Upload zero files" begin
upload_spec_channel = Channel{UploadSpec}(0)
upload_task = schedule(@task files_upload(auth, upload_spec_channel))
# upload_task = schedule(@task files_upload(auth, upload_spec_channel))
upload_task = start_task(() -> files_upload(auth, upload_spec_channel))
close(upload_spec_channel)
metadatas = fetch(upload_task)
@test isempty(metadatas)
Expand Down

0 comments on commit 354b667

Please sign in to comment.