In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import yaml
from enum import Enum
from datetime import datetime
from sklearn.metrics import confusion_matrix, recall_score, precision_score, f1_score
import warnings
import matplotlib.pyplot as plt

## Defining the Data Structures for easy manipulation of data

In [2]:
class SensorType(str, Enum):
    WindSpeed = "WindSpeed"
    WindDirection = "WindDirection"
    RelativeHumidity = "RelativeHumidity"
    Pressure = "Pressure"
    Temperature = "Temperature"
    Other = "Other"

class SignalType(str, Enum):
    Count = "Count"
    StdDev = "StdDev"
    Mean = "Mean"
    Max = "Max"
    Min = "Min"
    Other = "Other"

In [3]:
class Measurement:
    def __init__(self, sensor_type, signal_type, name, height, ts_df = None, column_name = None):
        self.name = name
        self.column_name = column_name
        self.height = height
        self.ts_df = ts_df
        self.sensor_type = sensor_type
        self.signal_type = signal_type

    def set_ts(self, ts_df):
        self.ts_df = pd.concat([
            ts_df,
            pd.DataFrame(0, index=ts_df.index, columns=[0])],
            axis=1
        )

        self.ts_df.columns = ["measurement", "anomalous"]

    def get_measurement_with_anomaly(self):
      df = self.ts_df.copy()
      df.columns = [self.name, f"{self.name}_anomalous"]
      return df

    def get_historical_average(self):
        return self.ts_df.measurement.mean()

    def get_historical_std(self):
        return self.ts_df.measurement.std()

    def __lt__(self, other):
        if self.__class__ is other.__class__:
            return (self.height < other.height)
        return NotImplemented

    def __le__(self, other):
        if self.__class__ is other.__class__:
            return (self.height <= other.height)
        return NotImplemented

    def __gt__(self, other):
        if self.__class__ is other.__class__:
            return (self.height > other.height)
        return NotImplemented

    def __ge__(self, other):
        if self.__class__ is other.__class__:
            return (self.height >= other.height)
        return NotImplemented

    def __eq__(self, other):
        if self.__class__ is other.__class__:
            return (self.height == other.height)
        return NotImplemented

    def __ne__(self, other):
        if self.__class__ is other.__class__:
            return not(self.__eq__(self, other))
        return NotImplemented

    def __str__(self):
        return f"Name: {self.name}, Height: {self.height}, Sensor Type: {self.sensor_type}, Signal Type: {self.signal_type}"

    def __repr__(self):
        return f"<Measurement: name={self.name}, height={self.height}, sensor_type={self.sensor_type}, signal_type:{self.signal_type}>"

