# core

> Fill in a module description here

In [None]:
# | default_exp operation_profile_structure
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
# | hide
from nbdev.showdoc import *
from typing import Union

In [None]:
# | export
import ship_performance_service_interface_lib.sps_interface_pb2 as proto
import numpy as np
from typing import Union, Optional, Dict, Any

Numeric = Union[float, np.ndarray]


class Location:
    longitude: Numeric = None
    latitude: Numeric = None
    """Wrapper class for the location"""

    def __init__(
        self,
        longitude: Optional[Numeric] = None,
        latitude: Optional[Numeric] = None,
        proto_object: Optional[proto.Location] = None,
    ):
        if proto_object is not None:
            self._proto = proto_object
            self.longitude = np.array([proto_object.longitude])
            self.latitude = np.array([proto_object.latitude])
        else:
            self._proto = proto.Location()
            self.longitude = (
                np.array([longitude])
                if isinstance(longitude, (float, int))
                else longitude
            )
            self.latitude = (
                np.array([latitude]) if isinstance(latitude, (float, int)) else latitude
            )

    def to_proto(self) -> proto.Location:
        """Converts the object to a protobuf object"""
        if len(self.longitude) > 1:
            raise ValueError("Cannot convert to proto if the object contains arrays")
        return proto.Location(longitude=self.longitude[0], latitude=self.latitude[0])

    def to_dict(self):
        """Converts the object to a dictionary"""
        return {
            "longitude": self.longitude,
            "latitude": self.latitude,
        }

    def to_dict_scalar(self):
        """Converts the object to a dictionary with scalar values. If the object contains arrays,
        the first value is returned"""
        return {
            "longitude": self.longitude[0] if self.longitude is not None else None,
            "latitude": self.latitude[0] if self.latitude is not None else None,
        }


