In [1]:
# ! module load mambaforge
# ! mamba create -n wind_forecasting_env python=3.12
# ! mamba activate wind_forecasting_env
# ! conda install -c conda-forge jupyterlab mpi4py impi_rt
# ! pip install ./OpenOA # have to change pyproject.toml to allow for python 3.12.7
# ! pip install floris polars windrose netCDF4 statsmodels h5pyd seaborn pyarrow memory_profiler

#%load_ext memory_profiler
from data_loader import DataLoader
from data_filter import DataFilter
from data_inspector import DataInspector
from openoa.utils import qa, plot, filters, power_curve, imputing
import polars.selectors as cs
import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sys import platform
import os
import re
import pandas as pd
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# import datetime
# t2 = datetime.datetime(2022, 3, 1, 12, 0, 55)

[ahenry-39583s:82278] shmem: mmap: an error occurred while determining whether or not /var/folders/15/zl_rr3g10_b5j7bk42pk4dgr4qydb3/T//ompi.ahenry-39583s.159331683/jf.0/3413573632/sm_segment.ahenry-39583s.159331683.cb770000.0 could be created.
2024-10-21 12:58:40,397 - INFO - Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2024-10-21 12:58:40,397 - INFO - NumExpr defaulting to 8 threads.
  from .autonotebook import tqdm as notebook_tqdm
  _exp = _sv(version)



## Print NetCDF Data Structure, Load Data, Transform Datetime Columns

In [2]:
PLOT = False
RELOAD_DATA = False

if platform == "darwin":
    DATA_DIR = "/Users/ahenry/Documents/toolboxes/wind_forecasting/examples/data"
    # PL_SAVE_PATH = "/Users/ahenry/Documents/toolboxes/wind_forecasting/examples/data/kp.turbine.zo2.b0.raw.parquet"
    # FILE_SIGNATURE = "kp.turbine.z02.b0.*.*.*.nc"
    PL_SAVE_PATH = "/Users/ahenry/Documents/toolboxes/wind_forecasting/examples/data/kp.turbine.zo2.b0.raw.parquet"
    FILE_SIGNATURE = "kp.turbine.z02.b0.20220301.*.*.nc"
    MULTIPROCESSOR = "cf"
    TURBINE_INPUT_FILEPATH = "/Users/ahenry/Documents/toolboxes/wind_forecasting/examples/inputs/ge_282_127.yaml"
    FARM_INPUT_FILEPATH = "/Users/ahenry/Documents/toolboxes/wind_forecasting/examples/inputs/gch_KP_v4.yaml"
    FEATURES = ["time", "turbine_id", "turbine_status", "wind_direction", "wind_speed", "power_output", "nacelle_direction"]
    WIDE_FORMAT = False
    COLUMN_MAPPING = {"time": "date",
                            "turbine_id": "turbine_id",
                            "turbine_status": "WTUR.TurSt",
                            "wind_direction": "WMET.HorWdDir",
                            "wind_speed": "WMET.HorWdSpd",
                            "power_output": "WTUR.W",
                            "nacelle_direction": "WNAC.Dir"
                            }
elif platform == "linux":
    DATA_DIR = "/pl/active/paolab/awaken_data/kp.turbine.z02.b0/"
    PL_SAVE_PATH = "/scratch/alpine/aohe7145/awaken_data/kp.turbine.zo2.b0.raw.parquet"
    FILE_SIGNATURE = "kp.turbine.z02.b0.*.*.*.nc"
    MULTIPROCESSOR = "mpi"
    TURBINE_INPUT_FILEPATH = "/projects/aohe7145/toolboxes/wind-forecasting/examples/inputs/ge_282_127.yaml"
    FARM_INPUT_FILEPATH = "/projects/aohe7145/toolboxes/wind-forecasting/examples/inputs/gch_KP_v4.yaml"
    FEATURES = ["time", "turbine_id", "turbine_status", "wind_direction", "wind_speed", "power_output", "nacelle_direction"]
    WIDE_FORMAT = False
    COLUMN_MAPPING = {"time": "date",
                            "turbine_id": "turbine_id",
                            "turbine_status": "WTUR.TurSt",
                            "wind_direction": "WMET.HorWdDir",
                            "wind_speed": "WMET.HorWdSpd",
                            "power_output": "WTUR.W",
                            "nacelle_direction": "WNAC.Dir"
                            }

DT = 5

