In [1]:
import os
import sys
import time
os.environ['TF_ENABLE_ONEDNN_OPTS'] = str(0)

import numpy as np
import pandas as pd
import sklearn
import tensorflow as tf
from tensorflow import keras  # tf.keras
import matplotlib as mpl
import matplotlib.pyplot as plt

In [2]:
print("python", sys.version)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

python 3.10.9 | packaged by Anaconda, Inc. | (main, Mar  1 2023, 18:18:15) [MSC v.1916 64 bit (AMD64)]
matplotlib 3.7.0
numpy 1.23.5
pandas 1.5.3
sklearn 1.2.1
tensorflow 2.12.0
keras.api._v2.keras 2.12.0


In [3]:
assert sys.version_info >= (3, 5) # Python ≥3.5 required
assert tf.__version__ >= "2.0"    # TensorFlow ≥2.0 required

In [4]:
from train_params_test import *
from train_utility import *
from trans import *

In [5]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    print(gpus)
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
            # logical_gpus = tf.config.experimental.list_logical_devices('GPU')
            # print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

mirrored_strategy = None
if len(gpus) > 1: 
    mirrored_strategy = tf.distribute.MirroredStrategy()
    Learning_Rate = Learning_Rate * len(gpus) * 3 / 4

In [6]:
from datetime import datetime, timedelta
import sys 
sys.path.append('..')
import json

In [7]:
def upgrade_file(path):
    with open(path, "+tw") as f:
        f.write("# Sorry, the content is removed.")
        f.write("\n# Please ask Mike for the content.")

In [8]:
folders = [dir_data, dir_datasets, dir_candles, dir_Checkpoint, dir_CSVLogs]
for folder in folders:
    if not os.path.isdir(folder):
        os.mkdir(folder)

FileNotFoundError: [WinError 3] The system cannot find the path specified: '/mnt/data/Trading/'

In [None]:
#==================== Load candle data into 'table' with shape of (time, markets, 10 fields) ====================
Candles = np.load( os.path.join( dir_candles, "table-" + CandleFile + ".npy") )
Candles = np.swapaxes(Candles, 0, 1)
print("Candles: {}".format(Candles.shape))

In [None]:
market = 5
Show_Price_Volume_10(Candles[:, market, :], 1, 1, 5000)

In [None]:
Event_Free_Learning_Scheme_10(Candles[:, market, :], 3, 30, 5000)

In [None]:
#==================== Delete 7 candle fields from 'Candles'. ====================
# Candles.shape becomes (time, markets, ['ClosePrice', 'BaseVolume', 'BuyerBaseVolume'] )

CandleMarks = Candles[:, :, 9] # keep it for later use
Candles = np.delete(Candles, [0, 1, 2, 5, 6, 8, 9], axis = 2) # delete Open, High, Low, qVolume, #Trades, bQVolume, CandleMarks
assert (~np.isfinite(Candles)).any() == False

table_markets = []
with open( os.path.join( dir_candles, "reports-" + CandleFile + ".json"), "r") as f:
    reports = json.loads(f.read())
print(reports[:2])

markets = [ s[0: s.find(':')] for s in reports if 'Success' in s ]
assert Candles.shape[1] == len(markets)
print(Candles.shape, len(markets), markets[:2])

In [None]:
start_ts, interval_s, timestamps_abs = get_timestamps(CandleFile, Candles)
print(start_ts, interval_s, timestamps_abs.shape, timestamps_abs[:3])

In [None]:
#==================== Define Data ====================

Data = Candles[:, :, :]   # (time:, all markets, 20 fields)

In [None]:
#===================== Find marketrank ==================

check = np.array([ np.argmax(Data[:, m, 0]>0) / Data.shape[0] * 100 for m in range(len(markets)) ])
permute = np.argsort(check)
marketrank = [ (markets[m], 100 - round(np.argmax(Data[:, m, 0]>0) / Data.shape[0] * 100)) for m in permute ]
# marketrank = [ markets[m] for m in permute ]

batch = 10
for i in range(0, len(markets), batch):
    print(marketrank[i: i+batch])

In [None]:
enFields = ['ClosePrice', 'BaseVolume', 'BuyerBaseVolume']

x_indices, y_indices, chosen_markets, chosen_fields = \
get_formation_params(
    enFields, markets, marketrank,
    min_true_candle_percent_x, chosen_fields_x_names, min_true_candle_percent_y, chosen_fields_y_names
)