class Weather:
    """Wrapper class for the weather"""

    significant_wave_height_m: Numeric = None
    mean_wave_period_s: Numeric = None
    wave_direction_deg: Numeric = None
    wind_speed_m_per_s: Numeric = None
    wind_direction_deg: Numeric = None
    ocean_current_speed_m_per_s: Numeric = None
    ocean_current_direction_deg: Numeric = None
    air_temperature_deg_c: Numeric = None
    sea_water_temperature_deg_c: Numeric = None
    _proto: proto.Weather = None

    def __init__(
        self,
        significant_wave_height_m: Numeric = None,
        mean_wave_period_s: Numeric = None,
        wave_direction_deg: Numeric = None,
        wind_speed_m_per_s: Numeric = None,
        wind_direction_deg: Numeric = None,
        ocean_current_speed_m_per_s: Numeric = None,
        ocean_current_direction_deg: Numeric = None,
        air_temperature_deg_c: Numeric = None,
        sea_water_temperature_deg_c: Numeric = None,
        proto_object: proto.Weather = None,
    ):
        if proto_object is not None:
            self._proto = proto_object
            for descriptor, value in proto_object.ListFields():
                setattr(self, descriptor.name, np.array([value]))
        else:
            self._proto = proto.Weather()
            if sea_water_temperature_deg_c is not None:
                self.sea_water_temperature_deg_c = np.atleast_1d(
                    sea_water_temperature_deg_c
                )
            if significant_wave_height_m is not None:
                self.significant_wave_height_m = np.atleast_1d(
                    significant_wave_height_m
                )
            if mean_wave_period_s is not None:
                self.mean_wave_period_s = np.atleast_1d(mean_wave_period_s)
            if wave_direction_deg is not None:
                self.wave_direction_deg = np.atleast_1d(wave_direction_deg)
            if wind_speed_m_per_s is not None:
                self.wind_speed_m_per_s = np.atleast_1d(wind_speed_m_per_s)
            if wind_direction_deg is not None:
                self.wind_direction_deg = np.atleast_1d(wind_direction_deg)
            if ocean_current_speed_m_per_s is not None:
                self.ocean_current_speed_m_per_s = np.atleast_1d(
                    ocean_current_speed_m_per_s
                )
            if ocean_current_direction_deg is not None:
                self.ocean_current_direction_deg = np.atleast_1d(
                    ocean_current_direction_deg
                )
            if air_temperature_deg_c is not None:
                self.air_temperature_deg_c = np.atleast_1d(air_temperature_deg_c)
            if sea_water_temperature_deg_c is not None:
                self.sea_water_temperature_deg_c = np.atleast_1d(
                    sea_water_temperature_deg_c
                )

    def get_weather_at_index(self, index):
        """Returns a new weather object with the values at the given index"""
        weather_index = Weather()
        if self.significant_wave_height_m is not None:
            weather_index.significant_wave_height_m = float(
                self.significant_wave_height_m[index]
            )
        if self.mean_wave_period_s is not None:
            weather_index.mean_wave_period_s = float(self.mean_wave_period_s[index])
        if self.wave_direction_deg is not None:
            weather_index.wave_direction_deg = float(self.wave_direction_deg[index])
        if self.wind_speed_m_per_s is not None:
            weather_index.wind_speed_m_per_s = float(self.wind_speed_m_per_s[index])
        if self.wind_direction_deg is not None:
            weather_index.wind_direction_deg = float(self.wind_direction_deg[index])
        if self.ocean_current_speed_m_per_s is not None:
            weather_index.ocean_current_speed_m_per_s = float(
                self.ocean_current_speed_m_per_s[index]
            )
        if self.ocean_current_direction_deg is not None:
            weather_index.ocean_current_direction_deg = float(
                self.ocean_current_direction_deg[index]
            )
        if self.air_temperature_deg_c is not None:
            weather_index.air_temperature_deg_c = float(
                self.air_temperature_deg_c[index]
            )
        if self.sea_water_temperature_deg_c is not None:
            weather_index.sea_water_temperature_deg_c = float(
                self.sea_water_temperature_deg_c[index]
            )
        return weather_index

    def to_proto(self) -> proto.Weather:
        """Converts the weather to a protobuf object"""
        if len(self.significant_wave_height_m) > 1:
            raise ValueError(
                "Only one significant_wave_height_m is allowed for conversion to proto"
            )
        return proto.Weather(
            significant_wave_height_m=self.significant_wave_height_m[0]
            if self.significant_wave_height_m is not None
            else None,
            mean_wave_period_s=self.mean_wave_period_s[0]
            if self.mean_wave_period_s is not None
            else None,
            wave_direction_deg=self.wave_direction_deg[0]
            if self.wave_direction_deg is not None
            else None,
            wind_speed_m_per_s=self.wind_speed_m_per_s[0]
            if self.wind_speed_m_per_s is not None
            else None,
            wind_direction_deg=self.wind_direction_deg[0]
            if self.wind_direction_deg is not None
            else None,
            ocean_current_speed_m_per_s=self.ocean_current_speed_m_per_s[0]
            if self.ocean_current_speed_m_per_s is not None
            else None,
            ocean_current_direction_deg=self.ocean_current_direction_deg[0]
            if self.ocean_current_direction_deg is not None
            else None,
            air_temperature_deg_c=self.air_temperature_deg_c[0]
            if self.air_temperature_deg_c is not None
            else None,
            sea_water_temperature_deg_c=self.sea_water_temperature_deg_c[0]
            if self.sea_water_temperature_deg_c is not None
            else None,
        )

    def to_dict(self) -> dict:
        """Converts the weather to a dictionary"""
        return {
            "significant_wave_height_m": getattr(
                self, "significant_wave_height_m", None
            ),
            "mean_wave_period_s": getattr(self, "mean_wave_period_s", None),
            "wave_direction_deg": getattr(self, "wave_direction_deg", None),
            "wind_speed_m_per_s": getattr(self, "wind_speed_m_per_s", None),
            "wind_direction_deg": getattr(self, "wind_direction_deg", None),
            "ocean_current_speed_m_per_s": getattr(
                self, "ocean_current_speed_m_per_s", None
            ),
            "ocean_current_direction_deg": getattr(
                self, "ocean_current_direction_deg", None
            ),
            "air_temperature_deg_c": getattr(self, "air_temperature_deg_c", None),
            "sea_water_temperature_deg_c": getattr(
                self, "sea_water_temperature_deg_c", None
            ),
        }

    def to_dict_scalar(self) -> Dict[str, float]:
        """Converts the weather to a dictionary with scalar values. If the weather has timeseries,
        the first value is returned"""
        return {
            "significant_wave_height_m": self.significant_wave_height_m[0]
            if self.significant_wave_height_m is not None
            else None,
            "mean_wave_period_s": self.mean_wave_period_s[0]
            if self.mean_wave_period_s is not None
            else None,
            "wave_direction_deg": self.wave_direction_deg[0]
            if self.wave_direction_deg is not None
            else None,
            "wind_speed_m_per_s": self.wind_speed_m_per_s[0]
            if self.wind_speed_m_per_s is not None
            else None,
            "wind_direction_deg": self.wind_direction_deg[0]
            if self.wind_direction_deg is not None
            else None,
            "ocean_current_speed_m_per_s": self.ocean_current_speed_m_per_s[0]
            if self.ocean_current_speed_m_per_s is not None
            else None,
            "ocean_current_direction_deg": self.ocean_current_direction_deg[0]
            if self.ocean_current_direction_deg is not None
            else None,
            "air_temperature_deg_c": self.air_temperature_deg_c[0]
            if self.air_temperature_deg_c is not None
            else None,
            "sea_water_temperature_deg_c": self.sea_water_temperature_deg_c[0]
            if self.sea_water_temperature_deg_c is not None
            else None,
        }


