Skip to content

Commit

Permalink
Improved database management
Browse files Browse the repository at this point in the history
  • Loading branch information
Herklos committed Oct 7, 2019
1 parent c1d2078 commit cabe277
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 39 deletions.
42 changes: 31 additions & 11 deletions octobot_backtesting/collectors/exchanges/exchange_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,26 @@ async def initialize(self):
symbols=json.dumps(self.symbols),
time_frames=json.dumps([tf.value for tf in self.time_frames]))

async def save_ticker(self, timestamp, exchange, symbol, ticker):
await self.database.insert(ExchangeDataTables.TICKER, timestamp, exchange_name=exchange, symbol=symbol,
ticker=json.dumps(ticker))
async def save_ticker(self, timestamp, exchange, symbol, ticker, multiple=False):
if not multiple:
await self.database.insert(ExchangeDataTables.TICKER, timestamp,
exchange_name=exchange, symbol=symbol, recent_trades=json.dumps(ticker))
else:
await self.database.insert_all(ExchangeDataTables.TICKER, timestamp,
exchange_name=exchange, symbol=symbol, recent_trades=[json.dumps(t)
for t in
ticker])

async def save_order_book(self, timestamp, exchange, symbol, asks, bids):
await self.database.insert(ExchangeDataTables.ORDER_BOOK, timestamp,
exchange_name=exchange, symbol=symbol, asks=asks, bids=bids)
async def save_order_book(self, timestamp, exchange, symbol, asks, bids, multiple=False):
if not multiple:
await self.database.insert(ExchangeDataTables.ORDER_BOOK, timestamp,
exchange_name=exchange, symbol=symbol,
asks=json.dumps(asks), bids=json.dumps(bids))
else:
await self.database.insert_all(ExchangeDataTables.ORDER_BOOK, timestamp,
exchange_name=exchange, symbol=symbol,
asks=[json.dumps(a) for a in asks],
bids=[json.dumps(b) for b in bids])

async def save_recent_trades(self, timestamp, exchange, symbol, recent_trades, multiple=False):
if not multiple:
Expand All @@ -79,12 +92,19 @@ async def save_ohlcv(self, timestamp, exchange, symbol, time_frame, candle, mult
if not multiple:
await self.database.insert(ExchangeDataTables.OHLCV, timestamp,
exchange_name=exchange, symbol=symbol, time_frame=time_frame.value,
candle=candle)
candle=json.dumps(candle))
else:
await self.database.insert_all(ExchangeDataTables.OHLCV, timestamp=timestamp,
exchange_name=exchange, symbol=symbol, time_frame=time_frame.value,
candle=candle)
candle=[json.dumps(rt) for rt in candle])

async def save_kline(self, timestamp, exchange, symbol, time_frame, kline, multiple=False):
if not multiple:
await self.database.insert(ExchangeDataTables.KLINE, timestamp,
exchange_name=exchange, symbol=symbol, time_frame=time_frame.value,
candle=json.dumps(kline))
else:
await self.database.insert_all(ExchangeDataTables.KLINE, timestamp=timestamp,
exchange_name=exchange, symbol=symbol, time_frame=time_frame.value,
candle=[json.dumps(kl) for kl in kline])

async def save_kline(self, timestamp, exchange, symbol, time_frame, kline):
await self.database.insert(ExchangeDataTables.KLINE, timestamp,
exchange_name=exchange, symbol=symbol, time_frame=time_frame.value, kline=kline)
4 changes: 4 additions & 0 deletions octobot_backtesting/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.


class DataBaseNotExists(Exception):
pass
3 changes: 3 additions & 0 deletions octobot_backtesting/data/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import aiosqlite

from octobot_backtesting.data import DataBaseNotExists
from octobot_backtesting.enums import DataBaseOrderBy
from octobot_commons.logging.logging_util import get_logger

Expand Down Expand Up @@ -126,6 +127,8 @@ async def __execute_select(self, table, select_items="*", where_clauses="", addi
f"{additional_clauses}")
return await self.cursor.fetchall() if size == self.DEFAULT_SIZE else await self.cursor.fetchmany(size)
except OperationalError as e:
if not await self.__check_table_exists(table):
raise DataBaseNotExists(e)
self.logger.error(f"An error occurred when executing select : {e}")
return []

