From 23d87edc7cb984a4dfc94d1733bccaead1a0770b Mon Sep 17 00:00:00 2001 From: Jerry Ling Date: Wed, 1 Jun 2022 11:28:14 -0500 Subject: [PATCH] xrootd read support (#150) * xrootd * bump Julia requirement to >=1.6 --- .github/workflows/ci.yml | 3 --- Project.toml | 4 ++- src/root.jl | 5 ++++ src/streamsource.jl | 57 ++++++++++++++++++++++++++++++++++++++-- test/runtests.jl | 4 +++ 5 files changed, 67 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5dcc9416..f6686aa3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,9 +15,6 @@ jobs: fail-fast: false matrix: version: - - '1.3' - - '1.4' - - '1.5' - '1.6' - '1' # Leave this line unchanged. '1' will automatically expand to the latest stable 1.x release of Julia. - 'nightly' diff --git a/Project.toml b/Project.toml index bace3bf8..490ab880 100644 --- a/Project.toml +++ b/Project.toml @@ -23,6 +23,7 @@ PrettyTables = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d" StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9" +xrootdgo_jll = "9d84c17e-11f2-50ef-8cc9-e9701362097f" [compat] AbstractTrees = "^0.3.0" @@ -44,7 +45,8 @@ PrettyTables = "^1.2.0" StaticArrays = "^0.12.0, ^1" Tables = "^1.0.0" TypedTables = "^1.0.0" -julia = "^1.3" +julia = "^1.6" +xrootdgo_jll = "^0.31.1" [extras] InteractiveUtils = "b77e0a4c-d291-57a0-90e8-8db25a27a240" diff --git a/src/root.jl b/src/root.jl index b5a11293..765c9160 100644 --- a/src/root.jl +++ b/src/root.jl @@ -59,6 +59,11 @@ const HEAD_BUFFER_SIZE = 1024 function ROOTFile(filename::AbstractString; customstructs = Dict("TLorentzVector" => LorentzVector{Float64})) fobj = if startswith(filename, r"https?://") HTTPStream(filename) + elseif startswith(filename, "root://") + sep_idx = findlast("//", filename) + baseurl = filename[8:first(sep_idx)-1] + filepath = filename[last(sep_idx):end] + XRDStream(baseurl, filepath, "go") else !isfile(filename) && throw(SystemError("opening file $filename", 2)) MmapStream(filename) diff --git a/src/streamsource.jl b/src/streamsource.jl index 78935cf6..8ac211b7 100644 --- a/src/streamsource.jl +++ b/src/streamsource.jl @@ -1,5 +1,12 @@ +using xrootdgo_jll import HTTP +mutable struct XRDStream + gofile_id::Cstring # used as key to a global `map` on the Go side + seekloc::Int + size::Int +end + mutable struct MmapStream # Mmap based mmap_ary::Vector{UInt8} seekloc::Int @@ -75,10 +82,10 @@ mutable struct HTTPStream end end -const SourceStream = Union{MmapStream, HTTPStream} +const SourceStream = Union{MmapStream, HTTPStream, XRDStream} function Base.read(fobj::SourceStream, ::Type{T}) where T - return reinterpret(T, read(fobj, sizeof(T)))[1] # TODO: use `only` + return only(reinterpret(T, read(fobj, sizeof(T)))) end function Base.position(fobj::SourceStream) @@ -121,3 +128,49 @@ end function Base.read(fobj::SourceStream) read(fobj, fobj.size - fobj.seekloc + 1) end + +function XRDStream(urlbase::AbstractString, filepath::AbstractString, username::AbstractString) + file_id = @ccall xrootdgo.Open(urlbase::Cstring, filepath::Cstring, username::Cstring)::Cstring + # file_id = @threadcall((:Open, xrootdgo), Cstring, (Cstring, Cstring, Cstring), urlbase, filepath, username) + size = @ccall xrootdgo.Size(file_id::Cstring)::Int + XRDStream(file_id, 0, size) +end + +function Base.close(fobj::XRDStream) + xrootdgo.Close(fobj.gofile_id) +end + +function read_seek_nb(fobj::XRDStream, seek, nb) + buffer = Vector{UInt8}(undef, nb) + @threadcall((:ReadAt, xrootdgo), Cvoid, (Ptr{UInt8}, Cstring, Clong, Clong), buffer, fobj.gofile_id, nb, seek) + # @ccall xrootdgo.ReadAt(buffer::Ptr{UInt8}, + # fobj.gofile_id::Cstring, nb::Clong, seek::Clong)::Cvoid + return buffer +end +function _read!(ptr, fobj, nb, seekloc) + @ccall xrootdgo.ReadAt(ptr::Ptr{UInt8}, + fobj.gofile_id::Cstring, nb::Clong, seekloc::Clong)::Cvoid +end + +function _read!(ptr, fobj, nb) + _read!(ptr, fobj, nb, fobj.seekloc) +end + +function Base.read(fobj::XRDStream, ::Type{T}) where T + @debug @show T, sizeof(T) + nb = sizeof(T) + output = Ref{T}() + tko = Base.@_gc_preserve_begin output + po = Ptr{UInt8}(pointer_from_objref(output)) + _read!(po, fobj, nb, fobj.seekloc) + Base.@_gc_preserve_end tko + fobj.seekloc += nb + return output[] +end + +function Base.read(fobj::XRDStream, nb::Integer) + buffer = Vector{UInt8}(undef, nb) + GC.@preserve buffer _read!(buffer, fobj, nb, fobj.seekloc) + fobj.seekloc += nb + return buffer +end diff --git a/test/runtests.jl b/test/runtests.jl index 82eb6b1e..5a29c749 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -731,6 +731,10 @@ end end @testset "SourceStream remote" begin + r = ROOTFile("root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root") + @test r["Events"].fEntries == 29308627 + show(devnull, r) # test display + t = LazyTree("https://scikit-hep.org/uproot3/examples/Zmumu.root", "events") @test t.eta1[1] ≈ -1.21769 @test t.eta1[end] ≈ -1.57044