class OperationPoint:
    """Operation point class"""

    timestamp_seconds: Numeric = None
    speed_kn: Numeric = None
    power_limit_kw: Numeric = None
    heading_deg: Numeric = None
    auxiliary_power: Numeric = None
    weather: Weather = None
    location: Location = None
    _proto: proto.OperationPoint = None

    def __init__(
        self,
        timestamp_seconds: Numeric = None,
        speed_kn: Numeric = None,
        power_limit_kw: Numeric = None,
        heading_deg: Numeric = None,
        auxiliary_power: Numeric = None,
        weather: Weather = None,
        location: Location = None,
        proto_object: Optional[proto.OperationPoint] = None,
    ):
        if proto_object is not None:
            self._proto = proto_object
            for descriptor, value in proto_object.ListFields():
                if descriptor.name == "weather":
                    self.weather = Weather(proto_object=value)
                elif descriptor.name == "location":
                    self.location = Location(proto_object=value)
                else:
                    value = (
                        np.array([value]) if isinstance(value, (float, int)) else value
                    )
                    setattr(self, descriptor.name, value)
        else:
            self._proto = proto.OperationPoint()
            self.timestamp_seconds = (
                np.array([timestamp_seconds])
                if isinstance(timestamp_seconds, (float, int))
                else timestamp_seconds
            )
            self.speed_kn = (
                np.array([speed_kn]) if isinstance(speed_kn, (float, int)) else speed_kn
            )
            self.power_limit_kw = (
                np.array([power_limit_kw])
                if isinstance(power_limit_kw, (float, int))
                else power_limit_kw
            )
            self.heading_deg = (
                np.array([heading_deg])
                if isinstance(heading_deg, (float, int))
                else heading_deg
            )
            self.auxiliary_power = (
                np.array([auxiliary_power])
                if isinstance(auxiliary_power, (float, int))
                else auxiliary_power
            )
            self.weather = weather
            self.location = location

    def to_proto(self) -> proto.OperationPoint:
        """Converts the operation point to a protobuf object"""
        if len(self.timestamp_seconds) > 1:
            raise ValueError(
                "Only one timestamp_seconds is allowed for conversion to proto"
            )
        return proto.OperationPoint(
            timestamp_seconds=self.timestamp_seconds[0]
            if self.timestamp_seconds is not None
            else None,
            speed_kn=self.speed_kn[0] if self.speed_kn is not None else None,
            power_limit_kw=self.power_limit_kw[0]
            if self.power_limit_kw is not None
            else None,
            heading_deg=self.heading_deg[0] if self.heading_deg is not None else None,
            auxiliary_power=self.auxiliary_power[0]
            if self.auxiliary_power is not None
            else None,
            weather=self.weather.to_proto() if self.weather is not None else None,
            location=self.location.to_proto() if self.location is not None else None,
        )

    def to_dict(self) -> dict:
        """Converts the operation point to a dictionary"""
        return {
            "timestamp_seconds": getattr(self, "timestamp_seconds", None),
            "speed_kn": getattr(self, "speed_kn", None),
            "power_limit_kw": getattr(self, "power_limit_kw", None),
            "heading_deg": getattr(self, "heading_deg", None),
            "auxiliary_power": getattr(self, "auxiliary_power", None),
            "weather": self.weather.to_dict() if self.weather is not None else None,
            "location": self.location.to_dict() if self.location is not None else None,
        }

    def to_dict_scalar(self) -> Dict[str, Any]:
        """Converts the operation point to a dictionary with scalar values.
        If the values are arrays, the first value is used."""
        return {
            "timestamp_seconds": self.timestamp_seconds[0]
            if self.timestamp_seconds is not None
            else None,
            "speed_kn": self.speed_kn[0] if self.speed_kn is not None else None,
            "power_limit_kw": self.power_limit_kw[0]
            if self.power_limit_kw is not None
            else None,
            "heading_deg": self.heading_deg[0]
            if self.heading_deg is not None
            else None,
            "auxiliary_power": self.auxiliary_power[0]
            if self.auxiliary_power is not None
            else None,
            "weather": self.weather.to_dict_scalar()
            if self.weather is not None
            else None,
            "location": self.location.to_dict_scalar()
            if self.location is not None
            else None,
        }

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()

