Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- fenics-dolfinx=0.9.0
- matplotlib
- scipy
- pandas
- pint # need to install pint with conda to avoid conflicts (HTM)
- pip
- pip:
Expand Down
5 changes: 3 additions & 2 deletions mb_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import dolfinx.fem as fem
import dolfinx

from hisp.helpers import PulsedSource, Scenario
from hisp.helpers import PulsedSource
from hisp.scenario import Scenario
from hisp import CustomProblem

# dolfinx.log.set_log_level(dolfinx.log.LogLevel.INFO)
Expand Down Expand Up @@ -42,7 +43,7 @@ def gaussian_distribution(x, mod=ufl):

def make_mb_model(nb_mb, scenario_file):
############# Input Flux, Heat Data #############
my_scenario = Scenario(scenario_file)
my_scenario = Scenario.from_txt_file(scenario_file, old_format=True)

my_model = CustomProblem()

Expand Down
3 changes: 2 additions & 1 deletion src/hisp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .helpers import PulsedSource, Scenario
from .helpers import PulsedSource

from .h_transport_class import CustomProblem
from .scenario import Scenario, Pulse
118 changes: 0 additions & 118 deletions src/hisp/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import festim as F
from dolfinx.fem.function import Constant
import ufl
import numpy as np


class PulsedSource(F.ParticleSource):
Expand Down Expand Up @@ -34,120 +33,3 @@ def create_value_fenics(self, mesh, temperature, t: Constant):

def update(self, t: float):
self.flux_fenics.value = self.flux(t)


class Scenario:
def __init__(self, filename: str):
self.filename = filename
data = np.genfromtxt(filename, dtype=str, comments="#")
if isinstance(data[0], str):
self.data = [data]
else:
self.data = data

def get_row(self, t: float):
"""Returns the row of the scenario file that corresponds to the time t.

Args:
t (float): the time in seconds

Returns:
int: the row index of the scenario file corresponding to the time t
"""
current_time = 0
for i, row in enumerate(self.data):
nb_pulses = int(row[1])
phase_duration = nb_pulses * self.get_pulse_duration(i)
if t <= current_time + phase_duration:
return i
else:
current_time += phase_duration

raise ValueError(
f"Time t {t} is out of bounds of the scenario file. Maximum time is {self.get_maximum_time()}"
)

def get_pulse_type(self, t: float) -> str:
"""Returns the pulse type as a string at time t.

Args:
t (float): time in seconds

Returns:
str: pulse type (eg. FP, ICWC, RISP, GDC, BAKE)
"""
row_idx = self.get_row(t)
return self.data[row_idx][0]

def get_pulse_duration(self, row: int) -> float:
"""Returns the total duration of a pulse in seconds for a given row in the file.

Args:
row (int): the row index in the scenario file

Returns:
float: the total duration of the pulse in seconds
"""
row_data = self.data[row]
pulse_type = row_data[0]
if pulse_type == "RISP": # hard coded because it's zero in the files
ramp_up = 10
steady_state = 250
ramp_down = 10
waiting = 1530
total_duration = ramp_up + steady_state + ramp_down + waiting
return total_duration

ramp_up = float(row_data[2])
steady_state = float(row_data[4])
ramp_down = float(row_data[3])
waiting = float(row_data[5])

total_duration = ramp_up + steady_state + ramp_down + waiting
return total_duration

def get_pulse_duration_no_waiting(self, row: int) -> float:
"""Returns the total duration (without the waiting time) of a pulse in seconds for a given row in the file.

Args:
row (int): the row index in the scenario file

Returns:
float: the total duration of the pulse in seconds
"""
row_data = self.data[row]
pulse_type = row_data[0]
if pulse_type == "RISP": # hard coded because it's zero in the files
waiting_time = 1530
else:
waiting_time = float(row_data[5])

duration = self.get_pulse_duration(row) - waiting_time
return duration

def get_time_till_row(self, row:int) -> float:
"""Returns the time that has elapsed in scenario up until start of current row.

Args:
row (int): the row index in the scenario file

Returns:
float: the time that has elapsed in scenario until and not including input row.
"""
time_elapsed = 0
for prev_row_id in range(0,row):
nb_pulses = int(self.data[prev_row_id][1])
time_elapsed += nb_pulses * self.get_pulse_duration(prev_row_id)
return time_elapsed

def get_maximum_time(self) -> float:
"""Returns the maximum time in seconds for the scenario file.

Returns:
float: the maximum time in seconds
"""
max_time = 0
for i, row in enumerate(self.data):
nb_pulses = int(row[1])
max_time += nb_pulses * self.get_pulse_duration(i)
return max_time
229 changes: 229 additions & 0 deletions src/hisp/scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import pandas as pd
from typing import List
import warnings


class Pulse:
pulse_type: str
nb_pulses: int
ramp_up: float
steady_state: float
ramp_down: float
waiting: float

def __init__(
self,
pulse_type: str,
nb_pulses: int,
ramp_up: float,
steady_state: float,
ramp_down: float,
waiting: float,
):
self.pulse_type = pulse_type
self.nb_pulses = nb_pulses
self.ramp_up = ramp_up
self.steady_state = steady_state
self.ramp_down = ramp_down
self.waiting = waiting

