Skip to content

Commit

Permalink
stop using python semver parsing for mysql versions
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 27, 2023
1 parent 3e4befb commit 8b66169
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
stamp_alembic_rev,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -195,7 +195,7 @@ def end_watch(self, run_id, handler):

@property
def supports_intersect(self):
return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(MINIMUM_MYSQL_INTERSECT_VERSION)

@property
def event_watcher(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from dagster._utils import utc_datetime_from_timestamp
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -154,11 +154,11 @@ def supports_bucket_queries(self):
if not self._mysql_version:
return False

return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BUCKET_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(MINIMUM_MYSQL_BUCKET_VERSION)

@property
def supports_intersect(self):
return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_INTERSECT_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(MINIMUM_MYSQL_INTERSECT_VERSION)

def add_daemon_heartbeat(self, daemon_heartbeat):
with self.connect() as conn:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
stamp_alembic_rev,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from packaging.version import parse

from ..utils import (
create_mysql_connection,
mysql_alembic_config,
mysql_url_from_config,
parse_mysql_version,
retry_mysql_connection_fn,
retry_mysql_creation_fn,
)
Expand Down Expand Up @@ -117,7 +117,7 @@ def supports_batch_queries(self):
if not self._mysql_version:
return False

return parse(self._mysql_version) >= parse(MINIMUM_MYSQL_BATCH_VERSION)
return parse_mysql_version(self._mysql_version) >= parse_mysql_version(MINIMUM_MYSQL_BATCH_VERSION)

def get_server_version(self):
rows = self.execute("select version()")
Expand Down
19 changes: 19 additions & 0 deletions python_modules/libraries/dagster-mysql/dagster_mysql/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
import time
from contextlib import contextmanager
from urllib.parse import (
Expand Down Expand Up @@ -45,6 +46,24 @@ def get_conn_string(username, password, hostname, db_name, port="3306"):
port=port,
)

def parse_mysql_version(version: str) -> tuple[int]:
"""Parse MySQL version into a tuple of ints.
Args:
version (str): MySQL version string.
Returns:
tuple: Tuple of ints representing the MySQL version.
"""
parsed = []
for part in re.split(r'\D+', version):
if len(part) == 0:
continue
try:
parsed.append(int(part))
except ValueError:
continue
return tuple(parsed)

def retry_mysql_creation_fn(fn, retry_limit=5, retry_wait=0.2):
# Retry logic to recover from the case where two processes are creating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,21 @@ def test_load_from_config(self, conn_string):
from_url_instance._run_storage.mysql_url
== from_env_instance._run_storage.mysql_url
)


def test_mysql_version(conn_string):
class FakeNonBucketing(MySQLRunStorage):
def get_server_version(self):
# override the server version to make sure the parsing works
return "5.7.38-log"

storage = FakeNonBucketing(conn_string)
assert not storage.supports_bucket_queries

class FakeBucketing(MySQLRunStorage):
def get_server_version(self):
# override the server version to make sure the parsing works
return "8.0.31-google"

storage = FakeBucketing(conn_string)
assert storage.supports_bucket_queries

0 comments on commit 8b66169

Please sign in to comment.