In [1]:
import datetime as dt
import numpy as np
import pandas as pd
import tensorflow as tf

from camels_aus.repository import CamelsAus
from model.tf.hydro import ProductionStorage

2024-08-28 10:49:02.275919: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
WINDOW_SIZE = 7

camels_dir = '../data/camels/aus'
repo = CamelsAus()
repo.load_from_text_files(camels_dir)

In [11]:
class CamelsDataset(object):
    ts_vars = ['precipitation_AWAP', 'et_morton_actual_SILO',
                       'tmax_awap', 'tmin_awap', 'streamflow_mmd']
    location_vars = ['state_outlet', 'map_zone']
    streamflow_vars = ['q_mean', 'stream_elas', 'runoff_ratio', 'high_q_freq',
                       'high_q_dur', 'low_q_freq', 'zero_q_freq']
    target_vars = ['streamflow_mmd']
    ts_slice = slice(dt.datetime(1980, 1, 1), dt.datetime(2015, 1, 1))

    def __init__(self, data_dir, state_outlet=None, map_zone=None,
                 station_list=None, ts_vars=None, location_vars=None,
                 streamflow_vars=None, target_vars=None) -> None:
        super().__init__()
        self.data_dir = data_dir

        # Reassign columns
        if ts_vars is not None:
            self.ts_vars = ts_vars
        if location_vars is not None:
            self.location_vars = location_vars
        if streamflow_vars is not None:
            self.streamflow_vars = streamflow_vars
        if target_vars is not None:
            self.target_vars = target_vars

        # Load the data
        self.repo = CamelsAus()
        self.repo.load_from_text_files(data_dir)

        # # Timeseries data
        ts_data = self.repo.daily_data.sel(time=self.ts_slice)[self.ts_vars+self.target_vars].to_dataframe().reset_index()
        self.ts_data = ts_data[self.ts_vars + ['station_id', 'time']]
        self.targets = ts_data[self.target_vars + ['station_id', 'time']]
        
        # # Static data
        location_data = self.repo.location_attributes.to_dataframe()[self.location_vars]
        streamflow_data = self.repo.streamflow_attributes.to_dataframe()[self.streamflow_vars]
        self.static_data = pd.concat([location_data, streamflow_data], axis=1).reset_index()

        # Filter static data by station_list or state_outlet and map_zone
        if station_list is not None:
            self.static_data = self.static_data[self.static_data['station_id'].isin(station_list)]
            self.ts_data = self.ts_data[self.ts_data['station_id'].isin(station_list)]
        elif state_outlet is not None and map_zone is not None:
            self.static_data = self.static_data[(self.static_data['state_outlet']==state_outlet) & (self.static_data['map_zone']==map_zone)]
            station_list = self.static_data.station_id.unique()
            self.ts_data = self.ts_data[self.ts_data['station_id'].isin(station_list)]
        else:
            raise ValueError('station_list or state_outlet and map_zone must be provided')
        self.static_data.drop(columns=['state_outlet', 'map_zone'], inplace=True)

        # Timestamps
        self._ts = self.ts_data.time.unique()

        # Sort data
        self.ts_data = self.ts_data.sort_values(['time', 'station_id'])
        self.static_data = self.static_data.sort_values('station_id')
        self.targets = self.targets.sort_values(['time', 'station_id'])

    def create_datasets(self, ts_data, static_data, target_data):
        station_ids = static_data.station_id.unique()
        
        ts_arr = []
        static_arr = []
        target_arr = []
        station_names = []
        
        for station_id in station_ids:
            
            station_ts = ts_data[ts_data['station_id']==station_id].drop(columns=['station_id', 'time'])
            station_static = static_data[static_data['station_id']==station_id].drop(columns=['station_id'])
            station_targets = target_data[target_data['station_id']==station_id].drop(columns=['station_id', 'time'])
            
            station_ts_data, station_targets = self.create_sequences(station_ts.values, station_targets.values, WINDOW_SIZE)
            station_static_data = np.repeat(station_static.values, station_ts_data.shape[0], axis=0)
            station_names_data = np.repeat([station_id], station_ts_data.shape[0], axis=0)
            
            ts_arr.append(station_ts_data)
            static_arr.append(station_static_data)
            target_arr.append(station_targets)
            station_names.append(station_names_data)
        
        self.ts_arr = np.concatenate(ts_arr)
        self.static_arr = np.concatenate(static_arr)
        self.target_arr = np.concatenate(target_arr)
        self.station_names = np.concatenate(station_names)

    def create_sequences(self, x, y, window_size):
        sequences = []
        targets = []
        for i in range(window_size, len(x)):
            sequences.append(x[i-window_size:i])
            targets.append(y[i])
        return np.stack(sequences), np.stack(targets)
    
    def get_datasets(self, test_size=0.2, batch_size=32):
        # Create the datasets
        self.create_datasets(self.ts_data, self.static_data, self.targets)

        # Split the data
        n_stations = np.unique(self.station_names).shape[0]
        n_records = self.station_names.shape[0]/n_stations
        n_records_train = int(n_records*(1-test_size)) * n_stations

        ts_train = self.ts_arr[:n_records_train]
        ts_test = self.ts_arr[n_records_train:]
        static_train = self.static_arr[:n_records_train]
        static_test = self.static_arr[n_records_train:]
        target_train = self.target_arr[:n_records_train]
        target_test = self.target_arr[n_records_train:]
        station_names_train = self.station_names[:n_records_train]
        station_names_test = self.station_names[n_records_train:]

        # Conver train and test data to tensors
        ts_train = tf.convert_to_tensor(ts_train, dtype=tf.float32)
        ts_test = tf.convert_to_tensor(ts_test, dtype=tf.float32)
        static_train = tf.convert_to_tensor(static_train, dtype=tf.float32)
        static_test = tf.convert_to_tensor(static_test, dtype=tf.float32)
        target_train = tf.convert_to_tensor(target_train, dtype=tf.float32)
        target_test = tf.convert_to_tensor(target_test, dtype=tf.float32)
        station_names_train = tf.convert_to_tensor(station_names_train, dtype=tf.string)
        station_names_test = tf.convert_to_tensor(station_names_test, dtype=tf.string)

        # Create the datasets
        train_dataset = tf.data.Dataset.from_tensor_slices((station_names_train, ts_train, 
                                                            static_train, target_train))
        train_dataset = train_dataset.shuffle(140000).batch(batch_size)

        test_dataset = tf.data.Dataset.from_tensor_slices((station_names_test, ts_test,
                                                           static_test, target_test))
        test_dataset = test_dataset.batch(batch_size)
        
        return train_dataset, test_dataset