In [8]:
class MetMast:
    def __init__(self, file_path = None, mast_config_path = None, start_date = None, end_date = None):
        if file_path:
            self.read_file(file_path)
            if not (start_date is None or end_date is None):
                self.ts_df = self.ts_df.loc[start_date:end_date]
        if mast_config_path:
            self.load_mast_configuration(mast_config_path)

        if not (file_path is None or mast_config_path is None):
            self.set_measurement_ts()

    def read_file(self, path):
        ts = pd.read_csv(path, sep="\t")
        # ts = pd.read_csv(path)
        self.ts_df = self._treat_ts(ts)

    def _treat_ts(self, ts_df):
        """
        This function takes as input the dataframe read from a file exported from
        WindFarmer - Analyst(WFA). It parses the date time column and substitute
        the 9999 values by np.nan. 9999 for WFA is the absecence of data.
        """
        ts_df["Date/Time"] = pd.to_datetime(ts_df["Date/Time"], format="%d%m%y %H%M")
        # ts_df["Date/Time"] = pd.to_datetime(ts_df["Date/Time"])
        ts_df = ts_df.sort_values(by="Date/Time")
        ts_df = ts_df.set_index("Date/Time")
        ts_df = ts_df.replace(9999.0, np.nan)
        return ts_df

    def load_mast_configuration(self, path):
        self.measurements = []
        with open(path, "r") as file:
            config = yaml.safe_load(file)
            self.name = config["name"]
            self.classification = config["classification"]
            self.position = config["position"]
            self.mast_height = config["mast_height"]
            for sensor in config["mast_sensors"]:
                for signal in sensor["signals"]:
                    measurement = Measurement(
                        sensor_type=SensorType(sensor["type"]),
                        signal_type=SignalType(signal),
                        name=sensor["name"],
                        height=sensor["height"]
                    )

                    self.measurements.append(measurement)

        list.sort(self.measurements, reverse=True)

    def get_measurement_by_name(self, name):
        _, sensor_name, signal_type = name.split("~")
        for measurement in self.measurements:
            if sensor_name == measurement.name and signal_type == measurement.signal_type:
                measurement.column_name = name
                return measurement

        return None

    def _get_measurements_by_type(self, sensor_type):
        for measurement in self.measurements:
            if measurement.sensor_type == sensor_type:
                yield measurement

    def set_measurement_ts(self):
        for column in self.ts_df:
            measurement = self.get_measurement_by_name(column)
            measurement.set_ts(self.ts_df[column])

    def get_speed_measurements(self):
        return self._get_measurements_by_type(SensorType.WindSpeed)

    def get_direction_measurements(self):
        return self._get_measurements_by_type(SensorType.WindDirection)

    def get_humidity_measurements(self):
        return self._get_measurements_by_type(SensorType.RelativeHumidity)

    def get_pressure_measurements(self):
        return self._get_measurements_by_type(SensorType.Pressure)

    def get_temperature_measurements(self):
        return self._get_measurements_by_type(SensorType.Temperature)

    def get_other_measurements(self):
        return self._get_measurements_by_type(SensorType.Other)

    def get_all_measurements(self):
        def _get_measurements():
            for measurement in self.measurements:
                yield measurement.get_measurement_with_anomaly()
        df = pd.concat(list(_get_measurements()), axis=1)
        return df

## Defining functions to detect anomalies in the sensors

In [5]:
def check_for_repeated_values(ts_df, threshold):
    ts_df["group"] = (ts_df["measurement"].diff() != 0).astype(int).cumsum()
    ts_df["count"] = ts_df.groupby("group").cumcount() + 1

    ts_df["anomalous"] = (
        (ts_df['count'] >= threshold) |
        (ts_df["anomalous"] == 1)  # if it already is anomalous, don't change
    ).astype(int)

    # This is to account for the first (threshold - 1) registers where
    # considering only the code above, will be ignored
    for _ in range(1, threshold):
        ts_df["count"] = ts_df["count"].shift(-1)
        ts_df['anomalous'] = (
            (ts_df['count'] >= threshold) |
            (ts_df["anomalous"] == 1)  # if it already is anomalous, don't change
        ).astype(int)

    ts_df = ts_df.drop(["group", "count"], axis=1)

    return ts_df

def check_for_repeated_specific_value(ts_df, threshold, value):
    ts_df["group"] = (ts_df["measurement"].diff() != 0).astype(int).cumsum()
    ts_df["count"] = ts_df.groupby("group").cumcount() + 1
    ts_df["anomalous"] = (
        (
            (ts_df["measurement"] == value) &
            (ts_df['count'] >= threshold)
        ) |
        (ts_df["anomalous"] == 1)  # if it already is anomalous, don't change
    ).astype(int)

    # This is to account for the first (threshold - 1) registers where
    # considering only the code above, will be ignored
    for _ in range(1, threshold):
        ts_df["count"] = ts_df["count"].shift(-1)
        ts_df['anomalous'] = (
            (
                (ts_df["measurement"] == value) &
                (ts_df['count'] >= threshold)
            ) |
            (ts_df["anomalous"] == 1)  # if it already is anomalous, don't change
        ).astype(int)

    ts_df = ts_df.drop(["group", "count"], axis=1)

    return ts_df

def check_for_values_within_range(ts_df, min_threshold, max_threshold):
    ts_df["anomalous"] = (
        (ts_df['measurement'] >= max_threshold) |
        (ts_df['measurement'] < min_threshold) |
        (ts_df["anomalous"] == 1)  # if it already is anomalous, don't change
    ).astype(int)

    return ts_df

