In [1]:
from typing import List, Union
from numbers import Number
from collections import Counter

import numpy as np
import pandas as pd

from cyeva.utils import (
    convert_to_ndarray,
    fix_zero_division,
    result_round_digit,
    source_round_digit,
    assert_length,
    drop_nan,
)
from cyeva.config.directions.wind import DIRECTION8_CENTER_ANGLE, DIRECTION16_CENTER_ANGLE
from cyeva.config.levels.wind import (
    GENERAL_WIND_SPEED_LEVELS,
)
from cyeva.core.statistic import (
    # calc_differ_accuracy_rate,
    calc_mae,
    calc_rmse,
    calc_rss,
    calc_chi_square,
    calc_linregress,
)
from cyeva.core.binarize import threshold_binarize
from cyeva.core import Comparison

In [2]:
def get_least_angle_deflection(
    angle1: Union[Number, np.ndarray], angle2: Union[Number, np.ndarray]
) -> Union[Number, np.ndarray]:
    """Calculate the least angle deflection between two angles.

    Args:
        angle1 (Union[Number, np.ndarray]): One angle(degree).
        angle2 (Union[Number, np.ndarray]): Another angle(degree).

    Returns:
        Union[Number, np.ndarray]: The least deflection between two angles.
    """

    angle1 %= 360
    angle2 %= 360

    deflection1 = abs(angle1 - angle2)
    deflection2 = 360 - abs(angle1 - angle2)

    if isinstance(angle1, Number) and isinstance(angle2, Number):
        return deflection1 if deflection1 < deflection2 else deflection2
    else:
        index1 = np.where(deflection1 <= deflection2)
        index2 = np.where(deflection2 <= deflection1)

        deflection = np.full_like(deflection1, 0)

        deflection[index1] = deflection1[index1]
        deflection[index2] = deflection2[index2]

        return deflection

In [3]:
angle1 = np.array([100, 1, 89, 270, 270, 0, 0])
angle2 = np.array([105, 300, 269, 89, 91, 0, 359])

In [4]:
get_least_angle_deflection(angle1, angle2)

array([  5,  61, 180, 179, 179,   0,   1])

In [5]:
get_least_angle_deflection(angle1[4], angle2[4])

179

In [6]:
def identify_direction(
    angle: Union[Number, np.ndarray], dnum: int = 8
) -> Union[int, np.ndarray]:
    """Identify 8 cardinal directions by angle.

    Args:
        angle (Number): The wind direction in degree.
        dnum (int): The wind directions number.

    Returns:
        Union[int, np.ndarray]: Direction ID of the cardinal direction.
    """
    if isinstance(angle, List):
        angle = np.array(angle)

    if dnum == 8:
        DIRECTION_CENTER_ANGLE = DIRECTION8_CENTER_ANGLE
        threshold = 22.5
    elif dnum == 16:
        DIRECTION_CENTER_ANGLE = DIRECTION16_CENTER_ANGLE
        threshold = 11.25

    angle %= 360

    if isinstance(angle, Number):
        for dir_id, center_angle in DIRECTION_CENTER_ANGLE.items():
            if get_least_angle_deflection(angle, center_angle) <= threshold:
                break

        return dir_id

    elif isinstance(angle, np.ndarray):
        dir_ids = np.full_like(angle, -1)
        for dir_id, center_angle in DIRECTION_CENTER_ANGLE.items():
            least_angle_defl = get_least_angle_deflection(angle, center_angle)
            dir_ids[least_angle_defl <= threshold] = dir_id

        return dir_ids

In [7]:
identify_direction(300, dnum=8)

7

In [8]:
identify_direction(300, dnum=16)

13

In [15]:
identify_direction(np.array([270, 358]), dnum=16)

array([12,  0])

In [24]:
def identify_speed_level(speed: Union[Number, np.ndarray]) -> Union[int, np.ndarray]:
    """Identify wind level by speed.

    Args:
        speed (Union[Number, np.ndarray]): Wind speed in m/s.

    Returns:
        Union[int, np.ndarray]: Wind level.
    """
    if isinstance(speed, np.ndarray):
        spd_levs = np.full_like(speed, np.nan)
    for lev, attr in GENERAL_WIND_SPEED_LEVELS.items():
        minimum = attr["min"]
        maximum = attr["max"]
        if isinstance(speed, Number):
            if lev == 0:
                if minimum <= speed <= maximum:
                    return lev
            else:
                if minimum < speed <= maximum:
                    return lev
        elif isinstance(speed, np.ndarray):
            if lev == 0:
                spd_levs[(speed >= minimum) & (speed <= maximum)] = lev
            else:
                spd_levs[(speed > minimum) & (speed <= maximum)] = lev

    return spd_levs

In [25]:
identify_speed_level(np.array([1,2,3,4,10,20,28.6]))

array([ 1.,  2.,  2.,  3.,  5.,  8., 11.])