@property
def total_duration(self) -> float:
all_zeros = (
self.ramp_up == 0
and self.steady_state == 0
and self.ramp_down == 0
and self.waiting == 0
)
if self.pulse_type == "RISP" and all_zeros:
msg = "RISP pulse has all zeros for ramp_up, steady_state, ramp_down, waiting. "
msg += "Setting hardcoded values. Please check the values in the scenario file."
warnings.warn(msg, UserWarning)

self.ramp_up = 10
self.steady_state = 250
self.ramp_down = 10
self.waiting = 1530

return self.ramp_up + self.steady_state + self.ramp_down + self.waiting

@property
def duration_no_waiting(self) -> float:
return self.total_duration - self.waiting


class Scenario:
def __init__(self, pulses: List[Pulse] = None):
"""Initializes a Scenario object containing several pulses.

Args:
pulses: The list of pulses in the scenario. Each pulse is a Pulse object.
"""
self._pulses = pulses if pulses is not None else []

@property
def pulses(self) -> List[Pulse]:
return self._pulses

def to_txt_file(self, filename: str):
df = pd.DataFrame(
[
{
"pulse_type": pulse.pulse_type,
"nb_pulses": pulse.nb_pulses,
"ramp_up": pulse.ramp_up,
"steady_state": pulse.steady_state,
"ramp_down": pulse.ramp_down,
"waiting": pulse.waiting,
}
for pulse in self.pulses
]
)
df.to_csv(filename, index=False)

@staticmethod
def from_txt_file(filename: str, old_format=False) -> "Scenario":
if old_format:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hopefully this will become obsolete when we expand to new users

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

pulses = []
with open(filename, "r") as f:
for line in f:
# skip first line
if line.startswith("#"):
continue

# assume this is the format
pulse_type, nb_pulses, ramp_up, steady_state, ramp_down, waiting = (
line.split()
)
pulses.append(
Pulse(
pulse_type=pulse_type,
nb_pulses=int(nb_pulses),
ramp_up=float(ramp_up),
steady_state=float(steady_state),
ramp_down=float(ramp_down),
waiting=float(waiting),
)
)
return Scenario(pulses)
df = pd.read_csv(filename)
pulses = [
Pulse(
pulse_type=row["pulse_type"],
nb_pulses=int(row["nb_pulses"]),
ramp_up=float(row["ramp_up"]),
steady_state=float(row["steady_state"]),
ramp_down=float(row["ramp_down"]),
waiting=float(row["waiting"]),
)
for _, row in df.iterrows()
]
return Scenario(pulses)

def get_row(self, t: float) -> int:
"""Returns the row of the scenario file that corresponds to the time t.

Args:
t: the time in seconds

Returns:
int: the row index of the scenario file corresponding to the time t
"""
current_time = 0
for i, pulse in enumerate(self.pulses):
phase_duration = pulse.nb_pulses * pulse.total_duration
if t <= current_time + phase_duration:
return i
else:
current_time += phase_duration

raise ValueError(
f"Time t {t} is out of bounds of the scenario file. Maximum time is {self.get_maximum_time()}"
)

def get_pulse(self, t: float) -> Pulse:
"""Returns the pulse at time t.

Args:
t: the time in seconds

Returns:
Pulse: the pulse at time t
"""
row_idx = self.get_row(t)
return self.pulses[row_idx]

def get_pulse_type(self, t: float) -> str:
"""Returns the pulse type as a string at time t.

Args:
t: time in seconds

Returns:
pulse type (eg. FP, ICWC, RISP, GDC, BAKE)
"""
return self.get_pulse(t).pulse_type

def get_maximum_time(self) -> float:
"""Returns the maximum time of the scenario in seconds.

Returns:
the maximum time of the scenario in seconds
"""
return sum([pulse.nb_pulses * pulse.total_duration for pulse in self.pulses])

def get_time_start_current_pulse(self, t: float):
"""Returns the time (s) at which the current pulse started.

Args:
t: the time in seconds

Returns:
the time at which the current pulse started
"""
current_pulse = self.get_pulse(t)
pulse_index = self.pulses.index(current_pulse)
return sum(
[
pulse.nb_pulses * pulse.total_duration
for pulse in self.pulses[:pulse_index]
]
)

# TODO this is the same as get_time_start_current_pulse, remove
def get_time_till_row(self, row: int) -> float:
"""Returns the time (s) until the row in the scenario file.

Args:
row: the row index in the scenario file

Returns:
the time until the row in the scenario file
"""
return sum(
[pulse.nb_pulses * pulse.total_duration for pulse in self.pulses[:row]]
)

# TODO remove
def get_pulse_duration_no_waiting(self, row: int) -> float:
"""Returns the total duration (without the waiting time) of a pulse in seconds for a given row in the file.

Args:
row: the row index in the scenario file

Returns:
the total duration of the pulse in seconds
"""
return self.pulses[row].duration_no_waiting

# TODO remove
def get_pulse_duration(self, row: int) -> float:
"""Returns the total duration of a pulse in seconds for a given row in the file.

Args:
row: the row index in the scenario file

Returns:
the total duration of the pulse in seconds
"""
return self.pulses[row].total_duration
2 changes: 0 additions & 2 deletions test/one_line_scenario.txt

This file was deleted.

Loading