-
Notifications
You must be signed in to change notification settings - Fork 41
/
monitor_data.py
2485 lines (2062 loc) · 98 KB
/
monitor_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
""" Monitor Level Data, store the DataArrays associated with a single monitor."""
from __future__ import annotations
from abc import ABC
from typing import Union, Tuple, Callable, Dict, List, Any
import warnings
import xarray as xr
import numpy as np
import pydantic.v1 as pd
from pandas import DataFrame
from .data_array import FluxTimeDataArray, FluxDataArray
from .data_array import MixedModeDataArray, ModeAmpsDataArray
from .data_array import ModeIndexDataArray, GroupIndexDataArray, ModeDispersionDataArray
from .data_array import FieldProjectionAngleDataArray, FieldProjectionCartesianDataArray
from .data_array import FieldProjectionKSpaceDataArray
from .data_array import DataArray, DiffractionDataArray
from .data_array import ScalarFieldDataArray, ScalarFieldTimeDataArray
from .data_array import FreqDataArray, TimeDataArray, FreqModeDataArray
from .dataset import Dataset, AbstractFieldDataset, ElectromagneticFieldDataset
from .dataset import FieldDataset, FieldTimeDataset, ModeSolverDataset, PermittivityDataset
from ..base import TYPE_TAG_STR, cached_property, skip_if_fields_missing
from ..types import Coordinate, Symmetry, ArrayFloat1D, ArrayFloat2D, Size, Numpy, TrackFreq
from ..types import EpsSpecType, Literal
from ..grid.grid import Grid, Coords
from ..validators import enforce_monitor_fields_present, required_if_symmetry_present
from ..monitor import MonitorType, FieldMonitor, FieldTimeMonitor, ModeSolverMonitor
from ..monitor import ModeMonitor, FluxMonitor, FluxTimeMonitor, PermittivityMonitor
from ..monitor import FieldProjectionAngleMonitor, FieldProjectionCartesianMonitor
from ..monitor import FieldProjectionKSpaceMonitor, FieldProjectionSurface
from ..monitor import DiffractionMonitor
from ..source import SourceTimeType, CustomFieldSource
from ..medium import Medium, MediumType
from ...exceptions import SetupError, DataError, Tidy3dNotImplementedError, ValidationError
from ...constants import ETA_0, C_0, MICROMETER, PICOSECOND_PER_NANOMETER_PER_KILOMETER
from ...log import log
from ..base_sim.data.monitor_data import AbstractMonitorData
Coords1D = ArrayFloat1D
class MonitorData(AbstractMonitorData, ABC):
"""
Abstract base class of objects that store data pertaining to a single :class:`.monitor`.
"""
monitor: MonitorType = pd.Field(
...,
title="Monitor",
description="Monitor associated with the data.",
discriminator=TYPE_TAG_STR,
)
@property
def symmetry_expanded(self) -> MonitorData:
"""Return self with symmetry applied."""
return self
def normalize(self, source_spectrum_fn: Callable[[float], complex]) -> Dataset:
"""Return copy of self after normalization is applied using source spectrum function."""
return self.copy()
def _updated(self, update: Dict) -> MonitorData:
"""Similar to ``updated_copy``, but does not actually copy components, for speed.
Note
----
This does **not** produce a copy of mutable objects, so e.g. if some of the data arrays
are not updated, they will point to the values in the original data. This method should
thus be used carefully.
"""
data_dict = self.dict()
data_dict.update(update)
return type(self).parse_obj(data_dict)
class AbstractFieldData(MonitorData, AbstractFieldDataset, ABC):
"""Collection of scalar fields with some symmetry properties."""
monitor: Union[FieldMonitor, FieldTimeMonitor, PermittivityMonitor, ModeSolverMonitor]
symmetry: Tuple[Symmetry, Symmetry, Symmetry] = pd.Field(
(0, 0, 0),
title="Symmetry",
description="Symmetry eigenvalues of the original simulation in x, y, and z.",
)
symmetry_center: Coordinate = pd.Field(
None,
title="Symmetry Center",
description="Center of the symmetry planes of the original simulation in x, y, and z. "
"Required only if any of the ``symmetry`` field are non-zero.",
)
grid_expanded: Grid = pd.Field(
None,
title="Expanded Grid",
description=":class:`.Grid` discretization of the associated monitor in the simulation "
"which created the data. Required if symmetries are present, as "
"well as in order to use some functionalities like getting poynting and flux.",
)
@pd.validator("grid_expanded", always=True)
def warn_missing_grid_expanded(cls, val):
"""If ``colocate`` not provided, set to true, but warn that behavior has changed."""
if val is None:
log.warning(
"Monitor data requires 'grid_expanded' to be defined to compute values like "
"flux, Poynting and dot product with other data."
)
return val
_require_sym_center = required_if_symmetry_present("symmetry_center")
_require_grid_expanded = required_if_symmetry_present("grid_expanded")
def _expanded_grid_field_coords(self, field_name: str) -> Coords:
"""Coordinates in the expanded grid corresponding to a given field component."""
return self.grid_expanded[self.grid_locations[field_name]]
@property
def symmetry_expanded(self):
"""Return the :class:`.AbstractFieldData` with fields expanded based on symmetry. If
any symmetry is nonzero (i.e. expanded), the interpolation implicitly creates a copy of the
data array. However, if symmetry is not expanded, the returned array contains a view of
the data, not a copy.
Returns
-------
:class:`AbstractFieldData`
A data object with the symmetry expanded fields.
"""
if all(sym == 0 for sym in self.symmetry):
return self
return self._updated(self._symmetry_update_dict)
@property
def symmetry_expanded_copy(self) -> AbstractFieldData:
"""Create a copy of the :class:`.AbstractFieldData` with fields expanded based on symmetry.
Returns
-------
:class:`AbstractFieldData`
A data object with the symmetry expanded fields.
"""
if all(sym == 0 for sym in self.symmetry):
return self.copy()
return self.copy(update=self._symmetry_update_dict)
@property
def _symmetry_update_dict(self) -> Dict:
"""Dictionary of data fields to create data with expanded symmetry."""
update_dict = {}
for field_name, scalar_data in self.field_components.items():
eigenval_fn = self.symmetry_eigenvalues[field_name]
# get grid locations for this field component on the expanded grid
field_coords = self._expanded_grid_field_coords(field_name)
for sym_dim, (sym_val, sym_loc) in enumerate(zip(self.symmetry, self.symmetry_center)):
dim_name = "xyz"[sym_dim]
# Continue if no symmetry along this dimension
if sym_val == 0:
continue
# Get coordinates for this field component on the expanded grid
coords = field_coords.to_list[sym_dim]
coords = self.monitor.downsample(coords, axis=sym_dim)
# Get indexes of coords that lie on the left of the symmetry center
flip_inds = np.where(coords < sym_loc)[0]
# Get the symmetric coordinates on the right
coords_interp = np.copy(coords)
coords_interp[flip_inds] = 2 * sym_loc - coords[flip_inds]
# Interpolate. There generally shouldn't be values out of bounds except potentially
# when handling modes, in which case they should be at the boundary and close to 0.
scalar_data = scalar_data.sel(**{dim_name: coords_interp}, method="nearest")
scalar_data = scalar_data.assign_coords({dim_name: coords})
# apply the symmetry eigenvalue (if defined) to the flipped values
if eigenval_fn is not None:
sym_eigenvalue = eigenval_fn(sym_dim)
scalar_data = scalar_data.multiply_at(
value=sym_val * sym_eigenvalue, coord_name=dim_name, indices=flip_inds
)
# assign the final scalar data to the update_dict
update_dict[field_name] = scalar_data
update_dict.update({"symmetry": (0, 0, 0), "symmetry_center": None})
return update_dict
def at_coords(self, coords: Coords) -> xr.Dataset:
"""Colocate data to some supplied coordinates. This is a convenience method that wraps
``colocate``, and skips dimensions for which the data has a single data point only
(``colocate`` will error in that case.) If the coords are out of bounds for the data
otherwise, an error will still be produced.
Parameters
----------
coords : :class:`Coords`
Coordinates in x, y and z to colocate to.
Returns
-------
xarray.Dataset
Dataset containing all of the fields in the data interpolated to boundary locations on
the Yee grid.
"""
# pass coords if each of the scalar field data have more than one coordinate along a dim
xyz_kwargs = {}
for dim, coords_dim in zip("xyz", (coords.x, coords.y, coords.z)):
scalar_data = list(self.field_components.values())
coord_lens = [len(data.coords[dim]) for data in scalar_data]
if all(ncoords > 1 for ncoords in coord_lens):
xyz_kwargs[dim] = coords_dim
return self.colocate(**xyz_kwargs)
class ElectromagneticFieldData(AbstractFieldData, ElectromagneticFieldDataset, ABC):
"""Collection of electromagnetic fields."""
grid_primal_correction: Union[
float, FreqDataArray, TimeDataArray, FreqModeDataArray
] = pd.Field(
1.0,
title="Field correction factor",
description="Correction factor that needs to be applied for data corresponding to a 2D "
"monitor to take into account the finite grid in the normal direction in the simulation in "
"which the data was computed. The factor is applied to fields defined on the primal grid "
"locations along the normal direction.",
)
grid_dual_correction: Union[float, FreqDataArray, TimeDataArray, FreqModeDataArray] = pd.Field(
1.0,
title="Field correction factor",
description="Correction factor that needs to be applied for data corresponding to a 2D "
"monitor to take into account the finite grid in the normal direction in the simulation in "
"which the data was computed. The factor is applied to fields defined on the dual grid "
"locations along the normal direction.",
)
def _expanded_grid_field_coords(self, field_name: str):
"""Coordinates in the expanded grid corresponding to a given field component."""
if self.monitor.colocate:
bounds_dict = self.grid_expanded.boundaries.to_dict
return Coords(**{key: val[:-1] for key, val in bounds_dict.items()})
return self.grid_expanded[self.grid_locations[field_name]]
@property
def _grid_correction_dict(self):
"""Return the primal and dual finite grid correction factors as a dictionary."""
return {
"grid_primal_correction": self.grid_primal_correction,
"grid_dual_correction": self.grid_dual_correction,
}
@property
def _tangential_dims(self) -> List[str]:
"""For a 2D monitor data, return the names of the tangential dimensions. Raise if cannot
confirm that the associated monitor is 2D."""
if len(self.monitor.zero_dims) != 1:
raise DataError("Data must be 2D to get tangential dimensions.")
tangential_dims = ["x", "y", "z"]
tangential_dims.pop(self.monitor.zero_dims[0])
return tangential_dims
@property
def colocation_boundaries(self) -> Coords:
"""Coordinates to be used for colocation of the data to grid boundaries."""
if not self.grid_expanded:
raise DataError(
"Monitor data requires 'grid_expanded' to be defined in order to "
"compute colocation coordinates."
)
# Get boundaries from the expanded grid
grid_bounds = self.grid_expanded.boundaries.to_dict
# Non-colocating monitors can only colocate starting from the first boundary
# (unless there's a single data point, in which case data has already been snapped).
# Regardless of colocation, we also drop the last boundary.
colocate_bounds = {}
for dim, bounds in grid_bounds.items():
cbs = bounds[:-1]
if not self.monitor.colocate and cbs.size > 1:
cbs = cbs[1:]
colocate_bounds[dim] = cbs
return Coords(**colocate_bounds)
@property
def colocation_centers(self) -> Coords:
"""Coordinates to be used for colocation of the data to grid centers."""
colocate_centers = {}
for dim, coords in self.colocation_boundaries.to_dict.items():
colocate_centers[dim] = (coords[1:] + coords[:-1]) / 2
return Coords(**colocate_centers)
@property
def _plane_grid_boundaries(self) -> Tuple[Coords1D, Coords1D]:
"""For a 2D monitor data, return the boundaries of the in-plane grid to be used to compute
differential area and to colocate fields if needed."""
if np.any(np.array(self.monitor.interval_space) > 1):
raise Tidy3dNotImplementedError(
"Cannot determine grid boundaries corresponding to "
"down-sampled monitor data ('interval_space' > 1 along a direction)."
)
dim1, dim2 = self._tangential_dims
bounds_dict = self.colocation_boundaries.to_dict
return (bounds_dict[dim1], bounds_dict[dim2])
@property
def _plane_grid_centers(self) -> Tuple[Coords1D, Coords1D]:
"""For 2D monitor data, return the centers of the in-plane grid"""
return [(bs[1:] + bs[:-1]) / 2 for bs in self._plane_grid_boundaries]
@property
def _diff_area(self) -> xr.DataArray:
"""For a 2D monitor data, return the area of each cell in the plane, for use in numerical
integrations. This assumes that data is colocated to grid boundaries, and uses the
difference in the surrounding grid centers to compute the area.
"""
# Monitor values are interpolated to bounds
bounds = self._plane_grid_boundaries
# Coords to compute cell sizes around the interpolation locations
coords = [bs.copy() for bs in self._plane_grid_centers]
# Append the first and last boundary
_, plane_inds = self.monitor.pop_axis([0, 1, 2], self.monitor.size.index(0.0))
coords[0] = np.array([bounds[0][0]] + coords[0].tolist() + [bounds[0][-1]])
coords[1] = np.array([bounds[1][0]] + coords[1].tolist() + [bounds[1][-1]])
"""Truncate coords to monitor boundaries. This implicitly makes extra pixels which may be
present have size 0 and so won't be included in the integration. For pixels intersected
by the monitor edge, the size is truncated to the part covered by the monitor. When using
the differential area sizes defined in this way together with integrand values
defined at cell boundaries, the integration is equivalent to trapezoidal rule with the first
and last values interpolated to the exact monitor start/end location, if the integrand
is zero outside of the monitor geometry. This should usually be the case for flux and dot
computations"""
mnt_bounds = np.array(self.monitor.bounds)
mnt_bounds = mnt_bounds[:, plane_inds].T
coords[0][np.argwhere(coords[0] < mnt_bounds[0, 0])] = mnt_bounds[0, 0]
coords[0][np.argwhere(coords[0] > mnt_bounds[0, 1])] = mnt_bounds[0, 1]
coords[1][np.argwhere(coords[1] < mnt_bounds[1, 0])] = mnt_bounds[1, 0]
coords[1][np.argwhere(coords[1] > mnt_bounds[1, 1])] = mnt_bounds[1, 1]
# Do not apply the spurious dl along a dimension where the simulation is 2D.
# Instead, we just set the boundaries such that the cell size along the zero dimension is 1,
# such that quantities like flux will come out in units of W / um.
sizes_dim0 = coords[0][1:] - coords[0][:-1] if bounds[0].size > 1 else [1.0]
sizes_dim1 = coords[1][1:] - coords[1][:-1] if bounds[1].size > 1 else [1.0]
return xr.DataArray(np.outer(sizes_dim0, sizes_dim1), dims=self._tangential_dims)
def _tangential_corrected(self, fields: Dict[str, DataArray]) -> Dict[str, DataArray]:
"""For a 2D monitor data, extract the tangential components from fields and orient them
such that the third component would be the normal axis. This just means that the H field
gets an extra minus sign if the normal axis is ``"y"``. Raise if any of the tangential
field components is missing.
The finite grid correction is also applied, so the intended use of these fields is in
poynting, flux, and dot-like methods. The normal coordinate is dropped from the field data.
"""
if len(self.monitor.zero_dims) != 1:
raise DataError("Data must be 2D to get tangential fields.")
# Tangential field components
tan_dims = self._tangential_dims
components = [fname + dim for fname in "EH" for dim in tan_dims]
normal_dim = "xyz"[self.monitor.size.index(0)]
tan_fields = {}
for component in components:
if component not in fields:
raise DataError(f"Tangential field component '{component}' missing in field data.")
correction = 1
# sign correction to H
if normal_dim == "y" and component[0] == "H":
correction *= -1
# finite grid correction to all fields
eig_val = self.symmetry_eigenvalues[component](normal_dim)
if eig_val < 0:
correction *= self.grid_dual_correction
else:
correction *= self.grid_primal_correction
field_squeezed = fields[component].squeeze(dim=normal_dim, drop=True)
tan_fields[component] = field_squeezed * correction
return tan_fields
@property
def _tangential_fields(self) -> Dict[str, DataArray]:
"""For a 2D monitor data, get the tangential E and H fields in the 2D plane grid. Fields
are oriented such that the third component would be the normal axis. This just means that
the H field gets an extra minus sign if the normal axis is ``"y"``.
Note
----
The finite grid correction factors are applied and symmetry is expanded.
"""
return self._tangential_corrected(self.symmetry_expanded.field_components)
@property
def _colocated_fields(self) -> Dict[str, DataArray]:
"""For a 2D monitor data, get all E and H fields colocated to the cell boundaries in the 2D
plane grid, with symmetries expanded.
"""
field_components = self.symmetry_expanded.field_components
if self.monitor.colocate:
return field_components
# Interpolate field components to cell boundaries
interp_dict = {"assume_sorted": True}
for dim, bounds in zip(self._tangential_dims, self._plane_grid_boundaries):
if bounds.size > 1:
interp_dict[dim] = bounds
colocated_fields = {key: val.interp(**interp_dict) for key, val in field_components.items()}
return colocated_fields
@property
def _colocated_tangential_fields(self) -> Dict[str, DataArray]:
"""For a 2D monitor data, get the tangential E and H fields colocated to the cell boundaries
in the 2D plane grid. Fields are oriented such that the third component would be the normal
axis. This just means that the H field gets an extra minus sign if the normal axis is
``"y"``. Raise if any of the tangential field components is missing.
Note
----
The finite grid correction factors are applied and symmetry is expanded.
"""
return self._tangential_corrected(self._colocated_fields)
@property
def grid_corrected_copy(self) -> ElectromagneticFieldData:
"""Return a copy of self with grid correction factors applied (if necessary) and symmetry
expanded."""
field_data = self.symmetry_expanded_copy
if len(self.monitor.zero_dims) != 1:
return field_data
normal_dim = "xyz"[self.monitor.zero_dims[0]]
update = {"grid_primal_correction": 1.0, "grid_dual_correction": 1.0}
for field_name, field in field_data.field_components.items():
eig_val = self.symmetry_eigenvalues[field_name](normal_dim)
if eig_val < 0:
update[field_name] = field * self.grid_dual_correction
else:
update[field_name] = field * self.grid_primal_correction
return field_data.copy(update=update)
@property
def intensity(self) -> ScalarFieldDataArray:
"""Return the sum of the squared absolute electric field components."""
normal_dim = "xyz"[self.monitor.size.index(0)]
fields = self._colocated_fields
components = ("Ex", "Ey", "Ez")
if any(cmp not in fields for cmp in components):
raise KeyError("Can't compute intensity, all E field components must be present.")
intensity = sum(fields[cmp].abs ** 2 for cmp in components)
return intensity.squeeze(dim=normal_dim, drop=True)
@property
def poynting(self) -> ScalarFieldDataArray:
"""Time-averaged Poynting vector for frequency-domain data associated to a 2D monitor,
projected to the direction normal to the monitor plane."""
# Tangential fields are ordered as E1, E2, H1, H2
tan_fields = self._colocated_tangential_fields
dim1, dim2 = self._tangential_dims
e_x_h_star = tan_fields["E" + dim1] * tan_fields["H" + dim2].conj()
e_x_h_star -= tan_fields["E" + dim2] * tan_fields["H" + dim1].conj()
poynting = 0.5 * np.real(e_x_h_star)
return poynting
def package_flux_results(self, flux_values: xr.DataArray) -> Any:
"""How to package flux"""
return FluxDataArray(flux_values)
@cached_property
def flux(self) -> FluxDataArray:
"""Flux for data corresponding to a 2D monitor."""
# Compute flux by integrating Poynting vector in-plane
d_area = self._diff_area
poynting = self.poynting
flux_values = poynting * d_area
flux_values = flux_values.sum(dim=d_area.dims)
return self.package_flux_results(flux_values)
@cached_property
def mode_area(self) -> FreqModeDataArray:
"""Effective mode area corresponding to a 2D monitor.
Effective mode area is calculated as: (∫|E|²dA)² / (∫|E|⁴dA)
"""
intensity = self.intensity
# integrate over the plane
d_area = self._diff_area
num = (intensity * d_area).sum(dim=d_area.dims) ** 2
den = (intensity**2 * d_area).sum(dim=d_area.dims)
area = num / den
if hasattr(self.monitor, "mode_spec"):
area *= np.cos(self.monitor.mode_spec.angle_theta)
return FreqModeDataArray(area)
def dot(
self, field_data: Union[FieldData, ModeSolverData], conjugate: bool = True
) -> ModeAmpsDataArray:
"""Dot product (modal overlap) with another :class:`.FieldData` object. Both datasets have
to be frequency-domain data associated with a 2D monitor. Along the tangential directions,
the datasets have to have the same discretization. Along the normal direction, the monitor
position may differ and is ignored. Other coordinates (``frequency``, ``mode_index``) have
to be either identical or broadcastable. Broadcasting is also supported in the case in
which the other ``field_data`` has a dimension of size ``1`` whose coordinate is not in the
list of coordinates in the ``self`` dataset along the corresponding dimension. In that case,
the coordinates of the ``self`` dataset are used in the output.
Parameters
----------
field_data : :class:`ElectromagneticFieldData`
A data instance to compute the dot product with.
conjugate : bool, optional
If ``True`` (default), the dot product is defined as ``1 / 4`` times the integral of
``E_self* x H_other - H_self* x E_other``, where ``x`` is the cross product and ``*`` is
complex conjugation. If ``False``, the complex conjugation is skipped.
Note
----
The dot product with and without conjugation is equivalent (up to a phase) for
modes in lossless waveguides but differs for modes in lossy materials. In that case,
the conjugated dot product can be interpreted as the fraction of the power of the first
mode carried by the second, but modes are not orthogonal with respect to that product
and the sum of carried power fractions may be different from the total flux.
In the non-conjugated definition, modes are orthogonal, but the interpretation of the
dot product power carried by a given mode is no longer valid.
"""
# Tangential fields for current and other field data
fields_self = self._colocated_tangential_fields
fields_other = field_data._colocated_tangential_fields
if conjugate:
fields_self = {key: field.conj() for key, field in fields_self.items()}
# Drop size-1 dimensions in the other data
fields_other = {key: field.squeeze(drop=True) for key, field in fields_other.items()}
# Cross products of fields
dim1, dim2 = self._tangential_dims
e_self_x_h_other = fields_self["E" + dim1] * fields_other["H" + dim2]
e_self_x_h_other -= fields_self["E" + dim2] * fields_other["H" + dim1]
h_self_x_e_other = fields_self["H" + dim1] * fields_other["E" + dim2]
h_self_x_e_other -= fields_self["H" + dim2] * fields_other["E" + dim1]
# Integrate over plane
d_area = self._diff_area
integrand = (e_self_x_h_other - h_self_x_e_other) * d_area
return ModeAmpsDataArray(0.25 * integrand.sum(dim=d_area.dims))
def _interpolated_tangential_fields(self, coords: ArrayFloat2D) -> Dict[str, DataArray]:
"""For 2D monitors, interpolate this fields to given coords in the tangential plane.
Parameters
----------
coords : ArrayFloat2D
Interpolation coords in the monitor's tangential plane.
Return
------
Dictionary with interpolated fields.
"""
fields = self._tangential_fields
interp_dict = {"assume_sorted": True}
for dim, cents in zip(self._tangential_dims, coords):
if cents.size > 0:
interp_dict[dim] = cents
kwargs = {"bounds_error": False, "fill_value": 0.0}
for component, field in fields.items():
fields[component] = field.interp(kwargs=kwargs, **interp_dict)
return fields
def outer_dot(
self, field_data: Union[FieldData, ModeSolverData], conjugate: bool = True
) -> MixedModeDataArray:
"""Dot product (modal overlap) with another :class:`.FieldData` object.
The tangential fields from ``field_data`` are interpolated to this object's grid, so the
data arrays don't need to have the same discretization. The calculation is performed for
all common frequencies between data arrays. In the output, ``mode_index_0`` and
``mode_index_1`` are the mode indices from this object and ``field_data``, respectively, if
they are instances of ``ModeSolverData``.
Parameters
----------
field_data : :class:`ElectromagneticFieldData`
A data instance to compute the dot product with.
conjugate : bool = True
If ``True`` (default), the dot product is defined as ``1 / 4`` times the integral of
``E_self* x H_other - H_self* x E_other``, where ``x`` is the cross product and ``*`` is
complex conjugation. If ``False``, the complex conjugation is skipped.
Returns
-------
:class:`xarray.DataArray`
Data array with the complex-valued modal overlaps between the two mode data.
See also
--------
:member:`dot`
"""
tan_dims = self._tangential_dims
if not all(a == b for a, b in zip(tan_dims, field_data._tangential_dims)):
raise DataError("Tangential dimensions must match between the two monitors.")
# Tangential fields for current
fields_self = self._colocated_tangential_fields
if conjugate:
fields_self = {component: field.conj() for component, field in fields_self.items()}
# Tangential fields for other data
fields_other = field_data._interpolated_tangential_fields(self._plane_grid_boundaries)
# Tangential field component names
dim1, dim2 = tan_dims
e_1 = "E" + dim1
e_2 = "E" + dim2
h_1 = "H" + dim1
h_2 = "H" + dim2
# Prepare array with proper dimensions for the dot product data
arrays = (fields_self[e_1], fields_other[e_1])
coords = (arrays[0].coords, arrays[1].coords)
# Common frequencies to both data arrays
f = np.array(sorted(set(coords[0]["f"].values).intersection(coords[1]["f"].values)))
# Mode indices, if available
modes_in_self = "mode_index" in coords[0]
mode_index_0 = coords[0]["mode_index"].values if modes_in_self else np.zeros(1, dtype=int)
modes_in_other = "mode_index" in coords[1]
mode_index_1 = coords[1]["mode_index"].values if modes_in_other else np.zeros(1, dtype=int)
dtype = np.promote_types(arrays[0].dtype, arrays[1].dtype)
dot = np.empty((f.size, mode_index_0.size, mode_index_1.size), dtype=dtype)
# Calculate overlap for each common frequency and each mode pair
for i, freq in enumerate(f):
indexer_self = {"f": freq}
indexer_other = {"f": freq}
for mi0 in mode_index_0:
if modes_in_self:
indexer_self["mode_index"] = mi0
e_self_1 = fields_self[e_1].sel(indexer_self, drop=True)
e_self_2 = fields_self[e_2].sel(indexer_self, drop=True)
h_self_1 = fields_self[h_1].sel(indexer_self, drop=True)
h_self_2 = fields_self[h_2].sel(indexer_self, drop=True)
for mi1 in mode_index_1:
if modes_in_other:
indexer_other["mode_index"] = mi1
e_other_1 = fields_other[e_1].sel(indexer_other, drop=True)
e_other_2 = fields_other[e_2].sel(indexer_other, drop=True)
h_other_1 = fields_other[h_1].sel(indexer_other, drop=True)
h_other_2 = fields_other[h_2].sel(indexer_other, drop=True)
# Cross products of fields
e_self_x_h_other = e_self_1 * h_other_2 - e_self_2 * h_other_1
h_self_x_e_other = h_self_1 * e_other_2 - h_self_2 * e_other_1
# Integrate over plane
d_area = self._diff_area
integrand = (e_self_x_h_other - h_self_x_e_other) * d_area
dot[i, mi0, mi1] = 0.25 * integrand.sum(dim=d_area.dims)
coords = {"f": f, "mode_index_0": mode_index_0, "mode_index_1": mode_index_1}
result = xr.DataArray(dot, coords=coords)
# Remove mode index coordinate if the input did not have it
if not modes_in_self:
result = result.isel(mode_index_0=0, drop=True)
if not modes_in_other:
result = result.isel(mode_index_1=0, drop=True)
return result
@property
def time_reversed_copy(self) -> FieldData:
"""Make a copy of the data with time-reversed fields."""
# Time reversal for frequency-domain fields; overwritten in :class:`FieldTimeData`
# and :class:`ModeSolverData`.
new_data = {}
for comp, field in self.field_components.items():
if comp[0] == "H":
new_data[comp] = -np.conj(field)
else:
new_data[comp] = np.conj(field)
return self.copy(update=new_data)
class FieldData(FieldDataset, ElectromagneticFieldData):
"""
Data associated with a :class:`.FieldMonitor`: scalar components of E and H fields.
Notes
-----
The data is stored as a `DataArray <https://docs.xarray.dev/en/stable/generated/xarray.DataArray.html>`_
object using the `xarray <https://docs.xarray.dev/en/stable/index.html>`_ package.
This dataset can contain all electric and magnetic field components: ``Ex``, ``Ey``, ``Ez``, ``Hx``, ``Hy``,
and ``Hz``.
Example
-------
>>> from tidy3d import ScalarFieldDataArray
>>> x = [-1,1,3]
>>> y = [-2,0,2,4]
>>> z = [-3,-1,1,3,5]
>>> f = [2e14, 3e14]
>>> coords = dict(x=x[:-1], y=y[:-1], z=z[:-1], f=f)
>>> grid = Grid(boundaries=Coords(x=x, y=y, z=z))
>>> scalar_field = ScalarFieldDataArray((1+1j) * np.random.random((2,3,4,2)), coords=coords)
>>> monitor = FieldMonitor(
... size=(2,4,6), freqs=[2e14, 3e14], name='field', fields=['Ex', 'Hz'], colocate=True
... )
>>> data = FieldData(monitor=monitor, Ex=scalar_field, Hz=scalar_field, grid_expanded=grid)
.. TODO sort out standalone data example.
See Also
--------
**Notebooks:**
* `Quickstart <../../notebooks/StartHere.html>`_: Usage in a basic simulation flow.
* `Performing visualization of simulation data <../../notebooks/VizData.html>`_
* `Advanced monitor data manipulation and visualization <../../notebooks/XarrayTutorial.html>`_
"""
monitor: FieldMonitor = pd.Field(
..., title="Monitor", description="Frequency-domain field monitor associated with the data."
)
_contains_monitor_fields = enforce_monitor_fields_present()
def normalize(self, source_spectrum_fn: Callable[[float], complex]) -> FieldDataset:
"""Return copy of self after normalization is applied using source spectrum function."""
fields_norm = {}
for field_name, field_data in self.field_components.items():
src_amps = source_spectrum_fn(field_data.f)
fields_norm[field_name] = (field_data / src_amps).astype(field_data.dtype)
return self.copy(update=fields_norm)
def to_source(
self, source_time: SourceTimeType, center: Coordinate, size: Size = None, **kwargs
) -> CustomFieldSource:
"""Create a :class:`.CustomFieldSource` from the fields stored in the :class:`.FieldData`.
Parameters
----------
source_time: :class:`.SourceTime`
Specification of the source time-dependence.
center: Tuple[float, float, float]
Source center in x, y and z.
size: Tuple[float, float, float]
Source size in x, y, and z. If not provided, the size of the monitor associated to the
data is used.
**kwargs
Extra keyword arguments passed to :class:`.CustomFieldSource`.
Returns
-------
:class:`.CustomFieldSource`
Source injecting the fields stored in the :class:`.FieldData`, with other settings as
provided in the input arguments.
"""
if not size:
size = self.monitor.size
fields = {}
for name, field in self.symmetry_expanded_copy.field_components.items():
fields[name] = field.copy()
for dim, dim_name in enumerate("xyz"):
coords_shift = field.coords[dim_name] - self.monitor.center[dim]
fields[name].coords[dim_name] = coords_shift
dataset = FieldDataset(**fields)
return CustomFieldSource(
field_dataset=dataset, source_time=source_time, center=center, size=size, **kwargs
)
class FieldTimeData(FieldTimeDataset, ElectromagneticFieldData):
"""
Data associated with a :class:`.FieldTimeMonitor`: scalar components of E and H fields.
Notes
-----
The data is stored as a `DataArray <https://docs.xarray.dev/en/stable/generated/xarray.DataArray.html>`_
object using the `xarray <https://docs.xarray.dev/en/stable/index.html>`_ package.
Example
-------
>>> from tidy3d import ScalarFieldTimeDataArray
>>> x = [-1,1,3]
>>> y = [-2,0,2,4]
>>> z = [-3,-1,1,3,5]
>>> t = [0, 1e-12, 2e-12]
>>> coords = dict(x=x[:-1], y=y[:-1], z=z[:-1], t=t)
>>> grid = Grid(boundaries=Coords(x=x, y=y, z=z))
>>> scalar_field = ScalarFieldTimeDataArray(np.random.random((2,3,4,3)), coords=coords)
>>> monitor = FieldTimeMonitor(
... size=(2,4,6), interval=100, name='field', fields=['Ex', 'Hz'], colocate=True
... )
>>> data = FieldTimeData(monitor=monitor, Ex=scalar_field, Hz=scalar_field, grid_expanded=grid)
"""
monitor: FieldTimeMonitor = pd.Field(
..., title="Monitor", description="Time-domain field monitor associated with the data."
)
_contains_monitor_fields = enforce_monitor_fields_present()
@property
def poynting(self) -> ScalarFieldTimeDataArray:
"""Instantaneous Poynting vector for time-domain data associated to a 2D monitor, projected
to the direction normal to the monitor plane."""
# Tangential fields are ordered as E1, E2, H1, H2
tan_fields = self._colocated_tangential_fields
dim1, dim2 = self._tangential_dims
e_x_h = np.real(tan_fields["E" + dim1]) * np.real(tan_fields["H" + dim2])
e_x_h -= np.real(tan_fields["E" + dim2]) * np.real(tan_fields["H" + dim1])
return e_x_h
@cached_property
def flux(self) -> FluxTimeDataArray:
"""Flux for data corresponding to a 2D monitor."""
# Compute flux by integrating Poynting vector in-plane
d_area = self._diff_area
return FluxTimeDataArray((self.poynting * d_area).sum(dim=d_area.dims))
def dot(self, field_data: ElectromagneticFieldData, conjugate: bool = True) -> xr.DataArray:
"""Inner product is not defined for time-domain data."""
raise DataError("Inner product is not defined for time-domain data.")
@property
def time_reversed_copy(self) -> FieldTimeData:
"""Make a copy of the data with time-reversed fields. The sign of the magnetic fields is
flipped, and the data is reversed along the ``t`` dimension, such that for a given field,
``field[t_beg + t] -> field[t_end - t]``, where ``t_beg`` and ``t_end`` are the first and
last coordinates along the ``t`` dimension.
"""
new_data = {}
for comp, field in self.field_components.items():
if comp[0] == "H":
new_data[comp] = -field
else:
new_data[comp] = field
# Reverse time coordinates
new_data[comp] = new_data[comp].assign_coords({"t": field.t[::-1]}).sortby("t")
return self.copy(update=new_data)
class ModeSolverData(ModeSolverDataset, ElectromagneticFieldData):
"""
Data associated with a :class:`.ModeSolverMonitor`: scalar components of E and H fields.
Notes
-----
The data is stored as a `DataArray <https://docs.xarray.dev/en/stable/generated/xarray.DataArray.html>`_
object using the `xarray <https://docs.xarray.dev/en/stable/index.html>`_ package.
Example
-------
>>> from tidy3d import ModeSpec
>>> from tidy3d import ScalarModeFieldDataArray, ModeIndexDataArray
>>> x = [-1,1,3]
>>> y = [-2,0]
>>> z = [-3,-1,1,3,5]
>>> f = [2e14, 3e14]
>>> mode_index = np.arange(5)
>>> grid = Grid(boundaries=Coords(x=x, y=y, z=z))
>>> field_coords = dict(x=x[:-1], y=y[:-1], z=z[:-1], f=f, mode_index=mode_index)
>>> field = ScalarModeFieldDataArray((1+1j)*np.random.random((2,1,4,2,5)), coords=field_coords)
>>> index_coords = dict(f=f, mode_index=mode_index)
>>> index_data = ModeIndexDataArray((1+1j) * np.random.random((2,5)), coords=index_coords)
>>> monitor = ModeSolverMonitor(
... size=(2,0,6),
... freqs=[2e14, 3e14],
... mode_spec=ModeSpec(num_modes=5),
... name='mode_solver',
... )
>>> data = ModeSolverData(
... monitor=monitor,
... Ex=field,
... Ey=field,
... Ez=field,
... Hx=field,
... Hy=field,
... Hz=field,
... n_complex=index_data,
... grid_expanded=grid
... )
"""
monitor: ModeSolverMonitor = pd.Field(
..., title="Monitor", description="Mode solver monitor associated with the data."
)
eps_spec: List[EpsSpecType] = pd.Field(
None,
title="Permettivity Specification",
description="Characterization of the permittivity profile on the plane where modes are "
"computed. Possible values are 'diagonal', 'tensorial_real', 'tensorial_complex'.",
)
@pd.validator("eps_spec", always=True)
@skip_if_fields_missing(["monitor"])
def eps_spec_match_mode_spec(cls, val, values):
"""Raise validation error if frequencies in eps_spec does not match frequency list"""
if val:
mode_data_freqs = values["monitor"].freqs
if len(val) != len(mode_data_freqs):
raise ValidationError(
"eps_spec must be provided at the same frequencies as mode solver data."
)
return val
def overlap_sort(
self,
track_freq: TrackFreq,
overlap_thresh: float = 0.9,
) -> ModeSolverData:
"""Starting from the base frequency defined by parameter ``track_freq``, sort modes at each
frequency according to their overlap values with the modes at the previous frequency.
That is, it attempts to rearrange modes in such a way that a given ``mode_index``
corresponds to physically the same mode at all frequencies. Modes with overlap values over
``overlap_tresh`` are considered matching and not rearranged.
Parameters
----------
track_freq : Literal["central", "lowest", "highest"]
Parameter that specifies which frequency will serve as a starting point in
the reordering process.
overlap_thresh : float = 0.9
Modal overlap threshold above which two modes are considered to be the same and are not
rearranged. If after the sorting procedure the overlap value between two corresponding
modes is less than this threshold, a warning about a possible discontinuity is
displayed.
"""
num_freqs = len(self.monitor.freqs)
num_modes = self.monitor.mode_spec.num_modes
if track_freq == "lowest":
f0_ind = 0
elif track_freq == "highest":