def check_for_speed_profile_inversion(speeds):
    df = pd.DataFrame()

    # Confirms that the speeds are ordered from the highest to the lowest
    list.sort(speeds, reverse=True)

    row_ascendent = lambda row: np.all(np.diff(row) >= 0)

    for i in range(0, len(speeds)):
        df = pd.concat([df, speeds[i].ts_df["measurement"]], axis=1)

    is_speed_profile_inversion = df.apply(row_ascendent, axis=1)

    return is_speed_profile_inversion

def speed1_greater_than_speed2(speed1_df, speed2_df):
    df = pd.concat([speed1_df, speed2_df], axis=1)

    row_ascendent = lambda row: np.all(np.diff(row) >= 0)

    return df.apply(row_ascendent, axis=1)

def associate_anomaly_flags(df, exclusions):
    sensors = df
    sensor_exclusions = exclusions[exclusions["Sensor"].isin(sensors.columns)]

    sensors_with_class = sensors.copy()

    for col in sensors.columns:
        sensors_with_class.loc[:, f"{col}_anomalous"] = 0 # Specific anomaly

    sensors_with_class["anomalous"] = 0 # Overall anomaly
    for exclusion in sensor_exclusions.itertuples():
        exclusion_range = (
            (sensors_with_class.index >= pd.to_datetime(exclusion.Start)) &
            (sensors_with_class.index <= pd.to_datetime(exclusion.End))
        )
        sensors_with_class.loc[exclusion_range, "anomalous"] = 1
        sensors_with_class.loc[
            exclusion_range, f"{exclusion.Sensor}_anomalous"
        ] = 1

    return sensors_with_class

## Load data to be cleaned

In [9]:
root = Path(r"./")
files_path = root
config_file_path = root / "kegnes_configuration.yml"

exclusions_path = root / "kegnes_all_exclusions.tsv"
exclusions = pd.read_csv(exclusions_path, sep="\t", names=["Mast", "Sensor", "Reason", "Start", "End", "Created"])

overall_preditction_dataset = pd.Series([])
overall_true_dataset = pd.Series([])

PRESSURE_THRESHOLD = 0.1
PRESSURE_REPEATED_MEASUREMENTS = 144
TEMPERATURE_REPEATED_MEASUREMENTS = 144
DIRECTION_REPEATED_MEASUREMENTS = 3
SPEED_REPEATED_MEASUREMENTS = 3
SPEED_CUT_IN = 3


mast = MetMast(root / "kegnes_all_data.tsv", config_file_path)

pressures = list(mast.get_pressure_measurements())
pressure_count = len(pressures)
temperatures = list(mast.get_temperature_measurements())
temperature_count = len(temperatures)
directions = list(mast.get_direction_measurements())
direction_count = len(directions)
speeds = list(mast.get_speed_measurements())
speed_count = len(speeds)
full_profile_inversion = check_for_speed_profile_inversion(speeds)

for i, pressure in enumerate(pressures):
    historical_average = pressure.get_historical_average()

    pressure_df = pressure.ts_df.copy()
    pressure_df.anomalous = 0

    min_threshold = historical_average * (1 - PRESSURE_THRESHOLD)
    max_threshold = historical_average * (1 + PRESSURE_THRESHOLD)

    # Check for consecutive measurements
    pressure_df = check_for_repeated_values(
        pressure_df, PRESSURE_REPEATED_MEASUREMENTS
    )

    # Check if measurements are within historical average threshold
    pressure_df = check_for_values_within_range(
        pressure_df, min_threshold, max_threshold
    )

    pressure.ts_df = pressure_df

for i, temperature in enumerate(temperatures):
    historical_average = temperature.get_historical_average()

    temperature_df = temperature.ts_df.copy()
    temperature_df.anomalous = 0

    # Check for consecutive measurements
    temperature_df = check_for_repeated_values(
        temperature_df, TEMPERATURE_REPEATED_MEASUREMENTS
    )

    temperature.ts_df = temperature_df

for i, direction in enumerate(directions):
    historical_average = direction.get_historical_average()

    direction_df = direction.ts_df.copy()
    direction_df.anomalous = 0

    # Check for consecutive measurements
    direction_df = check_for_repeated_values(
        direction_df, DIRECTION_REPEATED_MEASUREMENTS
    )

    direction.ts_df = direction_df

