In [None]:
import os
import tensorflow as tf
from trader.model import TraderNet
from datetime import datetime
from tensorflow.keras.callbacks import ModelCheckpoint, ProgbarLogger

In [None]:
BATCH_SIZE=256
VALIDATION_SIZE=1000
TRAIN_LIMIT=None

levels = [3, 3, 5, 2]

TFRECORD_DIR = '/mnt/data/dest/tfrecords'
SRC_DIR = '/mnt/data/src'
ARTIFACTS_DIR = '/mnt/artifacts'

# Hide xorg GPU
os.environ['CUDA_VISIBLE_DEVICES'] = '1'

feature_keys = ['close', 'volume', 'position']
#feature_keys = ['high', 'low', 'open', 'close', 'volume', 'position']

In [None]:
@tf.function
def standardize(*t, axis=0):
    features, label= t

    mean, var = tf.nn.moments(features, axes=axis, keepdims=True)
    length = tf.shape(features, out_type=tf.int32)[axis]
    length = tf.cast(length, tf.float32)


    std = tf.math.maximum(tf.math.sqrt(var), 1.0 / tf.math.sqrt(length))
    n = (features - mean) / std

    return tf.cast(n, tf.float16), label

In [None]:
def read_tfrecords():

    filenames = [os.path.join(TFRECORD_DIR, x) for x in os.listdir(TFRECORD_DIR) if 'part-r-' in x and not x[0] == '.']

    feature_col = tf.io.FixedLenSequenceFeature([], tf.float32, default_value=0.0, allow_missing=True)

    # Create a description of the features.
    feature_description = {
        'label': tf.io.FixedLenFeature([], tf.int64, default_value=2),
        'change': tf.io.FixedLenFeature([], tf.float32, default_value=2),
        'weight': tf.io.FixedLenFeature([], tf.float32, default_value=2),
        'high': feature_col,
        'low': feature_col,
        'open': feature_col,
        'close': feature_col,
        'volume': feature_col,
        'position': feature_col,
    }

    def _parse_function(example_proto):
        return tf.io.parse_single_example(example_proto, feature_description)

    def _fold_function(x):
        label = tf.cast(x['label'], tf.uint8)
        #weight = tf.cast(x['weight'], tf.float16)

        feature_list = [tf.math.l2_normalize(x[k]) for k in feature_keys]

        features = tf.stack(feature_list)
        features = tf.transpose(features)

        return (features, label)

    ds = tf.data.TFRecordDataset(filenames)
    ds = ds.map(_parse_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    ds = ds.map(_fold_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    print(ds)
    return ds

In [None]:
def get_checkpoint_callback():

    _ = datetime.now()
    date_prefix = _.strftime("%Y%m%d-%H%M%S")

    checkpoint_dir = os.path.join(ARTIFACTS_DIR, 'checkpoint', date_prefix)
    os.makedirs(checkpoint_dir, exist_ok=True)
    chkpt_fmt=os.path.join(checkpoint_dir, 'trader_{epoch:02d}.hdf5')

    max_checkpoint=5
    result = ModelCheckpoint(
            filepath=chkpt_fmt,
            save_freq='epoch',
            save_weights_only=True
    )

    return result

def get_tensorboard_callback():
    _ = datetime.now()
    date_prefix = _.strftime("%Y%m%d-%H%M%S")

    log_dir=os.path.join(ARTIFACTS_DIR, 'tblogs', date_prefix)
    os.makedirs(log_dir, exist_ok=True)

    result = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir,
        write_graph=True,
        histogram_freq=1,
        embeddings_freq=1,
        update_freq=2000
    )
    return result


def get_lr_callback():
    mon = 'loss'
    result = tf.keras.callbacks.ReduceLROnPlateau(
                monitor=mon,
                factor=0.2,
                patience=5,
                min_lr=0.001
    )
    return result

def get_stopping_callback():
    return tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)


In [None]:
def preprocess(ds):

    #ds = ds.map(standardize, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    ds = ds.shuffle(1024)

    train = ds.skip(VALIDATION_SIZE)
    validate = ds.take(VALIDATION_SIZE)

    validate_batch = VALIDATION_SIZE // BATCH_SIZE
    validate = validate.batch(BATCH_SIZE, drop_remainder=True)

    if TRAIN_LIMIT:
        train = train.take(TRAIN_LIMIT).repeat()
        validate = validate.repeat()

    train = train.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    train = train.batch(BATCH_SIZE, drop_remainder=True)
    validate = validate.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)


    return train, validate

In [None]:
if __name__ == '__main__':

    validation_steps = VALIDATION_SIZE // BATCH_SIZE


    ds = read_tfrecords()
    train, validate = preprocess(ds)

    inputs = tf.keras.layers.Input(
        shape=[180, len(feature_keys)],
        name='input',
        dtype=tf.float32
    )

    if not TRAIN_LIMIT:
        epoch_steps = None
    else:
        epoch_steps = TRAIN_LIMIT // BATCH_SIZE

    # Model
    model = TraderNet(levels=levels, use_head=True, use_tail=True)
    outputs = model(inputs)
    model.summary()

    # Metrics / loss / optimizer
    metrics = [
        tf.keras.metrics.SparseCategoricalAccuracy(),
        tf.keras.metrics.SparseTopKCategoricalAccuracy(k=2),
    ]

    loss = tf.keras.losses.SparseCategoricalCrossentropy()
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

    callbacks = [
            get_checkpoint_callback(),
            get_tensorboard_callback(),
            get_lr_callback(),
            get_stopping_callback()
    ]

    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

    history = model.fit(
            train,
            epochs=32,
            steps_per_epoch=epoch_steps,
            validation_data=validate,
            validation_steps=validation_steps,
            callbacks=callbacks
    )
