In [1]:
from typing import *

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

import os


In [2]:
FILES = [os.path.join("Data/Minutes", path) for path in os.listdir("Data/Minutes")[:5]]
DATA_PATH = ("temp/train.csv", "temp/test.csv")
SEQ_LEN = 64


In [None]:
class DataSetTooSmall(Exception):
	pass


In [7]:
class DataPreparer:
	def __init__(
			self, 
			seq_len, 
			files: List[str], 
			train_output_path: str,
			test_output_path: str,
			test_split_ratio: float = 0.1,
			x_column_headers: List[str] = None, 
			y_column_header: str = "y"
	):
		self.__seq_len = seq_len
		self.__files = files
		self.__train_output_path, self.__test_output_path = train_output_path, test_output_path
		self.__test_split_ratio = test_split_ratio
		self.__x_column_headers = x_column_headers
		if x_column_headers is None:
			self.__x_column_headers = [str(i) for i in range(seq_len)]
		self.__y_column_header = y_column_header
		
	def __get_currency_pairs(self, df: pd.DataFrame) -> List[Tuple[str, str]]:
		DELIMITER = "/"
		return [
			(pair.split(DELIMITER)[0], pair.split(DELIMITER)[1]) for pair in
			set(df["base_currency"] + DELIMITER + df["quote_currency"])
		]  # TODO FIND A CLEAR WAY

	def __prepare_for_pair(self, sequence: np.ndarray, seq_len: int):
		data_len = sequence.shape[0] - seq_len
		X = np.zeros((data_len, seq_len))
		y = np.zeros((data_len,))
		for i in range(data_len):
			X[i] = sequence[i:i + seq_len]
			if sequence[i] > sequence[i - 1]:
				y[i] = 1
			else:
				y[i] = 0
		return X, y

	def __prepare_data(self, data: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]:
		currency_pairs = self.__get_currency_pairs(data)

		X = []
		y = []

		for base_currency, quote_currency in currency_pairs:
			pair_sequence = data[data["base_currency"] == base_currency][
				data[data["base_currency"] == base_currency]["quote_currency"] == quote_currency
			]["c"].to_numpy()  # TODO
			pair_X, pair_y = self.__prepare_for_pair(pair_sequence, self.__seq_len)
			X += list(pair_X)
			y += list(pair_y)

		return np.array(X), np.array(y)

	def __split_data(self, X, y) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
		return train_test_split(X, y, test_size=self.__test_split_ratio, random_state=42)

	def __load_file(self, file_name: str) -> pd.DataFrame:
		return pd.read_csv(file_name, index_col=0)

	def __process_file(self, file: str) -> Tuple[pd.DataFrame, pd.DataFrame]:

		raw_data = self.__load_file(file)
		if len(raw_data) < self.__seq_len:
			raise DataSetTooSmall()
		X, y = self.__prepare_data(raw_data)
		X_train, X_test, y_train, y_test = self.__split_data(X, y)
		train_df, test_df = pd.DataFrame(X_train, columns=self.__x_column_headers), pd.DataFrame(X_test, columns=self.__x_column_headers)
		train_df.columns = test_df.columns = self.__x_column_headers
		train_df[self.__y_column_header], test_df[self.__y_column_header] = pd.DataFrame(y_train), pd.DataFrame(y_test)
		return train_df, test_df

	def __create_dfs(self):
		for path in [self.__train_output_path, self.__test_output_path]:
			df = pd.DataFrame(columns=self.__x_column_headers+[self.__y_column_header])
			df.to_csv(path)

	def __append_dfs(self, train_df: pd.DataFrame, test_df: pd.DataFrame):
		train_df.to_csv(self.__train_output_path, mode="a", header=False)
		test_df.to_csv(self.__test_output_path, mode="a", header=False)

	def start(self):
		self.__create_dfs()
		for i, file in enumerate(self.__files):
			try:
				train_df, test_df = self.__process_file(file)
			except DataSetTooSmall:
				continue
			self.__append_dfs(train_df, test_df)
			print(f"Done: {100*(i+1)/len(self.__files) :.2f}%", end="\r")


In [8]:
processor = DataPreparer(
	SEQ_LEN,
	FILES,
	DATA_PATH[0],
	DATA_PATH[1]
)
processor.start()


Done: 20.00%

Done: 40.00%

Done: 60.00%

Done: 80.00%

Done: 100.00%