-
Notifications
You must be signed in to change notification settings - Fork 11
/
environment.py
964 lines (868 loc) · 37.1 KB
/
environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
"""Provides the O&M Enviroment class; a subclass of simpy.Environment."""
from __future__ import annotations
import io
import csv
import math
import logging
import datetime as dt
from typing import TYPE_CHECKING
from pathlib import Path
from datetime import datetime, timedelta
import numpy as np
import simpy
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.csv # pylint: disable=W0611
from simpy.events import Event
import wombat # pylint: disable=W0611
from wombat.utilities import hours_until_future_hour
from wombat.core.data_classes import parse_date
if TYPE_CHECKING:
from wombat.windfarm import Windfarm
EVENTS_COLUMNS = [
"datetime",
"env_datetime",
"env_time",
"agent",
"action",
"reason",
"additional",
"system_id",
"system_name",
"part_id",
"part_name",
"system_operating_level",
"part_operating_level",
"duration",
"distance_km",
"request_id",
"location",
"materials_cost",
"hourly_labor_cost",
"salary_labor_cost",
"total_labor_cost",
"equipment_cost",
"total_cost",
]
class WombatEnvironment(simpy.Environment):
"""The primary mechanism for powering an O&M simulation. This object has insight
into all other simulation objects, and controls the timing, date/time stamps, and
weather conditions.
Parameters
----------
data_dir : pathlib.Path | str
Directory where the inputs are stored and where to save outputs.
weather_file : str
Name of the weather file. Should be contained within ``data_dir``/weather/, with
columns "datetime", "windspeed", and, optionally, "waveheight". The datetime
column should adhere to the following format: "MM/DD/YY HH:MM", in 24-hour time.
workday_start : int
Starting time for the repair crew, in 24 hour local time. This can be overridden
by an ``ServiceEquipmentData`` object that operates outside of the "typical"
working hours.
workday_end : int
Ending time for the repair crew, in 24 hour local time. This can be overridden
by an ``ServiceEquipmentData`` object that operates outside of the "typical"
working hours.
simulation_name : str | None, optional
Name of the simulation; will be used for naming the log file, by default None.
If ``None``, then the current time will be used. Will always save to
``data_dir``/outputs/logs/``simulation_name``.log.
.. note: spaces (" ") will be replaced with underscores ("_"), for example:
"my example analysis" becomes "my_example_analysis".
start_year : int | None, optional
Custom starting year for the weather profile, by default None. If ``None`` or
less than the first year of the weather profile, this will be ignored.
end_year : int | None, optional
Custom ending year for the weather profile, by default None. If ``None`` or
greater than the last year of the weather profile, this will be ignored.
port_distance : int | float
The simulation-wide daily travel distance for servicing equipment. This
should be used as a base setting when multiple or all servicing equipment
will be operating out of the same base location, but can be individually
modified.
non_operational_start : str | datetime.datetime | None
The starting month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of prohibited operations. When defined at the environment level,
an undefined or later starting date will be overridden for all servicing
equipment and any modeled port, by default None.
non_operational_end : str | datetime.datetime | None
The ending month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of prohibited operations. When defined at the environment level,
an undefined or earlier ending date will be overridden for all servicing
equipment and any modeled port, by default None.
reduced_speed_start : str | datetime.datetime | None
The starting month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of reduced speed operations. When defined at the environment level,
an undefined or later starting date will be overridden for all servicing
equipment and any modeled port, by default None.
reduced_speed_end : str | datetime.datetime | None
The ending month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of reduced speed operations. When defined at the environment level,
an undefined or earlier ending date will be overridden for all servicing
equipment and any modeled port, by default None.
reduced_speed : float
The maximum operating speed during the annualized reduced speed operations.
When defined at the environment level, an undefined or faster value will be
overridden for all servicing equipment and any modeled port, by default 0.0.
random_seed : int | None
The random seed to be passed to a universal NumPy ``default_rng`` object to
generate Weibull random generators, by default None.
random_generator: np.random._generator.Generator | None
An optional numpy random generator that can be provided to seed a simulation
with the same generator each time, in place of the random seed. If a
:py:attr:`random_seed` is also provided, this will override the random seed,
by default None.
Raises
------
FileNotFoundError
Raised if ``data_dir`` cannot be found.
"""
def __init__(
self,
data_dir: Path | str,
weather_file: str,
workday_start: int,
workday_end: int,
simulation_name: str | None = None,
start_year: int | None = None,
end_year: int | None = None,
port_distance: int | float | None = None,
non_operational_start: str | dt.datetime | None = None,
non_operational_end: str | dt.datetime | None = None,
reduced_speed_start: str | dt.datetime | None = None,
reduced_speed_end: str | dt.datetime | None = None,
reduced_speed: float = 0.0,
random_seed: int | None = None,
random_generator: np.random._generator.Generator | None = None,
) -> None:
"""Initialization."""
super().__init__()
self.data_dir = Path(data_dir).resolve()
if not self.data_dir.is_dir():
raise FileNotFoundError(f"{self.data_dir} does not exist")
self.workday_start = int(workday_start)
self.workday_end = int(workday_end)
if not 0 <= self.workday_start <= 24:
raise ValueError("workday_start must be a valid 24hr time before midnight.")
if not 0 <= self.workday_end <= 24:
raise ValueError("workday_end must be a valid 24hr time.")
if self.workday_end <= self.workday_start:
raise ValueError(
"Work shifts must end after they start ({self.workday_start}hrs)."
)
self.port_distance = port_distance
self.weather = self._weather_setup(weather_file, start_year, end_year)
self.weather_dates = pd.DatetimeIndex(
self.weather.get_column("datetime").to_pandas()
).to_pydatetime()
self.max_run_time = self.weather.shape[0]
self.shift_length = self.workday_end - self.workday_start
# Set the environmental consideration parameters
self.non_operational_start = parse_date(non_operational_start)
self.non_operational_end = parse_date(non_operational_end)
self.reduced_speed_start = parse_date(reduced_speed_start)
self.reduced_speed_end = parse_date(reduced_speed_end)
self.reduced_speed = reduced_speed
if random_generator is not None:
self.random_generator = random_generator
self.random_seed = None
elif random_seed is not None:
self.random_seed = random_seed
self.random_generator = np.random.default_rng(seed=random_seed)
else:
self.random_seed = None
self.random_generator = np.random.default_rng()
self.simulation_name = simulation_name
self._logging_setup()
self.process(self._log_actions())
def _register_windfarm(self, windfarm: Windfarm) -> None:
"""Adds the simulation windfarm to the class attributes."""
self.windfarm = windfarm
def run(self, until: int | float | Event | None = None):
"""Extends the ``simpy.Environment.run`` method to change the default behavior
if no argument is passed to ``until``, which will now run a simulation until the
end of the weather profile is reached.
Parameters
----------
until : Optional[Union[int, float, Event]], optional
When to stop the simulation, by default None. See documentation on
``simpy.Environment.run`` for more details.
"""
# If running a paused simulation, then reopen the file and append, but only if
# the simulation time is lower than the upper bound
time_check = self.now < self.max_run_time
if self._events_csv.closed and time_check: # type: ignore
self._events_csv = open(self.events_log_fname, "a")
self._events_writer = csv.DictWriter(
self._events_csv, delimiter="|", fieldnames=EVENTS_COLUMNS
)
if hasattr(self, "windfarm") and self._operations_csv.closed and time_check:
self._operations_csv: io.TextIOWrapper = open(
self.operations_log_fname, "a"
)
self.windfarm._setup_logger(initial=False)
if until is None:
until = self.max_run_time
elif until > self.max_run_time:
until = self.max_run_time
try:
super().run(until=until)
except BaseException as e:
# Flush the logs to so the simulation up to the point of failure is logged
self._events_writer.writerows(self._events_buffer)
self._events_buffer.clear()
self._events_csv.close()
self._operations_writer.writerows(self._operations_buffer)
self._operations_buffer.clear()
self._operations_csv.close()
print(
f"Simulation failed at hour {self.now:,.6f},"
f" simulation time: {self.simulation_time}"
)
raise e
# Ensure all logged events make it to their target file
self._events_writer.writerows(self._events_buffer)
self._events_buffer.clear()
self._events_csv.close()
self._operations_writer.writerows(self._operations_buffer)
self._operations_buffer.clear()
self._operations_csv.close()
def _logging_setup(self) -> None:
"""Completes the setup for logging data."""
if self.simulation_name is None:
self.simulation_name = simulation = "wombat"
else:
simulation = self.simulation_name.replace(" ", "_")
dt_stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
events_log_fname = f"{dt_stamp}_{simulation}_events.csv"
operations_log_fname = f"{dt_stamp}_{simulation}_operations.csv"
power_potential_fname = f"{dt_stamp}_{simulation}_power_potential.csv"
power_production_fname = f"{dt_stamp}_{simulation}_power_production.csv"
metrics_input_fname = f"{dt_stamp}_{simulation}_metrics_inputs.yaml"
log_path = self.data_dir / "results"
if not log_path.exists():
log_path.mkdir()
self.events_log_fname = log_path / events_log_fname
self.operations_log_fname = log_path / operations_log_fname
self.power_potential_fname = log_path / power_potential_fname
self.power_production_fname = log_path / power_production_fname
self.metrics_input_fname = log_path / metrics_input_fname
_dir = self.data_dir / "results"
if not _dir.is_dir():
_dir.mkdir()
self._events_csv = open(self.events_log_fname, "w")
self._operations_csv = open(self.operations_log_fname, "w")
self._events_writer = csv.DictWriter(
self._events_csv, delimiter="|", fieldnames=EVENTS_COLUMNS
)
self._events_writer.writeheader()
self._events_buffer: list[dict] = []
self._operations_buffer: list[dict] = []
def get_random_seconds(self, low: int = 0, high: int = 10) -> float:
"""Generate a random number of seconds to wait, between :py:attr:`low` and
:py:attr:`high`.
Parameters
----------
low : int, optional
Minimum number of seconds to wait, by default 0.
high : int, optional
Maximum number of seconds to wait, by default 10.
Returns
-------
float
Number of seconds to wait.
"""
seconds_to_wait, *_ = (
self.random_generator.integers(low=low, high=high, size=1) / 3600.0
)
return seconds_to_wait
@property
def simulation_time(self) -> datetime:
"""Current time within the simulation ("datetime" column within weather)."""
now = self.now
minutes = now % 1 * 60
if now == self.max_run_time:
_dt = self.weather_dates[math.floor(now - 1)]
_dt + timedelta(hours=1)
else:
_dt = self.weather_dates[math.floor(now)]
minutes, seconds = math.floor(minutes), math.ceil(minutes % 1 * 60)
return _dt + timedelta(minutes=minutes, seconds=seconds)
def is_workshift(self, workday_start: int = -1, workday_end: int = -1) -> bool:
"""Check if the current simulation time is within the windfarm's working hours.
Parameters
----------
workday_start : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_end`` must also be
provided in order to be used.
workday_end : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_start`` must also be
provided in order to be used.
Returns
-------
bool
True if it's valid working hours, False otherwise.
"""
if -1 in (workday_start, workday_end):
# Return True if the shift is around the clock
if self.workday_start == 0 and self.workday_end == 24:
return True
return self.workday_start <= self.simulation_time.hour < self.workday_end
# Return true if the shift is around the clock
if workday_start == 0 and workday_end == 24:
return True
return workday_start <= self.simulation_time.hour < workday_end
def hour_in_shift(
self, hour: int, workday_start: int = -1, workday_end: int = -1
) -> bool:
"""Checks whether an ``hour`` is within the working hours.
Parameters
----------
hour : int
Hour of the day.
workday_start : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_end`` must also be
provided in order to be used.
workday_end : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_start`` must also be
provided in order to be used.
Returns
-------
bool
True if ``hour`` is during working hours, False otherwise.
"""
if -1 in (workday_start, workday_end):
return self.workday_start <= hour < self.workday_end
return workday_start <= hour < workday_end
def hours_to_next_shift(self, workday_start: int = -1) -> float:
"""Time until the next work shift starts, in hours.
Parameters
----------
workday_start : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object.
Returns
-------
float
Hours until the next shift starts.
"""
current = self.simulation_time
start = self.workday_start if workday_start == -1 else workday_start
if current.hour < start:
# difference between now and workday start
return hours_until_future_hour(current, start)
elif current.hour == start == 0:
# Need to manually move forward one whole day to avoid an infinite loop
return hours_until_future_hour(current, 24)
else:
# time to midnight + hour of workday start
return start + hours_until_future_hour(current, 0)
@property
def current_time(self) -> str:
"""Timestamp for the current time as a datetime.datetime.strftime."""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
def date_ix(self, date: dt.datetime | dt.date) -> int:
"""The first index of a future date. This corresponds to the number of hours
until this dates from the very beginning of the simulation.
Parameters
----------
date : datetime.datetime | datetime.date
A date within the environment's simulation range.
Returns
-------
int
Index of the weather profile corresponds to the first hour of ``date``.
"""
if isinstance(date, dt.datetime):
date = date.date()
ix, *_ = self.weather.filter(pl.col("datetime") == date)
return ix.item()
def _weather_setup(
self,
weather_file: str,
start_year: int | None = None,
end_year: int | None = None,
) -> pl.DataFrame:
"""Reads the weather data from the "<inputs>/weather" directory, and creates the
``start_date`` and ``end_date`` time stamps for the simulation.
This also fills any missing data with zeros and interpolates the values of any
missing datetime entries.
Parameters
----------
weather_file : str
Name of the weather file to be used by the environment. Should be contained
within ``data_dir/weather``.
start_year : Optional[int], optional
Custom starting year for the weather profile, by default None. If ``None``
or less than the first year of the weather profile, this will be ignored.
end_year : Optional[int], optional
Custom ending year for the weather profile, by default None. If ``None`` or
greater than the last year of the weather profile, this will be ignored.
Returns
-------
pd.DataFrame
The wind (and wave) timeseries.
"""
REQUIRED = ["windspeed", "waveheight"]
# PyArrow datetime conversion setup
convert_options = pa.csv.ConvertOptions(
timestamp_parsers=[
"%m/%d/%y %H:%M",
"%m/%d/%y %I:%M",
"%m/%d/%y %H:%M:%S",
"%m/%d/%y %I:%M:%S",
"%m/%d/%Y %H:%M",
"%m/%d/%Y %I:%M",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %I:%M:%S",
"%m-%d-%y %H:%M",
"%m-%d-%y %I:%M",
"%m-%d-%y %H:%M:%S",
"%m-%d-%y %I:%M:%S",
"%m-%d-%Y %H:%M",
"%m-%d-%Y %I:%M",
"%m-%d-%Y %H:%M:%S",
"%m-%d-%Y %I:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %I:%M",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %I:%M:%S",
]
)
weather = (
pl.from_pandas(
pa.csv.read_csv(
self.data_dir / "weather" / weather_file,
convert_options=convert_options,
)
.to_pandas()
.fillna(0.0)
.set_index("datetime")
.sort_index()
.resample("H")
.interpolate(limit_direction="both") # , limit=5)
.reset_index(drop=False)
)
.with_row_count()
.with_columns(
[
pl.col("datetime").cast(pl.Datetime).dt.cast_time_unit("ns"),
(pl.col("datetime").dt.hour()).alias("hour"),
]
)
)
missing = set(REQUIRED).difference(weather.columns)
if missing:
raise KeyError(
"The weather data are missing the following required columns:"
f" {missing}"
)
# Create the start and end points
self.start_datetime = weather.get_column("datetime").dt.min()
self.end_datetime = weather.get_column("datetime").dt.max()
self.start_year = self.start_datetime.year
self.end_year = self.end_datetime.year
if start_year is None and end_year is None:
return weather
if start_year is None:
pass
elif start_year > self.end_year:
raise ValueError(
f"'start_year' ({start_year}) occurs after the last available year"
f" in the weather data (range: {self.end_year})"
)
else:
# Filter for the provided, validated starting year and update the attribute
weather = weather.filter(pl.col("datetime").dt.year() >= start_year)
self.start_datetime = weather.get_column("datetime").dt.min()
start_year = self.start_year = self.start_datetime.year
if end_year is None:
pass
elif start_year is None and end_year < self.start_year:
raise ValueError(
f"The provided 'end_year' ({end_year}) is before the start_year"
f" ({self.start_year})"
)
elif start_year is not None:
if end_year < start_year:
raise ValueError(
f"The provided 'end_year' ({end_year}) is before the start_year"
f" ({start_year})"
)
else:
# Filter for the provided, validated ending year and update
weather = weather.filter(pl.col("datetime").dt.year() <= end_year)
self.end_datetime = weather.get_column("datetime").dt.max()
self.end_year = self.end_datetime.year
else:
# Filter for the provided, validated ending year and update the attribute
weather = weather.filter(pl.col("datetime").dt.year() <= end_year)
self.end_datetime = weather.get_column("datetime").dt.max()
self.end_year = self.end_datetime.year
column_order = weather.columns
column_order.insert(0, column_order.pop(column_order.index("hour")))
column_order.insert(0, column_order.pop(column_order.index("waveheight")))
column_order.insert(0, column_order.pop(column_order.index("windspeed")))
column_order.insert(0, column_order.pop(column_order.index("datetime")))
column_order.insert(0, column_order.pop(column_order.index("row_nr")))
# Ensure the columns are ordered correctly and re-compute pandas-compatible ix
return weather.select(column_order).drop(columns="row_nr").with_row_count()
@property
def weather_now(self) -> pl.DataFrame:
"""The current weather.
Returns
-------
pl.DataFrame
A length 1 slice from the weather profile at the current ``int()`` rounded
hour, in simulation time.
"""
# Rounds down because we won't arrive at the next weather event until that hour
now = int(self.now)
return self.weather.slice(now, 1)
def weather_forecast(
self, hours: int | float
) -> tuple[pl.Series, pl.Series, pl.Series, pl.Series]:
"""Returns the datetime, wind, wave, and hour data for the next ``hours`` hours,
starting from the current hour's weather.
Parameters
----------
hours : Union[int, float]
Number of hours to look ahead, rounds up to the nearest hour.
Returns
-------
tuple[pl.Series, pl.Series, pl.Series, pl.Series]
Each of the relevant columns (datetime, wind, wave, hour) from the weather
profile.
"""
# If it's not on the hour, ensure we're looking ``hours`` hours into the future
start = math.floor(self.now)
_, ix, wind, wave, hour, *_ = self.weather.slice(
start, math.ceil(hours) + math.ceil(self.now % 1)
)
return ix, hour, wind, wave
def log_action(
self,
*,
agent: str,
action: str,
reason: str,
additional: str = "",
system_id: str = "",
system_name: str = "",
part_id: str = "",
part_name: str = "",
system_ol: float | int = 0,
part_ol: float | int = 0,
duration: float = 0,
distance_km: float = 0,
request_id: str = "na",
location: str = "na",
materials_cost: int | float = 0,
hourly_labor_cost: int | float = 0,
salary_labor_cost: int | float = 0,
equipment_cost: int | float = 0,
) -> None:
"""Formats the logging messages into the expected format for logging.
Parameters
----------
agent : str
Agent performing the action.
action : str
Action that was taken.
reason : str
Reason an action was taken.
additional : str
Any additional information that needs to be logged.
system_id : str
Turbine ID, ``System.id``, by default "".
system_name : str
Turbine name, ``System.name``, by default "".
part_id : str
Subassembly, component, or cable ID, ``_.id``, by default "".
part_name : str
Subassembly, component, or cable name, ``_.name``, by default "".
system_ol : float | int
Turbine operating level, ``System.operating_level``. Use an empty string
for n/a, by default 0.
part_ol : float | int
Subassembly, component, or cable operating level, ``_.operating_level``. Use
an empty string for n/a, by default 0.
request_id : str
The ``RepairManager`` assigned request_id found in
``RepairRequest.request_id``, by default "na".
location : str
The location of where the event ocurred: should be one of site, port,
enroute, or system, by default "na".
duration : float
Length of time the action lasted, by default 0.
distance : float
Distance traveled, in km, if applicable, by default 0.
materials_cost : Union[int, float], optional
Total cost of materials for action, in USD, by default 0.
hourly_labor_cost : Union[int, float], optional
Total cost of hourly labor for action, in USD, by default 0.
salary_labor_cost : Union[int, float], optional
Total cost of salaried labor for action, in USD, by default 0.
equipment_cost : Union[int, float], optional
Total cost of equipment for action, in USD, by default 0.
"""
valid_locations = ("site", "system", "port", "enroute", "na")
if location not in valid_locations:
raise ValueError(
f"Event logging `location` must be one of: {valid_locations}"
)
total_labor_cost = hourly_labor_cost + salary_labor_cost
total_cost = total_labor_cost + equipment_cost + materials_cost
now = self.simulation_time
row = {
"datetime": dt.datetime.now(),
"env_datetime": now,
"env_time": self.now,
"system_id": system_id,
"system_name": system_name,
"part_id": part_id,
"part_name": part_name,
"system_operating_level": system_ol,
"part_operating_level": part_ol,
"agent": agent,
"action": action,
"reason": reason,
"additional": additional,
"duration": duration,
"distance_km": distance_km,
"request_id": request_id,
"location": location,
"materials_cost": materials_cost,
"hourly_labor_cost": hourly_labor_cost,
"salary_labor_cost": salary_labor_cost,
"equipment_cost": equipment_cost,
"total_labor_cost": total_labor_cost,
"total_cost": total_cost,
}
# Don't log the initiation of a crew transfer that can forced at the end of an
# operation but happens to be after the end of the simulation
if now <= self.end_datetime:
self._events_buffer.append(row)
def _log_actions(self):
"""Writes the action log items every 8000 hours."""
HOURS = 8000
while True:
yield self.timeout(HOURS)
self._events_writer.writerows(self._events_buffer)
self._events_buffer.clear()
def load_events_log_dataframe(self) -> pd.DataFrame:
"""Imports the logging file created in ``run`` and returns it as a formatted
``pandas.DataFrame``.
Returns
-------
pd.DataFrame
The formatted logging data from a simulation.
"""
log_df = (
pd.read_csv(
self.events_log_fname,
delimiter="|",
engine="pyarrow",
dtype={
"agent": "string",
"action": "string",
"reason": "string",
"additional": "string",
"system_id": "string",
"system_name": "string",
"part_id": "string",
"part_name": "string",
"request_id": "string",
"location": "string",
},
)
.set_index("datetime")
.sort_index()
)
return log_df
def _calculate_windfarm_total(
self, op: pd.DataFrame, prod: pd.DataFrame | None = None
) -> pd.DataFrame:
"""Calculates the overall wind farm operational level, accounting for substation
downtime by multiplying the sum of all downstream turbine operational levels by
the substation's operational level.
Parameters
----------
op : pd.DataFrame
The turbine and substation operational level DataFrame.
Notes
-----
This is a crude cap on the operations, and so a smarter way of capping
the availability should be added in the future.
Returns
-------
pd.DataFrame
The aggregate wind farm operational level.
"""
t_id = self.windfarm.turbine_id
turbines = self.windfarm.turbine_weights[t_id].values * op[t_id]
total = np.sum(
[
op[[sub]]
* np.array(
[
[math.fsum(row)]
for _, row in turbines[val["turbines"]].iterrows()
]
).reshape(-1, 1)
for sub, val in self.windfarm.substation_turbine_map.items()
],
axis=0,
)
return total
def _calculate_adjusted_production(
self, op: pd.DataFrame, prod: pd.DataFrame
) -> pd.DataFrame:
"""Calculates the overall wind farm power production and adjusts individual
turbine production by accounting for substation downtime. This is done by
multiplying the all downstream turbine operational levels by the substation's
operational level.
Parameters
----------
op : pd.DataFrame
The operational level DataFrame with turbine, substation, and windfarm
columns.
prod : pd.DataFrame
The turbine energy production DataFrame.
Notes
-----
This is a crude cap on the operations, and so a smarter way of capping
the availability should be added in the future.
Returns
-------
pd.DataFrame
Either the aggregate wind farm operational level or the total wind farm
energy production if the :py:attr:`prod` is provided.
"""
# Adjust individual turbine production for substation downtime
prod = prod.copy()
for sub, val in self.windfarm.substation_turbine_map.items():
prod[val["turbines"]] *= op[[sub]].values
prod.windfarm = prod[self.windfarm.turbine_id].sum(axis=1)
return prod[["windfarm"]]
def load_operations_log_dataframe(self) -> pd.DataFrame:
"""Imports the logging file created in ``run`` and returns it as a formatted
``pandas.DataFrame``.
Returns
-------
pd.DataFrame
The formatted logging data from a simulation.
"""
log_df = (
pd.read_csv(
self.operations_log_fname,
delimiter="|",
engine="pyarrow",
)
.set_index("datetime")
.sort_values("datetime")
)
log_df["windfarm"] = self._calculate_windfarm_total(log_df)
return log_df
def power_production_potential_to_csv( # type: ignore
self,
windfarm: wombat.windfarm.Windfarm,
operations: pd.DataFrame | None = None,
return_df: bool = True,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Creates the power production ``DataFrame`` and optionally returns it.
Parameters
----------
windfarm : wombat.windfarm.Windfarm
The simulation's windfarm object.
operations : Optional[pd.DataFrame], optional
The operations log ``DataFrame`` if readily available, by default None. If
``None``, then it will be created through
``load_operations_log_dataframe()``.
return_df : bool, optional
Indicator to return the power production for further usage, by default True.
Returns
-------
Tuple[pd.DataFrame, pd.DataFrame]
The power potential and production timeseries data.
"""
write_options = pa.csv.WriteOptions(delimiter="|")
if operations is None:
operations = self.load_operations_log_dataframe().sort_values("env_time")
turbines = windfarm.turbine_id
windspeed = self.weather.to_pandas().set_index("datetime").windspeed
windspeed = windspeed.loc[operations.env_datetime].values
potential_df = pd.DataFrame(
[],
index=operations.env_datetime,
columns=["env_time", "env_datetime", "windspeed", "windfarm"]
+ turbines.tolist(),
)
potential_df[turbines] = np.vstack(
[windfarm.system(t_id).power(windspeed) for t_id in turbines]
).T
potential_df = potential_df.assign(
windspeed=windspeed,
windfarm=potential_df[turbines].sum(axis=1),
env_time=operations.env_time.values,
env_datetime=operations.env_datetime.values,
)
pa.csv.write_csv(
pa.Table.from_pandas(potential_df),
self.power_potential_fname,
write_options=write_options,
)
# TODO: The actual windfarm production needs to be clipped at each subgraph to
# the max of the substation's operating capacity and then summed.
production_df = potential_df.copy()
production_df[turbines] *= operations[turbines].values
production_df.windfarm = self._calculate_adjusted_production(
operations, production_df
)
pa.csv.write_csv(
pa.Table.from_pandas(production_df),
self.power_production_fname,
write_options=write_options,
)
if return_df:
return potential_df, production_df
def cleanup_log_files(self) -> None:
"""Convenience method to clear the output log files in case a large
batch of simulations is being run and there are space limitations.
... warning:: This shuts down the loggers, so no more logging will be able
to be performed.
"""
# NOTE: Everything is wrapped in a try/except clause to protect against failure
# when inevitably a file has already been deleted on accident, or if in the
# dataframe generation step, the original logs were deleted
logging.shutdown()
if not self._events_csv.closed:
self._events_csv.close()
if not self._operations_csv.closed:
self._operations_csv.close()
try:
self.events_log_fname.unlink()
except FileNotFoundError:
pass
try:
self.operations_log_fname.unlink()
except FileNotFoundError:
pass
try:
self.power_potential_fname.unlink()
except FileNotFoundError:
pass
try:
self.power_production_fname.unlink()
except FileNotFoundError:
pass
try:
self.metrics_input_fname.unlink()
except FileNotFoundError:
pass