In [1]:
import xarray as xr
from ocf_blosc2 import Blosc2
from tqdm import tqdm
import h5py
from datetime import datetime, timedelta
import pandas as pd

In [23]:
month_to_times = {
    1: (8, 16),
    2: (8, 17),
    3: (7, 18),
    4: (7, 19),
    5: (6, 20),
    6: (5, 20),
    7: (5, 20),
    8: (6, 20),
    9: (7, 19),
    10: (7, 18),
    11: (7, 16),
    12: (8, 16)
}
import h5py
import pandas as pd
type = "satellite-nonhrv"
NWP_FEATURES = ["t_500", "clcl", "alb_rad", "tot_prec", "ww", "relhum_2m", "h_snow", "aswdir_s", "td_2m", "omega_1000"]
for month in tqdm(range(1, 2)):
    print('opening dataset')
    hrv = xr.open_dataset(
        f"/data/{type}/2021/{month}.zarr.zip",
        engine="zarr",
        consolidated=True,
        chunks={"time": "auto"}
    )
    print('opening pv data parquet')
    try:
        df = pd.read_parquet(f"/data/pv/2021/{month}.parquet",).drop("generation_wh", axis=1)
    except KeyError:
         df = pd.read_parquet(f"/data/pv/2021/{month}.parquet",)
    print(df.keys())
    start, stop = month_to_times[month]
    start -= 1 # all data types look backward an hour
    if type == "weather":
        stop += 4 # ONLY FOR WEATHER, since we look forward 4 hrs
    print('filtering')
    filtered_dataset_lazy = hrv.where((hrv['time'].dt.hour >= start) & (hrv['time'].dt.hour <= stop), drop=True)
    # we should check to see that each timestep has the data we need
    filter_count = 0
    if type == "weather":
        for time in tqdm(filtered_dataset_lazy['time']):
                if time.dt.hour < start + 1 or time.dt.hour > stop - 4:
                    continue
                time = pd.to_datetime(time.values)
                check = filtered_dataset_lazy.sel(time=slice(str(time - timedelta(hours=1)), str(time + timedelta(hours=4)))).sizes['time']
                if check != 6:
                    df = df.drop(str(time), level='timestamp')
                    filter_count += 1
    elif type == "satellite-nonhrv":
        print(filtered_dataset_lazy['time'])
        for time in tqdm(filtered_dataset_lazy['time']):
                if time.dt.hour < start + 1:
                    continue
                if time.dt.minute != 0:
                    continue
                time = pd.to_datetime(time.values)
                check = filtered_dataset_lazy.sel(time=slice(str(time - timedelta(hours=1)), str(time - timedelta(minutes=55))))
                r = check.sizes['time']
                if r != 12:
                    filtered_dataset_lazy = filtered_dataset_lazy.where((hrv['time'] == str(time)), drop=True)
                    filter_count += 1
    print(f'chunking after filtering {filter_count}')
    filtered_dataset_lazy = filtered_dataset_lazy.chunk("auto")
    print(filtered_dataset_lazy['time'])
    print('writing')
    break
    filtered_dataset_lazy = filtered_dataset_lazy[NWP_FEATURES]
    output_path = f"/data/{type}_proc/2021/{month}.hdf5"
    with h5py.File(output_path, 'w') as hdf_file:
        for var in list(filtered_dataset_lazy.indexes):
            print(f"outputing var {var}")
            data = filtered_dataset_lazy[var].values
            if var == 'channel':
                continue
            if var == "time":
                # special handling is required
                data = list(map(lambda x: pd.to_datetime(x).timestamp(), data))
                print("number of times:", len(data))
            hdf_file.create_dataset(var, data=data, compression='lzf')
        for var in filtered_dataset_lazy.data_vars:
            print(f"outputing var {var}")
            data = filtered_dataset_lazy[var].values
            hdf_file.create_dataset(var, data=data, compression='lzf')
    df.to_parquet(f"/data/pv/2021/{month}.parquet")



  0%|          | 0/1 [00:00<?, ?it/s]

opening dataset


opening pv data parquet
Index(['power'], dtype='object')
filtering
<xarray.DataArray 'time' (time: 3688)> Size: 30kB
array(['2021-01-01T07:00:00.000000000', '2021-01-01T07:05:00.000000000',
       '2021-01-01T07:10:00.000000000', ..., '2021-01-31T16:45:00.000000000',
       '2021-01-31T16:50:00.000000000', '2021-01-31T16:55:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 30kB 2021-01-01T07:00:00 ... 2021-01-31T16...


100%|██████████| 3688/3688 [00:15<00:00, 242.25it/s]
  0%|          | 0/1 [00:20<?, ?it/s]

chunking after filtering 277
<xarray.DataArray 'time' (time: 0)> Size: 0B
array([], dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 0B 
writing