In [None]:
import time
import ship_performance_service_interface_lib.sps_interface_pb2 as proto
from operation_profile_lib.operation_profile_structure import OperationPoint

In [None]:
location = proto.Location()

operation_point_proto = proto.OperationPoint()
operation_point_proto.speed_kn = 6.7
operation_point_proto.power_limit_kw = 10000
operation_point_proto.auxiliary_power = 4.6
operation_point_proto.weather.significant_wave_height_m = 2.5
operation_point_proto.weather.mean_wave_period_s = 3.4
operation_point_proto.weather.wind_direction_deg = 34
operation_point_proto.weather.wind_speed_m_per_s = 5.6
operation_point_proto.location.longitude = 5.6
operation_point_proto.location.latitude = 4.2
operation_point_proto.timestamp_seconds = int(time.time())
operation_point_proto.weather.sea_water_temperature_deg_c = 20
operation_point_proto.weather.air_temperature_deg_c = 15
print(operation_point_proto)

timestamp_seconds: 1678789019
speed_kn: 6.7
power_limit_kw: 10000
auxiliary_power: 4.6
weather {
  significant_wave_height_m: 2.5
  mean_wave_period_s: 3.4
  wind_speed_m_per_s: 5.6
  wind_direction_deg: 34
  air_temperature_deg_c: 15
  sea_water_temperature_deg_c: 20
}
location {
  longitude: 5.6
  latitude: 4.2
}



In [None]:
from random import random

data = dict(
    significant_wave_height_m=random(),
    mean_wave_period_s=random(),
    wave_direction_deg=random(),
    wind_direction_deg=random(),
    wind_speed_m_per_s=random(),
    ocean_current_direction_deg=random(),
    ocean_current_speed_m_per_s=random(),
    sea_water_temperature_deg_c=random(),
    air_temperature_deg_c=random(),
)

weather = Weather(**data)
for key, value in data.items():
    assert getattr(weather, key) == value

In [None]:
operation_point = OperationPoint(proto_object=operation_point_proto)

In [None]:
print(operation_point.weather.to_dict())
print(operation_point.location.to_dict())

{'significant_wave_height_m': array([2.5]), 'mean_wave_period_s': array([3.4]), 'wave_direction_deg': None, 'wind_speed_m_per_s': array([5.6]), 'wind_direction_deg': array([34.]), 'ocean_current_speed_m_per_s': None, 'ocean_current_direction_deg': None, 'air_temperature_deg_c': array([15.]), 'sea_water_temperature_deg_c': array([20.])}
{'longitude': array([5.6]), 'latitude': array([4.2])}


In [None]:
operation_point.to_dict_scalar()

{'timestamp_seconds': 1678789019,
 'speed_kn': 6.7,
 'power_limit_kw': 10000.0,
 'heading_deg': None,
 'auxiliary_power': 4.6,
 'weather': {'significant_wave_height_m': 2.5,
  'mean_wave_period_s': 3.4,
  'wave_direction_deg': None,
  'wind_speed_m_per_s': 5.6,
  'wind_direction_deg': 34.0,
  'ocean_current_speed_m_per_s': None,
  'ocean_current_direction_deg': None,
  'air_temperature_deg_c': 15.0,
  'sea_water_temperature_deg_c': 20.0},
 'location': {'longitude': 5.6, 'latitude': 4.2}}

In [None]:
operation_point.to_proto()

timestamp_seconds: 1678789019
speed_kn: 6.7
power_limit_kw: 10000
auxiliary_power: 4.6
weather {
  significant_wave_height_m: 2.5
  mean_wave_period_s: 3.4
  wind_speed_m_per_s: 5.6
  wind_direction_deg: 34
  air_temperature_deg_c: 15
  sea_water_temperature_deg_c: 20
}
location {
  longitude: 5.6
  latitude: 4.2
}