In [None]:
import numpy as np
import os
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import gc

In [None]:
# set up random number generator
rng = np.random.default_rng(42)

In [None]:
# create list of data subsets
X_files = sorted([f for f in os.listdir("../data/array_data/X/") if f.endswith("npz")])
y_files = sorted([f for f in os.listdir("../data/array_data/y/") if f.endswith("npz")])

In [None]:
%%time
# merge X arrays
X_arrays = []
for i in X_files:
    X_data = np.load(f"../data/array_data/X/{i}")["arr_0"]
    X_arrays.append(X_data)
X = np.concatenate(X_arrays, axis=0)

In [None]:
%%time
# merge y arrays
y_arrays = []
for i in y_files:
    y_data = np.load(f"../data/array_data/y/{i}")["arr_0"]
    y_arrays.append(y_data)
y = np.concatenate(y_arrays, axis=0)

In [None]:
print(X.shape)
print(y.shape)

In [None]:
# %%time
# # shuffle X and y
# shuf_choice = rng.choice(X.shape[0],X.shape[0],replace=False)
# X = X[shuf_choice]
# y = y[shuf_choice]

In [None]:
# # free memory
# del shuf_choice
# gc.collect()

In [None]:
%%time
# convert sequence values to integers
X = X.view(np.int32)

In [None]:
%%time
# normalize sequence values to range [0,1]
min_val, max_val = np.min(X), np.max(X)

In [None]:
batched_X = np.array_split(X,1000)

In [None]:
del X
gc.collect()

In [None]:
batched_X_norm = []
for i in tqdm(batched_X, total=len(batched_X)):
    
    X_norm = (i-min_val)/(max_val-min_val)
    
    batched_X_norm.append(X_norm)
    del i
    gc.collect()

In [None]:
# reshape data for modelling
X = np.expand_dims(X,-1)
y = np.expand_dims(y,-1)

In [None]:
%%time
# split into training, validation, and testing sets
X_train, X_tv, y_train, y_tv = train_test_split(X,y,test_size=0.05,random_state=42)
X_test, X_val, y_test, y_val = train_test_split(X_tv,y_tv,test_size=0.5,random_state=42)

In [None]:
print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(y_val.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
# display random X and y value and sense check values
display_idx = rng.choice(X_train.shape[0])
for k,v in {"X":X_train,"y":y_train}.items():
    print(f"{k}_sample:")
    print(v[display_idx].reshape(128))
    print("")
X_decode = np.array([chr(int(i)) for i in (X_train[display_idx].reshape((128))*(max_val-min_val))+min_val])
for i in range(1,6):
    print("".join(X_decode[np.where(y_train[display_idx] == i)[0]]))

In [None]:
%%time
# calculate accuracy threshold for val and test datasets
# note thresholds and min/max values pre-normalization
lines = ["",f"Min val: {min_val}",f"Min val: {max_val}"]
for k,v in {"y_val":y_val,"y_test":y_test}.items():
    total_non_zero = np.where(v!=0)[0].shape[0]
    accuracy_threshold = np.round(100-(total_non_zero/v.shape[0]),2)
    acc = f"Accuracy threshold for {k}: {accuracy_threshold}%"
    lines.append(acc)
    print(acc)
    print("")
with open("meta_mini", 'w') as f:
    f.writelines('\n'.join(lines))

In [None]:
# %%time
# np.savez_compressed(f"../data/array_data_mini/X_train.npz",X_train)
# np.savez_compressed(f"../data/array_data_mini/y_train.npz",y_train)
# np.savez_compressed(f"../data/array_data_mini/X_val.npz",X_val)
# np.savez_compressed(f"../data/array_data_mini/y_val.npz",y_val)
# np.savez_compressed(f"../data/array_data_mini/X_test.npz",X_test)
# np.savez_compressed(f"../data/array_data_mini/y_test.npz",y_test)