-
Notifications
You must be signed in to change notification settings - Fork 186
/
output_writer_utils.jl
222 lines (179 loc) · 8.6 KB
/
output_writer_utils.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
using StructArrays: StructArray, replace_storage
using Oceananigans.Grids: on_architecture, architecture
using Oceananigans.DistributedComputations
using Oceananigans.DistributedComputations: DistributedGrid, Partition
using Oceananigans.Fields: AbstractField, indices, boundary_conditions, instantiated_location
using Oceananigans.BoundaryConditions: bc_str, FieldBoundaryConditions, ContinuousBoundaryFunction, DiscreteBoundaryFunction
using Oceananigans.TimeSteppers: QuasiAdamsBashforth2TimeStepper, RungeKutta3TimeStepper
using Oceananigans.Models.LagrangianParticleTracking: LagrangianParticles
using Oceananigans.Utils: AbstractSchedule
#####
##### Output writer utilities
#####
struct NoFileSplitting end
(::NoFileSplitting)(model) = false
Base.summary(::NoFileSplitting) = "NoFileSplitting"
Base.show(io::IO, nfs::NoFileSplitting) = print(io, summary(nfs))
mutable struct FileSizeLimit <: AbstractSchedule
size_limit :: Float64
path :: String
end
"""
FileSizeLimit(size_limit [, path=""])
Return a schedule that actuates when the file at `path` exceeds
the `size_limit`.
The `path` is automatically added and updated when `FileSizeLimit` is
used with an output writer, and should not be provided manually.
"""
FileSizeLimit(size_limit) = FileSizeLimit(size_limit, "")
(fsl::FileSizeLimit)(model) = filesize(fsl.path) ≥ fsl.size_limit
function Base.summary(fsl::FileSizeLimit)
current_size_str = pretty_filesize(filesize(fsl.path))
size_limit_str = pretty_filesize(fsl.size_limit)
return string("FileSizeLimit(size_limit=", size_limit_str,
", path=", fsl.path, " (", current_size_str, ")")
end
Base.show(io::IO, fsl::FileSizeLimit) = print(io, summary(fsl))
# Update schedule based on user input
update_file_splitting_schedule!(schedule, filepath) = nothing
function update_file_splitting_schedule!(schedule::FileSizeLimit, filepath)
schedule.path = filepath
return nothing
end
"""
ext(ow)
Return the file extension for the output writer or output
writer type `ow`.
"""
ext(ow::Type{AbstractOutputWriter}) = throw("Extension for $ow is not implemented.")
ext(ow::AbstractOutputWriter) = ext(typeof(fw))
# TODO: add example to docstring below
"""
saveproperty!(file, address, obj)
Save data in `obj` to `file[address]` in a "languate-agnostic" way,
thus primarily consisting of arrays and numbers, absent Julia-specific types
or other data that can _only_ be interpreted by Julia.
"""
saveproperty!(file, address, obj) = _saveproperty!(file, address, obj)
# Generic implementation: recursively unwrap an object.
_saveproperty!(file, address, obj) = [saveproperty!(file, address * "/$prop", getproperty(obj, prop)) for prop in propertynames(obj)]
# Some specific things
saveproperty!(file, address, p::Union{Number, Array}) = file[address] = p
saveproperty!(file, address, p::AbstractRange) = file[address] = collect(p)
saveproperty!(file, address, p::AbstractArray) = file[address] = Array(parent(p))
saveproperty!(file, address, p::Function) = nothing
saveproperty!(file, address, p::Tuple) = [saveproperty!(file, address * "/$i", p[i]) for i in 1:length(p)]
saveproperty!(file, address, grid::AbstractGrid) = _saveproperty!(file, address, on_architecture(CPU(), grid))
function saveproperty!(file, address, grid::DistributedGrid)
arch = architecture(grid)
cpu_arch = Distributed(CPU(); partition = Partition(arch.ranks...))
_saveproperty!(file, address, on_architecture(cpu_arch, grid))
end
# Special saveproperty! so boundary conditions are easily readable outside julia.
function saveproperty!(file, address, bcs::FieldBoundaryConditions)
for boundary in propertynames(bcs)
bc = getproperty(bcs, endpoint)
file[address * "/$endpoint/type"] = bc_str(bc)
if bc.condition isa Function || bc.condition isa ContinuousBoundaryFunction
file[address * "/$boundary/condition"] = missing
else
file[address * "/$boundary/condition"] = bc.condition
end
end
end
"""
serializeproperty!(file, address, obj)
Serialize `obj` to `file[address]` in a "friendly" way; i.e. converting
`CuArray` to `Array` so data can be loaded on any architecture,
and not attempting to serialize objects that generally aren't
deserializable, like `Function`.
"""
serializeproperty!(file, address, p) = file[address] = p
serializeproperty!(file, address, p::AbstractArray) = saveproperty!(file, address, p)
const CantSerializeThis = Union{Function,
ContinuousBoundaryFunction,
DiscreteBoundaryFunction}
serializeproperty!(file, address, p::CantSerializeThis) = nothing
# Convert to CPU please!
# TODO: use on_architecture for more stuff?
serializeproperty!(file, address, grid::AbstractGrid) = file[address] = on_architecture(CPU(), grid)
function serializeproperty!(file, address, grid::DistributedGrid)
arch = architecture(grid)
cpu_arch = Distributed(CPU(); partition = arch.partition)
file[address] = on_architecture(cpu_arch, grid)
end
function serializeproperty!(file, address, p::FieldBoundaryConditions)
# TODO: it'd be better to "filter" `FieldBoundaryCondition` and then serialize
# rather than punting with `missing` instead.
if has_reference(Function, p)
file[address] = missing
else
file[address] = p
end
end
function serializeproperty!(file, address, f::Field)
serializeproperty!(file, address * "/location", instantiated_location(f))
serializeproperty!(file, address * "/data", parent(f))
serializeproperty!(file, address * "/indices", indices(f))
serializeproperty!(file, address * "/boundary_conditions", boundary_conditions(f))
return nothing
end
# Special serializeproperty! for AB2 time stepper struct used by the checkpointer so
# it only saves the fields and not the tendency BCs or χ value (as they can be
# constructed by the `Model` constructor).
function serializeproperty!(file, address, ts::RungeKutta3TimeStepper)
serializeproperty!(file, address * "/Gⁿ", ts.Gⁿ)
serializeproperty!(file, address * "/G⁻", ts.G⁻)
return nothing
end
function serializeproperty!(file, address, ts::QuasiAdamsBashforth2TimeStepper)
serializeproperty!(file, address * "/Gⁿ", ts.Gⁿ)
serializeproperty!(file, address * "/G⁻", ts.G⁻)
serializeproperty!(file, address * "/previous_Δt", ts.previous_Δt)
return nothing
end
serializeproperty!(file, address, p::NamedTuple) = [serializeproperty!(file, address * "/$subp", getproperty(p, subp)) for subp in keys(p)]
serializeproperty!(file, address, s::StructArray) = (file[address] = replace_storage(Array, s))
serializeproperty!(file, address, p::LagrangianParticles) = serializeproperty!(file, address, p.properties)
saveproperties!(file, structure, ps) = [saveproperty!(file, "$p", getproperty(structure, p)) for p in ps]
serializeproperties!(file, structure, ps) = [serializeproperty!(file, "$p", getproperty(structure, p)) for p in ps]
# Don't check arrays because we don't need that noise.
has_reference(T, ::AbstractArray{<:Number}) = false
# This is going to be true.
has_reference(::Type{T}, ::NTuple{N, <:T}) where {N, T} = true
# Short circuit on fields.
has_reference(T::Type{Function}, f::Field) =
has_reference(T, f.data) || has_reference(T, f.boundary_conditions)
"""
has_reference(has_type, obj)
Check (or attempt to check) if `obj` contains, somewhere among its
subfields and subfields of fields, a reference to an object of type
`has_type`. This function doesn't always work.
"""
function has_reference(has_type, obj)
if typeof(obj) <: has_type
return true
elseif applicable(iterate, obj) && length(obj) > 1
return any([has_reference(has_type, elem) for elem in obj])
elseif applicable(propertynames, obj) && length(propertynames(obj)) > 0
return any([has_reference(has_type, getproperty(obj, p)) for p in propertynames(obj)])
else
return typeof(obj) <: has_type
end
end
""" Returns the schedule for output averaging determined by the first output value. """
function output_averaging_schedule(ow::AbstractOutputWriter)
first_output = first(values(ow.outputs))
return output_averaging_schedule(first_output)
end
output_averaging_schedule(output) = nothing # fallback
show_array_type(a::Type{Array{T}}) where T = "Array{$T}"
"""
auto_extension(filename, ext)
If `filename` ends in `ext`, return `filename`. Otherwise return `filename * ext`.
"""
function auto_extension(filename, ext)
Next = length(ext)
filename[end-Next+1:end] == ext || (filename *= ext)
return filename
end