In [None]:
len_usdt = len("USDT")
chosen_market_names = [markets[market][:-len_usdt] for market in chosen_markets]
batch = 10
for i in range(0, len(chosen_market_names), batch):
    print(chosen_market_names[i: i+batch])
print(len(chosen_market_names))

In [None]:
#==================== Generate event-free data into Data ====================
# Data loses heading items.
# Do it before: Permute Data in time

alpha = 3; beta = 3 # beta is used in 'get_eFree_with_plot'. Ugly coupling.
event_free_data_loss = 3 * ( alpha * SmallSigma + LargeSigma)
eFree = np.zeros( (Data.shape[0] - event_free_data_loss, len(chosen_markets), len(chosen_fields)), dtype = np.float32 )

for market in chosen_markets:
    for field in chosen_fields:
        sSigma = SmallSigma
        if enFields[field] == 'BaseVolume': sSigma = SmallSigma * alpha
        P, maP, logP, log_maP, event, eventFree = \
        get_eFree_with_plot(markets[market], enFields[field], Data[:, market, field], sSigma, 
                            LargeSigma, Data.shape[0] - event_free_data_loss, noPlot=eFreeNoPlot, noLog=eFreeNoLog)
        Data[event_free_data_loss:, market, field] = eventFree

Data = Data[event_free_data_loss: ]

print(Data.shape)

In [None]:
Time = get_time_features(timestamps_abs)
size_time = Time.shape[1]

Time = Time[event_free_data_loss: ]
assert Data.shape[0] == Time.shape[0]
print(Candles.shape, Time.shape)

In [None]:
Standard = None

if Standardization:
    Data, Standard = standardize(Data, chosen_markets, chosen_fields)

In [None]:
fig = plt.figure(figsize=(16,3))
ax = fig.add_subplot(111)
ax.set_title("Features are custom-standardized" if Standardization else "Features are not standardized")
for market in chosen_markets:
    for field in chosen_fields:
        ax.plot(Data[:, market, field], label = "{} @ {}".format(enFields[field], markets[market][:-4])) # -4: 'USDT'
ax.legend(loc = 'upper left')
plt.show()

## Start testing

In [None]:
parse_csv_line_to_tensors(b'0., 1., 2., 3., 4., 5., 6., 7., 222., 333., 8., 9., 10., 11., 444., 555.', 2, 4, 2, 2, True, True, 1) # nx, size_x, ny, size_y, time_x, time_y, sixe_time

In [None]:
parse_csv_line_to_tensors(b'0., 1., 2., 3., 4., 5., 6., 7., 222., 333., 8., 9., 10., 11.', 2, 4, 2, 2, True, False, 1) # nx, size_x, ny, size_y, time_x, time_y, sixe_time

In [None]:
parse_csv_line_to_tensors(b'0., 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11.', 2, 4, 2, 2, False, False, 1) # nx, size_x, ny, size_y, time_x, time_y, sixe_time

In [None]:
parse_csv_line_to_tensors_for_transformer(b'0., 1., 2., 3., 4., 5., 6., 7., 222., 333., 8., 9., 10., 11., 444.', 2, 4, 1, 4, True, True, 1) # nx, size_x, ny, size_y, time_x, time_y, sixe_time

In [None]:
parse_csv_line_to_tensors_for_transformer(b'0., 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11.', 2, 4, 1, 4, False, False, 1) # nx, size_x, ny, size_y, time_x, time_y, sixe_time

In [None]:
# defind test data

n_times = 1000; n_markets = 2; n_fields = 2
data = [ [ [ time * n_markets * n_fields + market * n_fields + field for field in range(n_fields) ] for market in range(n_markets) ] for time in range(n_times)]
data = np.array(data, dtype=float)
times_test = np.array( range(data.shape[0]) ) + 100000
print(data.shape, times_test.shape)   # time, market, field
print(data[:2, :, :])

In [None]:
print(data.shape, times_test.shape)   # time, market, field

In [None]:
nx_test = 2
ny_test = 2
ns_test = 10
batchSize = 2

sample_anchors = range(0, data.shape[0] - nx_test - ny_test, ns_test)
print(data.shape[0], len(sample_anchors), sample_anchors)

