In [1]:
from tsmoothie.smoother import *
from tslearn.clustering import TimeSeriesKMeans
from tslearn.preprocessing import TimeSeriesScalerMinMax
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
from pyts.image import GramianAngularField, RecurrencePlot, MarkovTransitionField

In [2]:
labeled_data_path = 'data/FTH_eurusd.csv'

df = pd.read_csv(labeled_data_path)

df

Unnamed: 0,Date,Open,High,Low,Close,Volume,Future_Close,Return,Label
0,2020-01-01 22:00,1.12106,1.12166,1.12106,1.12143,1215,1.11713,-0.003834,-1
1,2020-01-01 23:00,1.12143,1.12218,1.12142,1.12188,1407,1.11708,-0.004279,-1
2,2020-01-02 0:00,1.12188,1.12190,1.12157,1.12183,1790,1.11754,-0.003824,-1
3,2020-01-02 1:00,1.12182,1.12244,1.12180,1.12209,3135,1.11735,-0.004224,-1
4,2020-01-02 2:00,1.12210,1.12245,1.12184,1.12222,2121,1.11725,-0.004429,-1
...,...,...,...,...,...,...,...,...,...
33684,2025-07-09 22:00,1.17212,1.17249,1.17211,1.17243,527,1.17055,-0.001604,0
33685,2025-07-09 23:00,1.17244,1.17396,1.17228,1.17378,998,1.17018,-0.003067,-1
33686,2025-07-10 0:00,1.17375,1.17494,1.17354,1.17436,4514,1.16746,-0.005876,-1
33687,2025-07-10 1:00,1.17436,1.17443,1.17378,1.17406,2959,1.16750,-0.005587,-1


In [3]:
WINDOW_SIZE = 24
X = []
y = []

for i in range(len(df) - WINDOW_SIZE):
    window = df['Close'].iloc[i:i + WINDOW_SIZE].values
    label = df['Label'].iloc[i + WINDOW_SIZE - 1] 
    
    X.append(window)
    y.append(label)

X = np.array(X)
y = np.array(y)

In [8]:
gasf = GramianAngularField(method='summation', image_size=WINDOW_SIZE)
gADF = GramianAngularField(method='difference', image_size=WINDOW_SIZE)
rp = RecurrencePlot(threshold=None)
#mtf = MarkovTransitionField(image_size=X.shape[1], n_bins=10)

In [9]:
label_map = {-1: 'Downward', 0: 'Sideway', 1: 'Upward'}

gasf_dir = 'datasets/FTH/EURUSD/GASF'
gADF_dir = 'datasets/FTH/EURUSD/GADF'
rp_dir = 'datasets/FTH/EURUSD/RP'

for label_name in label_map.values():
    os.makedirs(os.path.join(gasf_dir, label_name), exist_ok=True)
    os.makedirs(os.path.join(gADF_dir, label_name), exist_ok=True)
    os.makedirs(os.path.join(rp_dir, label_name), exist_ok=True)

In [None]:
# # Memory usage issue
# # Run Once
# from tqdm import tqdm

# transformers = [
#     (gasf, gasf_dir, "GASF"),
#     (gADF, gADF_dir, "GADF"),
#     (rp, rp_dir, "RP"), 
# ]

# for transformer, save_dir, name in transformers:
#     print(f"\nRunning on feature projection ({name}): ")

#     for i, (series, label) in enumerate(tqdm(zip(X, y), total=len(X)), start=1):
#         image = transformer.fit_transform(series.reshape(1, -1))[0]
#         label_name = label_map[label]
#         image_path = os.path.join(save_dir, label_name, f'{label_name}_{i}.png')

#         plt.imsave(image_path, image, cmap='rainbow')
#         plt.close('all')


Running on feature projection (RP): 


100%|██████████| 33665/33665 [01:41<00:00, 331.85it/s]


In [30]:
from torch.utils.data import Dataset
from glob import glob
from PIL import Image
import os 
import torch

