In [1]:
import pathlib
import tqdm

import pandas as pd
import numpy as np
import tensorflow as tf

In [2]:
# Directories for reading and writing
data_dir = pathlib.Path(r"C:\Users\hemert\OneDrive - Stichting Deltares\Desktop\Projects\ET_FSO\Snellius\Data")
model_dir = pathlib.Path(r"C:\Users\hemert\OneDrive - Stichting Deltares\Desktop\Projects\ET_FSO\Snellius\VAE Model")
write_train_dir = pathlib.Path(r"C:\Users\hemert\OneDrive - Stichting Deltares\Desktop\Projects\ET_FSO\Snellius\Data\tfrecords_train")
write_val_dir = pathlib.Path(r"C:\Users\hemert\OneDrive - Stichting Deltares\Desktop\Projects\ET_FSO\Snellius\Data\tfrecords_val")

In [3]:
# Load relevant data to convert to tfrecord files
generator_data = pd.read_feather(data_dir.joinpath(
    "generator_data_simple_10numerics_wrecalc_allBasins_no_extremes.feather"))
index2word = pd.read_feather(data_dir.joinpath(
    "index2word_10numerics.feather")).iloc[0].astype(str).to_list()
tf_dist = pd.read_feather(data_dir.joinpath(
    "functions_simple_10_numerics_Distribution_indiv_scale_wrecalc_allBasins_no_extremes.feather"
))

In [5]:
# Scale distribution values
def min_max_scale(x):
    return np.round((x - np.min(x)) / (np.max(x) - np.min(x)), 11)

to_drop = ["transfer_function", "min", "max"]
dist_scaled = tf_dist.copy().drop(to_drop, axis=1)
for name in dist_scaled.columns:
    dist_scaled[name] = min_max_scale(dist_scaled[name])

# Train/val split
np.random.seed(0)
train_ind = np.random.choice(tf_dist.index, int(0.8*tf_dist.shape[0]), replace=False)
np.save(model_dir.joinpath("train_ind_no_extremes.npy"), train_ind)

x_train_tf = np.expand_dims(generator_data.iloc[train_ind].values, axis=-1)
x_val_tf = np.expand_dims(generator_data.drop(train_ind).values, axis=-1)

y_train_tf = np.expand_dims(generator_data.iloc[train_ind].values, axis=-1)
y_val_tf = np.expand_dims(generator_data.drop(train_ind).values, axis=-1)

train_dist = dist_scaled.iloc[train_ind].values
val_dist = dist_scaled.drop(train_ind).values

In [18]:
def get_files(data_path):
    files = tf.io.gfile.glob(data_path + "/" + "*.tfrecords")
    return files

def get_dataset(files):
    """return a tfrecord dataset with all tfrecord files"""
    dataset =  tf.data.TFRecordDataset(files)
    dataset = dataset.map(tf_parse)
    return dataset

In [19]:
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        # BytesList won't unpack a string from an EagerTensor.
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))


def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def serialize_array(array):
    array = tf.io.serialize_tensor(array)
    return array

In [20]:
def parse_combined_data(x, y, dist):

    # define the dictionary -- the structure -- of our single example
    data = {}
    
    # define dictionary for each mode
    data['x'] = _bytes_feature(serialize_array(x))
    data['y'] = _bytes_feature(serialize_array(y))
    data['dist'] = _bytes_feature(serialize_array(dist))

    out = tf.train.Example(features=tf.train.Features(feature=data))

    return out

In [30]:
def write_data(x, y, dist,
               filename, max_files, out_dir):
    '''Writes the data to multiple tfrecord files each containing max_files examples'''
    n_samples = len(x)
    splits = (n_samples//max_files) + 1
    if n_samples % max_files == 0:
        splits -= 1

    print(
        f"\nUsing {splits} shard(s) for {n_samples} files,\
            with up to {max_files} samples per shard")

    file_count = 0

    for i in tqdm.tqdm(range(splits)):
        if i == splits - 1 and n_samples % max_files != 0:
            current_shard_name = "{}{}_{}{}_{}.tfrecords".format(
                out_dir, i+1, splits, filename, n_samples % max_files)
        else:
            current_shard_name = "{}{}_{}{}_{}.tfrecords".format(
                out_dir, i+1, splits, filename, max_files)
        writer = tf.io.TFRecordWriter(current_shard_name)

        current_shard_count = 0
        while current_shard_count < max_files:
            index = i*max_files + current_shard_count
            if index == n_samples:
                break
            
            current_x = x[index]
            current_y = y[index]
            current_dist = dist[index]
            
            out = parse_combined_data(x=current_x,                                    
                                      y=current_y,
                                      dist=current_dist)

            
            writer.write(out.SerializeToString())
            current_shard_count += 1
            file_count += 1

        writer.close()

    print(f"\nWrote {file_count} elements to TFRecord")
    return file_count

In [31]:
# Write training data to tfrecords
write_data(
    x_train_tf,
    y_train_tf,
    train_dist, 
    max_files=1000,
    filename='train-tfrecords-v1', 
    out_dir=str(write_train_dir) + '/'
)


Using 3598 shard(s) for 3597769 files,            with up to 1000 samples per shard


100%|██████████████████████████████████████████████████████████████████████████████| 3598/3598 [52:40<00:00,  1.14it/s]


Wrote 3597769 elements to TFRecord





3597769

In [34]:
# Write validation data to tfrecords
write_data(
    x_val_tf,
    y_val_tf,
    val_dist, 
    max_files=1000,
    filename='val-tfrecords-v1', 
    out_dir=str(write_val_dir) + '/'
)


Using 900 shard(s) for 899443 files,            with up to 1000 samples per shard


100%|████████████████████████████████████████████████████████████████████████████████| 900/900 [14:04<00:00,  1.07it/s]


Wrote 899443 elements to TFRecord





899443

In [None]:
def get_files(data_path):
    files = tf.io.gfile.glob(data_path + "/" + "*.tfrecords")
    return files

def get_dataset(files):
    """return a tfrecord dataset with all tfrecord files"""
    dataset =  tf.data.TFRecordDataset(files)
    dataset = dataset.map(tf_parse)
    return dataset

In [None]:
def tf_parse(eg):
    """parse an example (or batch of examples, not quite sure...)"""

    # here we re-specify our format
    # you can also infer the format from the data using tf.train.Example.FromString
    # but that did not work
    example = tf.io.parse_example(
        eg[tf.newaxis],
        {
            'x': tf.io.FixedLenFeature([], tf.string),
            'y': tf.io.FixedLenFeature([], tf.string),
            'dist': tf.io.FixedLenFeature([], tf.string),
        },
    )
    x_tf = tf.io.parse_tensor(example["x"][0], out_type="int32")
    y_tf = tf.io.parse_tensor(example["y"][0], out_type="int32")
    dist = tf.io.parse_tensor(example["dist"][0], out_type="float64")

    x_tf = tf.cast(tf.ensure_shape(x_tf, (32, None)), tf.float64)
    y_tf = tf.cast(tf.ensure_shape(y_tf, (32, None)), tf.float64)
    dist = tf.ensure_shape(dist, 9)
    return (x_tf, dist), (y_tf, dist)

In [None]:
# Load tfrecords file for testing
train_files = get_files(str(train_data_dir))
val_files  = get_files(str(val_data_dir))

train_dataset = get_dataset(train_files).shuffle(1).batch(batch_size, drop_remainder=True)
val_dataset = get_dataset(val_files).shuffle(1).batch(batch_size, drop_remainder=True)