x_indices = ( (0, 1), (0, 1) )    # (market, field)
y_indices = ( (0, 1), (0, 1) )    # (market, field)
print(data[0:2][:, x_indices[0]][:, :, x_indices[1]])
print(data[2:4][:, y_indices[0]][:, :, y_indices[1]])

size_x = get_timepoint_size(x_indices)
size_y = get_timepoint_size(y_indices)
size_time = 1
print(size_x, size_y, size_time)

In [None]:
name_plus = CandleFile+'_o'
name_prefix = os.path.join(dir_datasets, name_plus)

reuse_files = False

if reuse_files:
    import re
    filenames = [ os.path.join(dir_datasets, x) for x in os.listdir(dir_datasets) if re.match(name_plus, x)]
else:
    os.system("rm {}/*{}*".format(dir_datasets, name_plus))
    filenames = divide_to_multiple_csv_files(data, True, True, times_test, sample_anchors, name_prefix, nx_test, x_indices, ny_test, y_indices, header=None, n_parts=10)

print(filenames)

In [None]:
filename_dataset = tf.data.Dataset.list_files(filenames, shuffle=None) # no way to prevent shuffle.
print(filename_dataset.cardinality().numpy())
for element in filename_dataset:
    print(element.numpy())

In [None]:
ds = tf.data.TextLineDataset(filenames[0])
for line in ds.take(20):
    print(line.numpy())

In [None]:
for element in dataset:
    print(element)
    break

# should print: (None, nx_test, size_x + size_time), (None, ny_test * size_y)

In [None]:
dataset = csv_reader_to_dataset(filenames, nx_test, size_x, ny_test, size_y, True, True, size_time,
                             n_parse_threads=5, batch_size=batchSize, shuffle_buffer_size=1000, n_readers=5)

In [None]:
# Check elements: NaN, -inf, +inf

assert (~np.isfinite(Candles)).any() == False

In [None]:
nLatest = 500
P, maP, logP, log_maP, event, eventFree = Get_eFree(Candles[:, 0, 0], 1, 30, nLatest)
assert maP.shape[0] == nLatest; assert logP.shape[0] == nLatest; assert log_maP.shape[0] == nLatest; assert event.shape[0] == nLatest; assert eventFree.shape[0] == nLatest


In [None]:
n_readers = 5
dataset = filename_dataset.interleave(
    lambda filename: tf.data.TextLineDataset(filename),
    cycle_length=n_readers, num_parallel_calls=tf.data.AUTOTUNE) # no way to prevent shuffle?

for line in dataset.take(15):
    print(line.numpy())

## End testing

In [None]:
sample_anchores_t, sample_anchores_v = get_sample_anchores(Data, Nx, Ny, Ns)

print(len(sample_anchores_t), len(sample_anchores_v))

In [None]:
Dataset_train, Dataset_valid, dx, dy = get_datasets(
    Reuse_files,
    CandleFile, dir_datasets, Data, Time_into_X, Time_into_Y, Time, 
    sample_anchores_t, sample_anchores_v,
    Nx, x_indices, Ny, y_indices, nFiles_t, nFiles_v, n_readers, size_time,
    BatchSize, shuffle_batch, Transformer, nPrefetch
)

In [None]:
model = None

if mirrored_strategy is None:
    model = build_model(
        dx, dy, Num_Layers, Num_Heads, Factor_FF, repComplexity, Dropout_Rate,
        HuberThreshold, CancleLossWeight, TrendLossWeight
    )
else:
    with mirrored_strategy.scope():
        model = build_model(
            dx, dy, Num_Layers, Num_Heads, Factor_FF, repComplexity, Dropout_Rate,
            HuberThreshold, CancleLossWeight, TrendLossWeight
    )

In [None]:
callbacks = get_callbacks(
    checkpoint_filepath, Checkpoint_Monitor, 
    csvLogger_filepath, 
    EarlyStopping_Min_Monitor, EarlyStopping_Patience
)

try:
    model.load_weights(checkpoint_filepath)
except:
    print("Failed to load a checkpoint")
    pass

In [None]:
model.fit(
    Dataset_train, # x and y_true
    validation_data=Dataset_valid, 
    epochs=20, #Epochs_Initial,
    callbacks=callbacks
)

In [None]:
columns = ('loss', 'val_loss', 'trans_mTA', 'val_trans_mTA', 'trans_1_loss', 'val_trans_1_loss')
plot_csv_log_history(csvLogger_filepath, columns)