for i, speed in enumerate(speeds):
    historical_average = speed.get_historical_average()

    speed_df = speed.ts_df.copy()
    speed_df.anomalous = 0

    # Check for consecutive measurements
    speed_df = check_for_repeated_values(
        speed_df, SPEED_REPEATED_MEASUREMENTS
    )

    # Check against other speed sensors
    if speed_count > 1:
        current_is_higher_than_above = pd.Series(False, index=speed_df.index)
        # Check higher sensor if measurement is greater
        for j in range(i - 1, -1, -1):
            current_is_higher_than_above = (
                current_is_higher_than_above |
                (speed_df["measurement"] > speeds[j].ts_df["measurement"])
            )

        # Check lower sensor if measurement is lower
        current_is_lower_than_below = pd.Series(False, index=speed_df.index)
        for j in range(i + 1, speed_count):
            current_is_lower_than_below = (
                current_is_lower_than_below |
                (speed_df["measurement"] < speeds[j].ts_df["measurement"])
            )

        # Composes criterias: current measurement is higher than above sensor
        # and is lower than below sensor and it is not a full profile inversion
        inversion_df = pd.DataFrame((
            (current_is_higher_than_above | current_is_lower_than_below) &
            ~full_profile_inversion
        ).astype(int), columns=["measurement"])
        inversion_df["anomalous"] = 0

        # Will account as anomalous only if composed criteriea repeates itself
        # for 5 times
        inversion_df = check_for_repeated_specific_value(inversion_df, 6, 1)

        speed_df["anomalous"] = (
            (
                # Considers only wind speeds above certain cut-in
                (speed_df["measurement"] >= SPEED_CUT_IN) &
                (inversion_df["anomalous"] == 1)
            ) |
            (speed_df["anomalous"] == 1)  # if it already is anomalous, don't change
        ).astype(int)

    speed.ts_df = speed_df

prediction_dataset = mast.get_all_measurements()
prediction_dataset["anomalous"] = (prediction_dataset.filter(regex="_anomalous")).any(axis=1)
prediction_dataset["anomalous"] = prediction_dataset["anomalous"].astype(int)
prediction_dataset

Unnamed: 0_level_0,WS23,WS23_anomalous,WD23,WD23_anomalous,RH22,RH22_anomalous,T22,T22_anomalous,WS11,WS11_anomalous,...,WD10_anomalous,WS03,WS03_anomalous,RH2,RH2_anomalous,T2,T2_anomalous,P,P_anomalous,anomalous
Date/Time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1991-01-01 00:05:00,6.84,0,,0,95.0,0,3.8,0,6.72,0,...,0,6.68,0,94.0,0,4.0,0,1006.9,0,0
1991-01-01 00:15:00,7.36,0,,0,95.0,0,3.8,0,7.27,0,...,0,7.28,0,94.0,0,4.0,0,1006.8,0,0
1991-01-01 00:25:00,8.12,0,,0,95.0,0,3.9,0,7.93,0,...,0,7.98,0,94.0,0,4.1,0,1006.3,0,0
1991-01-01 00:35:00,8.19,0,,0,95.0,0,3.9,0,8.07,0,...,0,8.14,0,94.0,0,4.0,0,1005.6,0,0
1991-01-01 00:45:00,7.49,0,,0,95.0,0,3.8,0,7.47,0,...,0,7.45,0,94.0,0,4.0,0,1005.6,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2001-12-31 23:15:00,7.07,0,285.0,0,91.0,0,0.1,0,6.65,0,...,0,5.36,0,91.0,0,0.1,0,1024.4,0,0
2001-12-31 23:25:00,6.67,0,288.0,0,92.0,0,0.1,0,6.05,0,...,0,4.87,0,92.0,0,0.0,0,1024.5,0,0
2001-12-31 23:35:00,5.51,0,296.0,0,92.0,0,0.1,0,4.49,0,...,0,3.61,0,92.0,0,0.0,0,1024.8,0,0
2001-12-31 23:45:00,5.84,0,293.0,0,92.0,0,0.1,0,4.82,0,...,0,3.91,0,91.0,0,-0.1,0,1024.8,0,0
