Skip to content

Commit

Permalink
Revert "Revert "KVS storage methods (#8249)"" (#8296)
Browse files Browse the repository at this point in the history
This reverts commit 406b501.
  • Loading branch information
johannkm committed Jun 13, 2022
1 parent b9ac2a3 commit 5db2182
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 0 deletions.
16 changes: 16 additions & 0 deletions python_modules/dagster/dagster/core/storage/runs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,19 @@ def update_backfill(self, partition_backfill: PartitionBackfill):

def alembic_version(self):
return None

# Key Value Storage
#
# Stores arbitrary key-value pairs. Currently used for the cursor for the auto-reexecution
# deamon. Bundled into run storage for convenience.

def supports_kvs(self):
return True

@abstractmethod
def kvs_get(self, keys: Set[str]) -> Dict[str, str]:
"""Retrieve the value for a given key in the current deployment."""

@abstractmethod
def kvs_set(self, pairs: Dict[str, str]) -> None:
"""Set the value for a given key in the current deployment."""
9 changes: 9 additions & 0 deletions python_modules/dagster/dagster/core/storage/runs/in_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,12 @@ def add_backfill(self, partition_backfill: PartitionBackfill):
def update_backfill(self, partition_backfill: PartitionBackfill):
check.inst_param(partition_backfill, "partition_backfill", PartitionBackfill)
self._bulk_actions[partition_backfill.backfill_id] = partition_backfill

def supports_kvs(self):
return False

def kvs_get(self, keys: Set[str]) -> Dict[str, str]:
raise NotImplementedError()

def kvs_set(self, pairs: Dict[str, str]) -> None:
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
BulkActionsTable,
DaemonHeartbeatsTable,
InstanceInfo,
KeyValueStoreTable,
RunTagsTable,
RunsTable,
SecondaryIndexMigrationTable,
Expand Down Expand Up @@ -1067,6 +1068,32 @@ def update_backfill(self, partition_backfill: PartitionBackfill):
)
)

def supports_kvs(self):
return True

def kvs_get(self, keys: Set[str]) -> Dict[str, str]:
check.set_param(keys, "keys", of_type=str)

with self.connect() as conn:
rows = conn.execute(
db.select(KeyValueStoreTable.columns).where(KeyValueStoreTable.c.key.in_(keys)),
)
return {row.key: row.value for row in rows}

def kvs_set(self, pairs: Dict[str, str]) -> None:
check.dict_param(pairs, "pairs", key_type=str, value_type=str)
db_values = [{"key": k, "value": v} for k, v in pairs.items()]

with self.connect() as conn:
try:
conn.execute(KeyValueStoreTable.insert().values(db_values))
except db.exc.IntegrityError:
conn.execute(
KeyValueStoreTable.update() # pylint: disable=no-value-for-parameter
.where(KeyValueStoreTable.c.key.in_(pairs.keys()))
.values(value=db.sql.case(pairs, value=KeyValueStoreTable.c.key))
)


GET_PIPELINE_SNAPSHOT_QUERY_ID = "get-pipeline-snapshot"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1480,3 +1480,23 @@ def my_job():
record = records[0]
assert record.start_time == freeze_datetime.timestamp()
assert record.end_time == freeze_datetime.timestamp()

def test_kvs(self, storage):
if not storage.supports_kvs():
pytest.skip("storage cannot kvs")

storage.kvs_set({"key": "value"})
assert storage.kvs_get({"key"}) == {"key": "value"}

storage.kvs_set({"key": "new-value"})
assert storage.kvs_get({"key"}) == {"key": "new-value"}

storage.kvs_set({"foo": "foo", "bar": "bar"})
assert storage.kvs_get({"foo", "bar", "key"}) == {
"key": "new-value",
"foo": "foo",
"bar": "bar",
}

storage.kvs_set({"foo": "1", "bar": "2", "key": "3"})
assert storage.kvs_get({"foo", "bar", "key"}) == {"foo": "1", "bar": "2", "key": "3"}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict

import sqlalchemy as db

import dagster._check as check
Expand All @@ -8,6 +10,7 @@
RunStorageSqlMetadata,
SqlRunStorage,
)
from dagster.core.storage.runs.schema import KeyValueStoreTable
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
Expand Down Expand Up @@ -149,6 +152,18 @@ def add_daemon_heartbeat(self, daemon_heartbeat):
)
)

def kvs_set(self, pairs: Dict[str, str]) -> None:
check.dict_param(pairs, "pairs", key_type=str, value_type=str)
db_values = [{"key": k, "value": v} for k, v in pairs.items()]

with self.connect() as conn:
insert_stmt = db.dialects.mysql.insert(KeyValueStoreTable).values(db_values)
conn.execute(
insert_stmt.on_duplicate_key_update(
value=insert_stmt.inserted.value,
)
)

def alembic_version(self):
alembic_config = mysql_alembic_config(__file__)
with self.connect() as conn:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Dict

import sqlalchemy as db

import dagster._check as check
Expand All @@ -8,6 +10,7 @@
RunStorageSqlMetadata,
SqlRunStorage,
)
from dagster.core.storage.runs.schema import KeyValueStoreTable
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
Expand Down Expand Up @@ -159,6 +162,23 @@ def add_daemon_heartbeat(self, daemon_heartbeat):
)
)

def kvs_set(self, pairs: Dict[str, str]) -> None:
check.dict_param(pairs, "pairs", key_type=str, value_type=str)

# pg speciic on_conflict_do_update
insert_stmt = db.dialects.postgresql.insert(KeyValueStoreTable).values(
[{"key": k, "value": v} for k, v in pairs.items()]
)
upsert_stmt = insert_stmt.on_conflict_do_update(
index_elements=[
KeyValueStoreTable.c.key,
],
set_={"value": insert_stmt.excluded.value},
)

with self.connect() as conn:
conn.execute(upsert_stmt)

def alembic_version(self):
alembic_config = pg_alembic_config(__file__)
with self.connect() as conn:
Expand Down

0 comments on commit 5db2182

Please sign in to comment.