# turbine_ids =  ["wt028", "wt033", "wt073"]
#
data_loader = DataLoader(data_dir=DATA_DIR, file_signature=FILE_SIGNATURE, multiprocessor=MULTIPROCESSOR, save_path=PL_SAVE_PATH, dt=DT,
                         desired_feature_types=FEATURES,
                         ffill_limit=int(60 * 60 * 10 // DT))

In [3]:
data_loader.print_netcdf_structure(data_loader.file_paths[0])

2024-10-21 12:58:41,683 - INFO - 📊 NetCDF File: kp.turbine.z02.b0.20220301.000000.wt038.nc
2024-10-21 12:58:41,683 - INFO - 
🌐 Global Attributes:
2024-10-21 12:58:41,683 - INFO - 
📏 Dimensions:
2024-10-21 12:58:41,684 - INFO -   date: 251127
2024-10-21 12:58:41,684 - INFO -   string7: 7
2024-10-21 12:58:41,684 - INFO - 
🔢 Variables:
2024-10-21 12:58:41,684 - INFO -   Flag:
2024-10-21 12:58:41,684 - INFO -     Dimensions: ('date', 'string7')
2024-10-21 12:58:41,684 - INFO -     Shape: (251127, 7)
2024-10-21 12:58:41,685 - INFO -     Data type: |S1
2024-10-21 12:58:41,685 - INFO -     Attributes:
2024-10-21 12:58:41,685 - INFO -       _Encoding: utf-8
2024-10-21 12:58:41,685 - INFO -   date:
2024-10-21 12:58:41,685 - INFO -     Dimensions: ('date',)
2024-10-21 12:58:41,685 - INFO -     Shape: (251127,)
2024-10-21 12:58:41,686 - INFO -     Data type: int32
2024-10-21 12:58:41,686 - INFO -     Attributes:
2024-10-21 12:58:41,686 - INFO -       units: milliseconds since 2022-03-01 00:00:01.

In [4]:
if not RELOAD_DATA and os.path.exists(data_loader.save_path):
     # Note that the order of the columns in the provided schema must match the order of the columns in the CSV being read.
    schema = pl.Schema(dict(sorted(({**{"time": pl.Datetime(time_unit="ms")},
                **{
                    f"{feat}_{tid}": pl.Float64
                    for feat in FEATURES 
                    for tid in [f"wt{d+1:03d}" for d in range(88)]}
                }).items())))
    logging.info("🔄 Loading existing Parquet file")
    df_query = pl.scan_parquet(source=data_loader.save_path)
    logging.info("✅ Loaded existing Parquet file successfully")
    data_loader.available_features = sorted(df_query.collect_schema().names())
    data_loader.turbine_ids = sorted(set(col.split("_")[-1] for col in data_loader.available_features if "wt" in col))
else:
    logging.info("🔄 Processing new data files")
    df_query = data_loader.read_multi_files()
        
    if df_query is not None:
        # Perform any additional operations on df_query if needed
        logging.info("✅ Data processing completed successfully")
    else:
        logging.warning("⚠️  No data was processed")

2024-10-21 12:58:41,712 - INFO - 🔄 Loading existing Parquet file
2024-10-21 12:58:41,712 - INFO - ✅ Loaded existing Parquet file successfully


## Plot Wind Farm, Data Distributions

In [5]:
data_inspector = DataInspector(turbine_input_filepath=TURBINE_INPUT_FILEPATH, farm_input_filepath=FARM_INPUT_FILEPATH)

In [6]:
if PLOT:
    data_inspector.plot_wind_farm()

In [7]:
if PLOT:
    data_inspector.plot_wind_speed_power(df_query, turbine_ids=["wt073"])

In [8]:
if PLOT:
    data_inspector.plot_wind_speed_weibull(df_query, turbine_ids="all")

In [9]:
if PLOT:
    data_inspector.plot_wind_rose(df_query, turbine_ids="all")

In [10]:
if PLOT:
    data_inspector.plot_correlation(df_query, 
    DataInspector.get_features(df_query, feature_types=["wind_speed", "wind_direction", "nacelle_direction"], turbine_ids=["wt073"]))

In [11]:
if PLOT:
    data_inspector.plot_boxplot_wind_speed_direction(df_query, turbine_ids=["wt073"])

In [12]:
if PLOT:
    data_inspector.plot_time_series(df_query, turbine_ids=["wt073"])

## OpenOA Data Preparation & Inspection

In [13]:
ws_cols = DataInspector.get_features(df_query, "wind_speed")
wd_cols = DataInspector.get_features(df_query, "wind_direction")
pwr_cols = DataInspector.get_features(df_query, "power_output")

In [14]:
print(f"Features of interest = {data_loader.desired_feature_types}")
print(f"Available features = {data_loader.available_features}")
# qa.describe(DataInspector.collect_data(df=df_query))

Features of interest = ['time', 'turbine_id', 'turbine_status', 'wind_direction', 'wind_speed', 'power_output', 'nacelle_direction']
Available features = ['nacelle_direction_wt001', 'nacelle_direction_wt002', 'nacelle_direction_wt003', 'nacelle_direction_wt004', 'nacelle_direction_wt005', 'nacelle_direction_wt006', 'nacelle_direction_wt007', 'nacelle_direction_wt008', 'nacelle_direction_wt009', 'nacelle_direction_wt010', 'nacelle_direction_wt011', 'nacelle_direction_wt012', 'nacelle_direction_wt013', 'nacelle_direction_wt014', 'nacelle_direction_wt015', 'nacelle_direction_wt016', 'nacelle_direction_wt017', 'nacelle_direction_wt018', 'nacelle_direction_wt019', 'nacelle_direction_wt020', 'nacelle_direction_wt021', 'nacelle_direction_wt022', 'nacelle_direction_wt023', 'nacelle_direction_wt024', 'nacelle_direction_wt025', 'nacelle_direction_wt026', 'nacelle_direction_wt027', 'nacelle_direction_wt028', 'nacelle_direction_wt029', 'nacelle_direction_wt030', 'nacelle_direction_wt031', 'nacelle

In [15]:
if PLOT:
    plot.column_histograms(DataInspector.collect_data(df=df_query, 
    feature_types=DataInspector.get_features(df_query, ["wind_speed", "wind_direction", "power_output", "nacelle_direction"])))

In [16]:
data_filter = DataFilter(turbine_availability_col=None, turbine_status_col="turbine_status")

In [17]:
# df_query.filter(pl.col("time") <= t2).collect()

### Nullify Inoperational Turbine Rows

In [18]:
# check if wind speed/dir measurements from inoperational turbines differ from fully operational 
# df_query = data_filter.filter_inoperational(df_query, status_codes=[1], include_nan=False)
status_codes = [1]
mask = lambda tid: pl.col(f"turbine_status_{tid}").is_in(status_codes) | pl.col(f"turbine_status_{tid}").is_null()
features = ws_cols + pwr_cols
DataInspector.print_pc_unfiltered_vals(df_query, features, mask)

if PLOT:
    data_inspector.plot_filtered_vs_unfiltered(df_query, mask, ws_cols + wd_cols, ["wind_speed", "wind_direction"], ["Wind Speed [m/s]", "Wind Direction [deg]"])


Feature wind_speed_wt001 has 98.65165938485576 % unfiltered values.
Feature wind_speed_wt002 has 98.31023407887504 % unfiltered values.
Feature wind_speed_wt003 has 99.01623216920807 % unfiltered values.
Feature wind_speed_wt004 has 99.05384682156188 % unfiltered values.
Feature wind_speed_wt005 has 98.39993055756489 % unfiltered values.
Feature wind_speed_wt006 has 98.39993055756489 % unfiltered values.
Feature wind_speed_wt007 has 98.35363560082173 % unfiltered values.
Feature wind_speed_wt008 has 98.39993055756489 % unfiltered values.
Feature wind_speed_wt009 has 98.05271838199127 % unfiltered values.
Feature wind_speed_wt010 has 98.42307803593646 % unfiltered values.
Feature wind_speed_wt011 has 98.42018460114002 % unfiltered values.
Feature wind_speed_wt012 has 98.42307803593646 % unfiltered values.
Feature wind_speed_wt013 has 98.42307803593646 % unfiltered values.
Feature wind_speed_wt014 has 98.4259714707329 % unfiltered values.
Feature wind_speed_wt015 has 98.72110181997049 % 

In [19]:
# df_query.select(f"power_output_wt073").collect(streaming=True).to_pandas().isna().sum() 
# df_query.head().collect()
# df_query.filter(pl.col("time") <= t2).collect()

In [20]:
# loop through each turbine's wind speed and wind direction columns, and compare the distribution of data with and without the inoperational turbines
# fill out_of_range measurements with Null st they are marked for interpolation via impute or linear/forward fill interpolation later
threshold = 0.01
df_query = data_filter.conditional_filter(df_query, threshold, mask, ws_cols + wd_cols)

JS Score for feature wind_speed_wt001 = 0.00015671430646730966
JS Score for feature wind_speed_wt002 = 4.608515769346186e-05
JS Score for feature wind_speed_wt003 = 4.143653535439009e-05
JS Score for feature wind_speed_wt004 = 3.283173419992098e-05
JS Score for feature wind_speed_wt005 = 0.00012532011997538528
JS Score for feature wind_speed_wt006 = 0.00014642942392636445
JS Score for feature wind_speed_wt007 = 0.0001311093208527302
JS Score for feature wind_speed_wt008 = 0.00011184293957314557
JS Score for feature wind_speed_wt009 = 0.0001316968930053437
JS Score for feature wind_speed_wt010 = 0.00012738030046860162
JS Score for feature wind_speed_wt011 = 0.00012893939756853954
JS Score for feature wind_speed_wt012 = 0.000136975051904332
JS Score for feature wind_speed_wt013 = 0.0001373661733600891
JS Score for feature wind_speed_wt014 = 0.0001228577973742403
JS Score for feature wind_speed_wt015 = 0.00015533706637040003
JS Score for feature wind_speed_wt016 = 0.0001071980885107725
JS

  p = h / data.shape[0]



JS Score for feature wind_speed_wt023 = 8.739787598142877e-05
JS Score for feature wind_speed_wt024 = 7.252250178122654e-05
JS Score for feature wind_speed_wt025 = 5.165058243299295e-05
JS Score for feature wind_speed_wt026 = 6.440430727692676e-05
JS Score for feature wind_speed_wt027 = 8.84750651921394e-05
JS Score for feature wind_speed_wt028 = 0.0001353955178870178
JS Score for feature wind_speed_wt029 = 7.018718945099268e-05
JS Score for feature wind_speed_wt030 = nan
JS Score for feature wind_speed_wt031 = 0.00011446198650210254
JS Score for feature wind_speed_wt032 = 7.017691672105305e-05
JS Score for feature wind_speed_wt033 = 0.0005970293653245666
JS Score for feature wind_speed_wt034 = 0.0006825769236926643
JS Score for feature wind_speed_wt035 = 0.0005473455829780665
JS Score for feature wind_speed_wt036 = 7.553529011145488e-05
JS Score for feature wind_speed_wt037 = 0.0009494808614832586
JS Score for feature wind_speed_wt038 = 7.672156301249733e-05
JS Score for feature wind_

In [21]:
df_query.head().collect()
# df_query.filter(pl.col("time") <= t2).collect()

nacelle_direction_wt001,nacelle_direction_wt002,nacelle_direction_wt003,nacelle_direction_wt004,nacelle_direction_wt005,nacelle_direction_wt006,nacelle_direction_wt007,nacelle_direction_wt008,nacelle_direction_wt009,nacelle_direction_wt010,nacelle_direction_wt011,nacelle_direction_wt012,nacelle_direction_wt013,nacelle_direction_wt014,nacelle_direction_wt015,nacelle_direction_wt016,nacelle_direction_wt017,nacelle_direction_wt018,nacelle_direction_wt019,nacelle_direction_wt020,nacelle_direction_wt021,nacelle_direction_wt022,nacelle_direction_wt023,nacelle_direction_wt024,nacelle_direction_wt025,nacelle_direction_wt026,nacelle_direction_wt027,nacelle_direction_wt028,nacelle_direction_wt029,nacelle_direction_wt030,nacelle_direction_wt031,nacelle_direction_wt032,nacelle_direction_wt033,nacelle_direction_wt034,nacelle_direction_wt035,nacelle_direction_wt036,nacelle_direction_wt037,…,wind_speed_wt052,wind_speed_wt053,wind_speed_wt054,wind_speed_wt055,wind_speed_wt056,wind_speed_wt057,wind_speed_wt058,wind_speed_wt059,wind_speed_wt060,wind_speed_wt061,wind_speed_wt062,wind_speed_wt063,wind_speed_wt064,wind_speed_wt065,wind_speed_wt066,wind_speed_wt067,wind_speed_wt068,wind_speed_wt069,wind_speed_wt070,wind_speed_wt071,wind_speed_wt072,wind_speed_wt073,wind_speed_wt074,wind_speed_wt075,wind_speed_wt076,wind_speed_wt077,wind_speed_wt078,wind_speed_wt079,wind_speed_wt080,wind_speed_wt081,wind_speed_wt082,wind_speed_wt083,wind_speed_wt084,wind_speed_wt085,wind_speed_wt086,wind_speed_wt087,wind_speed_wt088
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,…,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,209.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,6.220496,4.500181,5.450901,5.352233,9.499508,7.855479,8.870214,5.862917,6.421623,6.142904,6.54178,7.106257,6.367146,6.741147,7.10255,6.8440237,6.4179883,6.374405,5.521564,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,5.110387,7.019245,6.029548,5.080723,9.330383,7.054394,9.076818,6.197019,6.078082,6.42344,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.119235,7.249273,6.434347,6.367146,6.004409,7.647892,6.151917,5.180297,8.077567,7.275337,8.020494,6.365332,5.41739,6.378035,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,5.72381,7.409687,,5.78794,5.072004,5.716692,6.492573,3.442897,7.353647,6.958279
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,5.950606,6.832987,6.323628,6.461628,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.358075,5.229326,6.842185,6.292836,4.52929,2.549927,6.276546,5.986466,,5.120864,4.737289,5.445607,6.060098,2.788936,6.673342,6.570972
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,6.552724,8.437213,6.031344,6.5581975,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.636747,6.112278,6.534485,6.191604,5.162805,3.113147,5.4209156,7.046991,,5.120864,4.737289,5.445607,6.060098,2.788936,6.283785,6.552724


### Wind Speed Range Filter

In [22]:

# check for wind speed values that are outside of the acceptable range
ws = DataInspector.collect_data(df=df_query, feature_types="wind_speed")
out_of_range = (filters.range_flag(ws, lower=0, upper=70) & ~ws.isna()).values # range flag includes formerly null values as nan
del ws
# qa.describe(DataInspector.collect_data(df=df_query, feature_types="wind_speed", mask=np.any(out_of_range, axis=1)))

# check if wind speed/dir measurements from inoperational turbines differ from fully operational 
mask = lambda tid: ~out_of_range[:, data_loader.turbine_ids.index(tid)]
features = ws_cols
DataInspector.print_pc_unfiltered_vals(df_query, features, mask)

if PLOT:
    data_inspector.plot_filtered_vs_unfiltered(df_query, mask, ws_cols, ["wind_speed"], ["Wind Speed [m/s]"])



Feature wind_speed_wt001 has 100.0 % unfiltered values.
Feature wind_speed_wt002 has 100.0 % unfiltered values.
Feature wind_speed_wt003 has 100.0 % unfiltered values.
Feature wind_speed_wt004 has 100.0 % unfiltered values.
Feature wind_speed_wt005 has 100.0 % unfiltered values.
Feature wind_speed_wt006 has 100.0 % unfiltered values.
Feature wind_speed_wt007 has 100.0 % unfiltered values.
Feature wind_speed_wt008 has 100.0 % unfiltered values.
Feature wind_speed_wt009 has 100.0 % unfiltered values.
Feature wind_speed_wt010 has 100.0 % unfiltered values.
Feature wind_speed_wt011 has 100.0 % unfiltered values.
Feature wind_speed_wt012 has 100.0 % unfiltered values.
Feature wind_speed_wt013 has 100.0 % unfiltered values.
Feature wind_speed_wt014 has 100.0 % unfiltered values.
Feature wind_speed_wt015 has 100.0 % unfiltered values.
Feature wind_speed_wt016 has 100.0 % unfiltered values.
Feature wind_speed_wt017 has 100.0 % unfiltered values.
Feature wind_speed_wt018 has 100.0 % unfiltered 

In [23]:
# out_of_range
# df_query.select(f"power_output_wt073").collect(streaming=True).to_pandas().isna().sum() 

In [24]:
# loop through each turbine's wind speed and wind direction columns, and compare the distribution of data with and without the inoperational turbines
# fill out_of_range measurements with Null st they are marked for interpolation via impute or linear/forward fill interpolation later
threshold = 0.01
df_query = data_filter.conditional_filter(df_query, threshold, mask, ws_cols)
# df_query = df_query.with_columns(
#                 [pl.when(~out_of_range[:, data_loader.turbine_ids.index(feat.split("_")[-1])]).then(pl.col(feat)).alias(feat)
#                 for feat in ws_cols]
#                 )

JS Score for feature wind_speed_wt001 = 0.0
JS Score for feature wind_speed_wt002 = 0.0
JS Score for feature wind_speed_wt003 = 0.0
JS Score for feature wind_speed_wt004 = 0.0
JS Score for feature wind_speed_wt005 = 0.0
JS Score for feature wind_speed_wt006 = 0.0
JS Score for feature wind_speed_wt007 = 0.0
JS Score for feature wind_speed_wt008 = 0.0
JS Score for feature wind_speed_wt009 = 0.0
JS Score for feature wind_speed_wt010 = 0.0
JS Score for feature wind_speed_wt011 = 0.0
JS Score for feature wind_speed_wt012 = 0.0
JS Score for feature wind_speed_wt013 = 0.0
JS Score for feature wind_speed_wt014 = 0.0
JS Score for feature wind_speed_wt015 = 0.0
JS Score for feature wind_speed_wt016 = 0.0
JS Score for feature wind_speed_wt017 = 0.0
JS Score for feature wind_speed_wt018 = 0.0
JS Score for feature wind_speed_wt019 = 0.0
JS Score for feature wind_speed_wt020 = 0.0
JS Score for feature wind_speed_wt021 = 0.0
JS Score for feature wind_speed_wt022 = 0.0
JS Score for feature wind_speed_

In [25]:
del out_of_range 
df_query.head().collect()

nacelle_direction_wt001,nacelle_direction_wt002,nacelle_direction_wt003,nacelle_direction_wt004,nacelle_direction_wt005,nacelle_direction_wt006,nacelle_direction_wt007,nacelle_direction_wt008,nacelle_direction_wt009,nacelle_direction_wt010,nacelle_direction_wt011,nacelle_direction_wt012,nacelle_direction_wt013,nacelle_direction_wt014,nacelle_direction_wt015,nacelle_direction_wt016,nacelle_direction_wt017,nacelle_direction_wt018,nacelle_direction_wt019,nacelle_direction_wt020,nacelle_direction_wt021,nacelle_direction_wt022,nacelle_direction_wt023,nacelle_direction_wt024,nacelle_direction_wt025,nacelle_direction_wt026,nacelle_direction_wt027,nacelle_direction_wt028,nacelle_direction_wt029,nacelle_direction_wt030,nacelle_direction_wt031,nacelle_direction_wt032,nacelle_direction_wt033,nacelle_direction_wt034,nacelle_direction_wt035,nacelle_direction_wt036,nacelle_direction_wt037,…,wind_speed_wt052,wind_speed_wt053,wind_speed_wt054,wind_speed_wt055,wind_speed_wt056,wind_speed_wt057,wind_speed_wt058,wind_speed_wt059,wind_speed_wt060,wind_speed_wt061,wind_speed_wt062,wind_speed_wt063,wind_speed_wt064,wind_speed_wt065,wind_speed_wt066,wind_speed_wt067,wind_speed_wt068,wind_speed_wt069,wind_speed_wt070,wind_speed_wt071,wind_speed_wt072,wind_speed_wt073,wind_speed_wt074,wind_speed_wt075,wind_speed_wt076,wind_speed_wt077,wind_speed_wt078,wind_speed_wt079,wind_speed_wt080,wind_speed_wt081,wind_speed_wt082,wind_speed_wt083,wind_speed_wt084,wind_speed_wt085,wind_speed_wt086,wind_speed_wt087,wind_speed_wt088
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,…,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,209.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,6.220496,4.500181,5.450901,5.352233,9.499508,7.855479,8.870214,5.862917,6.421623,6.142904,6.54178,7.106257,6.367146,6.741147,7.10255,6.8440237,6.4179883,6.374405,5.521564,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,5.110387,7.019245,6.029548,5.080723,9.330383,7.054394,9.076818,6.197019,6.078082,6.42344,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.119235,7.249273,6.434347,6.367146,6.004409,7.647892,6.151917,5.180297,8.077567,7.275337,8.020494,6.365332,5.41739,6.378035,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,5.72381,7.409687,,5.78794,5.072004,5.716692,6.492573,3.442897,7.353647,6.958279
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,5.950606,6.832987,6.323628,6.461628,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.358075,5.229326,6.842185,6.292836,4.52929,2.549927,6.276546,5.986466,,5.120864,4.737289,5.445607,6.060098,2.788936,6.673342,6.570972
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,6.552724,8.437213,6.031344,6.5581975,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.636747,6.112278,6.534485,6.191604,5.162805,3.113147,5.4209156,7.046991,,5.120864,4.737289,5.445607,6.060098,2.788936,6.283785,6.552724


### Power Curve Window Range Filter

In [26]:
# apply a window range filter to remove data with power values outside of the window from 20 to 3000 kW for wind speeds between 5 and 40 m/s.
# identifies when turbine is shut down, filtering for normal turbine operation

out_of_window = np.stack([(filters.window_range_flag(window_col=DataInspector.collect_data(df=df_query, 
                                                                                    feature_types=["wind_speed"], 
                                                                                    turbine_ids=[tid])[f"wind_speed_{tid}"],
                                                    window_start=5., window_end=40., 
                                                    value_col=DataInspector.collect_data(df=df_query, 
                                                                                    feature_types=["power_output"], 
                                                                                    turbine_ids=[tid])[f"power_output_{tid}"],
                                                    value_min=20., value_max=3000.)
                        & df_query.select(no_nulls=pl.all_horizontal(pl.col(f"wind_speed_{tid}").is_not_null(), pl.col(f"power_output_{tid}").is_not_null()))\
                                  .collect(streaming=True).to_pandas()["no_nulls"]
                                #   & ~DataInspector.collect_data(df=df_query, feature_types="wind_speed", turbine_ids=tid).isna()
                                #   & ~DataInspector.collect_data(df=df_query, feature_types="power_output", turbine_ids=tid).isna()
                                  ).values for tid in data_loader.turbine_ids], axis=1)


# check if wind speed/dir measurements from inoperational turbines differ from fully operational 
mask = lambda tid: ~out_of_window[:, data_loader.turbine_ids.index(tid)]
features = ws_cols + pwr_cols
DataInspector.print_pc_unfiltered_vals(df_query, features, mask)

if PLOT:
    data_inspector.plot_filtered_vs_unfiltered(df_query, mask, features, ["wind_speed", "power_output"], ["Wind Speed [m/s]", "Power Output [W]"])



Feature wind_speed_wt001 has 99.9450247388675 % unfiltered values.
Feature wind_speed_wt002 has 98.83394577703191 % unfiltered values.
Feature wind_speed_wt003 has 99.83218078180607 % unfiltered values.
Feature wind_speed_wt004 has 99.99131969561066 % unfiltered values.
Feature wind_speed_wt005 has 99.98842626081421 % unfiltered values.
Feature wind_speed_wt006 has 99.78877925985938 % unfiltered values.
Feature wind_speed_wt007 has 99.78009895547004 % unfiltered values.
Feature wind_speed_wt008 has 99.90162321692081 % unfiltered values.
Feature wind_speed_wt009 has 99.76273834669136 % unfiltered values.
Feature wind_speed_wt010 has 99.83218078180607 % unfiltered values.
Feature wind_speed_wt011 has 99.98263939122131 % unfiltered values.
Feature wind_speed_wt012 has 100.0 % unfiltered values.
Feature wind_speed_wt013 has 99.98842626081421 % unfiltered values.
Feature wind_speed_wt014 has 99.98842626081421 % unfiltered values.
Feature wind_speed_wt015 has 100.0 % unfiltered values.
Featu

In [27]:
# df_query.select(f"power_output_wt033").collect(streaming=True).to_pandas().isna().sum() 
# DataInspector.collect_data(df_query, feature_types=["time", "wind_speed", "power_output"], turbine_ids=["wt073"], mask=out_of_window[:, 2])
# out_of_window

In [28]:

if PLOT:
    # plot values that are outside of power-wind speed range
    plot.plot_power_curve(
        DataInspector.collect_data(df=df_query, feature_types="wind_speed").to_numpy().flatten(),
        DataInspector.collect_data(df=df_query, feature_types="power_output").to_numpy().flatten(),
        flag=out_of_window.flatten(),
        flag_labels=("Outside Acceptable Window", "Acceptable Power Curve Points"),
        xlim=(-1, 15),
        ylim=(-100, 3000),
        legend=True,
        scatter_kwargs=dict(alpha=0.4, s=10)
    )


In [29]:
# fill cells corresponding to values that are outside of power-wind speed window range with Null st they are marked for interpolation via impute or linear/forward fill interpolation later
# loop through each turbine's wind speed and wind direction columns, and compare the distribution of data with and without the inoperational turbines
threshold = 0.01
df_query = data_filter.conditional_filter(df_query, threshold, mask, features)

# df_query = df_query.with_columns(
#                 [pl.when(~out_of_window[:, data_loader.turbine_ids.index(feat.split("_")[-1])]).then(pl.col(feat)).alias(feat)
#                 for feat in ws_cols + pwr_cols]
# )

JS Score for feature wind_speed_wt001 = 4.3667841666643625e-07
JS Score for feature wind_speed_wt002 = 2.3942130738330442e-05
JS Score for feature wind_speed_wt003 = 1.8155470002475658e-06
JS Score for feature wind_speed_wt004 = 2.7379568944596934e-08
JS Score for feature wind_speed_wt005 = 4.492101960604429e-08
JS Score for feature wind_speed_wt006 = 2.1196534396149684e-06
JS Score for feature wind_speed_wt007 = 1.9394698493446366e-06
JS Score for feature wind_speed_wt008 = 1.0594498239605552e-06
JS Score for feature wind_speed_wt009 = 4.048197544977042e-06
JS Score for feature wind_speed_wt010 = 3.204752624482732e-06
JS Score for feature wind_speed_wt011 = 1.0015808686733599e-07
JS Score for feature wind_speed_wt012 = 0.0
JS Score for feature wind_speed_wt013 = 5.199934361688572e-08
JS Score for feature wind_speed_wt014 = 3.653819798581125e-08
JS Score for feature wind_speed_wt015 = 0.0
JS Score for feature wind_speed_wt016 = 2.5600710371683503e-05
JS Score for feature wind_speed_wt0

In [30]:
del out_of_window
df_query.head().collect()

nacelle_direction_wt001,nacelle_direction_wt002,nacelle_direction_wt003,nacelle_direction_wt004,nacelle_direction_wt005,nacelle_direction_wt006,nacelle_direction_wt007,nacelle_direction_wt008,nacelle_direction_wt009,nacelle_direction_wt010,nacelle_direction_wt011,nacelle_direction_wt012,nacelle_direction_wt013,nacelle_direction_wt014,nacelle_direction_wt015,nacelle_direction_wt016,nacelle_direction_wt017,nacelle_direction_wt018,nacelle_direction_wt019,nacelle_direction_wt020,nacelle_direction_wt021,nacelle_direction_wt022,nacelle_direction_wt023,nacelle_direction_wt024,nacelle_direction_wt025,nacelle_direction_wt026,nacelle_direction_wt027,nacelle_direction_wt028,nacelle_direction_wt029,nacelle_direction_wt030,nacelle_direction_wt031,nacelle_direction_wt032,nacelle_direction_wt033,nacelle_direction_wt034,nacelle_direction_wt035,nacelle_direction_wt036,nacelle_direction_wt037,…,wind_speed_wt052,wind_speed_wt053,wind_speed_wt054,wind_speed_wt055,wind_speed_wt056,wind_speed_wt057,wind_speed_wt058,wind_speed_wt059,wind_speed_wt060,wind_speed_wt061,wind_speed_wt062,wind_speed_wt063,wind_speed_wt064,wind_speed_wt065,wind_speed_wt066,wind_speed_wt067,wind_speed_wt068,wind_speed_wt069,wind_speed_wt070,wind_speed_wt071,wind_speed_wt072,wind_speed_wt073,wind_speed_wt074,wind_speed_wt075,wind_speed_wt076,wind_speed_wt077,wind_speed_wt078,wind_speed_wt079,wind_speed_wt080,wind_speed_wt081,wind_speed_wt082,wind_speed_wt083,wind_speed_wt084,wind_speed_wt085,wind_speed_wt086,wind_speed_wt087,wind_speed_wt088
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,…,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,209.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,6.220496,4.500181,5.450901,5.352233,9.499508,7.855479,8.870214,5.862917,6.421623,6.142904,6.54178,7.106257,6.367146,6.741147,7.10255,6.8440237,6.4179883,6.374405,5.521564,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,5.110387,7.019245,6.029548,5.080723,9.330383,7.054394,9.076818,6.197019,6.078082,6.42344,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.119235,7.249273,6.434347,6.367146,6.004409,7.647892,6.151917,5.180297,8.077567,7.275337,8.020494,6.365332,5.41739,6.378035,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,5.72381,7.409687,,5.78794,5.072004,5.716692,6.492573,3.442897,7.353647,6.958279
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,5.950606,6.832987,6.323628,6.461628,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.358075,5.229326,6.842185,6.292836,4.52929,2.549927,6.276546,5.986466,,5.120864,4.737289,5.445607,6.060098,2.788936,6.673342,6.570972
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,6.552724,8.437213,6.031344,6.5581975,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.636747,6.112278,6.534485,6.191604,5.162805,3.113147,5.4209156,7.046991,,5.120864,4.737289,5.445607,6.060098,2.788936,6.283785,6.552724


In [31]:
#print(df_query.collect(streaming=True).shape)

### Power Curve Bin Filter

In [32]:
# apply a bin filter to remove data with power values outside of an envelope around median power curve at each wind speed

bin_outliers = np.stack([(filters.bin_filter(
                                  bin_col=f"power_output_{tid}", 
                                  value_col=f"wind_speed_{tid}", 
                                  bin_width=50, threshold=3,
                                  center_type="median", 
                                  bin_min=20., bin_max=0.90*(df_query.select(f"power_output_{tid}").max().collect(streaming=True).item() or 3000.),
                                  threshold_type="scalar", direction="below",
                                  data=DataInspector.collect_data(df=df_query, 
                                                                  feature_types=["wind_speed", "power_output"], 
                                                                  turbine_ids=[tid])
                                  )
                                & df_query.select(no_nulls=pl.all_horizontal(pl.col(f"wind_speed_{tid}").is_not_null(), pl.col(f"power_output_{tid}").is_not_null()))\
                                  .collect(streaming=True).to_pandas()["no_nulls"]
                                #   & ~DataInspector.collect_data(df=df_query, feature_types="wind_speed", turbine_ids=tid).isna()
                                #   & ~DataInspector.collect_data(df=df_query, feature_types="power_output", turbine_ids=tid).isna()
                                  ).values for tid in data_loader.turbine_ids], axis=1)
# qa.describe(DataInspector.collect_data(df=df_query, feature_types=["wind_speed", "power_output"], mask=bin_outliers))

# check if wind speed/dir measurements from inoperational turbines differ from fully operational 
mask = lambda tid: ~bin_outliers[:, data_loader.turbine_ids.index(tid)]

features = ws_cols + pwr_cols
DataInspector.print_pc_unfiltered_vals(df_query, features, mask)

if PLOT:
    data_inspector.plot_filtered_vs_unfiltered(df_query, mask, features, ["wind_speed", "power_output"], ["Wind Speed [m/s]", "Power Output [W]"])

  return function_base._ureduce(a, func=_nanmedian, keepdims=keepdims,



Feature wind_speed_wt001 has 100.0 % unfiltered values.
Feature wind_speed_wt002 has 99.99710656520355 % unfiltered values.
Feature wind_speed_wt003 has 100.0 % unfiltered values.
Feature wind_speed_wt004 has 100.0 % unfiltered values.
Feature wind_speed_wt005 has 100.0 % unfiltered values.
Feature wind_speed_wt006 has 99.9942131304071 % unfiltered values.
Feature wind_speed_wt007 has 100.0 % unfiltered values.
Feature wind_speed_wt008 has 99.99710656520355 % unfiltered values.
Feature wind_speed_wt009 has 100.0 % unfiltered values.
Feature wind_speed_wt010 has 99.9942131304071 % unfiltered values.
Feature wind_speed_wt011 has 99.99131969561066 % unfiltered values.
Feature wind_speed_wt012 has 99.98842626081421 % unfiltered values.
Feature wind_speed_wt013 has 99.98263939122131 % unfiltered values.
Feature wind_speed_wt014 has 100.0 % unfiltered values.
Feature wind_speed_wt015 has 99.9942131304071 % unfiltered values.
Feature wind_speed_wt016 has 99.94213130407105 % unfiltered values.

In [33]:
bin_outliers

array([[False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       ...,
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]])

In [34]:
if PLOT:
    # plot values outside the power-wind speed bin filter
    plot.plot_power_curve(
        DataInspector.collect_data(df=df_query, feature_types="wind_speed").to_numpy().flatten(),
        DataInspector.collect_data(df=df_query, feature_types="power_output").to_numpy().flatten(),
        flag=bin_outliers.flatten(),
        flag_labels=("Anomylous Data", "Normal Wind Speed Sensor Operation"),
        xlim=(-1, 15),
        ylim=(-100, 3000),
        legend=True,
        scatter_kwargs=dict(alpha=0.4, s=10)
    )

In [35]:
# fill cells corresponding to values that are outside of power-wind speed bins with Null st they are marked for interpolation via impute or linear/forward fill interpolation later
# loop through each turbine's wind speed and wind direction columns, and compare the distribution of data with and without the inoperational turbines
threshold = 0.01
df_query = data_filter.conditional_filter(df_query, threshold, mask, features)
# df_query = df_query.with_columns(
#                 [pl.when(~bin_outliers[:, data_loader.turbine_ids.index(feat.split("_")[-1])]).then(pl.col(feat)).alias(feat)
#                 for feat in ws_cols + pwr_cols]
# )

JS Score for feature wind_speed_wt001 = 0.0
JS Score for feature wind_speed_wt002 = 1.9498872049652695e-08
JS Score for feature wind_speed_wt003 = 0.0
JS Score for feature wind_speed_wt004 = 0.0
JS Score for feature wind_speed_wt005 = 0.0
JS Score for feature wind_speed_wt006 = 2.1501981064669655e-08
JS Score for feature wind_speed_wt007 = 0.0
JS Score for feature wind_speed_wt008 = 2.123370209423056e-08
JS Score for feature wind_speed_wt009 = 0.0
JS Score for feature wind_speed_wt010 = 1.2491687101722607e-08
JS Score for feature wind_speed_wt011 = 5.389659893933843e-08
JS Score for feature wind_speed_wt012 = 3.898218945461529e-08
JS Score for feature wind_speed_wt013 = 1.3791805035160875e-07
JS Score for feature wind_speed_wt014 = 0.0
JS Score for feature wind_speed_wt015 = 3.076155874543768e-08
JS Score for feature wind_speed_wt016 = 6.275368498167723e-07
JS Score for feature wind_speed_wt017 = 1.9076425224766064e-07
JS Score for feature wind_speed_wt018 = 1.7722082764400585e-07
JS S

  p = h / data.shape[0]



JS Score for feature wind_speed_wt030 = nan
JS Score for feature wind_speed_wt031 = 5.210330751046058e-09
JS Score for feature wind_speed_wt032 = 3.0196385497882097e-08
JS Score for feature wind_speed_wt033 = 4.619481138228885e-08
JS Score for feature wind_speed_wt034 = 0.0
JS Score for feature wind_speed_wt035 = 0.0
JS Score for feature wind_speed_wt036 = 0.0
JS Score for feature wind_speed_wt037 = 0.0
JS Score for feature wind_speed_wt038 = 0.0
JS Score for feature wind_speed_wt039 = 5.733678702261442e-09
JS Score for feature wind_speed_wt040 = 0.0
JS Score for feature wind_speed_wt041 = 0.0
JS Score for feature wind_speed_wt042 = 2.0190491029588448e-08
JS Score for feature wind_speed_wt043 = 5.5940603891814494e-08
JS Score for feature wind_speed_wt044 = 1.1516870749246968e-07
JS Score for feature wind_speed_wt045 = 5.167666847905582e-07
JS Score for feature wind_speed_wt046 = 1.1892905575854117e-07
JS Score for feature wind_speed_wt047 = 6.473303123171757e-08
JS Score for feature wi

In [36]:
df_query.head().collect()

nacelle_direction_wt001,nacelle_direction_wt002,nacelle_direction_wt003,nacelle_direction_wt004,nacelle_direction_wt005,nacelle_direction_wt006,nacelle_direction_wt007,nacelle_direction_wt008,nacelle_direction_wt009,nacelle_direction_wt010,nacelle_direction_wt011,nacelle_direction_wt012,nacelle_direction_wt013,nacelle_direction_wt014,nacelle_direction_wt015,nacelle_direction_wt016,nacelle_direction_wt017,nacelle_direction_wt018,nacelle_direction_wt019,nacelle_direction_wt020,nacelle_direction_wt021,nacelle_direction_wt022,nacelle_direction_wt023,nacelle_direction_wt024,nacelle_direction_wt025,nacelle_direction_wt026,nacelle_direction_wt027,nacelle_direction_wt028,nacelle_direction_wt029,nacelle_direction_wt030,nacelle_direction_wt031,nacelle_direction_wt032,nacelle_direction_wt033,nacelle_direction_wt034,nacelle_direction_wt035,nacelle_direction_wt036,nacelle_direction_wt037,…,wind_speed_wt052,wind_speed_wt053,wind_speed_wt054,wind_speed_wt055,wind_speed_wt056,wind_speed_wt057,wind_speed_wt058,wind_speed_wt059,wind_speed_wt060,wind_speed_wt061,wind_speed_wt062,wind_speed_wt063,wind_speed_wt064,wind_speed_wt065,wind_speed_wt066,wind_speed_wt067,wind_speed_wt068,wind_speed_wt069,wind_speed_wt070,wind_speed_wt071,wind_speed_wt072,wind_speed_wt073,wind_speed_wt074,wind_speed_wt075,wind_speed_wt076,wind_speed_wt077,wind_speed_wt078,wind_speed_wt079,wind_speed_wt080,wind_speed_wt081,wind_speed_wt082,wind_speed_wt083,wind_speed_wt084,wind_speed_wt085,wind_speed_wt086,wind_speed_wt087,wind_speed_wt088
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,…,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,209.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,6.220496,4.500181,5.450901,5.352233,9.499508,7.855479,8.870214,5.862917,6.421623,6.142904,6.54178,7.106257,6.367146,6.741147,7.10255,6.8440237,6.4179883,6.374405,5.521564,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,5.110387,7.019245,6.029548,5.080723,9.330383,7.054394,9.076818,6.197019,6.078082,6.42344,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.119235,7.249273,6.434347,6.367146,6.004409,7.647892,6.151917,5.180297,8.077567,7.275337,8.020494,6.365332,5.41739,6.378035,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,5.72381,7.409687,,5.78794,5.072004,5.716692,6.492573,3.442897,7.353647,6.958279
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,5.950606,6.832987,6.323628,6.461628,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.358075,5.229326,6.842185,6.292836,4.52929,2.549927,6.276546,5.986466,,5.120864,4.737289,5.445607,6.060098,2.788936,6.673342,6.570972
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,6.552724,8.437213,6.031344,6.5581975,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.636747,6.112278,6.534485,6.191604,5.162805,3.113147,5.4209156,7.046991,,5.120864,4.737289,5.445607,6.060098,2.788936,6.283785,6.552724


### Power Curve Fitting

In [37]:
if PLOT:
    # Fit the power curves
    iec_curve = power_curve.IEC(
        windspeed_col="wind_speed", power_col="power_output",
        data=DataInspector.unpivot_dataframe(df_query).select("wind_speed", "power_output").filter(pl.all_horizontal(pl.all().is_not_null())).collect(streaming=True).to_pandas(),
        )

    l5p_curve = power_curve.logistic_5_parametric(
        windspeed_col="wind_speed", power_col="power_output",
        data=DataInspector.unpivot_dataframe(df_query).select("wind_speed", "power_output").filter(pl.all_horizontal(pl.all().is_not_null())).collect(streaming=True).to_pandas(),
        )

    spline_curve = power_curve.gam(
        windspeed_col="wind_speed", power_col="power_output",
        data=DataInspector.unpivot_dataframe(df_query).select("wind_speed", "power_output").filter(pl.all_horizontal(pl.all().is_not_null())).collect(streaming=True).to_pandas(), 
        n_splines=20)

In [38]:
if PLOT:
    fig, ax = plot.plot_power_curve(
        DataInspector.collect_data(df=df_query, feature_types="wind_speed").to_numpy().flatten(),
        DataInspector.collect_data(df=df_query, feature_types="power_output").to_numpy().flatten(),
        flag=np.zeros(DataInspector.collect_data(df=df_query, feature_types="wind_speed").shape[0], dtype=bool),
        flag_labels=("", "Filtered Power Curve"),
        xlim=(-1, 15),  # optional input for refining plots
        ylim=(-100, 3000),  # optional input for refining plots
        legend=False,  # optional flag for adding a legend
        scatter_kwargs=dict(alpha=0.4, s=10),  # optional input for refining plots
        return_fig=True,
    )

    x = np.linspace(0, 20, 100)
    ax.plot(x, iec_curve(x), color="red", label = "IEC", linewidth = 3)
    ax.plot(x, spline_curve(x), color="C1", label = "Spline", linewidth = 3)
    ax.plot(x, l5p_curve(x), color="C2", label = "L5P", linewidth = 3)

    ax.legend()

    fig.tight_layout()
    plt.show()

### Unresponsive Sensor Filter

In [39]:
# find stuck sensor measurements for each turbine and set them to null
# TODO question does unresponsive flag include nan values
# frozen_thresholds = [(data_loader.ffill_limit * i) + 1 for i in range(1, 19)]
# print(frozen_thresholds)
# ws_pcs = []
# wd_pcs = []
# pwr_pcs = []
# for thr in frozen_thresholds:
#     ws_frozen_sensor = filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="wind_speed"), threshold=thr).values
#     wd_frozen_sensor = filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="wind_direction"), threshold=thr).values
#     pwr_frozen_sensor = filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="power_output"), threshold=thr).values

#     # check if wind speed/dir measurements from inoperational turbines differ from fully operational
#     print(f"For a threshold of {thr} for the frozen sensor filters:")
#     ws_pcs.append(DataInspector.print_pc_unfiltered_vals(df_query, ws_cols, lambda tid: ws_frozen_sensor[:, data_loader.turbine_ids.index(tid)]))
#     wd_pcs.append(DataInspector.print_pc_unfiltered_vals(df_query, wd_cols, lambda tid: wd_frozen_sensor[:, data_loader.turbine_ids.index(tid)]))
#     pwr_pcs.append(DataInspector.print_pc_unfiltered_vals(df_query, pwr_cols, lambda tid: pwr_frozen_sensor[:, data_loader.turbine_ids.index(tid)]))

# fig, ax = plt.subplots(3, 1, sharex=True)
# for t_idx in range(len(data_loader.turbine_ids)):
#     ax[0].scatter(x=frozen_thresholds, y=[x[1][t_idx] for x in ws_pcs], label=data_loader.turbine_ids[t_idx])
#     ax[1].scatter(x=frozen_thresholds, y=[x[1][t_idx] for x in wd_pcs], label=data_loader.turbine_ids[t_idx])
#     ax[2].scatter(x=frozen_thresholds, y=[x[1][t_idx] for x in pwr_pcs],label=data_loader.turbine_ids[t_idx])

# h, l = ax[0].get_legend_handles_labels()
# ax[0].legend(h[:len(data_loader.turbine_ids)], l[:len(data_loader.turbine_ids)])
# ax[0].set_title("Percentage of Unfrozen Wind Speed Measurements")
# ax[1].set_title("Percentage of Unfrozen Wind Direction Measurements")
# ax[2].set_title("Percentage of Unfrozen Power Output Measurements")
# qa.describe(pl.concat([DataInspector.collect_data(df=df_query, feature_types=feature_type, mask=mask, to_pandas=False)
#                         for mask, feature_type in zip([ws_frozen_sensor, wd_frozen_sensor, pwr_frozen_sensor], ["wind_speed", "wind_direction", "power_output"])], 
#                             how="horizontal")\
#                                .to_pandas())

# TODO ASK ERIC if necessary
if False:
    thr = data_loader.ffill_limit + 1
    # frozen_sensor = (filters.unresponsive_flag(data=df_query.select(features).collect(streaming=True).to_pandas(), threshold=thr)
                                    # & df_query.select([pl.col(feat).is_not_null().alias(feat) for feat in features])\
                                    #   .collect(streaming=True).to_pandas()
                                    #   ).values
    features = ws_cols + wd_cols
    # frozen_sensor = np.stack([(filters.unresponsive_flag(data=df_query.select(feat).collect(streaming=True).to_pandas(), threshold=thr)
    #                                 & df_query.select(pl.col(feat).is_not_null().alias(feat))\
    #                                 .collect(streaming=True).to_pandas()
    #                                 ).values for feat in features], axis=1)
    # find values of wind speed/direction, where there are duplicate values with nulls inbetween
    mask = lambda tid: ~frozen_sensor[:, data_loader.turbine_ids.index(tid)]

# df_query.select([pl.col(feat).is_not_null().name.suffix("_not_null") for feat in ws_cols])\
#                                   .collect(streaming=True).to_pandas()
# 
# ws_frozen_sensor = (filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="wind_speed"), threshold=thr)
#                                 & df_query.select([pl.col(feat).is_not_null().name.suffix("_not_null") for feat in ws_cols])\
#                                   .collect(streaming=True).to_pandas()
#                                   ).values
# ws_frozen_sensor
# wd_frozen_sensor = np.stack([(filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="wind_direction"), threshold=thr)
#                                 & df_query.select(no_nulls=pl.col(f"wind_speed_{tid}").is_not_null())\
#                                   .collect(streaming=True).to_pandas()["no_nulls"]
#                                   ).values for tid in data_loader.turbine_ids], axis=1)
# pwr_frozen_sensor = np.stack([(filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="power_output"), threshold=thr)
#                                 & df_query.select(no_nulls=pl.col(f"wind_speed_{tid}").is_not_null())\
#                                   .collect(streaming=True).to_pandas()["no_nulls"]
#                                   ).values for tid in data_loader.turbine_ids], axis=1)

# wd_frozen_sensor = filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="wind_direction"), threshold=thr).values
# pwr_frozen_sensor = filters.unresponsive_flag(data=DataInspector.collect_data(df=df_query, feature_types="power_output"), threshold=thr).values

# ws_mask = lambda tid: ~ws_frozen_sensor[:, data_loader.turbine_ids.index(tid)]
# wd_mask = lambda tid: ~wd_frozen_sensor[:, data_loader.turbine_ids.index(tid)]
# pwr_mask = lambda tid: ~pwr_frozen_sensor[:, data_loader.turbine_ids.index(tid)]


In [40]:
if False:
    frozen_sensor

In [41]:
if PLOT and False:
    plot.plot_power_curve(
        DataInspector.collect_data(df=df_query, feature_types="wind_speed"),
        DataInspector.collect_data(df=df_query, feature_types="power_output"),
        flag=ws_frozen_sensor,
        flag_labels=(f"Wind Speed Unresponsive Sensors (n={ws_frozen_sensor.sum():,.0f})", "Normal Turbine Operations"),
        xlim=(-1, 15),  # optional input for refining plots
        ylim=(-100, 3000),  # optional input for refining plots
        legend=True,  # optional flag for adding a legend
        scatter_kwargs=dict(alpha=0.4, s=10)  # optional input for refining plots
    )

    plot.plot_power_curve(
        DataInspector.collect_data(df=df_query, feature_types="wind_speed"),
        DataInspector.collect_data(df=df_query, feature_types="power_output"),
        flag=wd_frozen_sensor,
        flag_labels=(f"Wind Direction Unresponsive Sensors (n={wd_frozen_sensor.sum():,.0f})", "Normal Turbine Operations"),
        xlim=(-1, 15),  # optional input for refining plots
        ylim=(-100, 3000),  # optional input for refining plots
        legend=True,  # optional flag for adding a legend
        scatter_kwargs=dict(alpha=0.4, s=10)  # optional input for refining plots
    )

    plot.plot_power_curve(
        DataInspector.collect_data(df=df_query, feature_types="wind_speed"),
        DataInspector.collect_data(df=df_query, feature_types="power_output"),
        flag=pwr_frozen_sensor,
        flag_labels=(f"Power Output Unresponsive Sensors (n={pwr_frozen_sensor.sum():,.0f})", "Normal Turbine Operations"),
        xlim=(-1, 15),  # optional input for refining plots
        ylim=(-100, 3000),  # optional input for refining plots
        legend=True,  # optional flag for adding a legend
        scatter_kwargs=dict(alpha=0.4, s=10)  # optional input for refining plots
    )

In [42]:
# change the values corresponding to frozen sensor measurements to null or interpolate (instead of dropping full row, since other sensors could be functioning properly)
# fill stuck sensor measurements with Null st they are marked for interpolation later,
if False:
    threshold = 0.01
    df_query = data_filter.conditional_filter(df_query, threshold, mask, features)
# df_query = df_query.with_columns(
#                 [pl.when(~frozen_mask[:, data_loader.turbine_ids.index(feat.split("_")[-1])]).then(pl.col(feat)).alias(feat)
#                 for features, frozen_mask in zip(
#                     [ws_cols, wd_cols, pwr_cols], 
#                     [ws_frozen_sensor, wd_frozen_sensor, pwr_frozen_sensor])
#                 for feat in features]
# )

In [43]:
if False:
    del frozen_sensor

### Assess and Impute/Interpolate Turbine Missing Data from Correlated Measurements OR Split Dataset during Time Stamps for which many Turbines have Missing Data

In [44]:
df_query.head().collect()

nacelle_direction_wt001,nacelle_direction_wt002,nacelle_direction_wt003,nacelle_direction_wt004,nacelle_direction_wt005,nacelle_direction_wt006,nacelle_direction_wt007,nacelle_direction_wt008,nacelle_direction_wt009,nacelle_direction_wt010,nacelle_direction_wt011,nacelle_direction_wt012,nacelle_direction_wt013,nacelle_direction_wt014,nacelle_direction_wt015,nacelle_direction_wt016,nacelle_direction_wt017,nacelle_direction_wt018,nacelle_direction_wt019,nacelle_direction_wt020,nacelle_direction_wt021,nacelle_direction_wt022,nacelle_direction_wt023,nacelle_direction_wt024,nacelle_direction_wt025,nacelle_direction_wt026,nacelle_direction_wt027,nacelle_direction_wt028,nacelle_direction_wt029,nacelle_direction_wt030,nacelle_direction_wt031,nacelle_direction_wt032,nacelle_direction_wt033,nacelle_direction_wt034,nacelle_direction_wt035,nacelle_direction_wt036,nacelle_direction_wt037,…,wind_speed_wt052,wind_speed_wt053,wind_speed_wt054,wind_speed_wt055,wind_speed_wt056,wind_speed_wt057,wind_speed_wt058,wind_speed_wt059,wind_speed_wt060,wind_speed_wt061,wind_speed_wt062,wind_speed_wt063,wind_speed_wt064,wind_speed_wt065,wind_speed_wt066,wind_speed_wt067,wind_speed_wt068,wind_speed_wt069,wind_speed_wt070,wind_speed_wt071,wind_speed_wt072,wind_speed_wt073,wind_speed_wt074,wind_speed_wt075,wind_speed_wt076,wind_speed_wt077,wind_speed_wt078,wind_speed_wt079,wind_speed_wt080,wind_speed_wt081,wind_speed_wt082,wind_speed_wt083,wind_speed_wt084,wind_speed_wt085,wind_speed_wt086,wind_speed_wt087,wind_speed_wt088
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,…,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,209.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,6.220496,4.500181,5.450901,5.352233,9.499508,7.855479,8.870214,5.862917,6.421623,6.142904,6.54178,7.106257,6.367146,6.741147,7.10255,6.8440237,6.4179883,6.374405,5.521564,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.021093,8.270353,6.963817,6.445257,5.110387,7.019245,6.029548,5.080723,9.330383,7.054394,9.076818,6.197019,6.078082,6.42344,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,6.0583,6.908477,,5.02322,5.197799,5.468552,6.799896,3.756544,7.152627,6.997062
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,7.119235,7.249273,6.434347,6.367146,6.004409,7.647892,6.151917,5.180297,8.077567,7.275337,8.020494,6.365332,5.41739,6.378035,6.301889,7.364848,6.890049,6.483469,6.737479,7.072907,6.902947,6.519901,5.645601,6.569147,6.168147,5.405053,2.6379,5.72381,7.409687,,5.78794,5.072004,5.716692,6.492573,3.442897,7.353647,6.958279
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,5.950606,6.832987,6.323628,6.461628,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.358075,5.229326,6.842185,6.292836,4.52929,2.549927,6.276546,5.986466,,5.120864,4.737289,5.445607,6.060098,2.788936,6.673342,6.570972
216.0,233.0,216.0,214.0,212.0,207.0,214.0,216.0,204.0,229.0,213.0,226.0,214.0,216.0,217.0,210.0,206.0,221.0,224.0,200.0,209.0,208.0,205.0,211.0,213.0,198.0,220.0,212.0,215.0,337.0,222.0,208.0,220.0,220.0,217.0,216.0,218.0,…,6.552724,8.437213,6.031344,6.5581975,5.526871,7.984393,5.952399,5.399766,8.241659,8.220631,8.350815,6.336315,5.802208,5.030184,6.263882,6.529016,7.104403,6.834826,6.65504,6.851384,5.768331,6.636747,6.112278,6.534485,6.191604,5.162805,3.113147,5.4209156,7.046991,,5.120864,4.737289,5.445607,6.060098,2.788936,6.283785,6.552724


In [45]:
if PLOT:
    data_inspector.plot_time_series(df_query, turbine_ids="all")

In [46]:
def add_df_continuity_columns(df, mask):
  # change first value of continuous_shifted to false such that add_df_agg_continuity_columns catches it as a start time for a period 
  return df\
        .filter(mask)\
        .with_columns(dt=pl.col("time").diff())\
        .with_columns(dt=pl.when(pl.int_range(0, pl.len()) == 0).then(np.timedelta64(data_loader.dt, "s")).otherwise(pl.col("dt")))\
        .select("time", "dt", cs.ends_with("num_missing"), cs.ends_with("is_missing"))\
        .with_columns(continuous=pl.col("dt")==np.timedelta64(data_loader.dt, "s"))\
        .with_columns(continuous_shifted=pl.col("continuous").shift(-1, fill_value=True))

def add_df_agg_continuity_columns(df):
  # if the continuous flag is True, but the value in the row before it False
  df = df.filter(pl.col("continuous") | (pl.col("continuous") & ~pl.col("continuous_shifted")) |  (~pl.col("continuous") & pl.col("continuous_shifted")))
  start_time_cond = ((pl.col("continuous") & ~pl.col("continuous_shifted"))).shift() | (pl.int_range(0, pl.len()) == 0)
  end_time_cond = (~pl.col("continuous") & pl.col("continuous_shifted"))
  return pl.concat([df.filter(start_time_cond).select(start_time=pl.col("time")), 
                    df.with_columns(end_time=pl.col("time").shift(1)).filter(end_time_cond).select("end_time")], how="horizontal")\
            .with_columns(duration=(pl.col("end_time") - pl.col("start_time")))\
            .sort("start_time")\
            .drop_nulls()

def get_continuity_group_index(df):
  # Create the condition for the group
  group_number = None

  # Create conditions to assign group numbers based on time ranges
  for i, (start, end) in enumerate(df.collect().select("start_time", "end_time").iter_rows()):
      # print(i, start, end, duration)
      time_cond = pl.col("time").is_between(start, end)
      if group_number is None:
          group_number = pl.when(time_cond).then(pl.lit(i))
      else:
          group_number = group_number.when(time_cond).then(pl.lit(i))
  
  # If no group is matched, assign a default value (e.g., -1) 
  group_number = group_number.when(pl.col("time") > end).then(pl.lit(i + 1))
  group_number = group_number.otherwise(pl.lit(-1))
  return group_number

def group_df_by_continuity(df, agg_df):
  group_number = get_continuity_group_index(agg_df)

  return pl.concat([agg_df, df.with_columns(group_number.alias("continuity_group"))\
          .filter(pl.col("continuity_group") != -1)\
          .group_by("continuity_group")\
          .agg(cs.ends_with("is_missing").sum())\
          .with_columns([pl.sum_horizontal(cs.starts_with(col) & cs.ends_with("is_missing")).alias(f"{col}_is_missing") for col in missing_data_cols])\
          .sort("continuity_group")], how="horizontal")

In [107]:
# if there is a short or long gap for some turbines, impute them using the imputing.impute_all_assets_by_correlation function
#       else if there is a short or long gap for many turbines, split the dataset
missing_col_thr = max(1, int(len(data_loader.turbine_ids) * 0.05))
missing_duration_thr = np.timedelta64(5, "m")
missing_data_cols = ["wind_speed", "wind_direction", "nacelle_direction"]

# check for any periods of time for which more than 'missing_col_thr' features have missing data

df_query2 = df_query\
        .with_columns([cs.starts_with(col).is_null().name.suffix("_is_missing") for col in missing_data_cols])\
        .with_columns(**{f"{col}_num_missing": pl.sum_horizontal((cs.starts_with(col) & cs.contains("is_missing"))) for col in missing_data_cols})

# subset of data, indexed by time, which has <= the threshold number of missing columns
df_query_not_missing_times = add_df_continuity_columns(df_query2, mask=pl.sum_horizontal(cs.ends_with("num_missing")) <= missing_col_thr)

# subset of data, indexed by time, which has > the threshold number of missing columns
df_query_missing_times = add_df_continuity_columns(df_query2, mask=pl.sum_horizontal(cs.ends_with("num_missing")) > missing_col_thr)

# start times, end times, and durations of each of the continuous subsets of data in df_query_missing_times 
df_query_missing = add_df_agg_continuity_columns(df_query_missing_times)

# start times, end times, and durations of each of the continuous subsets of data in df_query_not_missing_times 
# AND of each of the coninuous subsets of data in df_query_missing_times that are under the threshold duration time 
df_query_not_missing = pl.concat([add_df_agg_continuity_columns(df_query_not_missing_times), 
                                  df_query_missing.filter(pl.col("duration") <= missing_duration_thr)])\
                          .sort("start_time")

df_query_missing = df_query_missing.filter(pl.col("duration") > missing_duration_thr)

if df_query_not_missing.select(pl.len()).collect().item() == 0:
  raise Exception("Parameters 'missing_col_thr' or 'missing_duration_thr' are too stringent, can't find any eligible durations of time.")

In [55]:
df_query_not_missing.head().collect()

start_time,end_time,duration
datetime[μs],datetime[μs],duration[μs]
2022-03-01 00:00:00,2022-03-01 12:00:30,12h 30s
2022-03-01 12:00:35,2022-03-01 12:00:50,15s
2022-03-01 12:00:55,2022-03-01 12:01:00,5s
2022-03-01 12:01:05,2022-03-01 12:01:40,35s
2022-03-01 12:01:45,2022-03-01 14:12:30,2h 10m 45s


In [154]:
# end_times_to_merge = df_query_not_missing.filter(((pl.col("end_time") + np.timedelta64(data_loader.dt, "s")) == (pl.col("start_time").shift(-1)))).collect().select("end_time").to_numpy()
from datetime import timedelta
all_start_times = df_query_not_missing.select(pl.col("start_time")).collect()
all_end_times = df_query_not_missing.select(pl.col("end_time")).collect()

data = {"start_time":[], "end_time": []}
i = 0
#TODO loop through and merge as long as the shifted -1 end time + dt == the start time

start_time_idx = 0
end_time_idx = 0
for 1:
    end_time = all_end_times.item(end_time_idx, "end_time") 
    if (i + 1 < all_start_times.select(pl.len()).item()):
        break
    
    if (end_time + timedelta(seconds=data_loader.dt)  == all_start_times.item(i + 1, "start_time")):
        continue
    
    data["start_time"].append(all_start_times.item(start_time_idx, "start_time"))
    data["end_time"].append(end_time)

    start_time_idx = i + 1

pl.DataFrame(data)

start_time,end_time
datetime[μs],datetime[μs]
2022-03-01 00:00:00,2022-03-01 14:12:30
2022-03-01 14:21:25,2022-03-01 23:59:55
2022-03-02 21:11:00,2022-03-02 21:32:40


In [152]:
all_start_times.item(i+1, "start_time")

datetime.datetime(2022, 3, 1, 12, 0, 35)

In [None]:
# merge rows with end times corresponding to start times of the next row into the next row, until no more rows need to be merged
while 1:
  # get the end times which should be merged with subsequent rows
  end_times_to_merge = df_query_not_missing.filter(((pl.col("end_time") + np.timedelta64(data_loader.dt, "s")) == (pl.col("start_time").shift(-1)))).collect().select("end_time")
  
  if end_times_to_merge.select(pl.len()).item() == 0:
    break
  
  df_query_not_missing = df_query_not_missing.select(pl.when(pl.col("end_time").is_in(end_times_to_merge)).then(pl.col("start_time")).alias("start_time"),
                                                     pl.when(pl.col("end_time").is_in(end_times_to_merge)).then(pl.col("end_time").shift(-1)).alias("end_time"))\
                                            .drop_nulls()

df_query_not_missing = df_query_not_missing.with_columns((pl.col("end_time") - pl.col("start_time")).alias("duration")).drop_nulls()
  
del end_times_to_merge

# df_query_not_missing = group_df_by_continuity(pl.concat([df_query_not_missing_times, df_query_missing_times]), df_query_not_missing)
df_query_missing = group_df_by_continuity(df_query2, df_query_missing)
df_query_not_missing = group_df_by_continuity(df_query2, df_query_not_missing)

In [50]:
# df_query = df_query2
df_query_missing.head().collect()
df_query_not_missing.head().collect()

start_time,end_time,duration,continuity_group,wind_speed_wt001_is_missing,wind_speed_wt002_is_missing,wind_speed_wt003_is_missing,wind_speed_wt004_is_missing,wind_speed_wt005_is_missing,wind_speed_wt006_is_missing,wind_speed_wt007_is_missing,wind_speed_wt008_is_missing,wind_speed_wt009_is_missing,wind_speed_wt010_is_missing,wind_speed_wt011_is_missing,wind_speed_wt012_is_missing,wind_speed_wt013_is_missing,wind_speed_wt014_is_missing,wind_speed_wt015_is_missing,wind_speed_wt016_is_missing,wind_speed_wt017_is_missing,wind_speed_wt018_is_missing,wind_speed_wt019_is_missing,wind_speed_wt020_is_missing,wind_speed_wt021_is_missing,wind_speed_wt022_is_missing,wind_speed_wt023_is_missing,wind_speed_wt024_is_missing,wind_speed_wt025_is_missing,wind_speed_wt026_is_missing,wind_speed_wt027_is_missing,wind_speed_wt028_is_missing,wind_speed_wt029_is_missing,wind_speed_wt030_is_missing,wind_speed_wt031_is_missing,wind_speed_wt032_is_missing,wind_speed_wt033_is_missing,…,nacelle_direction_wt055_is_missing,nacelle_direction_wt056_is_missing,nacelle_direction_wt057_is_missing,nacelle_direction_wt058_is_missing,nacelle_direction_wt059_is_missing,nacelle_direction_wt060_is_missing,nacelle_direction_wt061_is_missing,nacelle_direction_wt062_is_missing,nacelle_direction_wt063_is_missing,nacelle_direction_wt064_is_missing,nacelle_direction_wt065_is_missing,nacelle_direction_wt066_is_missing,nacelle_direction_wt067_is_missing,nacelle_direction_wt068_is_missing,nacelle_direction_wt069_is_missing,nacelle_direction_wt070_is_missing,nacelle_direction_wt071_is_missing,nacelle_direction_wt072_is_missing,nacelle_direction_wt073_is_missing,nacelle_direction_wt074_is_missing,nacelle_direction_wt075_is_missing,nacelle_direction_wt076_is_missing,nacelle_direction_wt077_is_missing,nacelle_direction_wt078_is_missing,nacelle_direction_wt079_is_missing,nacelle_direction_wt080_is_missing,nacelle_direction_wt081_is_missing,nacelle_direction_wt082_is_missing,nacelle_direction_wt083_is_missing,nacelle_direction_wt084_is_missing,nacelle_direction_wt085_is_missing,nacelle_direction_wt086_is_missing,nacelle_direction_wt087_is_missing,nacelle_direction_wt088_is_missing,wind_speed_is_missing,wind_direction_is_missing,nacelle_direction_is_missing
datetime[μs],datetime[μs],duration[μs],i32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,…,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
2022-03-01 00:00:00,2022-03-01 12:00:50,12h 50s,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,8651,0,0,0,…,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,17306,8655,0
2022-03-01 12:00:35,2022-03-01 12:01:00,25s,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,…,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,4,2,0
2022-03-01 12:00:55,2022-03-01 12:01:40,45s,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,8,0,0,0,…,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,24,16,0
2022-03-01 12:01:05,2022-03-01 14:12:30,2h 11m 25s,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1570,0,0,0,…,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3140,1570,0
,,,4,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,24330,0,0,0,…,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,64031,39701,0


In [None]:
if PLOT:
  # Plot number of missing wind dir/wind speed data for each wind turbine (missing duration on x axis, turbine id on y axis, color for wind direction/wind speed)
  from matplotlib import colormaps
  from matplotlib.ticker import MaxNLocator
  fig, ax = plt.subplots(1, 1)
  for feature_type, marker in zip(missing_data_cols, ["o", "^"]):
    for turbine_id, color in zip(data_loader.turbine_ids, colormaps["tab20c"](np.linspace(0, 1, len(data_loader.turbine_ids)))):
      df = df_query_missing.select("duration", f"{feature_type}_{turbine_id}_is_missing").collect(streaming=True).to_pandas()
      ax.scatter(x=df["duration"].dt.seconds / 3600,
                  y=df[f"{feature_type}_{turbine_id}_is_missing"].astype(int),  
      marker=marker, label=turbine_id, s=400, color=color)
  ax.set_title("Occurence of Missing Wind Speed (circle) and Wind Direction (triangle) Values vs. Missing Duration, for each Turbine")
  ax.set_xlabel("Duration of Missing Values [hrs]")
  ax.set_ylabel("Number of Missing Values over this Duration")
  h, l = ax.get_legend_handles_labels()
  ax.legend(h[:len(data_loader.turbine_ids)], l[:len(data_loader.turbine_ids)])
  ax.yaxis.set_major_locator(MaxNLocator(integer=True))

  # Plot missing duration on x axis, number of missing turbines on y-axis, marker for wind speed vs wind direction,
  fig, ax = plt.subplots(1, 1)
  for feature_type, marker in zip(missing_data_cols, ["o", "^"]):
    df = df_query_missing.select("duration", (cs.contains(feature_type) & cs.ends_with("is_missing")))\
                            .with_columns(pl.sum_horizontal([f"{feature_type}_{tid}_is_missing" for tid in data_loader.turbine_ids]).alias(f"{feature_type}_is_missing")).collect(streaming=True).to_pandas()
    ax.scatter(x=df["duration"].dt.seconds / 3600,
                y=df[f"{feature_type}_is_missing"].astype(int),  
    marker=marker, label=feature_type, s=400)
  ax.set_title("Occurence of Missing Wind Speed (circle) and Wind Direction (triangle) Values vs. Missing Duration, for all Turbines")
  ax.set_xlabel("Duration of Missing Values [hrs]")
  ax.set_ylabel("Number of Missing Values over this Duration")
  h, l = ax.get_legend_handles_labels()
  ax.legend(h[:len(missing_data_cols)], l[:len(missing_data_cols)], ncol=8)
  ax.yaxis.set_major_locator(MaxNLocator(integer=True))

In [None]:
# if more than 'missing_col_thr' columns are missing data for more than 'missing_timesteps_thr', split the dataset at the point of temporal discontinuity
df_query = [df.lazy() for df in df_query.with_columns(get_continuity_group_index(df_query_not_missing).alias("continuity_group"))\
                          .filter(pl.col("continuity_group") != -1)\
                          .drop(cs.contains("is_missing") | cs.contains("num_missing"))
                          .collect(streaming=True)\
                          .sort("time")\
                          .partition_by("continuity_group")]

# check that each split dataframe a) is continuous in time AND b) has <= than the threshold number of missing columns OR for less than the threshold time span
# for df in df_query:
#     assert df.select((pl.col("time").diff(null_behavior="drop") == np.timedelta64(data_loader.dt, "s")).all()).collect(streaming=True).item()
#     assert (df.select((pl.sum_horizontal([(cs.numeric() & cs.contains(col)).is_null() for col in missing_data_cols]) <= missing_col_thr)).collect(streaming=True)
#             |  ((df.select("time").max().collect(streaming=True).item() - df.select("time").min().collect(streaming=True).item()) < missing_duration_thr))

# max_len = 0
# max_idx = -1
# for i, df in enumerate(split_df_query):
#     # print(i, df.collect(streaming=True).shape[0])
#     if (new_len := df.collect(streaming=True).shape[0]) > max_len:
#         max_len = new_len
#         max_idx = i

# print(max_idx, max_len)

In [None]:
# else, for each of those split datasets, impute the values using the imputing.impute_all_assets_by_correlation function 
# TODO add to resolve_missing_data function parallelize, convert to logging

imputed_df_idx = []
interpolated_df_idx = []
ffill_df_idx = []

for i, df in enumerate(df_query):
    for col in missing_data_cols:
        unpivot_df = data_inspector.unpivot_dataframe(df)
        imputed = False
        for other_col in [col] + [c for c in ["wind_speed", "wind_direction", "power_output", "nacelle_direction"] if c != col]:
            try:
                imputed_vals = imputing.impute_all_assets_by_correlation(data=unpivot_df.collect().to_pandas().set_index(["turbine_id", "time"]),
                                                            impute_col=col, reference_col=other_col,
                                                            asset_id_col="turbine_id", method="linear").to_numpy()
                df_query[i] = DataInspector.pivot_dataframe(
                    unpivot_df.with_columns({col: imputed_vals}))
                # print(f"\nSuccessfully imputed column {col} in DataFrame {i}.")
                imputed_df_idx.append((i, col))
                imputed = True
                break
            except ValueError as e:
                continue
            
        if not imputed:
            # allow interpolation from its own colums, and others if that is not possible
            df_query[i] = DataInspector.pivot_dataframe(
                unpivot_df.with_columns({col: pl.col(col).interpolate()})
            )
            # print(f"\nSuccessfully interpolated column {col} in DataFrame {i} using other column {other_col}.")
            interpolated_df_idx.append((i, col))
        
del unpivot_df

for i, col in imputed_df_idx: 
    print(f"\nSuccessfully imputed column {col} in DataFrame {i}.")

for i, col in interpolated_df_idx: 
    print(f"\nSuccessfully interpolated column {col} in DataFrame {i}.")

# for i, col in ffill_df_idx:
#     print(f"\nSuccesfully forward/backward filled column {col} in DataFrame {i}.") 

filled_df_idx = []
missing_data_cols = ["wind_speed", "wind_direction", "nacelle_direction"]
for i, df in enumerate(df_query):
    # fill any remaining null values with forward fill or backward fill
    
    if df_query[i].select(pl.any_horizontal([cs.starts_with(col).is_null() for col in missing_data_cols]))\
                  .select(pl.len()).collect().item():

        df_query[i] = df_query[i].with_columns([cs.starts_with(col).fill_null(strategy="forward") for col in missing_data_cols])\
                                 .with_columns([cs.starts_with(col).fill_null(strategy="backward") for col in missing_data_cols])

        for col in [c for c in data_loader.available_features if any(cc in c for cc in missing_data_cols)]:
             if df_query[i].select(pl.col(col).is_null())\
                  .select(pl.len()).collect().item():
                filled_df_idx.append((i, col))

for i, col in filled_df_idx: 
    print(f"\nSuccessfully interpolated column {col} in DataFrame {i}.")

In [None]:
for i, col in filled_df_idx: 
    print(f"\nSuccessfully interpolated column {col} in DataFrame {i}.")
df_query[0].collect()

In [310]:
# remerge into one dataframe for nacelle calibration and normalization (may not need to if integrated in models)
# df_query = data_filter.resolve_missing_data(df_query, features=["wind_speed", "wind_direction", "power_output"], how="forward_fill")
# df_query = df_query.pivot(on="turbine_id", index="time", values=["power_output", "nacelle_direction", "wind_speed", "wind_direction", "turbine_status"]).lazy()
df_query = pl.concat([df.with_columns(continuity_group=pl.lit(i)) for i, df in enumerate(df_query)], how="vertical").sort("time")

## Nacelle Calibration

### Find and correct wind direction offsets from median wind plant wind direction for each turbine

In [None]:
# turbine_ids = df_query.select("turbine_id").unique().collect(streaming=True).to_numpy()[:, 0]

# add the 3 degrees back to the wind direction signal
offset = 3.0
# ERIC WHY ADD OFFSET TODO
df_query2 = df_query.with_columns((cs.starts_with("wind_direction") + offset % 360.0))

wd_median = np.median(df_query2.select(cs.starts_with("wind_direction")).collect().to_numpy(), axis=1)
wd_median = np.degrees(np.arctan2(np.sin(np.radians(wd_median)), np.cos(np.radians(wd_median))))
wd_median = df_query2.select("time", cs.starts_with("wind_direction"), cs.starts_with("power_output")).with_columns(wd_median=wd_median)

yaw_median = np.median(df_query2.select(cs.starts_with("nacelle_direction")).collect().to_numpy(), axis=1)
yaw_median = np.degrees(np.arctan2(np.sin(np.radians(yaw_median)), np.cos(np.radians(yaw_median))))
yaw_median = df_query2.select("time", cs.starts_with("nacelle_direction"), cs.starts_with("power_output")).with_columns(yaw_median=yaw_median)

fig, ax = plt.subplots(1, 1)
for turbine_id in turbine_ids:
    df = df_query2.filter(pl.col(f"power_output_{turbine_id}") >= 0).select("time", f"wind_direction_{turbine_id}").collect()
                        
    ax.plot(df.select("time").to_numpy().flatten(), 
            DataFilter.wrap_180(
                        (df.select(f"wind_direction_{turbine_id}")
                        - wd_median.filter(pl.col(f"power_output_{turbine_id}") >= 0).select(f"wd_median").collect()).to_numpy().flatten()),
                        label=f"{turbine_id}")

ax.legend()
ax.set_xlabel("Time (s)")
ax.set_ylabel("Wind Direction - Median Wind Direction (deg)")

ax.set_title("Original")

In [None]:
df_offsets = {"turbine_id": [], "northing_bias": []}

# remove biases from median direction
for turbine_id in data_loader.turbine_ids:
    df = df_query2.filter(pl.col(f"power_output_{turbine_id}") >= 0)\
                  .select("time", f"wind_direction_{turbine_id}", f"nacelle_direction_{turbine_id}").collect()
                   
    wd_bias = DataFilter.wrap_180(DataFilter.circ_mean(
        (df.select(f"wind_direction_{turbine_id}")
                                    - wd_median.filter(pl.col(f"power_output_{turbine_id}") >= 0).select(f"wd_median").collect()).to_numpy().flatten()))
    yaw_bias = DataFilter.wrap_180(DataFilter.circ_mean(
        (df.select(f"nacelle_direction_{turbine_id}")
                                    - yaw_median.filter(pl.col(f"power_output_{turbine_id}") >= 0).select(f"yaw_median").collect()).to_numpy().flatten()))

    df_offsets["turbine_id"].append(turbine_id)
    df_offsets["northing_bias"].append(np.round(0.5 * (wd_bias + yaw_bias), 2))
    
    # TODO QUESTION ERIC how to know if some turbines need exceptional cases??
    offset = -0.5 * (wd_bias + yaw_bias)
    df = df.with_columns((pl.col(f"wind_direction_{turbine_id}") + offset) % 360.0, (pl.col(f"nacelle_direction_{turbine_id}") + offset) % 360.0)

    print(f"Turbine {turbine_id} bias from median wind direction: {df_offsets['northing_bias'][-1]} deg.")

df_offsets = pl.DataFrame(df_offsets)
# handle special case of turbine 39 with a couple change points
"""
tid = "wd_040"
df = DataInspector.collect_data(df=df_query2, 
                        features=["time", "turbine_id", "wind_direction", "power_output", "nacelle_direction"], 
                        mask=((pl.col("turbine_id") == tid) & (pl.col("power_output") >= 0)))
wd_bias_1 = DataFilter.wrap_180(DataFilter.circ_mean(df.loc[
    (df['time'] <= "2021-06-09 19:30"), "wind_direction"].values \
        - wd_median.loc[
            (wd_median['time'] <= "2021-06-09 19:30") 
        & (wd_median[f"power_output_{tid}"] >= 0), "wind_direction"].values))

wd_bias_2 = DataFilter.wrap_180(DataFilter.circ_mean(df.loc[
    (df['time'] >= "2021-06-09 19:40")
    & (df['time'] <= "2021-09-14 19:50"), "wind_direction"].values \
        - wd_median.loc[
            (wd_median['time'] >= "2021-06-09 19:30")
            & (wd_median['time'] <= "2021-09-14 19:50")   
        & (wd_median[f"power_output_{tid}"] >= 0), "wind_direction"].values))

wd_bias_3 = DataFilter.wrap_180(DataFilter.circ_mean(df.loc[
    (df['time'] >= "2021-09-14 20:00"), "wind_direction"].values \
        - wd_median.loc[
            (wd_median['time'] >= "2021-09-14 20:00")
        & (wd_median[f"power_output_{tid}"] >= 0), "wind_direction"].values))

yaw_bias_1 = DataFilter.wrap_180(DataFilter.circ_mean(df.loc[
    (df['time'] <= "2021-06-09 19:30"), "nacelle_direction"].values \
        - yaw_median.loc[
            (yaw_median['time'] <= "2021-06-09 19:30") 
        & (yaw_median[f"power_output_{tid}"] >= 0), "wind_direction"].values))

yaw_bias_2 = DataFilter.wrap_180(DataFilter.circ_mean(df.loc[
    (df['time'] >= "2021-06-09 19:40")
    & (df['time'] <= "2021-09-14 19:50"), "nacelle_direction"].values \
        - yaw_median.loc[
            (yaw_median['time'] >= "2021-06-09 19:30")
            & (yaw_median['time'] <= "2021-09-14 19:50")   
        & (yaw_median[f"power_output_{tid}"] >= 0), "wind_direction"].values))

yaw_bias_3 = DataFilter.wrap_180(DataFilter.circ_mean(df.loc[
    (df['time'] >= "2021-09-14 20:00"), "nacelle_direction"].values \
        - yaw_median.loc[
            (yaw_median['time'] >= "2021-09-14 20:00")
        & (yaw_median[f"power_output_{tid}"] >= 0), "wind_direction"].values))

cond = (df['time'] <= "2021-06-09 19:30")
df.loc[cond, "wind_direction"] = (df.loc[cond, "wind_direction"] - 0.5 * (wd_bias_1 + yaw_bias_1)) % 360
df.loc[cond, "nacelle_direction"] = (df[cond, "nacelle_direction"] - 0.5 * (wd_bias_1 + yaw_bias_1)) % 360

cond = (df['time'] >= "2021-06-09 19:40") & (df['time'] <= "2021-09-14 19:50")
df.loc[cond, "wind_direction"] = (df.loc[cond, "wind_direction"] - 0.5 * (wd_bias_2 + yaw_bias_2)) % 360
df.loc[cond, "nacelle_direction"] = (df.loc[cond, "nacelle_direction"] - 0.5 * (wd_bias_2 + yaw_bias_2)) % 360

cond = (df['time'] >= "2021-09-14 20:00")
df.loc[cond, "wind_direction"] = (df.loc[cond, "wind_direction"] - 0.5 * (wd_bias_3 + yaw_bias_3)) % 360
df.loc[cond, "nacelle_direction"] = (df.loc[cond, "nacelle_direction"] - 0.5 * (wd_bias_3 + yaw_bias_3)) % 360

print("Biases from median wind direction for turbine 39:")

print(f"wd_bias_1: {wd_bias_1}")
print(f"wd_bias_2: {wd_bias_2}")
print(f"wd_bias_3: {wd_bias_3}")

print(f"yaw_bias_1: {yaw_bias_1}")
print(f"yaw_bias_2: {yaw_bias_2}")
print(f"yaw_bias_3: {yaw_bias_3}")

plt.figure()
for turbine_id in turbine_ids:
    plt.plot(df["time"], 
    DataFilter.wrap_180(df["wind_direction"].values - wd_median.loc[wd_median[f"power_output_{turbine_id}"] >= 0, "wind_direction"].values))

plt.xlabel("Time (s)")
plt.ylabel("Wind Direction - Median Wind Direction (deg)")
plt.title("Corrected")

# specific time of changepoints for turbine 39: 6/9 19:35:55; 9/14 19:55:02
"""
# make sure we have corrected the bias between wind direction and yaw position by adding 3 deg. to the wind direction
bias = 0
for turbine_id in turbine_ids:
    df = df_query2.filter(pl.col(f"power_output_{turbine_id}") >= 0)\
                  .select("time", f"wind_direction_{turbine_id}", f"nacelle_direction_{turbine_id}")
                  
    bias += DataFilter.wrap_180(DataFilter.circ_mean(df.select(pl.col(f"wind_direction_{turbine_id}") - pl.col(f"nacelle_direction_{turbine_id}")).collect().to_numpy().flatten()))
    
print(f"Average Bias = {bias / len(turbine_ids)}")

### Find offset to true North using wake loss profiles

In [355]:
# Optimization function for finding waked direction
def gauss_corr(gauss_params, power_ratio):
    xs = np.array(range(-int((len(power_ratio) - 1) / 2), int((len(power_ratio) + 1) / 2), 1))
    gauss = -1 * gauss_params[2] * np.exp(-0.5 * ((xs - gauss_params[0]) / gauss_params[1])**2) + 1.
    return -1 * np.corrcoef(gauss, power_ratio)[0, 1]

In [None]:
# TODO Find offsets between direction of alignment between pairs of turbines 
# and direction of peak wake losses. Use the average offset found this way 
# to identify the Northing correction that should be applied to all turbines 
# in the wind farm.
from scipy.stats import norm
from scipy.optimize import minimize

from floris import FlorisModel
fi = FlorisModel(data_inspector.farm_input_filepath)

def compute_offsets(df, turbine_pairs:list[tuple[int, int]]=None):
    p_min = 100
    p_max = 2500

    prat_hfwdth = 30

    prat_turbine_pairs = turbine_pairs or [(61,60), (51,50), (43,42), (41,40), (18,19), (34,33), (17,16), (21,22), (87,86), (62,63), (32,33), (59,60), (42,43)]

    dir_offsets = []

    for i in range(len(prat_turbine_pairs)):
        i_up = prat_turbine_pairs[i][0]
        i_down = prat_turbine_pairs[i][1]

        dir_align = np.degrees(np.arctan2(fi.layout_x[i_up] - fi.layout_x[i_down], fi.layout_y[i_up] - fi.layout_y[i_down])) % 360

        # df_sub = df_10min.loc[(df_10min['pow_%03d' % i_up] >= p_min) & (df_10min['pow_%03d' % i_up] <= p_max) & (df_10min['pow_%03d' % i_down] >= 0)]
        tid_up =  f'wt{i_up:03d}'
        tid_down =  f'wt{i_down:03d}'

        if not (any(tid_up in feat for feat in data_loader.available_features) and any(tid_down in feat for feat in data_loader.available_features)):
            continue

        df_sub = df.filter((pl.col(f"power_output_{tid_up}") >= p_min) 
                                & (pl.col(f"power_output_{tid_up}") <= p_max) 
                                & (pl.col(f"power_output_{tid_down}") >= 0))
        
        # df_sub.loc[df_sub['wd_%03d' % i_up] >= 359.5,'wd_%03d' % i_up] = df_sub.loc[df_sub['wd_%03d' % i_up] >= 359.5,'wd_%03d' % i_up] - 360.0
        df_sub = df_sub.with_columns(pl.when((pl.col(f"wind_direction_{tid_up}") >= 359.5))\
                                        .then(pl.col(f"wind_direction_{tid_up}") - 360.0)\
                                        .otherwise(pl.col(f"wind_direction_{tid_up}")))

        # df_sub["wd_round"] = df_sub[f'wd_{i_up:03d}'].round()
        df_sub = df_sub.with_columns(pl.col(f"wind_direction_{tid_up}").round().alias(f"wd_round_{tid_up}"))

        df_sub = df_sub.group_by(f"wd_round_{tid_up}").mean().collect()

        p_ratio = df_sub.select(pl.col(f"power_output_{tid_down}") / pl.col(f"power_output_{tid_up}")).collect().to_numpy().flatten()

        plt.figure()
        _, ax = plt.subplots(1,1)
        ax.plot(p_ratio, label="_nolegend_")
        ax.plot(dir_align * np.ones(2),[0,1.25], 'k--', label="Direction of Alignment")
        ax.grid()

        nadir = np.argmin(p_ratio[np.arange(int(np.round(dir_align)) - prat_hfwdth, int(np.round(dir_align)) + prat_hfwdth + 1) % 360])
        nadir = nadir + int(np.round(dir_align)) - prat_hfwdth

        opt_gauss_params = minimize(gauss_corr, [0, 5.0, 1.0], args=(p_ratio[np.arange(nadir - prat_hfwdth, nadir + prat_hfwdth + 1) % 360]),method='SLSQP')

        xs = np.array(range(-int((60 - 1) / 2),int((60 + 1) / 2),1))
        gauss = -1 * opt_gauss_params.x[2] * np.exp(-0.5 * ((xs - opt_gauss_params.x[0]) / opt_gauss_params.x[1])**2) + 1.

        ax.plot(xs + nadir, gauss,'k',label="_nolegend_")
        ax.plot(2 * [nadir + opt_gauss_params.x[0]], [0,1.25], 'r--',label="Direction of Measured Wake Center")
        ax.title(f"Turbine Pair: ({i_up}, {i_down})")
        ax.legend()
        ax.xlabel("Wind Direction [deg]")
        ax.ylabel("Power Ratio [-]")
        
        dir_offset = DataFilter.wrap_180(nadir + opt_gauss_params.x[0] - dir_align)
        print(dir_offset)

        dir_offsets = dir_offsets + [dir_offset]

    if dir_offsets:
        print(f"Mean offset = {np.mean(dir_offsets)}")
        print(f"Std. Dev. = {np.std(dir_offsets)}")
        print(f"Min. = {np.min(dir_offsets)}")
        print(f"Max. = {np.max(dir_offsets)}")
    else:
        print("No available turbine pairs!")

compute_offsets(df_query2)

In [None]:
# Apply Northing offset to each turbine
for turbine_id in data_loader.turbine_ids:
    df_query2.with_columns((pl.col(f"wind_direction_{turbine_id}") - np.mean(dir_offsets)) % 360)
    df_query2.with_columns((pl.col(f"nacelle_direction_{turbine_id}") - np.mean(dir_offsets)) % 360)

In [None]:
# TODO Determine final wind direction correction for each turbine
df_offsets = df_offsets.with_columns(DataFilter.wrap_180(pl.col("northing_bias") + np.mean(dir_offsets)).round(2))

# df_offsets = df_offsets.set_index("Turbine ID")

print("wd_bias_1: "+str(np.round(0.5*(wd_bias_1+yaw_bias_1) + np.mean(dir_offsets),2)))
print("wd_bias_2: "+str(np.round(0.5*(wd_bias_2+yaw_bias_2) + np.mean(dir_offsets),2)))
print("wd_bias_3: "+str(np.round(0.5*(wd_bias_3+yaw_bias_3) + np.mean(dir_offsets),2)))

df_offsets.head()

In [None]:
# verify that Northing calibration worked properly
compute_offsets(df_query2)

## Normalization