Expand Down
5 changes: 5 additions & 0 deletions octobot_backtesting/importers/exchanges/exchange_importer.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ cdef class ExchangeDataImporter(DataImporter):
cdef public list time_frames

cdef tuple __get_operations_from_timestamps(self, float superior_timestamp, float inferior_timestamp)
cdef list import_ohlcvs(self, list ohlcvs)
cdef list import_tickers(self, list tickers)
cdef list import_klines(self, list klines)
cdef list import_order_books(self, list order_books)
cdef list import_recent_trades(self, list recent_trades)
93 changes: 65 additions & 28 deletions octobot_backtesting/importers/exchanges/exchange_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# License along with this library.
import json

from octobot_backtesting.data import DataBaseNotExists
from octobot_backtesting.data.database import DataBase
from octobot_backtesting.enums import ExchangeDataTables, DataBaseOperations, DataBaseOrderBy, DataTables
from octobot_backtesting.importers.data_importer import DataImporter
Expand Down Expand Up @@ -59,7 +60,7 @@ async def get_data_timestamp_interval(self):
max_timestamp = (await self.database.select(table, size=1, sort=DataBaseOrderBy.DESC.value))[0][0]
if not maximum_timestamp or maximum_timestamp > max_timestamp:
maximum_timestamp = max_timestamp
except IndexError:
except (IndexError, DataBaseNotExists):
pass
return minimum_timestamp, maximum_timestamp