In [38]:
def get_least_dir_deflection(
    dir1: Union[int, np.ndarray], dir2: Union[int, np.ndarray], circle_num: int = 8
) -> Union[int, np.ndarray]:
    """Get least deflection from two directions.

    Args:
        dir1 (Union[int, np.ndarray]): One direction number(0-7 if circle_num is 8)
        dir2 (Union[int, np.ndarray]): Another direction number(0-7 if circle_num is 8)
        circle_num (int, optional): Circulation knot num. Defaults to 8.

    Returns:
        Union[int, np.ndarray]: The least deflection of directions.
    """
    if isinstance(dir1, Number) and isinstance(dir2, Number):
        deflection1 = abs(dir1 - dir2)
        deflection2 = circle_num - abs(dir1 - dir2)

        return int(deflection1) if deflection1 < deflection2 else int(deflection2)
    else:
        deflection1 = np.abs(dir1 - dir2)
        deflection2 = circle_num - np.abs(dir1 - dir2)

        index1 = np.where(deflection1 <= deflection2)
        index2 = np.where(deflection2 <= deflection1)

        deflection = np.full_like(deflection1, 0)

        deflection[index1] = deflection1[index1]
        deflection[index2] = deflection2[index2]

        return deflection

In [39]:
get_least_dir_deflection(np.array([0,2,3]),5)

array([3, 3, 2])

In [42]:
def get_least_lev_diff(
    lev1: Union[int, np.ndarray],
    lev2: Union[int, np.ndarray],
) -> int:
    """Calculate least level difference.

    Args:
        lev1 (int): One level value.
        lev2 (int): Another level value.

    Returns:
        int: Least level difference.
    """
    return np.abs(lev1 - lev2)

In [83]:
def calc_wind_dir_score(observation, forecast, dnum=8):
    """计算风向评分

    Args:
        observation (list | ndarray): 观测风向角度
        forecast (list | ndarray): 预报风向角度

    Returns:
        float: 风向预报评分
    """

    if dnum == 8:
        obs_d8 = identify_direction(observation)
        fct_d8 = identify_direction(forecast)
        

        dir_deflection = get_least_lev_diff(obs_d8, fct_d8)

        score_series = np.full_like(dir_deflection, 0).astype(float)
        score_series[np.isclose(dir_deflection, 1)] = 0.6
        score_series[dir_deflection < 1] = 1

    elif dnum == 16:
        obs_d16 = identify_direction(observation, dnum=16)
        fct_d16 = identify_direction(forecast, dnum=16)

        dir_deflection = get_least_lev_diff(obs_d16, fct_d16)

        score_series = np.full_like(dir_deflection, 0).astype(float)
        score_series[np.isclose(dir_deflection, 2)] = 0.6
        score_series[np.isclose(dir_deflection, 1)] = 0.8
        score_series[dir_deflection < 1] = 1
    
    try:
        return np.sum(score_series) / len(score_series)
    except TypeError:
        return np.sum(score_series)

In [84]:
calc_wind_dir_score(90,75, dnum=8)

1.0

In [85]:
calc_wind_dir_score(90,75, dnum=16)

0.8

In [88]:
calc_wind_dir_score(np.array([90, np.nan]),np.array([75, 90]), dnum=16)

0.4

In [116]:
def calc_wind_dir_accuracy_ratio(
    observation: Union[Number, np.ndarray],
    forecast: Union[Number, np.ndarray],
    mode="degree",
    threshold=22.5,
):
    """计算风向准确率

    Args:
        observation (list | ndarray): 观测风向角度
        forecast (list | ndarray): 预报风向角度
        mode (str): 计算模式，可选选项有'degree', 'drange8'
                    * degree 表示以角度偏转为指标进行计算
                    * drange8 表示以8方位角为指标进行计算
                    * drange16 表示以16方位角为指标进行计算
        threshold (Number): 仅 mode == degree 模式时有效，即判断预报准确的角度阈值

    Returns:
        float: 风向准确率(%)
    """
    try:
        couples = zip(observation, forecast)
    except TypeError:
        couples = zip([observation], [forecast])

    if mode == "degree":
        angle_deflection = np.array(
            [get_least_angle_deflection(obs, fct) for obs, fct in couples]
        )

        count_series = np.full_like(angle_deflection, 0).astype(float)
        count_series[angle_deflection <= threshold] = 1

        return np.sum(count_series) / len(count_series) * 100

    elif mode == "drange8":
        obs_d8 = identify_direction(observation)
        fct_d8 = identify_direction(forecast)

        cross = obs_d8 == fct_d8
        try:
            total = len(cross)
        except TypeError:
            cross = [cross]
            total = 1
        counter = Counter(cross)
        correct = counter[True]

        return correct / total * 100

    elif mode == "drange16":
        obs_d16 = identify_direction(observation, dnum=16)
        fct_d16 = identify_direction(forecast, dnum=16)

        cross = obs_d16 == fct_d16
        try:
            total = len(cross)
        except TypeError:
            cross = [cross]
            total = 1
        counter = Counter(cross)
        correct = counter[True]

        return correct / total * 100

In [131]:
calc_wind_dir_accuracy_ratio(np.array([10,100,100]), np.array([40,120,110]), mode='drange8')

33.33333333333333

In [130]:
calc_wind_dir_accuracy_ratio(100, 101.25, mode='drange16')

100.0

In [129]:
(90 + 112.5) / 2

101.25