-
Notifications
You must be signed in to change notification settings - Fork 0
/
OHLCV.py
150 lines (106 loc) · 4.38 KB
/
OHLCV.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import time
import random
import itertools
import multiprocessing
from tqdm import tqdm
import ccxt
import pandas as pd
from ta.utils import dropna
from settings import DOWNLOAD_FOLDER, symbols, timeframes
class OHLCV:
def __init__(self, exchange: ccxt.Exchange, symbol: str, timeframe: str, update: bool = False):
"""OHLCV data handler.
Args:
exchange (ccxt.Exchange): ccxt exchange object.
symbol (str): symbol to fetch.
timeframe (str): timeframe to fetch.
update (bool, optional): whether to update the data. Defaults to False.
"""
self.exchange = exchange
self.symbol = symbol
self.timeframe = timeframe
self.filename = f"ohlcv_{self.symbol.replace('/', '_')}_{self.timeframe}.pkl"
self.filepath = os.path.join(DOWNLOAD_FOLDER, self.filename)
if update:
self.update()
def __fetch_ohlcv(self, since: int = 0) -> list:
sleep_timer = 10
while True:
try:
return self.exchange.fetch_ohlcv(
symbol=self.symbol,
timeframe=self.timeframe,
since=since,
limit=1000
)
except ccxt.NetworkError:
time.sleep(random.randint(1, sleep_timer))
sleep_timer += 10
continue
def __fetch_candles(self, since: int = 0) -> list:
"""Fetches candles from exchange.
Args:
since (int, optional): timestamp to start fetching from. Defaults to 0.
"""
candles = []
with tqdm(desc = f"Fetching {self.symbol:>9} {self.timeframe:>3} candles") as pbar:
while True:
candles_new = self.__fetch_ohlcv(since=since)
if len(candles_new) == 0:
pbar.close()
# drop last candle since it is not complete
return candles[:-1]
candles += candles_new
since = candles[-1][0] + 1
pbar.update(len(candles_new))
def __parse_candles(self, candles: list) -> pd.DataFrame:
"""Parses candles into a pandas DataFrame.
Args:
candles (list): list of candles.
"""
columns = ["timestamp", "open", "high", "low", "close", "volume"]
df = pd.DataFrame(candles, columns = columns)
return df
def update(self):
"""Updates the raw data file."""
try:
df = pd.read_pickle(self.filepath)
# print(f"File {filepath} found. Updating...")
since = df["timestamp"].iloc[-1] + 1
candles = self.__fetch_candles(since)
df_new = self.__parse_candles(candles)
if df_new.shape[0] > 0:
df = pd.concat([df, df_new], ignore_index = True)
df.to_pickle(self.filepath)
except FileNotFoundError:
# print(f"File {filepath} not found. Downloading...")
candles = self.__fetch_candles()
df = self.__parse_candles(candles)
df.to_pickle(self.filepath)
@staticmethod
def __clean(df: pd.DataFrame) -> pd.DataFrame:
df = dropna(df)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
df.set_index('timestamp', inplace=True)
return df
def get_data(self, update: bool = False) -> pd.DataFrame:
"""Returns the cleaned data.
Args:
update (bool, optional): whether to update the data. Defaults to False.
Returns:
pd.DataFrame: cleaned data.
"""
if update:
self.update()
elif not os.path.isfile(self.filepath):
print(f"File {self.filepath} not found. Downloading...")
self.update()
df = pd.read_pickle(self.filepath)
df = self.__clean(df)
return df
if __name__ == "__main__":
exchange = ccxt.binance()
args = list(itertools.product([exchange], symbols, timeframes, [True]))
with multiprocessing.Pool(len(args)) as pool:
pool.starmap(OHLCV, args)