Skip to content

Commit

Permalink
xrootd read support (#150)
Browse files Browse the repository at this point in the history
* xrootd
* bump Julia requirement to >=1.6
  • Loading branch information
Moelf committed Jun 1, 2022
1 parent e04c335 commit e374cd8
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 6 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ jobs:
fail-fast: false
matrix:
version:
- '1.3'
- '1.4'
- '1.5'
- '1.6'
- '1' # Leave this line unchanged. '1' will automatically expand to the latest stable 1.x release of Julia.
- 'nightly'
Expand Down
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ PrettyTables = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d"
StaticArrays = "90137ffa-7385-5640-81b9-e52037218182"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9"
xrootdgo_jll = "9d84c17e-11f2-50ef-8cc9-e9701362097f"

[compat]
AbstractTrees = "^0.3.0"
Expand All @@ -44,7 +45,8 @@ PrettyTables = "^1.2.0"
StaticArrays = "^0.12.0, ^1"
Tables = "^1.0.0"
TypedTables = "^1.0.0"
julia = "^1.3"
julia = "^1.6"
xrootdgo_jll = "^0.31.1"

[extras]
InteractiveUtils = "b77e0a4c-d291-57a0-90e8-8db25a27a240"
Expand Down
5 changes: 5 additions & 0 deletions src/root.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ const HEAD_BUFFER_SIZE = 1024
function ROOTFile(filename::AbstractString; customstructs = Dict("TLorentzVector" => LorentzVector{Float64}))
fobj = if startswith(filename, r"https?://")
HTTPStream(filename)
elseif startswith(filename, "root://")
sep_idx = findlast("//", filename)
baseurl = filename[8:first(sep_idx)-1]
filepath = filename[last(sep_idx):end]
XRDStream(baseurl, filepath, "go")
else
!isfile(filename) && throw(SystemError("opening file $filename", 2))
MmapStream(filename)
Expand Down
57 changes: 55 additions & 2 deletions src/streamsource.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
using xrootdgo_jll
import HTTP

mutable struct XRDStream
gofile_id::Cstring # used as key to a global `map` on the Go side
seekloc::Int
size::Int
end

mutable struct MmapStream # Mmap based
mmap_ary::Vector{UInt8}
seekloc::Int
Expand Down Expand Up @@ -75,10 +82,10 @@ mutable struct HTTPStream
end
end

const SourceStream = Union{MmapStream, HTTPStream}
const SourceStream = Union{MmapStream, HTTPStream, XRDStream}

function Base.read(fobj::SourceStream, ::Type{T}) where T
return reinterpret(T, read(fobj, sizeof(T)))[1] # TODO: use `only`
return only(reinterpret(T, read(fobj, sizeof(T))))
end

function Base.position(fobj::SourceStream)
Expand Down Expand Up @@ -121,3 +128,49 @@ end
function Base.read(fobj::SourceStream)
read(fobj, fobj.size - fobj.seekloc + 1)
end

function XRDStream(urlbase::AbstractString, filepath::AbstractString, username::AbstractString)
file_id = @ccall xrootdgo.Open(urlbase::Cstring, filepath::Cstring, username::Cstring)::Cstring
# file_id = @threadcall((:Open, xrootdgo), Cstring, (Cstring, Cstring, Cstring), urlbase, filepath, username)
size = @ccall xrootdgo.Size(file_id::Cstring)::Int
XRDStream(file_id, 0, size)
end

function Base.close(fobj::XRDStream)
xrootdgo.Close(fobj.gofile_id)
end

function read_seek_nb(fobj::XRDStream, seek, nb)
buffer = Vector{UInt8}(undef, nb)
@threadcall((:ReadAt, xrootdgo), Cvoid, (Ptr{UInt8}, Cstring, Clong, Clong), buffer, fobj.gofile_id, nb, seek)
# @ccall xrootdgo.ReadAt(buffer::Ptr{UInt8},
# fobj.gofile_id::Cstring, nb::Clong, seek::Clong)::Cvoid
return buffer
end
function _read!(ptr, fobj, nb, seekloc)
@ccall xrootdgo.ReadAt(ptr::Ptr{UInt8},
fobj.gofile_id::Cstring, nb::Clong, seekloc::Clong)::Cvoid
end

function _read!(ptr, fobj, nb)
_read!(ptr, fobj, nb, fobj.seekloc)
end

function Base.read(fobj::XRDStream, ::Type{T}) where T
@debug @show T, sizeof(T)
nb = sizeof(T)
output = Ref{T}()
tko = Base.@_gc_preserve_begin output
po = Ptr{UInt8}(pointer_from_objref(output))
_read!(po, fobj, nb, fobj.seekloc)
Base.@_gc_preserve_end tko
fobj.seekloc += nb
return output[]
end

function Base.read(fobj::XRDStream, nb::Integer)
buffer = Vector{UInt8}(undef, nb)
GC.@preserve buffer _read!(buffer, fobj, nb, fobj.seekloc)
fobj.seekloc += nb
return buffer
end
4 changes: 4 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,10 @@ end
end

@testset "SourceStream remote" begin
r = ROOTFile("root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root")
@test r["Events"].fEntries == 29308627
show(devnull, r) # test display

t = LazyTree("https://scikit-hep.org/uproot3/examples/Zmumu.root", "events")
@test t.eta1[1] -1.21769
@test t.eta1[end] -1.57044
Expand Down

0 comments on commit e374cd8

Please sign in to comment.