Skip to content

Commit

Permalink
add dialect-specific update logic (#7572)
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Apr 27, 2022
1 parent 8ecaadf commit 7ed2dbd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pendulum
import sqlalchemy as db

from dagster import check
from dagster.core.storage.schedules import ScheduleStorageSqlMetadata, SqlScheduleStorage
from dagster.core.storage.schedules.schema import InstigatorsTable
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev
from dagster.serdes import ConfigurableClass, ConfigurableClassData
from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple

from ..utils import (
MYSQL_POOL_RECYCLE,
Expand Down Expand Up @@ -105,3 +107,22 @@ def connect(self, run_id=None): # pylint: disable=arguments-differ, unused-argu
def upgrade(self):
alembic_config = mysql_alembic_config(__file__)
run_alembic_upgrade(alembic_config, self._engine)

def _add_or_update_instigators_table(self, conn, state):
selector_id = state.selector_id
conn.execute(
db.dialects.mysql.insert(InstigatorsTable)
.values(
selector_id=selector_id,
repository_selector_id=state.repository_selector_id,
status=state.status.value,
instigator_type=state.instigator_type.value,
instigator_body=serialize_dagster_namedtuple(state),
)
.on_duplicate_key_update(
status=state.status.value,
instigator_type=state.instigator_type.value,
instigator_body=serialize_dagster_namedtuple(state),
update_timestamp=pendulum.now("UTC"),
)
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pendulum
import sqlalchemy as db

from dagster import check
from dagster.core.storage.schedules import ScheduleStorageSqlMetadata, SqlScheduleStorage
from dagster.core.storage.schedules.schema import InstigatorsTable
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev
from dagster.serdes import ConfigurableClass, ConfigurableClassData
from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple

from ..utils import (
create_pg_connection,
Expand Down Expand Up @@ -110,3 +112,25 @@ def upgrade(self):
alembic_config = pg_alembic_config(__file__)
with self.connect() as conn:
run_alembic_upgrade(alembic_config, conn)

def _add_or_update_instigators_table(self, conn, state):
selector_id = state.selector_id
conn.execute(
db.dialects.postgresql.insert(InstigatorsTable)
.values( # pylint: disable=no-value-for-parameter
selector_id=selector_id,
repository_selector_id=state.repository_selector_id,
status=state.status.value,
instigator_type=state.instigator_type.value,
instigator_body=serialize_dagster_namedtuple(state),
)
.on_conflict_do_update(
index_elements=[InstigatorsTable.c.selector_id],
set_={
"status": state.status.value,
"instigator_type": state.instigator_type.value,
"instigator_body": serialize_dagster_namedtuple(state),
"update_timestamp": pendulum.now("UTC"),
},
)
)

0 comments on commit 7ed2dbd

Please sign in to comment.