Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make OnDisk backend minimally usable #3014

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/OutputReaders/OutputReaders.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ abstract type AbstractDataBackend end
struct InMemory <: AbstractDataBackend end
struct OnDisk <: AbstractDataBackend end

struct OnDiskData
path :: String
name :: String
end

Base.summary(odd::OnDiskData) = "OnDiskData($(odd.path), $(odd.name))"

include("field_time_series.jl")
include("field_dataset.jl")

Expand Down
240 changes: 83 additions & 157 deletions src/OutputReaders/field_time_series.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,27 @@ struct FieldTimeSeries{LX, LY, LZ, K, I, D, G, T, B, χ} <: AbstractField{LX, LY
indices :: I
times :: χ

function FieldTimeSeries{LX, LY, LZ, K}(data::D, grid::G, bcs::B,
times::χ, indices::I) where {LX, LY, LZ, K, D, G, B, χ, I}
T = eltype(data)
function FieldTimeSeries{LX, LY, LZ, K}(data::D,
grid::G,
bcs::B,
times::χ,
indices::I) where {LX, LY, LZ, K, D, G, B, χ, I}
T = eltype(data)
return new{LX, LY, LZ, K, I, D, G, T, B, χ}(data, grid, bcs, indices, times)
end
end

architecture(fts::FieldTimeSeries) = architecture(fts.grid)

const InMemoryFieldTimeSeries{LX, LY, LZ} = FieldTimeSeries{LX, LY, LZ, InMemory}
const OnDiskFieldTimeSeries{LX, LY, LZ} = FieldTimeSeries{LX, LY, LZ, OnDisk}

struct UnspecifiedBoundaryConditions end

#####
##### Constructors
#####

"""
FieldTimeSeries{LX, LY, LZ}(grid, times, [FT=eltype(grid);]
indices = (:, :, :),
boundary_conditions = nothing)

Return a `FieldTimeSeries` at location `(LX, LY, LZ)`, on `grid`, at `times`.
"""
function FieldTimeSeries{LX, LY, LZ}(grid, times, FT=eltype(grid);
indices = (:, :, :),
boundary_conditions = nothing) where {LX, LY, LZ}

Nt = length(times)
arch = architecture(grid)
loc = (LX, LY, LZ)
space_size = total_size(loc, grid, indices)
underlying_data = zeros(FT, arch, space_size..., Nt)
data = offset_data(underlying_data, grid, loc, indices)

return FieldTimeSeries{LX, LY, LZ, InMemory}(data, grid, boundary_conditions, times, indices)
end

"""
FieldTimeSeries(path, name;
backend = InMemory(),
Expand All @@ -80,18 +67,9 @@ Keyword arguments
comparison to recorded save times. Defaults to times associated with `iterations`.
Takes precedence over `iterations` if `times` is specified.
"""
FieldTimeSeries(path, name; backend=InMemory(), kwargs...) =
FieldTimeSeries(path, name, backend; kwargs...)

#####
##### InMemory time serieses
#####

const InMemoryFieldTimeSeries{LX, LY, LZ} = FieldTimeSeries{LX, LY, LZ, InMemory}

struct UnspecifiedBoundaryConditions end
FieldTimeSeries(path, name; backend=InMemory(), kw...) = FieldTimeSeries(path, name, backend; kw...)

function FieldTimeSeries(path, name, backend::InMemory;
function FieldTimeSeries(path, name, backend;
architecture = nothing,
grid = nothing,
location = nothing,
Expand All @@ -117,68 +95,39 @@ function FieldTimeSeries(path, name, backend::InMemory;
end

isnothing(grid) && (grid = file["serialized/grid"])
close(file)

# Default to CPU if neither architecture nor grid is specified
architecture = isnothing(architecture) ? (isnothing(grid) ? CPU() : Architectures.architecture(grid)) : architecture
architecture = isnothing(architecture) ?
(isnothing(grid) ? CPU() : Architectures.architecture(grid)) : architecture

# This should be removed in a month or two (4/5/2022).
grid = try
on_architecture(architecture, grid)
catch err # Likely, the grid was saved with CuArrays or generated with a different Julia version.
if grid isa RectilinearGrid # we can try...
@info "Initial attempt to transfer grid to $architecture failed."
@info "Attempting to reconstruct RectilinearGrid on $architecture manually..."

Nx = file["grid/Nx"]
Ny = file["grid/Ny"]
Nz = file["grid/Nz"]
Hx = file["grid/Hx"]
Hy = file["grid/Hy"]
Hz = file["grid/Hz"]
xᶠᵃᵃ = file["grid/xᶠᵃᵃ"]
yᵃᶠᵃ = file["grid/yᵃᶠᵃ"]
zᵃᵃᶠ = file["grid/zᵃᵃᶠ"]
x = file["grid/Δxᶠᵃᵃ"] isa Number ? (xᶠᵃᵃ[1], xᶠᵃᵃ[Nx+1]) : xᶠᵃᵃ
y = file["grid/Δyᵃᶠᵃ"] isa Number ? (yᵃᶠᵃ[1], yᵃᶠᵃ[Ny+1]) : yᵃᶠᵃ
z = file["grid/Δzᵃᵃᶠ"] isa Number ? (zᵃᵃᶠ[1], zᵃᵃᶠ[Nz+1]) : zᵃᵃᶠ
topo = topology(grid)

N = (Nx, Ny, Nz)

# Reduce for Flat dimensions
domain = Dict()
for (i, ξ) in enumerate((x, y, z))
if topo[i] !== Flat
if !(ξ isa Tuple)
chopped_ξ = ξ[1:N[i]+1]
else
chopped_ξ = ξ
end
sξ = (:x, :y, :z)[i]
domain[sξ] = chopped_ξ
end
end
grid = on_architecture(architecture, grid)

size = Tuple(N[i] for i=1:3 if topo[i] !== Flat)
halo = Tuple((Hx, Hy, Hz)[i] for i=1:3 if topo[i] !== Flat)
LX, LY, LZ = location

RectilinearGrid(architecture; size, halo, topology=topo, domain...)
else
throw(err)
end
if backend isa InMemory
Nt = length(times)
space_size = total_size(location, grid, indices)
underlying_data = zeros(eltype(grid), architecture, space_size..., Nt)
data = offset_data(underlying_data, grid, location, indices)
elseif backend isa OnDisk
data = OnDiskData(path, name)
else
error("FieldTimeSeries does not support backend $backend!")
end

close(file)

LX, LY, LZ = location
time_series = FieldTimeSeries{LX, LY, LZ}(grid, times; indices, boundary_conditions)

K = typeof(backend)
time_series = FieldTimeSeries{LX, LY, LZ, K}(data, grid, boundary_conditions, times, indices)
set!(time_series, path, name)

return time_series
end

Base.parent(fts::FieldTimeSeries) = parent(fts.data)
Base.parent(fts::InMemoryFieldTimeSeries) = parent(fts.data)
Base.parent(fts::OnDiskFieldTimeSeries) = nothing

@propagate_inbounds Base.getindex(f::FieldTimeSeries{LX, LY, LZ, InMemory}, i, j, k, n) where {LX, LY, LZ} = f.data[i, j, k, n]

function Base.getindex(fts::InMemoryFieldTimeSeries, n::Int)
underlying_data = view(parent(fts), :, :, :, n)
Expand All @@ -189,13 +138,52 @@ function Base.getindex(fts::InMemoryFieldTimeSeries, n::Int)
end

# Making FieldTimeSeries behave like Vector
Base.lastindex(fts::InMemoryFieldTimeSeries) = size(fts, 4)
Base.firstindex(fts::InMemoryFieldTimeSeries) = 1
Base.lastindex(fts::FieldTimeSeries) = size(fts, 4)
Base.firstindex(fts::FieldTimeSeries) = 1

function Base.getindex(fts::FieldTimeSeries{LX, LY, LZ, OnDisk}, n::Int) where {LX, LY, LZ}
# Load data
arch = architecture(fts)
file = jldopen(fts.data.path)
iter = keys(file["timeseries/t"])[n]
raw_data = arch_array(architecture(fts), file["timeseries/$(fts.data.name)/$iter"])
close(file)

# Wrap Field
loc = (LX, LY, LZ)
field_data = offset_data(raw_data, fts.grid, loc, fts.indices)

return Field(loc, fts.grid; indices=fts.indices, boundary_conditions=fts.boundary_conditions, data=field_data)
end

#####
##### set!
#####

set!(time_series::OnDiskFieldTimeSeries, path::String, name::String) = nothing

function set!(time_series::InMemoryFieldTimeSeries, path::String, name::String)

file = jldopen(path)
file_iterations = parse.(Int, keys(file["timeseries/t"]))
file_times = [file["timeseries/t/$i"] for i in file_iterations]
close(file)

for (n, time) in enumerate(time_series.times)
file_index = findfirst(t -> t ≈ time, file_times)
file_iter = file_iterations[file_index]

field_n = Field(location(time_series), path, name, file_iter,
indices = time_series.indices,
boundary_conditions = time_series.boundary_conditions,
grid = time_series.grid)

set!(time_series[n], field_n)
end

return nothing
end

"""
Field(location, path, name, iter;
grid = nothing,
Expand Down Expand Up @@ -231,28 +219,6 @@ function Field(location, path::String, name::String, iter;
return Field(location, grid; boundary_conditions, indices, data)
end

function set!(time_series::InMemoryFieldTimeSeries, path::String, name::String)

file = jldopen(path)
file_iterations = parse.(Int, keys(file["timeseries/t"]))
file_times = [file["timeseries/t/$i"] for i in file_iterations]
close(file)

for (n, time) in enumerate(time_series.times)
file_index = findfirst(t -> t ≈ time, file_times)
file_iter = file_iterations[file_index]

field_n = Field(location(time_series), path, name, file_iter,
indices = time_series.indices,
boundary_conditions = time_series.boundary_conditions,
grid = time_series.grid)

set!(time_series[n], field_n)
end

return nothing
end

function set!(fts::FieldTimeSeries, fields_vector::AbstractVector{<:AbstractField})
raw_data = parent(fts)
file = jldopen(path)
Expand Down Expand Up @@ -304,61 +270,14 @@ function Statistics.mean(fts::FieldTimeSeries; dims=:)
end
end

#####
##### OnDisk time serieses
#####

struct OnDiskData
path :: String
name :: String
end

function FieldTimeSeries(path, name, backend::OnDisk; architecture=nothing, grid=nothing)
file = jldopen(path)

if isnothing(grid)
grid = on_architecture(architecture, file["serialized/grid"])
end

iterations = parse.(Int, keys(file["timeseries/t"]))
times = [file["timeseries/t/$i"] for i in iterations]

data = OnDiskData(path, name)
LX, LY, LZ = file["timeseries/$name/serialized/location"]
bcs = file["timeseries/$name/serialized/boundary_conditions"]
indices = file["timeseries/$name/serialized/indices"]

close(file)

return FieldTimeSeries{LX, LY, LZ, OnDisk}(data, grid, bcs, times, indices)
end

#####
##### Methods
#####

# Include the time dimension.
@inline Base.size(fts::FieldTimeSeries) = (size(location(fts), fts.grid, fts.indices)..., length(fts.times))

@propagate_inbounds Base.getindex(f::FieldTimeSeries{LX, LY, LZ, InMemory}, i, j, k, n) where {LX, LY, LZ} = f.data[i, j, k, n]

function Base.getindex(fts::FieldTimeSeries{LX, LY, LZ, OnDisk}, n::Int) where {LX, LY, LZ}
# Load data
arch = architecture(fts)
file = jldopen(fts.data.path)
iter = keys(file["timeseries/t"])[n]
raw_data = arch_array(architecture(fts), file["timeseries/$(fts.data.name)/$iter"])
close(file)

# Wrap Field
loc = (LX, LY, LZ)
field_data = offset_data(raw_data, fts.grid, loc, fts.indices)

return Field(loc, fts.grid; indices=fts.indices, boundary_conditions=fts.boundary_conditions, data=field_data)
end

Base.setindex!(fts::FieldTimeSeries, val, inds...) = Base.setindex!(fts.data, val, inds...)
Base.parent(fts::FieldTimeSeries{LX, LY, LZ, OnDisk}) where {LX, LY, LZ} = nothing

#####
##### Basic support for reductions
Expand Down Expand Up @@ -423,8 +342,15 @@ function Base.show(io::IO, fts::FieldTimeSeries)
"├── grid: ", summary(fts.grid), "\n",
"├── indices: ", fts.indices, "\n")

suffix = string("└── data: ", summary(fts.data), "\n",
" └── ", data_summary(fts))
suffix = field_time_series_suffix(fts)

return print(io, prefix, suffix)
end

field_time_series_suffix(fts::InMemoryFieldTimeSeries) =
string("└── data: ", summary(fts.data), "\n",
" └── ", data_summary(fts))

field_time_series_suffix(fts::OnDiskFieldTimeSeries) =
string("└── data: ", summary(fts.data))