In [12]:
camels_ds = CamelsDataset(data_dir=camels_dir, 
                        #   station_list=['314213'],
                          state_outlet='WA', map_zone=50)
train_ds, test_ds = camels_ds.get_datasets(batch_size=256)

In [13]:
class HybridDataset(CamelsDataset):

    ts_vars = ['precipitation_AWAP', 'et_morton_actual_SILO',
                       'tmax_awap', 'tmin_awap']
    location_vars = ['state_outlet', 'map_zone']
    streamflow_vars = ['q_mean', 'stream_elas', 'runoff_ratio', 'high_q_freq',
                       'high_q_dur', 'low_q_freq', 'zero_q_freq']
    target_vars = ['streamflow_mmd']
    ts_slice = slice(dt.datetime(1980, 1, 1), dt.datetime(2015, 1, 1))

    def __init__(self, data_dir, gr4j_logfile, state_outlet=None, map_zone=None, station_list=None) -> None:
        super().__init__(data_dir, state_outlet=state_outlet,
                         map_zone=map_zone, station_list=station_list)
        self.gr4j_logs = pd.read_csv(gr4j_logfile)
    
    def create_datasets(self, ts_data, static_data, target_data):
        station_ids = static_data.station_id.unique()
        
        ts_arr = []
        static_arr = []
        target_arr = []
        station_names = []
        
        for station_id in station_ids:
            
            station_ts = ts_data[ts_data['station_id']==station_id].drop(columns=['station_id', 'time']).values
            station_static = static_data[static_data['station_id']==station_id].drop(columns=['station_id']).values
            station_targets = target_data[target_data['station_id']==station_id].drop(columns=['station_id', 'time']).values

            # Initialize GR4J Production storage
            x1_param = self.gr4j_logs.loc[self.gr4j_logs['station_id']==station_id, 'x1'].values[0]
            prod = ProductionStorage(x1_param)
            station_hybrid_feat = prod(tf.convert_to_tensor(station_ts), include_x=False, scale=False)[0].numpy()
            station_ts = np.concatenate([station_ts, station_hybrid_feat], axis=1)
            
            station_ts_data, station_targets = self.create_sequences(station_ts, station_targets, WINDOW_SIZE)
            station_static_data = np.repeat(station_static, station_ts_data.shape[0], axis=0)
            station_names_data = np.repeat(station_id, station_ts_data.shape[0], axis=0)
            
            ts_arr.append(station_ts_data)
            static_arr.append(station_static_data)
            target_arr.append(station_targets)
            station_names.append(station_names_data)
        
        self.ts_arr = np.concatenate(ts_arr)
        self.static_arr = np.concatenate(static_arr)
        self.target_arr = np.concatenate(target_arr)
        self.station_names = np.concatenate(station_names)

In [14]:
gr4j_logfile = '/Users/akap5486/Library/CloudStorage/OneDrive-UNSW/Shared/Projects/01_PhD/04_DeepGR4J_QNN/deepgr4j-extremes/results/gr4j/result.csv'
hybrid_ds = HybridDataset(data_dir=camels_dir, gr4j_logfile=gr4j_logfile, 
                        #   station_list=['314213'],
                          state_outlet='WA', map_zone=50)

In [15]:
train_ds, test_ds = hybrid_ds.get_datasets(batch_size=256)

In [None]:
hybrid_ds.ts_arr.shape, hybrid_ds.static_arr.shape, hybrid_ds.target_arr.shape  

((166114, 7, 8), (166114, 7), (166114, 1))

In [None]:
for batch in train_ds:
    print(batch[0].shape, batch[1].shape, batch[2].shape, batch[3].shape)
    break

(256,) (256, 7, 8) (256, 7) (256, 1)
