Skip to content

Commit

Permalink
Merge pull request #19 from demml/mraess/hardwareMetricsDataDump
Browse files Browse the repository at this point in the history
initial commit hw metrics queue
  • Loading branch information
mraess committed May 16, 2024
2 parents 34e94dc + 460595d commit cded397
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
4 changes: 2 additions & 2 deletions opsml/projects/_hw_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,11 @@ def __init__(self, interval: float = 15):
self.memory_logger = MemoryMetricsLogger(interval, False)
self.network_logger = NetworkMetricsLogger(interval)

def get_metrics(self) -> None:
def get_metrics(self) -> HardwareMetrics:
metrics = HardwareMetrics(
cpu=self.cpu_logger.get_metrics(),
memory=self.memory_logger.get_metrics(),
network=self.network_logger.get_metrics(),
)

logger.info("Hardware metrics: {}", metrics.model_dump())
return metrics
58 changes: 52 additions & 6 deletions opsml/projects/_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import concurrent
import time
import uuid
from queue import Empty, Queue
from typing import Dict, Optional, Union, cast

from opsml.cards import RunCard
from opsml.helpers.logging import ArtifactLogger
from opsml.projects._hw_metrics import HardwareMetricsLogger
from opsml.projects._hw_metrics import HardwareMetrics, HardwareMetricsLogger
from opsml.projects.active_run import ActiveRun, RunInfo
from opsml.projects.types import _DEFAULT_INTERVAL, ProjectInfo, Tags
from opsml.registry import CardRegistries
Expand All @@ -19,18 +20,45 @@
logger = ArtifactLogger.get_logger()


def run_hardware_logger(interval: int, run: "ActiveRun") -> bool:
def put_hw_metrics(interval: int, run: "ActiveRun", queue: Queue[HardwareMetrics]) -> bool:
hw_logger = HardwareMetricsLogger(interval=interval)

while run.active:
hw_logger.get_metrics()
while run.active: # producer function for hw output
metrics = hw_logger.get_metrics()

# add to the queue
queue.put(metrics, block=False)
logger.info("Metrics in queue: {}", metrics)
time.sleep(interval)

logger.info("Hardware logger stopped")

return False


def get_hw_metrics(interval: int, run: "ActiveRun", queue: Queue[HardwareMetrics]) -> None:
"""Pull hardware metrics from the queue and log them.
Args:
interval:
Interval to log hardware metrics
run:
ActiveRun
queue:
Queue[HardwareMetrics]
"""
while run.active: # consumer function for hw output
try:
metrics_unit = queue.get(timeout=1)
# report
logger.info("Got metrics: {}", metrics_unit.model_dump())

except Empty:
pass

time.sleep(interval + 0.5)


class ActiveRunException(Exception): ...


Expand Down Expand Up @@ -61,6 +89,16 @@ def __init__(self, project_info: ProjectInfo, registries: CardRegistries):
self.run_id = None
self._run_exists = False

self._thread_executor = None

@property
def thread_executor(self) -> Optional[concurrent.futures.ThreadPoolExecutor]:
return self._thread_executor

@thread_executor.setter
def thread_executor(self, value: concurrent.futures.ThreadPoolExecutor) -> None:
self._thread_executor = value

@property
def project_id(self) -> int:
assert self._project_info.project_id is not None, "project_id should not be None"
Expand Down Expand Up @@ -133,8 +171,11 @@ def _log_hardware_metrics(self, interval: int) -> None:
assert self.active_run is not None, "active_run should not be None"

# run hardware logger in background thread
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
executor.submit(run_hardware_logger, interval, self.active_run)
queue: Queue[HardwareMetrics] = Queue()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
executor.submit(put_hw_metrics, interval, self.active_run, queue)
executor.submit(get_hw_metrics, interval, self.active_run, queue)
self.thread_executor = executor

def start_run(
self,
Expand Down Expand Up @@ -193,3 +234,8 @@ def end_run(self) -> None:
self.active_run = None
self.run_id = None
self._run_exists = False

# check if thread executor is still running
if self.thread_executor is not None:
self.thread_executor.shutdown(wait=True, cancel_futures=True)
self._thread_executor = None
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2335,7 +2335,12 @@ def deeplabv3_resnet50():
from PIL import Image
from torchvision import transforms

model = torch.hub.load("pytorch/vision:v0.8.0", "deeplabv3_resnet50", pretrained=True)
model = torch.hub.load(
"pytorch/vision:v0.8.0",
"deeplabv3_resnet50",
pretrained=True,
skip_validation=True,
)
model.eval()

input_image = Image.open("tests/assets/deeplab.jpg")
Expand Down
1 change: 0 additions & 1 deletion tests/test_projects/test_opsml_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,5 @@ def test_opsml_project_hardware(db_registries: CardRegistries) -> None:

with project.run(log_hardware=True, hardware_interval=10) as run:
# Create metrics / params / cards
run = cast(ActiveRun, run)
run.log_metric(key="m1", value=1.1)
run.log_parameter(key="m1", value="apple")

0 comments on commit cded397

Please sign in to comment.