class MultiFeatureFusionDataset(Dataset):
    def __init__(self, root_dir, class_to_idx, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.class_to_idx = class_to_idx
        self.data_list = self._make_data_list()

        if not self.data_list:
            raise RuntimeError(f"Error :: Not found a data in Path: {root_dir}. Check the directory structure.")
        
    def _make_data_list(self):
        data_list = []

        classes = list(self.class_to_idx.keys())

        for class_name in classes:
            class_idx = self.class_to_idx[class_name]
            dir = os.path.join(self.root_dir, 'GASF', class_name)
            files = sorted(glob(os.path.join(dir, '*.png')))

            for path in files:
                filename = os.path.basename(path)

                gadf_path = os.path.join(self.root_dir, 'GADF', class_name, filename)
                rp_path = os.path.join(self.root_dir, 'RP', class_name, filename)

                if os.path.exists(gadf_path) and os.path.exists(rp_path):
                    data_list.append({
                        'gasf': path,
                        'gadf': gadf_path,
                        'rp': rp_path,
                        'label': class_idx
                    })

        return data_list

    def __len__(self):
        return len(self.data_list)
    
    def __getitem__(self, index):
        item = self.data_list[index]

        img_gasf = Image.open(item['gasf']).convert('RGB') # L
        img_gadf = Image.open(item['gadf']).convert('RGB')
        img_rp = Image.open(item['rp']).convert('RGB')

        label = item['label']

        if self.transform:
            img_gasf = self.transform(img_gasf)
            img_gadf = self.transform(img_gadf)
            img_rp = self.transform(img_rp)

        return img_gasf, img_gadf, img_rp, torch.tensor(label, dtype=torch.long)

In [31]:
class DoubleFeatureFusionDataset(Dataset):
    def __init__(self, root_dir, f1, f2, class_to_idx, transform=None):
        self.root_dir = root_dir
        self.f1 = f1
        self.f2 = f2
        self.transform = transform
        self.class_to_idx = class_to_idx
        self.data_list = self._make_data_list()

        if not self.data_list:
            raise RuntimeError(f"Error :: Not found a data in Path: {root_dir}. Check the directory structure.")
        
    def _make_data_list(self):
        data_list = []

        classes = list(self.class_to_idx.keys())

        for class_name in classes:
            class_idx = self.class_to_idx[class_name]
            dir = os.path.join(self.root_dir, self.f1, class_name)
            files = sorted(glob(os.path.join(dir, '*.png')))

            for path in files:
                filename = os.path.basename(path)
                f2_path = os.path.join(self.root_dir, self.f2, class_name, filename)
                
                if os.path.exists(f2_path):
                    data_list.append({
                        'f1': path,
                        'f2': f2_path,
                        'label': class_idx
                    })

        return data_list

    def __len__(self):
        return len(self.data_list)
    
    def __getitem__(self, index):
        item = self.data_list[index]

        img_f1 = Image.open(item['f1']).convert('RGB') # L
        img_f2 = Image.open(item['f2']).convert('RGB')

        label = item['label']

        if self.transform:
            img_f1 = self.transform(img_f1)
            img_f2 = self.transform(img_f2)

        return img_f1, img_f2, torch.tensor(label, dtype=torch.long)

In [57]:
from torchvision import transforms
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomErasing()
])
eurusd_data_root = 'datasets/FTH/EURUSD_sample'
class_to_idx = {
    'Downward': -1,
    'Sideway': 0,
    'Upward': 1
}
eurusd_dataset = MultiFeatureFusionDataset(
    root_dir=eurusd_data_root,
    transform=transform,
    class_to_idx=class_to_idx
)

eurusd_npz_path = 'datasets/FTH/Serialized_FTH_EURUSD_sample.npz'

In [58]:
from tqdm import tqdm
import numpy as np

