Skip to content

Commit

Permalink
Add kvs table to oss (#8213)
Browse files Browse the repository at this point in the history
KVS storage for bringing automatic re-execution daemon to OSS. Will be a bit odd- we need to stick this in run storage in oss, while have it in cloud storage in cloud. Couple ways to handle that, I think we can just make the kvs methods on cloud run storage error
  • Loading branch information
johannkm committed Jun 9, 2022
1 parent d14ae4c commit e34d9ca
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""add kvs table
Revision ID: 5e139331e376
Revises: 6860f830e40c
Create Date: 2022-06-06 15:48:51.559562
"""
import sqlalchemy as db
from alembic import op

from dagster.core.storage.migration.utils import has_column, has_index, has_table

# revision identifiers, used by Alembic.
revision = "5e139331e376"
down_revision = "6860f830e40c"
branch_labels = None
depends_on = None


def upgrade():
if not has_table("kvs"):
op.create_table(
"kvs",
db.Column("key", db.Text, nullable=False),
db.Column("value", db.Text),
)

if not has_index("kvs", "idx_kvs_keys_unique"):
op.create_index(
"idx_kvs_keys_unique",
"kvs",
["key"],
unique=True,
mysql_length={"key": 64},
)


def downgrade():
if has_index("kvs", "idx_kvs_keys_unique"):
op.drop_index("idx_kvs_keys_unique")

if has_table("kvs"):
op.drop_table("kvs")
8 changes: 8 additions & 0 deletions python_modules/dagster/dagster/core/storage/runs/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@
db.Column("run_storage_id", db.Text),
)

KeyValueStoreTable = db.Table(
"kvs",
RunStorageSqlMetadata,
db.Column("key", db.Text, nullable=False),
db.Column("value", db.Text),
)

db.Index("idx_run_tags", RunTagsTable.c.key, RunTagsTable.c.value, mysql_length=64)
db.Index("idx_run_partitions", RunsTable.c.partition_set, RunsTable.c.partition, mysql_length=64)
db.Index("idx_bulk_actions", BulkActionsTable.c.key, mysql_length=32)
Expand All @@ -114,3 +121,4 @@
"create_timestamp": 8,
},
)
db.Index("idx_kvs_keys_unique", KeyValueStoreTable.c.key, unique=True, mysql_length=64)
Original file line number Diff line number Diff line change
Expand Up @@ -992,3 +992,24 @@ def test_add_bulk_actions_columns():
assert "idx_bulk_actions_selector_id" not in get_sqlite3_indexes(
db_path, "bulk_actions"
)


def test_add_kvs_table():
src_dir = file_relative_path(__file__, "snapshot_0_14_16_bulk_actions_columns/sqlite")

with copy_directory(src_dir) as test_dir:
db_path = os.path.join(test_dir, "history", "runs.db")

with DagsterInstance.from_ref(InstanceRef.from_dir(test_dir)) as instance:
assert not "kvs" in get_sqlite3_tables(db_path)
assert get_sqlite3_indexes(db_path, "kvs") == []

instance.upgrade()

assert "kvs" in get_sqlite3_tables(db_path)
assert get_sqlite3_indexes(db_path, "kvs") == ["idx_kvs_keys_unique"]

instance._run_storage._alembic_downgrade(rev="6860f830e40c")

assert not "kvs" in get_sqlite3_tables(db_path)
assert get_sqlite3_indexes(db_path, "kvs") == []
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def get_indexes(instance, table_name: str):
return set(c["name"] for c in inspect(instance.run_storage._engine).get_indexes(table_name))


def get_tables(instance):
return instance.run_storage._engine.table_names()


def _reconstruct_from_file(hostname, conn_string, path, _username="root", _password="test"):
engine = create_engine(conn_string)
engine.execute("drop schema test;")
Expand Down Expand Up @@ -208,3 +212,29 @@ def test_add_bulk_actions_columns(hostname, conn_string):
instance.upgrade()
assert new_columns <= get_columns(instance, "bulk_actions")
assert new_indexes <= get_indexes(instance, "bulk_actions")


def test_add_kvs_table(hostname, conn_string):

_reconstruct_from_file(
hostname,
conn_string,
# use an old snapshot
file_relative_path(__file__, "snapshot_0_14_6_post_schema_pre_data_migration.sql"),
)

with tempfile.TemporaryDirectory() as tempdir:
with open(
file_relative_path(__file__, "dagster.yaml"), "r", encoding="utf8"
) as template_fd:
with open(os.path.join(tempdir, "dagster.yaml"), "w", encoding="utf8") as target_fd:
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with DagsterInstance.from_config(tempdir) as instance:

assert "kvs" not in get_tables(instance)

instance.upgrade()
assert "kvs" in get_tables(instance)
assert "idx_kvs_keys_unique" in get_indexes(instance, "kvs")
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def get_indexes(instance, table_name: str):
return set(c["name"] for c in inspect(instance.run_storage._engine).get_indexes(table_name))


def get_tables(instance):
return instance.run_storage._engine.table_names()


def test_0_7_6_postgres_pre_add_pipeline_snapshot(hostname, conn_string):
_reconstruct_from_file(
hostname,
Expand Down Expand Up @@ -611,3 +615,32 @@ def test_add_bulk_actions_columns(hostname, conn_string):
instance.upgrade()
assert new_columns <= get_columns(instance, "bulk_actions")
assert new_indexes <= get_indexes(instance, "bulk_actions")


def test_add_kvs_table(hostname, conn_string):

_reconstruct_from_file(
hostname,
conn_string,
file_relative_path(
# use an old snapshot
__file__,
"snapshot_0_14_6_post_schema_pre_data_migration/postgres/pg_dump.txt",
),
)

with tempfile.TemporaryDirectory() as tempdir:

with open(
file_relative_path(__file__, "dagster.yaml"), "r", encoding="utf8"
) as template_fd:
with open(os.path.join(tempdir, "dagster.yaml"), "w", encoding="utf8") as target_fd:
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with DagsterInstance.from_config(tempdir) as instance:
assert "kvs" not in get_tables(instance)

instance.upgrade()
assert "kvs" in get_tables(instance)
assert "idx_kvs_keys_unique" in get_indexes(instance, "kvs")

0 comments on commit e34d9ca

Please sign in to comment.