Skip to content

Commit

Permalink
use independent cursors on concurrent database calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume De Saint Martin committed May 14, 2020
1 parent d7bcb88 commit 38cd926
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
3 changes: 2 additions & 1 deletion octobot_backtesting/data/database.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ cdef class DataBase:

cdef object logger
cdef object connection
cdef object cursor
cdef list _cursor_pool
cdef int _current_cursor_index

cdef list tables

Expand Down
65 changes: 47 additions & 18 deletions octobot_backtesting/data/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from sqlite3 import OperationalError, DatabaseError
from contextlib import asynccontextmanager
from sqlite3 import OperationalError, DatabaseError, Cursor

import aiosqlite

Expand All @@ -38,21 +39,43 @@ def __init__(self, file_name):
self.cache = {}

self.connection = None
self.cursor = None

# should never be used directly, use async with self.aio_cursor() as cursor: instead
self._cursor_pool = []
self._current_cursor_index = 0

async def initialize(self):
try:
self.connection = await aiosqlite.connect(self.file_name)
self.cursor = await self.connection.cursor()
await self._add_cursor_in_pool()
await self.__init_tables_list()
except (OperationalError, DatabaseError) as e:
raise DataBaseNotExists(e)

async def create_index(self, table, columns):
await self.__execute_index_creation(table, '_'.join(columns), ', '.join(columns))

async def _add_cursor_in_pool(self):
self._cursor_pool.append(await self.connection.cursor())

@asynccontextmanager
async def aio_cursor(self) -> Cursor:
"""
Use this as a context manager to get a free database cursor
:yield: A free cursor
:return: None
"""
if self._current_cursor_index != 0 and len(self._cursor_pool) <= self._current_cursor_index:
await self._add_cursor_in_pool()
self._current_cursor_index += 1
try:
yield self._cursor_pool[self._current_cursor_index - 1]
finally:
self._current_cursor_index -= 1

async def __execute_index_creation(self, table, name, columns):
await self.cursor.execute(f"CREATE INDEX index_{table.value}_{name} ON {table.value} ({columns})")
async with self.aio_cursor() as cursor:
await cursor.execute(f"CREATE INDEX index_{table.value}_{name} ON {table.value} ({columns})")

async def insert(self, table, timestamp, **kwargs):
if table.value not in self.tables:
Expand Down Expand Up @@ -81,7 +104,8 @@ def __insert_values(self, timestamp, inserting_values) -> str:
return f"({timestamp}, {inserting_values})"

async def __execute_insert(self, table, insert_items) -> None:
await self.cursor.execute(f"INSERT INTO {table.value} VALUES {insert_items}")
async with self.aio_cursor() as cursor:
await cursor.execute(f"INSERT INTO {table.value} VALUES {insert_items}")

# Save (commit) the changes
await self.connection.commit()
Expand Down Expand Up @@ -161,30 +185,34 @@ def __selected_columns(self, columns=None):
async def __execute_select(self, table, select_items="*", where_clauses="", additional_clauses="", group_by="",
size=DEFAULT_SIZE):
try:
await self.cursor.execute(f"SELECT {select_items} FROM {table.value} "
f"{'WHERE' if where_clauses else ''} {where_clauses} "
f"{additional_clauses} {group_by}")
return await self.cursor.fetchall() if size == self.DEFAULT_SIZE else await self.cursor.fetchmany(size)
async with self.aio_cursor() as cursor:
await cursor.execute(f"SELECT {select_items} FROM {table.value} "
f"{'WHERE' if where_clauses else ''} {where_clauses} "
f"{additional_clauses} {group_by}")
return await cursor.fetchall() if size == self.DEFAULT_SIZE else await cursor.fetchmany(size)
except OperationalError as e:
if not await self.check_table_exists(table):
raise DataBaseNotExists(e)
self.logger.error(f"An error occurred when executing select : {e}")
return []

async def check_table_exists(self, table) -> bool:
await self.cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table.value}'")
return await self.cursor.fetchall() != []
async with self.aio_cursor() as cursor:
await cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table.value}'")
return await cursor.fetchall() != []

async def check_table_not_empty(self, table) -> bool:
await self.cursor.execute(f"SELECT count(*) FROM '{table.value}'")
row_count = await self.cursor.fetchone()
return row_count[0] != 0
async with self.aio_cursor() as cursor:
await cursor.execute(f"SELECT count(*) FROM '{table.value}'")
row_count = await cursor.fetchone()
return row_count[0] != 0

async def __create_table(self, table, with_index_on_timestamp=True, **kwargs) -> None:
try:
columns: list = list(kwargs.keys())
await self.cursor.execute(
f"CREATE TABLE {table.value} ({self.TIMESTAMP_COLUMN} datetime, {' text, '.join([col for col in columns])})")
async with self.aio_cursor() as cursor:
await cursor.execute(
f"CREATE TABLE {table.value} ({self.TIMESTAMP_COLUMN} datetime, {' text, '.join([col for col in columns])})")

if with_index_on_timestamp:
await self.create_index(table, [self.TIMESTAMP_COLUMN])
Expand All @@ -198,8 +226,9 @@ async def __create_table(self, table, with_index_on_timestamp=True, **kwargs) ->
self.tables.append(table.value)

async def __init_tables_list(self):
await self.cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table'")
self.tables = await self.cursor.fetchall()
async with self.aio_cursor() as cursor:
await cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table'")
self.tables = await cursor.fetchall()

async def stop(self):
if self.connection is not None:
Expand Down

0 comments on commit 38cd926

Please sign in to comment.