def serialize_multifeaturefusion(dataset, output_npz_path):    
    data_list = dataset.data_list
    
    if not data_list:
        print("Warning: There is no data to serialize.")
        return
    
    if os.path.exists(output_npz_path):
        print("Already exists the serialized dataset for NPZ.")
        return
    
    gasf_arrays = []
    gadf_arrays = []
    rp_arrays = []
    labels = []
    
    print(f"Start to serialize {len(data_list)} samples for multi-features fusion...")
    
    for item in tqdm(data_list, desc="Serializing"):
        img_gasf = Image.open(item['gasf']).convert('RGB')
        img_gadf = Image.open(item['gadf']).convert('RGB')
        img_rp = Image.open(item['rp']).convert('RGB')

        arr_gasf = np.array(img_gasf, dtype=np.uint8)
        arr_gadf = np.array(img_gadf, dtype=np.uint8)
        arr_rp = np.array(img_rp, dtype=np.uint8)
            
        gasf_arrays.append(arr_gasf)
        gadf_arrays.append(arr_gadf)
        rp_arrays.append(arr_rp)
        labels.append(item['label'])

    final_gasf = np.array(gasf_arrays)
    final_gadf = np.array(gadf_arrays)
    final_rp = np.array(rp_arrays)
    final_labels = np.array(labels, dtype=np.int64)
    
    np.savez_compressed(
        output_npz_path,
        gasf=final_gasf,
        gadf=final_gadf,
        rp=final_rp,
        labels=final_labels
    )
    
    print(f"Complete to serialize! Path to save the file: {output_npz_path}")

In [59]:
def serialize_doublefeaturefusion(dataset, output_npz_path, feature_1, feature_2):    
    data_list = dataset.data_list
    
    if not data_list:
        print("Warning: There is no data to serialize.")
        return
    
    if os.path.exists(output_npz_path):
        print("Already exists the serialized dataset for NPZ.")
        return

    f1_arrays = []
    f2_arrays = []
    labels = []
    
    print(f"Start to serialize {len(data_list)} samples for double-features fusion...")
    
    for item in tqdm(data_list, desc="Serializing"):
        f1 = Image.open(item[feature_1]).convert('RGB')
        f2 = Image.open(item[feature_2]).convert('RGB')

        arr_f1 = np.array(f1, dtype=np.uint8)
        arr_f2 = np.array(f2, dtype=np.uint8)
            
        f1_arrays.append(arr_f1)
        f2_arrays.append(arr_f2)
        labels.append(item['label'])

    final_f1 = np.array(f1_arrays)
    final_f2 = np.array(f2_arrays)
    final_labels = np.array(labels, dtype=np.int64)
    
    np.savez_compressed(
        output_npz_path,
        f1=final_f1,
        f2=final_f2,
        labels=final_labels
    )
    
    print(f"Complete to serialize! Path to save the file: {output_npz_path}")

In [60]:
serialize_multifeaturefusion(eurusd_dataset, eurusd_npz_path)

Start to serialize 9622 samples for multi-features fusion...


Serializing: 100%|██████████| 9622/9622 [00:32<00:00, 299.54it/s]


Complete to serialize! Path to save the file: datasets/FTH/Serialized_FTH_EURUSD_sample.npz


In [64]:
npz_path = 'datasets/FTH'
feature_name1 = 'gasf'
feature_name2 = 'gadf'
file_name = 'Serialized_FTH_EURUSD_sample_' + feature_name1 + '_' + feature_name2 + '.npz'
npz_path_to_save = os.path.join(npz_path, file_name)

serialize_doublefeaturefusion(eurusd_dataset, npz_path_to_save, feature_name1, feature_name2)

Start to serialize 9622 samples for double-features fusion...


Serializing: 100%|██████████| 9622/9622 [00:05<00:00, 1722.15it/s]


Complete to serialize! Path to save the file: datasets/FTH\Serialized_FTH_EURUSD_sample_gasf_gadf.npz
