Skip to content

Commit

Permalink
v0.2.17 CockroachDB update
Browse files Browse the repository at this point in the history
  • Loading branch information
bmeares committed Apr 7, 2021
1 parent 894841b commit 951af42
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 49 deletions.
8 changes: 7 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
This changelog was not introduced until v0.2.12, so many of the critical releases of Meerschaum have already been published.

### v0.2.17
- **Added CockroachDB as a supported database flavor.**
- **Added CockroachDB as a supported database flavor.**
CockroachDB may be a data source or a Meerschaum backend. There may be some performance tuning to do, but for now, it is functional. For example, I may implement bulk insert for CockroachDB like what is done for PostgreSQL and TimescaleDB.
- **Only attempt to install readline once in Meerschaum portable.**
The first Meerschaum portable launch will attempt to install readline, but even in case of failure, it won't try to reinstall during subsequent launches or reloads.
- **Refactored SQLAlchemy configuration.**
Under `system:connectors:sql`, the key `create_engine` has been added to house all the `sqlalchemy` configuration settings. **WARNING:** You might need to run `delete config system` to refresh this portion of the config file in case any old settings break things.
- **Dependency conflict resolution.**
- **As always, more bugfixes :)**

### v0.2.16
- **Hypertable improvements and bugfixes.**
Expand Down
9 changes: 8 additions & 1 deletion meerschaum/Pipe/_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,11 @@ def get_rowcount(
"""
connector = self.instance_connector
if remote: connector = self.connector
return connector.get_pipe_rowcount(self, begin=begin, end=end, remote=remote, params=params, debug=debug)
try:
return connector.get_pipe_rowcount(self, begin=begin, end=end, remote=remote, params=params, debug=debug)
except AttributeError:
if remote:
return None
from meerschaum.utils.warnings import warn
warn(f"Failed to get a rowcount for pipe '{self}'.")
return None
44 changes: 43 additions & 1 deletion meerschaum/actions/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def show(
'connectors' : _show_connectors,
'arguments' : _show_arguments,
'data' : _show_data,
'rowcounts' : _show_rowcounts,
'plugins' : _show_plugins,
'packages' : _show_packages,
'help' : _show_help,
Expand Down Expand Up @@ -294,8 +295,47 @@ def _show_data(
df.plot()
return True, "Success"

def _show_rowcounts(
action : Optional[List[str]] = None,
debug : bool = False,
**kw : Any
) -> SuccessTuple:
"""
Show the rowcounts for pipes.
To see remote rowcounts (execute `COUNT(*)` on the source server),
execute `show rowcounts remote`.
"""
from meerschaum.utils.misc import print_options
from meerschaum.utils.pool import get_pool
from meerschaum import get_pipes

if action is None: action = []
remote = action and action[0] == 'remote'

pipes = get_pipes(as_list=True, debug=debug, **kw)
pool = get_pool()
def _get_rc(_pipe):
return _pipe.get_rowcount(remote=remote, debug=debug)

rowcounts = pool.map(_get_rc, pipes)

rc_dict = {}
for i, p in enumerate(pipes):
rc_dict[p] = rowcounts[i]

msgs = []
for p, rc in rc_dict.items():
msgs.append(f'{p}\n{rc}')

header = "Remote rowcounts:" if remote else "Pipe rowcounts:"

print_options(msgs, header=header, **kw)

return True, "Success"

def _show_plugins(
action : Sequence[str] = [],
action : Optional[List[str]] = None,
repository : Optional[str] = None,
debug : bool = False,
**kw : Any
Expand All @@ -310,6 +350,8 @@ def _show_plugins(
from meerschaum._internal.User import User
repo_connector = parse_repo_keys(repository)

if action is None: action = []

if action == [''] or len(action) == 0:
_to_print = get_plugins_names()
header = "Installed plugins:"
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/api/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ async def shutdown():
if get_connector().type == 'sql':
try:
await get_connector().db.disconnect()
except AssertionError:
except:
pass

25 changes: 14 additions & 11 deletions meerschaum/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,21 @@ def _apply_environment_config(env_var):
if environment_runtime in os.environ:
if os.environ[environment_runtime] == 'portable':
import platform
from meerschaum.config._paths import PORTABLE_CHECK_READLINE_PATH
from meerschaum.utils.packages import attempt_import, pip_install
rl_name = "gnureadline" if platform.system() != 'Windows' else "pyreadline"
try:
rl = attempt_import(
rl_name,
lazy = False,
install = True,
venv = None,
)
except ImportError:
if not pip_install(rl_name, args=['--upgrade', '--ignore-installed'], venv=None):
print(f"Unable to import {rl_name}!")
if not PORTABLE_CHECK_READLINE_PATH.exists():
rl_name = "gnureadline" if platform.system() != 'Windows' else "pyreadline"
try:
rl = attempt_import(
rl_name,
lazy = False,
install = True,
venv = None,
)
except ImportError:
if not pip_install(rl_name, args=['--upgrade', '--ignore-installed'], venv=None):
print(f"Unable to import {rl_name}!")
PORTABLE_CHECK_READLINE_PATH.touch()


### If interactive REPL, print welcome header.
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/config/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
default_meerschaum_config = {
'instance' : 'sql:main',
'api_instance' : 'MRSM{meerschaum:instance}',
'default_repository' : 'api:mrsm',
'default_repository' : 'MRSM{meerschaum:instance}',
'connectors' : {
'sql' : {
# 'default' : connector_attributes['sql']['flavors']['timescaledb']['defaults'],
Expand Down
2 changes: 2 additions & 0 deletions meerschaum/config/_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
'GRAFANA_DASHBOARD_PATH' : ('{GRAFANA_RESOURCES_PATH}', 'provisioning', 'dashboards', 'dashboard.yaml'),
'MOSQUITTO_RESOURCES_PATH' : ('{STACK_RESOURCES_PATH}', 'mosquitto', 'resources'),
'MOSQUITTO_CONFIG_PATH' : ('{MOSQUITTO_RESOURCES_PATH}', 'mosquitto.conf'),

'PORTABLE_CHECK_READLINE_PATH' : ('{SHELL_RESOURCES_PATH}', '.readline_attempted_install'),
}

def set_root(root : Union[Path, str]):
Expand Down
13 changes: 11 additions & 2 deletions meerschaum/connectors/sql/SQLConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
Interface with SQL servers using sqlalchemy.
"""

from __future__ import annotations
from meerschaum.utils.typing import Optional

from meerschaum.connectors import Connector
from meerschaum.utils.warnings import error

Expand Down Expand Up @@ -134,14 +137,20 @@ def metadata(self):
return self._metadata

@property
def db(self):
def db(self) -> Optional[databases.Database]:
from meerschaum.utils.packages import attempt_import
databases = attempt_import('databases', lazy=False, install=True)
url = self.DATABASE_URL
if 'mysql' in url:
url = url.replace('+pymysql', '')
if '_db' not in self.__dict__:
self._db = databases.Database(url)
try:
self._db = databases.Database(url)
except KeyError:
### Likely encountered an unsupported flavor.
from meerschaum.utils.warnings import warn
self._db = None
# warn(f"Connector '{self}' doesn't appear supported by the databases module. If the flavor '{self.flavor}' is supported by SQLAlchemy, then the connector may work but won't be able to wait for a valid connection.", stack=False)
return self._db

def __getstate__(self): return self.__dict__
Expand Down
23 changes: 14 additions & 9 deletions meerschaum/connectors/sql/_create_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
'database',
}
default_create_engine_args = {
'method',
# 'method',
'pool_size',
'max_overflow',
'pool_recycle',
Expand Down Expand Up @@ -89,6 +89,7 @@
'port' : 26257,
'database' : 'defaultdb',
'username' : 'root',
'password' : 'admin',
},
},
}
Expand All @@ -105,6 +106,7 @@ def create_engine(
returns: sqlalchemy engine
"""
from meerschaum.utils.packages import attempt_import
from meerschaum.utils.warnings import error, warn
sqlalchemy = attempt_import('sqlalchemy')
import urllib
if self.flavor in ('timescaledb', 'postgresql', 'cockroachdb'):
Expand All @@ -120,7 +122,6 @@ def create_engine(

### Verify that everything is in order.
if self.flavor not in flavor_configs:
from meerschaum.utils.warnings import error
error(f"Cannot create a connector with the flavor '{self.flavor}'.")

_engine = flavor_configs[self.flavor].get('engine', None)
Expand All @@ -133,7 +134,10 @@ def create_engine(
### self.sys_config was deepcopied and can be updated safely
if self.flavor == "sqlite":
engine_str = f"sqlite:///{self.database}"
self.sys_config['connect_args'].update({"check_same_thread" : False})
if 'create_engine' not in self.sys_config: self.sys_config['create_engine'] = {}
if 'connect_args' not in self.sys_config['create_engine']:
self.sys_config['create_engine']['connect_args'] = {}
self.sys_config['create_engine']['connect_args'].update({"check_same_thread" : False})
aiosqlite = attempt_import('aiosqlite', debug=self._debug, lazy=False)
else:
engine_str = (
Expand All @@ -142,13 +146,14 @@ def create_engine(
"@" + _host + ((":" + str(_port)) if _port is not None else '') +
(("/" + _database) if _database is not None else '')
)
if debug: dprint(f"{engine_str}" + '\n' + f"{self.sys_config['connect_args']}")
if debug: dprint(f"{engine_str}" + '\n' + f"{self.sys_config.get('create_engine', {}).get('connect_args', {})}")

_kw_copy = kw.copy()
_kw_copy.update(
{ k: v for k, v in self.sys_config.get('create_engine', {}).items()
if k in flavor_configs[self.flavor]['create_engine'] }
)
_create_engine_args = {
k: v for k, v in self.sys_config.get('create_engine', {}).items()
if k in flavor_configs[self.flavor].get('create_engine', {})
}
_create_engine_args.update(_kw_copy)

try:
engine = sqlalchemy.create_engine(
Expand All @@ -164,7 +169,7 @@ def create_engine(
self.sys_config['poolclass'].split('.')[-1]
),
echo = debug,
**_kw_copy
**_create_engine_args
)
except Exception as e:
warn(e)
Expand Down
37 changes: 23 additions & 14 deletions meerschaum/connectors/sql/_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def create_indices(
elif self.flavor in ('mysql', 'mariadb'):
dt_query = f"CREATE INDEX ON {_pipe_name} ({_datetime_name})"
else: ### mssql, sqlite, etc.
dt_query = f"CREATE INDEX {_datetime.lower().strip()}_index ON {_pipe_name} ({_datetime_name})"
dt_query = f"CREATE INDEX {sql_item_name(_datetime + '_index', self.flavor)} ON {_pipe_name} ({_datetime_name})"

index_queries[_datetime] = dt_query

Expand All @@ -262,7 +262,7 @@ def create_indices(
elif self.flavor in ('mysql', 'mariadb'):
id_query = f"CREATE INDEX ON {_pipe_name} ({_id_name})"
else: ### mssql, sqlite, etc.
id_query = f"CREATE INDEX {_id.lower().strip()}_index ON {_pipe_name} ({_id_name})"
id_query = f"CREATE INDEX {sql_item_name(_id + '_index', self.flavor)} ON {_pipe_name} ({_id_name})"

if id_query is not None:
index_queries[_id] = id_query
Expand Down Expand Up @@ -652,16 +652,17 @@ def pipe_exists(
from meerschaum.connectors.sql._tools import sql_item_name
from meerschaum.utils.debug import dprint
### default: select no rows. NOTE: this might not work for Oracle
q = f"SELECT COUNT(*) FROM {pipe}"
_pipe_name = sql_item_name(str(pipe), self.flavor)
q = f"SELECT COUNT(*) FROM {_pipe_name} WHERE 1 = 0"
if self.flavor in ('timescaledb', 'postgresql'):
q = f"SELECT to_regclass('{sql_item_name(str(pipe), self.flavor)}')"
q = f"SELECT to_regclass('{_pipe_name}')"
elif self.flavor == 'mssql':
q = f"SELECT OBJECT_ID('{pipe}')"
q = f"SELECT OBJECT_ID('{_pipe_name}')"
elif self.flavor in ('mysql', 'mariadb'):
q = f"SHOW TABLES LIKE '{pipe}'"
q = f"SHOW TABLES LIKE '{_pipe_name}'"
elif self.flavor == 'sqlite':
q = f"SELECT name FROM sqlite_master WHERE name='{pipe}'"
exists = self.value(q, debug=debug) is not None
exists = self.value(q, debug=debug, silent=True) is not None
if debug: dprint(f"Pipe '{pipe}' " + ('exists.' if exists else 'does not exist.'))
return exists

Expand All @@ -687,26 +688,34 @@ def get_pipe_rowcount(
if 'definition' not in pipe.parameters['fetch']:
error(msg)
return None
if 'datetime' not in pipe.columns: error(f"Pipe '{pipe}' must have a 'datetime' column declared (columns:datetime)")
src = f"SELECT * FROM {sql_item_name(pipe, self.flavor)}" if not remote else pipe.parameters['fetch']['definition']
# if 'datetime' not in pipe.columns: error(f"Pipe '{pipe}' must have a 'datetime' column declared (columns:datetime)")

_pipe_name = sql_item_name(str(pipe), self.flavor)
_datetime_name = sql_item_name(pipe.get_columns('datetime'), self.flavor)

src = f"SELECT {_datetime_name} FROM {_pipe_name}" if not remote else pipe.parameters['fetch']['definition']
query = f"""
WITH src AS ({src})
SELECT COUNT({pipe.columns['datetime']})
SELECT COUNT({_datetime_name})
FROM src
"""
if begin is not None or end is not None: query += "WHERE"
if begin is not None:
query += f"""
{pipe.columns['datetime']} > {dateadd_str(flavor=self.flavor, datepart='minute', number=(0), begin=begin)}
{_datetime_name} > {dateadd_str(flavor=self.flavor, datepart='minute', number=(0), begin=begin)}
"""
if end is not None and begin is not None: query += "AND"
if end is not None:
query += f"""
{pipe.columns['datetime']} <= {dateadd_str(flavor=self.flavor, datepart='minute', number=(0), begin=end)}
{_datetime_name} <= {dateadd_str(flavor=self.flavor, datepart='minute', number=(0), begin=end)}
"""
if params is not None:
from meerschaum.utils.misc import build_where
query += build_where(params, self).replace('WHERE', ('AND' if (begin is not None or end is not None) else "WHERE"))
from meerschaum.connectors.sql._tools import build_where
query += build_where(params, self).replace('WHERE', (
'AND' if (begin is not None or end is not None)
else 'WHERE'
)
)

result = self.value(query, debug=debug)
try:
Expand Down
7 changes: 4 additions & 3 deletions meerschaum/connectors/sql/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def read(
except Exception as e:
import inspect
if debug: dprint(f"Failed to execute query:\n\n{query_or_table}\n\n")
if not silent: warn(str(e))
if not silent: warn(str(e), stacklevel=3)

return None

Expand Down Expand Up @@ -288,8 +288,9 @@ def to_sql(
if self.flavor in bulk_flavors:
method = psql_insert_copy
else:
method = self.sys_config['method']
chunksize = chunksize if chunksize != -1 else self.sys_config['chunksize']
### Should default to 'multi'.
method = self.sys_config.get('create_engine', {}).get('method', None)
chunksize = chunksize if chunksize != -1 else self.sys_config.get('chunksize', None)

success, msg = False, "Default to_sql message"
start = time.time()
Expand Down
7 changes: 4 additions & 3 deletions meerschaum/connectors/sql/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ def pg_capital(s : str) -> str:
if '"' in s: return s
needs_quotes = False
for c in str(s):
if c.isupper():
needs_quotes = True
break
if ord(c) < ord('a') or ord(c) > ord('z'):
if not c.isdigit():
needs_quotes = True
break
if needs_quotes:
return '"' + s + '"'
return s
Expand Down
11 changes: 10 additions & 1 deletion meerschaum/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,17 @@ async def retry_connect(
dprint(f"Trying to connect to '{connector}'...")
dprint(f"Attempt ({retries + 1} / {max_retries})")
if connector.type == 'sql':
async def _connect(_connector):
_db = _connector.db
if _db is None:
_connect_attempt = _connector.val("SELECT 1 AS connect_attempt")
if _connect_attempt is None:
raise Exception("Failed to connect.")
else:
await _db.connect()
try:
await connector.db.connect()
# await connector.db.connect()
_connect(connector)
connected = True
except Exception as e:
print(e)
Expand Down
Loading

0 comments on commit 951af42

Please sign in to comment.