Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for request kwargs in object.jl #31

Merged
merged 1 commit into from
Jan 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/object.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Object(
size::Integer,
eTag::AbstractString) = Object(store, creds, String(key), Int(size), String(eTag))

function Object(store::AbstractStore, key::String; credentials::Union{CloudCredentials, Nothing}=nothing)
function Object(store::AbstractStore, key::String; credentials::Union{CloudCredentials, Nothing}=nothing, kw...)
url = makeURL(store, key)
resp = API.headObject(store, url, HTTP.Headers(); credentials=credentials)
resp = API.headObject(store, url, HTTP.Headers(); credentials=credentials, kw...)
# The ArgumentError will be caused by the HTTP error to provide more context
if HTTP.isredirect(resp)
try
Expand All @@ -46,11 +46,11 @@ function Base.copyto!(dest::AbstractVector{UInt8}, doff::Integer, src::Object, s
return unsafe_copyto!(dest, doff, src, soff, n)
end

function getRange(src::Object, soff::Integer, n::Integer)
function getRange(src::Object, soff::Integer, n::Integer; kw...)
headers = HTTP.Headers()
HTTP.setheader(headers, contentRange((soff - 1):(soff + n - 2)))
url = makeURL(src.store, src.key)
return getObject(src.store, url, headers; credentials=src.credentials).body
return getObject(src.store, url, headers; credentials=src.credentials, kw...).body
end

function Base.unsafe_copyto!(dest::AbstractVector{UInt8}, doff::Integer, src::Object, soff::Integer, n::Integer)
Expand Down Expand Up @@ -112,7 +112,7 @@ function _prefetching_task(io)
return nothing
end

function _download_task(io)
function _download_task(io; kw...)
headers = HTTP.Headers()
object = io.object
url = makeURL(object.store, io.object.key)
Expand All @@ -127,7 +127,7 @@ function _download_task(io)
response_stream.data = buffer_view
response_stream.maxsize = length(buffer_view)
seekstart(response_stream)
_ = getObject(object.store, url, headers; credentials, response_stream)
_ = getObject(object.store, url, headers; credentials, response_stream, kw...)

Base.@lock io.cond.cond_wait begin
io.cond.ntasks -= 1
Expand Down Expand Up @@ -158,7 +158,8 @@ mutable struct PrefetchedDownloadStream{T <: Object} <: IO
function PrefetchedDownloadStream(
object::T,
prefetch_size::Int=DEFAULT_PREFETCH_SIZE;
prefetch_multipart_size::Int=DEFAULT_PREFETCH_MULTIPART_SIZE
prefetch_multipart_size::Int=DEFAULT_PREFETCH_MULTIPART_SIZE,
kw...
) where {T<:Object}
len = length(object)
size = min(prefetch_size, len)
Expand All @@ -177,7 +178,7 @@ mutable struct PrefetchedDownloadStream{T <: Object} <: IO
prefetch_multipart_size > 0 || throw(ArgumentError("`prefetch_multipart_size` must be positive, got $prefetch_multipart_size"))
if size > 0
for _ in 1:min(Threads.nthreads(), max(1, div(size, io.prefetch_multipart_size)))
Threads.@spawn _download_task($io)
Threads.@spawn _download_task($io; $kw...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do a quick local check to make sure this works? My gut says I don't know if it should be ($io; $(kw...)) or what you have here, but maybe it doesn't make a difference. Can you just double check by printing keyword args in CloudBase or HTTP to make sure things are getting passed through correctly? I'd hate to update things and then discover later that specific tasks weren't getting keywords passed through right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I wasn't sure as well so I made this little experiment:

x = [1,2,3]
for _ in 1:5 # works
    Threads.@spawn println($x...)
end

for _ in 1:5 # error
    Threads.@spawn println($(x...))
end

for _ in 1:5 # works
    Threads.@spawn println($(x)...)
end

end
Threads.@spawn _prefetching_task($io)
else
Expand All @@ -194,9 +195,10 @@ mutable struct PrefetchedDownloadStream{T <: Object} <: IO
prefetch_size::Int=DEFAULT_PREFETCH_SIZE;
credentials::Union{CloudCredentials, Nothing}=nothing,
prefetch_multipart_size::Int=DEFAULT_PREFETCH_MULTIPART_SIZE,
kw...,
)
url = makeURL(store, key)
resp = API.headObject(store, url, HTTP.Headers(); credentials=credentials)
resp = API.headObject(store, url, HTTP.Headers(); credentials=credentials, kw...)
# The ArgumentError will be caused by the HTTP error to provide more context
if HTTP.isredirect(resp)
try
Expand All @@ -208,7 +210,7 @@ mutable struct PrefetchedDownloadStream{T <: Object} <: IO
len = parse(Int, HTTP.header(resp, "Content-Length", "0"))
et = etag(HTTP.header(resp, "ETag", ""))
object = Object(store, credentials, String(key), Int(len), String(et))
return PrefetchedDownloadStream(object, prefetch_size; prefetch_multipart_size)
return PrefetchedDownloadStream(object, prefetch_size; prefetch_multipart_size, kw...)
end
end
Base.eof(io::PrefetchedDownloadStream) = io.pos >= io.len
Expand Down