-
Notifications
You must be signed in to change notification settings - Fork 2
/
import_edf.jl
859 lines (726 loc) · 36.5 KB
/
import_edf.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
@generated function _named_tuple(x)
names = fieldnames(x)
types = Tuple{fieldtypes(x)...}
body = Expr(:tuple)
for i in 1:fieldcount(x)
push!(body.args, :(getfield(x, $i)))
end
return :(NamedTuple{$names,$types}($body))
end
function _err_msg(e, msg="Error while converting EDF:")
bt = catch_backtrace()
msg *= '\n' * sprint(showerror, e, bt)
@error msg
return msg
end
function _errored_row(row, e)
msg = _err_msg(e, "Skipping signal $(row.label): error while extracting channels")
return rowmerge(row; error=msg)
end
function _errored_rows(rows, e)
labels = [row.label for row in rows]
labels_str = join(string.('"', labels, '"'), ", ", ", and ")
msg = _err_msg(e, "Skipping signals $(labels_str): error while extracting channels")
return rowmerge.(rows; error=msg)
end
#####
##### `EDF.Signal` label handling
#####
# This function:
# - ensures the given label is whitespace-stripped, lowercase, and parens-free
# - strips trailing generic EDF references (e.g. "ref", "ref2", etc.)
# - replaces all references with the appropriate name as specified by `canonical_names`
# - replaces `+` with `_plus_` and `/` with `_over_`
# - returns the initial reference name (w/o prefix sign, if present) and the entire label;
# the initial reference name should match the canonical channel name,
# otherwise the channel extraction will be rejected.
function _normalize_references(original_label, canonical_names)
label = replace(_safe_lowercase(original_label), r"\s"=>"")
label = replace(replace(label, '('=>""), ')'=>"")
label = replace(label, r"\*$"=>"")
label = replace(label, '-'=>'…')
label = replace(label, '+'=>"…+…")
label = replace(label, '/'=>"…/…")
m = match(r"^\[(.*)\]$", label)
if m !== nothing
label = only(m.captures)
end
parts = split(label, '…'; keepempty=false)
final = findlast(part -> replace(part, r"\d" => "") != "ref", parts)
parts = parts[1:something(final, 0)]
isempty(parts) && return ("", "")
for n in canonical_names
if n isa Pair
primary, alternatives = n
primary = string(primary)
for alternative in (string(a) for a in alternatives)
for i in 1:length(parts)
if parts[i] == alternative
parts[i] = primary
end
end
end
end
end
recombined = '-'^startswith(original_label, '-') * join(parts, '-')
recombined = replace(recombined, "-+-"=>"_plus_")
recombined = replace(recombined, "-/-"=>"_over_")
return first(parts), recombined
end
_safe_lowercase(c::Char) = isvalid(c) ? lowercase(c) : c
# malformed UTF-8 chars are a choking hazard
_safe_lowercase(s::AbstractString) = map(_safe_lowercase, s)
"""
OndaEDF.match_edf_label(label, signal_names, channel_name, canonical_names)
Return a normalized label matched from an EDF `label`. The purpose of this
function is to remove signal names from the label, and to canonicalize the
channel name(s) that remain. So something like "[eCG] avl-REF" will be
transformed to "avl" (given `signal_names=["ecg"]`, and `channel_name="avl"`)
This returns `nothing` if `channel_name` does not match after normalization.
Canonicalization
- ensures the given label is whitespace-stripped, lowercase, and parens-free
- strips trailing generic EDF references (e.g. "ref", "ref2", etc.)
- replaces all references with the appropriate name as specified by
`canonical_names`
- replaces `+` with `_plus_` and `/` with `_over_`
- returns the initial reference name (w/o prefix sign, if present) and the
entire label; the initial reference name should match the canonical channel
name, otherwise the channel extraction will be rejected.
## Examples
```julia
match_edf_label("[ekG] avl-REF", ["ecg", "ekg"], "avl", []) == "avl"
match_edf_label("ECG 2", ["ecg", "ekg"], "ii", ["ii" => ["2", "two", "ecg2"]]) == "ii"
```
See the tests for more examples
!!! note
This is an internal function and is not meant to be called directly.
"""
function match_edf_label(label, signal_names, channel_name, canonical_names)
label = _safe_lowercase(label)
# ideally, we'd do the original behavior:
#
# match exact STANDARD (or custom) signal types at beginning of label,
# ignoring case possibly bracketed by or prepended with `[`, `]`, `,` or
# whitespace everything after is included in the spec a.k.a. label
#
# for instance, if `signal_names = ["ecg", "ekg"]`, this would convert
# - "[EKG] 2-REF"
# - " eCg 2"
# - ",ekg,2"
#
# into "2"
#
# however, the original behavior requires compiling and matching a different
# regex for every possible `signal_names` entry (across all labels), for
# every signal. this adds ENORMOUS overhead compared to the rest of the
# import pipeline (>90% of total time was spent in regex stuff) so instead
# we do an approximation: treat ANYTHING between whitespace, [], or ',', as
# teh signal, adn remove it (and the enclosing chars) if it is exactly equal
# to any of the input `signal_names` (after lowercasing).
#
# This is not equivalent to the original behavior in only a handful of
# cases
#
# - if one of the `signal_names` is a suffix of the signal, like `"pap"`
# matching against `"xpap cpap"`. the fix for this is to add the full
# signal name to the (end) of `signal_names` in the label set.
# - if the signal name itself contains whitespace or one of `",[]"`, it
# will not match. the fix for this is to preprocess signal headers before
# `plan_edf_to_onda_samples` to normalize known instances (after reviewing the plan)
m = match(r"[\s\[,\]]*(?<signal>.+?)[\s,\]]*\s+(?<spec>.+)"i, label)
if !isnothing(m) && m[:signal] in signal_names
label = m[:spec]
end
label = replace(label, r"\s*-\s*" => "-")
initial, normalized_label = _normalize_references(label, canonical_names)
initial == channel_name && return normalized_label
return nothing
end
#####
##### encodings
#####
struct MismatchedSampleRateError <: Exception
sample_rates
end
function Base.showerror(io::IO, err::MismatchedSampleRateError)
print(io, """
found mismatched sample rate between channel encodings: $(err.sample_rates)
OndaEDF does not currently automatically resolve mismatched sample rates;
please preprocess your data before attempting `import_edf` so that channels
of the same signal share a common sample rate.
""")
end
# I wasn't super confident that the `sample_offset_in_unit` calculation I derived
# had the correctness/symmetry I was hoping it had, so basic algebra time (written
# so that you can try each step out in the REPL with random values):
#
# res = (pmax - pmin) / (dmax - dmin)
# pmax - (sample_resolution_in_unit * dmax) ≈ pmin - (sample_resolution_in_unit * dmin)
# pmax - ((pmax * dmax - pmin * dmax) / (dmax - dmin)) ≈ pmin - ((pmax * dmin - pmin * dmin) / (dmax - dmin))
# pmax - ((pmax * dmax - pmin * dmax) / (dmax - dmin)) + ((pmax * dmin - pmin * dmin) / (dmax - dmin)) ≈ pmin
# pmax + ((pmin * dmax - pmax * dmax) / (dmax - dmin)) + ((pmax * dmin - pmin * dmin) / (dmax - dmin)) ≈ pmin
# pmax + (pmin * dmax - pmax * dmax + pmax * dmin - pmin * dmin) / (dmax - dmin) ≈ pmin
# pmax + (pmin * dmax + pmax * (-dmax) + pmax * dmin + pmin * (-dmin)) / (dmax - dmin) ≈ pmin
# pmax + (pmax*(dmin - dmax) + pmin*(dmax - dmin)) / (dmax - dmin) ≈ pmin
# pmax + pmin + (pmax*(dmin - dmax)/(dmax - dmin)) ≈ pmin
# pmax + pmin + (-pmax) ≈ pmin
# pmin ≈ pmin
function edf_signal_encoding(edf_signal_header, edf_seconds_per_record)
dmin, dmax = edf_signal_header.digital_minimum, edf_signal_header.digital_maximum
pmin, pmax = edf_signal_header.physical_minimum, edf_signal_header.physical_maximum
sample_resolution_in_unit = (pmax - pmin) / (dmax - dmin)
sample_offset_in_unit = pmin - (sample_resolution_in_unit * dmin)
sample_rate = edf_signal_header.samples_per_record / edf_seconds_per_record
sample_type = (dmax > typemax(Int16) || dmin < typemin(Int16)) ? "int32" : "int16"
return (sample_resolution_in_unit=Float64(sample_resolution_in_unit),
sample_offset_in_unit=Float64(sample_offset_in_unit),
sample_rate=Float64(sample_rate),
sample_type=sample_type)
end
# TODO: replace this with float type for mismatched
"""
promote_encodings(encodings; pick_offset=(_ -> 0.0), pick_resolution=minimum)
Return a common encoding for input `encodings`, as a `NamedTuple` with fields
`sample_type`, `sample_offset_in_unit`, `sample_resolution_in_unit`, and
`sample_rate`. If input encodings' `sample_rate`s are not all equal, an error
is thrown. If sample rates/offests are not equal, then `pick_offset` and
`pick_resolution` are used to combine them into a common offset/resolution.
!!! note
This is an internal function and is not meant to be called direclty.
"""
function promote_encodings(encodings; pick_offset=(_ -> 0.0), pick_resolution=minimum)
encoding_fields = (:sample_rate,
:sample_offset_in_unit,
:sample_resolution_in_unit,
:sample_type)
if any(ismissing,
getproperty(row, p)
for p in encoding_fields
for row in encodings)
return (; sample_type=missing,
sample_offset_in_unit=missing,
sample_resolution_in_unit=missing,
sample_rate=missing)
end
sample_type = mapreduce(Onda.sample_type, promote_type, encodings)
sample_rates = [e.sample_rate for e in encodings]
if all(==(first(sample_rates)), sample_rates)
sample_rate = first(sample_rates)
else
throw(MismatchedSampleRateError(sample_rates))
end
offsets = [e.sample_offset_in_unit for e in encodings]
if all(==(first(offsets)), offsets)
sample_offset_in_unit = first(offsets)
else
sample_type = Int32
sample_offset_in_unit = pick_offset(offsets)
end
resolutions = [e.sample_resolution_in_unit for e in encodings]
if all(==(first(resolutions)), resolutions)
sample_resolution_in_unit = first(resolutions)
else
sample_type = Int32
sample_resolution_in_unit = pick_resolution(resolutions)
end
return (sample_type=Onda.onda_sample_type_from_julia_type(sample_type),
sample_offset_in_unit=sample_offset_in_unit,
sample_resolution_in_unit=sample_resolution_in_unit,
sample_rate=sample_rate)
end
#####
##### `EDF.Signal`s -> `Onda.Samples`
#####
const SAMPLES_ENCODED_WARNING = """
!!! warning
Returned samples are integer-encoded. If these samples are being serialized out (e.g. via `Onda.store!`)
this is not an issue, but if the samples are being immediately analyzed in memory, call `Onda.decode`
to decode them to recover the time-series voltages.
"""
struct SamplesInfoError <: Exception
msg::String
cause::Exception
end
function Base.showerror(io::IO, e::SamplesInfoError)
print(io, "SamplesInfoError: ", e.msg, " caused by: ")
Base.showerror(io, e.cause)
end
function groupby(f, list)
d = Dict()
for v in list
push!(get!(d, f(v), Vector{eltype(list)}()), v)
end
return d
end
# unpack a single channel spec from labels:
# "channel"
canonical_channel_name(channel_name) = channel_name
# "channel" => ["alt1", "alt2", ...]
canonical_channel_name(channel_alternates::Pair) = first(channel_alternates)
plan_edf_to_onda_samples(signal::EDF.Signal, s; kwargs...) = plan_edf_to_onda_samples(signal.header, s; kwargs...)
plan_edf_to_onda_samples(header::EDF.SignalHeader, s; kwargs...) = plan_edf_to_onda_samples(_named_tuple(header), s; kwargs...)
"""
plan_edf_to_onda_samples(header, seconds_per_record; labels=STANDARD_LABELS,
units=STANDARD_UNITS)
plan_edf_to_onda_samples(signal::EDF.Signal, args...; kwargs...)
Formulate a plan for converting an EDF signal into Onda format. This returns a
Tables.jl row with all the columns from the signal header, plus additional
columns for the `Onda.SamplesInfo` for this signal, and the `seconds_per_record`
that is passed in here.
If no labels match, then the `channel` and `kind` columns are `missing`; the
behavior of other `SamplesInfo` columns is undefined; they are currently set to
missing but that may change in future versions.
Any errors that are thrown in the process will be wrapped as `SampleInfoError`s
and then printed with backtrace to a `String` in the `error` column.
## Matching EDF label to Onda labels
The `labels` keyword argument determines how Onda `channel` and signal `kind`
are extracted from the EDF label.
Labels are specified as an iterable of `signal_names => channel_names` pairs.
`signal_names` should be an iterable of signal names, the first of which is the
canonical name used as the Onda `kind`. Each element of `channel_names` gives
the specification for one channel, which can either be a string, or a
`canonical_name => alternates` pair. Occurences of `alternates` will be
replaces with `canonical_name` in the generated channel label.
Matching is determined _solely_ by the channel names. When matching, the signal
names are only used to remove signal names occuring as prefixes (e.g., "[ECG]
AVL") before matching channel names. See [`match_edf_label`](@ref) for details,
and see `OndaEDF.STANDARD_LABELS` for the default labels.
As an example, here is (a subset of) the default labels for ECG signals:
```julia
["ecg", "ekg"] => ["i" => ["1"], "ii" => ["2"], "iii" => ["3"],
"avl"=> ["ecgl", "ekgl", "ecg", "ekg", "l"],
"avr"=> ["ekgr", "ecgr", "r"], ...]
```
Matching is done in the order that `labels` iterates pairs, and will stop at the
first match, with no warning if signals are ambiguous (although this may change
in a future version)
"""
function plan_edf_to_onda_samples(header,
seconds_per_record=_get(header,
:seconds_per_record);
labels=STANDARD_LABELS,
units=STANDARD_UNITS,
preprocess_labels=nothing)
# we don't check this inside the try/catch because it's a user/method error
# rather than a data/ingest error
ismissing(seconds_per_record) && throw(ArgumentError(":seconds_per_record not found in header, or missing"))
# keep the kwarg so we can throw a more informative error
if preprocess_labels !== nothing
throw(ArgumentError("the `preprocess_labels` argument has been removed. " *
"Instead, preprocess signal header rows to before calling " *
"`plan_edf_to_onda_samples`"))
end
row = (; header..., seconds_per_record, error=nothing)
try
# match physical units and encoding first so that we give users better
# feedback about _which_ thing (labels vs. units) didn't match.
#
# still do it in the try/catch in case edf_to_onda_unit or
# edf_signal_encoding throws an error
row = rowmerge(row;
sample_unit=edf_to_onda_unit(header.physical_dimension, units),
edf_signal_encoding(header, seconds_per_record)...)
edf_label = header.label
for (signal_names, channel_names) in labels
# channel names is iterable of channel specs, which are either "channel"
# or "canonical => ["alt1", ...]
for canonical in channel_names
channel_name = canonical_channel_name(canonical)
matched = match_edf_label(edf_label, signal_names, channel_name, channel_names)
if matched !== nothing
# create SamplesInfo and return
row = rowmerge(row;
channel=matched,
sensor_type=first(signal_names),
sensor_label=first(signal_names))
return PlanV2(row)
end
end
end
catch e
return PlanV2(_errored_row(row, e))
end
# nothing matched, return the original signal header (as a namedtuple)
return PlanV2(row)
end
# create a table with a plan for converting this EDF file to onda: one row per
# signal, with the Onda.SamplesInfo fields that will be generated (modulo
# `promote_encoding`). The column `onda_signal_index` gives the planned grouping
# of EDF signals into Onda Samples.
#
# pass this plan to edf_to_onda_samples to actually run it
"""
plan_edf_to_onda_samples(edf::EDF.File;
labels=STANDARD_LABELS,
units=STANDARD_UNITS,
onda_signal_groupby=(:sensor_type, :sample_unit, :sample_rate))
Formulate a plan for converting an `EDF.File` to Onda Samples. This applies
`plan_edf_to_onda_samples` to each individual signal contained in the file,
storing `edf_signal_index` as an additional column.
The resulting rows are then passed to [`plan_edf_to_onda_samples_groups`](@ref)
and grouped according to `onda_signal_groupby` (by default, the `:sensor_type`,
`:sample_unit`, and `:sample_rate` columns), and the group index is added as an
additional column in `onda_signal_index`.
The resulting plan is returned as a table. No signal data is actually read from
the EDF file; to execute this plan and generate `Onda.Samples`, use
[`edf_to_onda_samples`](@ref). The index of the EDF signal (after filtering out
signals that are not `EDF.Signal`s, e.g. annotation channels) for each row is
stored in the `:edf_signal_index` column, and the rows are sorted in order of
`:onda_signal_index`, and then by `:edf_signal_index`.
"""
function plan_edf_to_onda_samples(edf::EDF.File;
labels=STANDARD_LABELS,
units=STANDARD_UNITS,
preprocess_labels=nothing,
onda_signal_groupby=(:sensor_type, :sample_unit, :sample_rate))
# keep the kwarg so we can throw a more informative error
if preprocess_labels !== nothing
throw(ArgumentError("the `preprocess_labels` argument has been removed. " *
"Instead, preprocess signal header rows to before calling " *
"`plan_edf_to_onda_samples`. See the OndaEDF README."))
end
true_signals = filter(x -> isa(x, EDF.Signal), edf.signals)
plan_rows = map(true_signals) do s
return plan_edf_to_onda_samples(s.header, edf.header.seconds_per_record;
labels, units)
end
# group signals by which Samples they will belong to, promote_encoding, and
# write index of destination signal into plan to capture grouping
plan_rows = plan_edf_to_onda_samples_groups(plan_rows; onda_signal_groupby)
return FilePlanV2.(plan_rows)
end
"""
plan_edf_to_onda_samples_groups(plan_rows; onda_signal_groupby=(:sensor_type, :sample_unit, :sample_rate))
Group together `plan_rows` based on the values of the `onda_signal_groupby`
columns, creating the `:onda_signal_index` column and promoting the Onda encodings
for each group using [`OndaEDF.promote_encodings`](@ref).
If the `:edf_signal_index` column is not present or otherwise missing, it will
be filled in based on the order of the input rows.
The updated rows are returned, sorted first by the columns named in
`onda_signal_groupby` and second by order of occurrence within the input rows.
"""
function plan_edf_to_onda_samples_groups(plan_rows;
onda_signal_groupby=(:sensor_type, :sample_unit, :sample_rate))
plan_rows = Tables.rows(plan_rows)
# if `edf_signal_index` is not present, create it before we re-order things
plan_rows = map(enumerate(plan_rows)) do (i, row)
edf_signal_index = coalesce(_get(row, :edf_signal_index), i)
return rowmerge(row; edf_signal_index)
end
grouped_rows = groupby(grouper(onda_signal_groupby), plan_rows)
sorted_keys = sort!(collect(keys(grouped_rows)))
plan_rows = mapreduce(vcat, enumerate(sorted_keys)) do (onda_signal_index, key)
rows = grouped_rows[key]
encoding = promote_encodings(rows)
return [rowmerge(row, encoding, (; onda_signal_index)) for row in rows]
end
return plan_rows
end
_get(x, property) = hasproperty(x, property) ? getproperty(x, property) : missing
function grouper(vars=(:sensor_type, :sample_unit, :sample_rate))
return x -> NamedTuple{vars}(_get.(Ref(x), vars))
end
grouper(vars::AbstractVector{Symbol}) = grouper((vars..., ))
grouper(var::Symbol) = grouper((var, ))
# return Samples for each :onda_signal_index
"""
edf_to_onda_samples(edf::EDF.File, plan_table; validate=true, dither_storage=missing)
Convert Signals found in an EDF File to `Onda.Samples` according to the plan
specified in `plan_table` (e.g., as generated by [`plan_edf_to_onda_samples`](@ref)), returning an
iterable of the generated `Onda.Samples` and the plan as actually executed.
The input plan is transformed by using [`merge_samples_info`](@ref) to combine
rows with the same `:onda_signal_index` into a common `Onda.SamplesInfo`. Then
[`OndaEDF.onda_samples_from_edf_signals`](@ref) is used to combine the EDF
signals data into a single `Onda.Samples` per group.
Any errors that occur are shown as `String`s (with backtrace) and inserted into
the `:error` column for the corresponding rows from the plan.
Samples are returned in the order of `:onda_signal_index`. Signals that could
not be matched or otherwise caused an error during execution are not returned.
If `validate=true` (the default), the plan is validated against the
[`FilePlanV2`](@ref) schema, and the signal headers in the `EDF.File`.
If `dither_storage=missing` (the default), dither storage is allocated automatically
as specified in the docstring for `Onda.encode`. `dither_storage=nothing` disables dithering.
$SAMPLES_ENCODED_WARNING
"""
function edf_to_onda_samples(edf::EDF.File, plan_table; validate=true, dither_storage=missing)
true_signals = filter(x -> isa(x, EDF.Signal), edf.signals)
if validate
Legolas.validate(Tables.schema(Tables.columns(plan_table)),
Legolas.SchemaVersion("ondaedf.file-plan", 2))
for row in Tables.rows(plan_table)
signal = true_signals[row.edf_signal_index]
signal.header.label == row.label ||
throw(ArgumentError("Plan's label $(row.label) does not match EDF label $(signal.header.label)!"))
end
end
EDF.read!(edf)
plan_rows = Tables.rows(plan_table)
grouped_plan_rows = groupby(grouper((:onda_signal_index, )), plan_rows)
exec_rows = map(collect(grouped_plan_rows)) do (idx, rows)
try
info = merge_samples_info(rows)
if ismissing(info)
# merge_samples_info returns missing is any of :sensor_type,
# :sample_unit, :sample_rate, or :channel is missing in any of
# the rows, to indicate that it's not possible to generate
# samples. this keeps us from overwriting any existing, more
# specific :errors in the plan with nonsense about promote_type
# etc.
samples = missing
else
signals = [true_signals[row.edf_signal_index] for row in rows]
samples = onda_samples_from_edf_signals(SamplesInfoV2(info), signals,
edf.header.seconds_per_record; dither_storage)
end
return (; idx, samples, plan_rows=rows)
catch e
plan_rows = _errored_rows(rows, e)
return (; idx, samples=missing, plan_rows)
end
end
sort!(exec_rows; by=(row -> row.idx))
exec = Tables.columntable(exec_rows)
exec_plan = reduce(vcat, exec.plan_rows)
return collect(skipmissing(exec.samples)), exec_plan
end
"""
OndaEDF.merge_samples_info(plan_rows)
Create a single, merged `SamplesInfo` from plan rows, such as generated by
[`plan_edf_to_onda_samples`](@ref). Encodings are promoted with `promote_encodings`.
The input rows must have the same values for `:sensor_type`, `:sample_unit`, and
`:sample_rate`; otherwise an `ArgumentError` is thrown.
If any of these values is `missing`, or any row's `:channel` value is `missing`,
this returns `missing` to indicate it is not possible to determine a shared
`SamplesInfo`.
The original EDF labels are included in the output in the `:edf_channels`
column.
!!! note
This is an internal function and is not meant to be called direclty.
"""
function merge_samples_info(rows)
# we enforce that kind, sample_unit, and sample_rate are all equal here
key = unique(grouper((:sensor_type, :sample_unit, :sample_rate)).(rows))
if length(key) != 1
throw(ArgumentError("couldn't merge samples info from rows: multiple " *
"kind/sample_unit/sample_rate combinations:\n\n" *
"$(pretty_table(String, key))\n\n" *
"$(pretty_table(String, rows))"))
end
key = only(key)
if any(ismissing, key) || any(ismissing, _get.(rows, :channel))
# we use missing as a sentinel value to indicate that it's not possible
# to create Samples from these rows
return missing
else
onda_encoding = promote_encodings(rows)
channels = [row.channel for row in rows]
edf_channels = [row.label for row in rows]
return (; onda_encoding..., NamedTuple(key)..., channels, edf_channels)
end
end
#####
##### `import_edf!`
#####
"""
OndaEDF.onda_samples_from_edf_signals(target::Onda.SamplesInfo, edf_signals,
edf_seconds_per_record; dither_storage=missing)
Generate an `Onda.Samples` struct from an iterable of `EDF.Signal`s, based on
the `Onda.SamplesInfo` in `target`. This checks for matching sample rates in
the source signals. If the encoding of `target` is the same as the encoding in
a signal, its encoded (usually `Int16`) data is copied directly into the
`Samples` data matrix; otherwise it is re-encoded.
If `dither_storage=missing` (the default), dither storage is allocated automatically
as specified in the docstring for `Onda.encode`. `dither_storage=nothing` disables dithering.
See `Onda.encode`'s docstring for more details.
!!! note
This function is not meant to be called directly, but through
[`edf_to_onda_samples`](@ref)
$SAMPLES_ENCODED_WARNING
"""
function onda_samples_from_edf_signals(target::SamplesInfoV2, edf_signals,
edf_seconds_per_record; dither_storage=missing)
sample_count = length(first(edf_signals).samples)
if !all(length(s.samples) == sample_count for s in edf_signals)
error("mismatched sample counts between `EDF.Signal`s: ", [length(s.samples) for s in edf_signals])
end
sample_data = Matrix{sample_type(target)}(undef, length(target.channels), sample_count)
for (i, edf_signal) in enumerate(edf_signals)
edf_encoding = edf_signal_encoding(edf_signal.header, edf_seconds_per_record)
if target.sample_rate != edf_encoding.sample_rate
throw(MismatchedSampleRateError((target.sample_rate, edf_encoding.sample_rate)))
end
if (target.sample_resolution_in_unit != edf_encoding.sample_resolution_in_unit ||
target.sample_offset_in_unit != edf_encoding.sample_offset_in_unit ||
sample_type(target) != eltype(edf_signal.samples))
decoded_samples = Onda.decode(edf_encoding.sample_resolution_in_unit,
edf_encoding.sample_offset_in_unit,
edf_signal.samples)
encoded_samples = try
Onda.encode(sample_type(target), target.sample_resolution_in_unit,
target.sample_offset_in_unit, decoded_samples,
dither_storage)
catch e
if e isa DomainError
@warn "DomainError during `Onda.encode` can be due to a dithering bug; try calling with `dither_storage=nothing` to disable dithering."
end
rethrow()
end
else
encoded_samples = edf_signal.samples
end
copyto!(view(sample_data, i, :), encoded_samples)
end
return Samples(sample_data, target, true)
end
"""
store_edf_as_onda(edf::EDF.File, onda_dir, recording_uuid::UUID=uuid4();
custom_extractors=STANDARD_EXTRACTORS, import_annotations::Bool=true,
postprocess_samples=identity,
signals_prefix="edf", annotations_prefix=signals_prefix)
Convert an EDF.File to `Onda.Samples` and `Onda.Annotation`s, store the samples
in `\$path/samples/`, and write the Onda signals and annotations tables to
`\$path/\$(signals_prefix).onda.signals.arrow` and
`\$path/\$(annotations_prefix).onda.annotations.arrow`. The default prefix is
"edf", and if a prefix is provided for signals but not annotations both will use
the signals prefix. The prefixes cannot reference (sub)directories.
Returns `(; recording_uuid, signals, annotations, signals_path, annotations_path, plan)`.
This is a convenience function that first formulates an import plan via
[`plan_edf_to_onda_samples`](@ref), and then immediately executes this plan with
[`edf_to_onda_samples`](@ref).
The samples and executed plan are returned; it is **strongly advised** that you
review the plan for un-extracted signals (where `:sensor_type` or `:channel` is
`missing`) and errors (non-`nothing` values in `:error`).
Groups of `EDF.Signal`s are mapped as channels to `Onda.Samples` via
[`plan_edf_to_onda_samples`](@ref). The caller of this function can control the
plan via the `labels` and `units` keyword arguments, all of which are forwarded
to [`plan_edf_to_onda_samples`](@ref).
`EDF.Signal` labels that are converted into Onda channel names undergo the
following transformations:
- the label is whitespace-stripped, parens-stripped, and lowercased
- trailing generic EDF references (e.g. "ref", "ref2", etc.) are dropped
- any instance of `+` is replaced with `_plus_` and `/` with `_over_`
- all component names are converted to their "canonical names" when possible
(e.g. "3" in an ECG-matched channel name will be converted to "iii").
If more control (e.g. preprocessing signal labels) is required, callers should
use [`plan_edf_to_onda_samples`](@ref) and [`edf_to_onda_samples`](@ref)
directly, and `Onda.store` the resulting samples manually.
See the OndaEDF README for additional details regarding EDF formatting expectations.
"""
function store_edf_as_onda(edf::EDF.File, onda_dir, recording_uuid::UUID=uuid4();
import_annotations::Bool=true,
postprocess_samples=identity,
signals_prefix="edf", annotations_prefix=signals_prefix,
kwargs...)
# Validate input argument early on
signals_path = joinpath(onda_dir, "$(validate_arrow_prefix(signals_prefix)).onda.signals.arrow")
annotations_path = joinpath(onda_dir, "$(validate_arrow_prefix(annotations_prefix)).onda.annotations.arrow")
EDF.read!(edf)
file_format = "lpcm.zst"
# Trailing slash needed for compatibility with AWSS3.jl's `S3Path`
mkpath(joinpath(onda_dir, "samples") * '/')
signals = Onda.SignalV2[]
edf_samples, plan = edf_to_onda_samples(edf; kwargs...)
errors = _get(Tables.columns(plan), :error)
if !ismissing(errors)
# why unique? because errors that occur during execution get inserted
# into all plan rows for that group of EDF signals, so they may be
# repeated
for e in unique(errors)
if e !== nothing
@warn sprint(showerror, e)
end
end
end
edf_samples = postprocess_samples(edf_samples)
for samples in edf_samples
sample_filename = string(recording_uuid, "_", samples.info.sensor_type, ".", file_format)
file_path = joinpath(onda_dir, "samples", sample_filename)
signal = store(file_path, file_format, samples, recording_uuid, Second(0))
push!(signals, signal)
end
Legolas.write(signals_path, signals, SignalV2SchemaVersion())
if import_annotations
annotations = edf_to_onda_annotations(edf, recording_uuid)
if !isempty(annotations)
Legolas.write(annotations_path, annotations,
OndaEDFSchemas.EDFAnnotationV1SchemaVersion())
else
@warn "No annotations found in $onda_dir"
annotations_path = nothing
end
else
annotations = EDFAnnotationV1[]
end
return (; recording_uuid, signals, annotations, signals_path, annotations_path, plan)
end
function validate_arrow_prefix(prefix)
prefix == basename(prefix) || throw(ArgumentError("prefix \"$prefix\" is invalid: cannot contain directory separator"))
pm = match(r"(.*)\.onda\.(signals|annotations)\.arrow", prefix)
if pm !== nothing
@warn "Extracting prefix \"$(pm.captures[1])\" from provided prefix \"$prefix\""
prefix = pm.captures[1]
end
return prefix
end
"""
edf_to_onda_samples(edf::EDF.File; kwargs...)
Read signals from an `EDF.File` into a vector of `Onda.Samples`. This is a
convenience function that first formulates an import plan via [`plan_edf_to_onda_samples`](@ref),
and then immediately executes this plan with [`edf_to_onda_samples`](@ref).
The samples and executed plan are returned; it is **strongly advised** that you
review the plan for un-extracted signals (where `:sensor_type` or `:channel` is
`missing`) and errors (non-`nothing` values in `:error`).
Collections of `EDF.Signal`s are mapped as channels to `Onda.Samples` via
[`plan_edf_to_onda_samples`](@ref). The caller of this function can control the
plan via the `labels` and `units` keyword arguments, all of which are forwarded
to [`plan_edf_to_onda_samples`](@ref).
`EDF.Signal` labels that are converted into Onda channel names undergo the
following transformations:
- the label is whitespace-stripped, parens-stripped, and lowercased
- trailing generic EDF references (e.g. "ref", "ref2", etc.) are dropped
- any instance of `+` is replaced with `_plus_` and `/` with `_over_`
- all component names are converted to their "canonical names" when possible
(e.g. "m1" in an EEG-matched channel name will be converted to "a1").
See the OndaEDF README for additional details regarding EDF formatting expectations.
$SAMPLES_ENCODED_WARNING
"""
function edf_to_onda_samples(edf::EDF.File; kwargs...)
signals_plan = plan_edf_to_onda_samples(edf; kwargs...)
EDF.read!(edf)
samples, exec_plan = edf_to_onda_samples(edf, signals_plan)
return samples, exec_plan
end
"""
edf_to_onda_annotations(edf::EDF.File, uuid::UUID)
Extract EDF+ annotations from an `EDF.File` for recording with ID `uuid` and
return them as a vector of `Onda.Annotation`s. Each returned annotation has
a `value` field that contains the string value of the corresponding EDF+
annotation.
If no EDF+ annotations are found in `edf`, then an empty `Vector{Annotation}` is
returned.
"""
function edf_to_onda_annotations(edf::EDF.File, uuid::UUID)
EDF.read!(edf)
annotations = EDFAnnotationV1[]
for annotation_signal in edf.signals
annotation_signal isa EDF.AnnotationsSignal || continue
for record in annotation_signal.records
for tal in record
start_nanosecond = Nanosecond(round(Int, 1e9 * tal.onset_in_seconds))
if tal.duration_in_seconds === nothing
stop_nanosecond = start_nanosecond
else
stop_nanosecond = start_nanosecond + Nanosecond(round(Int, 1e9 * tal.duration_in_seconds))
end
for annotation_string in tal.annotations
isempty(annotation_string) && continue
annotation = EDFAnnotationV1(; recording=uuid, id=uuid4(),
span=TimeSpan(start_nanosecond, stop_nanosecond),
value=annotation_string)
push!(annotations, annotation)
end
end
end
end
return annotations
end