Expand All @@ -75,61 +76,97 @@ def __get_operations_from_timestamps(self, superior_timestamp, inferior_timestam

return timestamps, operations

def import_ohlcvs(self, ohlcvs):
for ohlcv in ohlcvs:
ohlcv[-1] = json.loads(ohlcv[-1])
return ohlcvs

async def get_ohlcv(self, exchange_name=None, symbol=None, time_frame=None, limit=DataBase.DEFAULT_SIZE):
return await self.database.select(ExchangeDataTables.OHLCV, size=limit,
exchange_name=exchange_name, symbol=symbol, time_frame=time_frame)
return self.import_ohlcvs(await self.database.select(ExchangeDataTables.OHLCV, size=limit,
exchange_name=exchange_name, symbol=symbol,
time_frame=time_frame))

async def get_ohlcv_from_timestamps(self, exchange_name=None, symbol=None, time_frame=None,
limit=DataBase.DEFAULT_SIZE,
inferior_timestamp=-1, superior_timestamp=-1):
timestamps, operations = self.__get_operations_from_timestamps(superior_timestamp, inferior_timestamp)
return await self.database.select_from_timestamp(ExchangeDataTables.OHLCV, size=limit,
exchange_name=exchange_name, symbol=symbol,
time_frame=time_frame,
timestamps=timestamps, operations=operations)
return self.import_ohlcvs(await self.database.select_from_timestamp(ExchangeDataTables.OHLCV, size=limit,
exchange_name=exchange_name, symbol=symbol,
time_frame=time_frame,
timestamps=timestamps,
operations=operations))

def import_tickers(self, tickers):
for ticker in tickers:
ticker[-1] = json.loads(ticker[-1])
return tickers

async def get_ticker(self, exchange_name=None, symbol=None, limit=DataBase.DEFAULT_SIZE):
return await self.database.select(ExchangeDataTables.TICKER, size=limit,
exchange_name=exchange_name, symbol=symbol)
return self.import_tickers(await self.database.select(ExchangeDataTables.TICKER, size=limit,
exchange_name=exchange_name, symbol=symbol))

async def get_ticker_from_timestamps(self, exchange_name=None, symbol=None, limit=DataBase.DEFAULT_SIZE,
inferior_timestamp=-1, superior_timestamp=-1):
timestamps, operations = self.__get_operations_from_timestamps(superior_timestamp, inferior_timestamp)
return await self.database.select_from_timestamp(ExchangeDataTables.TICKER, size=limit,
exchange_name=exchange_name, symbol=symbol,
timestamps=timestamps, operations=operations)
return self.import_tickers(await self.database.select_from_timestamp(ExchangeDataTables.TICKER, size=limit,
exchange_name=exchange_name, symbol=symbol,
timestamps=timestamps,
operations=operations))

def import_order_books(self, order_books):
for order_book in order_books:
order_book[-1] = json.loads(order_book[-1])
order_book[-2] = json.loads(order_book[-2])
return order_books

async def get_order_book(self, exchange_name=None, symbol=None, limit=DataBase.DEFAULT_SIZE):
return await self.database.select(ExchangeDataTables.ORDER_BOOK, size=limit,
exchange_name=exchange_name, symbol=symbol)
return self.import_order_books(await self.database.select(ExchangeDataTables.ORDER_BOOK, size=limit,
exchange_name=exchange_name, symbol=symbol))

async def get_order_book_from_timestamps(self, exchange_name=None, symbol=None, limit=DataBase.DEFAULT_SIZE,
inferior_timestamp=-1, superior_timestamp=-1):
timestamps, operations = self.__get_operations_from_timestamps(superior_timestamp, inferior_timestamp)
return await self.database.select_from_timestamp(ExchangeDataTables.ORDER_BOOK, size=limit,
exchange_name=exchange_name, symbol=symbol,
timestamps=timestamps, operations=operations)
return self.import_order_books(
await self.database.select_from_timestamp(ExchangeDataTables.ORDER_BOOK, size=limit,
exchange_name=exchange_name, symbol=symbol,
timestamps=timestamps, operations=operations))

def import_recent_trades(self, recent_trades):
for recent_trade in recent_trades:
recent_trade[-1] = json.loads(recent_trade[-1])
return recent_trades

async def get_recent_trades(self, exchange_name=None, symbol=None, limit=DataBase.DEFAULT_SIZE):
return await self.database.select(ExchangeDataTables.RECENT_TRADES, size=limit,
exchange_name=exchange_name, symbol=symbol)
return self.import_recent_trades(await self.database.select(ExchangeDataTables.RECENT_TRADES, size=limit,
exchange_name=exchange_name, symbol=symbol))

async def get_recent_trades_from_timestamps(self, exchange_name=None, symbol=None, limit=DataBase.DEFAULT_SIZE,
inferior_timestamp=-1, superior_timestamp=-1):
timestamps, operations = self.__get_operations_from_timestamps(superior_timestamp, inferior_timestamp)
return await self.database.select_from_timestamp(ExchangeDataTables.RECENT_TRADES, size=limit,
exchange_name=exchange_name, symbol=symbol,
timestamps=timestamps, operations=operations)
return self.import_recent_trades(await
self.database.select_from_timestamp(ExchangeDataTables.RECENT_TRADES,
size=limit,
exchange_name=exchange_name,
symbol=symbol,
timestamps=timestamps,
operations=operations))

def import_klines(self, klines):
for kline in klines:
kline[-1] = json.loads(kline[-1])
return klines

async def get_kline(self, exchange_name=None, symbol=None, time_frame=None, limit=DataBase.DEFAULT_SIZE):
return await self.database.select(ExchangeDataTables.KLINE, size=limit,
exchange_name=exchange_name, symbol=symbol, time_frame=time_frame)
return self.import_klines(await self.database.select(ExchangeDataTables.KLINE, size=limit,
exchange_name=exchange_name, symbol=symbol,
time_frame=time_frame))

async def get_kline_from_timestamps(self, exchange_name=None, symbol=None, time_frame=None,
limit=DataBase.DEFAULT_SIZE,
inferior_timestamp=-1, superior_timestamp=-1):
timestamps, operations = self.__get_operations_from_timestamps(superior_timestamp, inferior_timestamp)
return await self.database.select_from_timestamp(ExchangeDataTables.KLINE, size=limit,
exchange_name=exchange_name, symbol=symbol,
time_frame=time_frame,
timestamps=timestamps, operations=operations)
return self.import_klines(await self.database.select_from_timestamp(ExchangeDataTables.KLINE, size=limit,
exchange_name=exchange_name, symbol=symbol,
time_frame=time_frame,
timestamps=timestamps,
operations=operations))

0 comments on commit cabe277

Please sign in to comment.