Skip to content

Commit

Permalink
Preserve updated content in migration 145 downgrade
Browse files Browse the repository at this point in the history
Also add UNIQUE constraint to `("workflow_step_id", "name")` to the
new `workflow_step_input` table.
  • Loading branch information
nsoranzo committed Nov 16, 2018
1 parent 00af4e2 commit f2c17e5
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 53 deletions.
6 changes: 4 additions & 2 deletions lib/galaxy/model/mapping.py
Expand Up @@ -920,7 +920,9 @@
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean, default=False),
Column("runtime_value", Boolean, default=False))
Column("runtime_value", Boolean, default=False),
UniqueConstraint("workflow_step_id", "name"),
)


model.WorkflowRequestStepState.table = Table(
Expand Down Expand Up @@ -2236,7 +2238,7 @@ def simple_mapping(model, **kwds):
workflow_step=relation(model.WorkflowStep,
backref=backref("inputs", uselist=True),
cascade="all",
primaryjoin=(model.WorkflowStep.table.c.id == model.WorkflowStepInput.table.c.workflow_step_id))
primaryjoin=(model.WorkflowStepInput.table.c.workflow_step_id == model.WorkflowStep.table.c.id))
))

mapper(model.WorkflowOutput, model.WorkflowOutput.table, properties=dict(
Expand Down
120 changes: 69 additions & 51 deletions lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py
Expand Up @@ -5,100 +5,118 @@

import logging

from sqlalchemy import Boolean, Column, ForeignKey, Integer, MetaData, Table, TEXT
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
Integer,
MetaData,
Table,
TEXT,
UniqueConstraint
)

from galaxy.model.custom_types import JSONType

log = logging.getLogger(__name__)
metadata = MetaData()


def get_new_tables():

WorkflowStepInput_table = Table(
"workflow_step_input", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("name", TEXT),
Column("merge_type", TEXT),
Column("scatter_type", TEXT),
Column("value_from", JSONType),
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean, default=False),
Column("runtime_value", Boolean, default=False),
)

WorkflowStepConnection_table = Table(
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True),
Column("output_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)

return [
WorkflowStepInput_table, WorkflowStepConnection_table
]
WorkflowStepInput_table = Table(
"workflow_step_input", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("name", TEXT),
Column("merge_type", TEXT),
Column("scatter_type", TEXT),
Column("value_from", JSONType),
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean, default=False),
Column("runtime_value", Boolean, default=False),
UniqueConstraint("workflow_step_id", "name"),
)


def upgrade(migrate_engine):
metadata.bind = migrate_engine
print(__doc__)
metadata.reflect()

LegacyWorkflowStepConnection_table = Table("workflow_step_connection", metadata, autoload=True)
for index in LegacyWorkflowStepConnection_table.indexes:
OldWorkflowStepConnection_table = Table("workflow_step_connection", metadata, autoload=True)
for index in OldWorkflowStepConnection_table.indexes:
index.drop()
LegacyWorkflowStepConnection_table.rename("workflow_step_connection_premigrate145")
OldWorkflowStepConnection_table.rename("workflow_step_connection_preupgrade145")
# Try to deregister that table to work around some caching problems it seems.
LegacyWorkflowStepConnection_table.deregister()
OldWorkflowStepConnection_table.deregister()
metadata._remove_table("workflow_step_connection", metadata.schema)

metadata.reflect()
tables = get_new_tables()
for table in tables:
__create(table)

NewWorkflowStepConnection_table = Table(
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True),
Column("output_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)
for table in (WorkflowStepInput_table, NewWorkflowStepConnection_table):
_create(table)

insert_step_inputs_cmd = \
"INSERT INTO workflow_step_input (workflow_step_id, name) " + \
"SELECT input_step_id, input_name FROM workflow_step_connection_premigrate145"

"SELECT input_step_id, input_name FROM workflow_step_connection_preupgrade145"
migrate_engine.execute(insert_step_inputs_cmd)

# TODO: verify order here.
insert_step_connections_cmd = \
"INSERT INTO workflow_step_connection (output_step_id, input_step_input_id, output_name, input_subworkflow_step_id) " + \
"SELECT wsc.output_step_id, wsi.id, wsc.output_name, wsc.input_subworkflow_step_id " + \
"FROM workflow_step_connection_premigrate145 AS wsc LEFT OUTER JOIN workflow_step_input AS wsi ON wsc.input_step_id = wsi.workflow_step_id AND wsc.input_name = wsi.name ORDER BY wsc.id"

"FROM workflow_step_connection_preupgrade145 AS wsc JOIN workflow_step_input AS wsi ON wsc.input_step_id = wsi.workflow_step_id AND wsc.input_name = wsi.name ORDER BY wsc.id"
migrate_engine.execute(insert_step_connections_cmd)
_drop(OldWorkflowStepConnection_table)


def downgrade(migrate_engine):
metadata.bind = migrate_engine

tables = get_new_tables()
for table in tables:
__drop(table)

NewWorkflowStepConnection_table = Table("workflow_step_connection", metadata, autoload=True)
for index in NewWorkflowStepConnection_table.indexes:
index.drop()
NewWorkflowStepConnection_table.rename("workflow_step_connection_predowngrade145")
# Try to deregister that table to work around some caching problems it seems.
NewWorkflowStepConnection_table.deregister()
metadata._remove_table("workflow_step_connection", metadata.schema)
metadata.reflect()

# Drop new workflow invocation step and job association table and restore legacy data.
LegacyWorkflowStepConnection_table = Table("workflow_step_connection_premigrate145", metadata, autoload=True)
LegacyWorkflowStepConnection_table.rename("workflow_step_connection")
OldWorkflowStepConnection_table = Table(
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("output_name", TEXT),
Column("input_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)
_create(OldWorkflowStepConnection_table)

insert_step_connections_cmd = \
"INSERT INTO workflow_step_connection (output_step_id, input_step_id, output_name, input_name, input_subworkflow_step_id) " + \
"SELECT wsc.output_step_id, wsi.workflow_step_id, wsc.output_name, wsi.name, wsc.input_subworkflow_step_id " + \
"FROM workflow_step_connection_predowngrade145 AS wsc JOIN workflow_step_input AS wsi ON wsc.input_step_input_id = wsi.id ORDER BY wsc.id"
migrate_engine.execute(insert_step_connections_cmd)

for table in (WorkflowStepInput_table, NewWorkflowStepConnection_table):
_drop(table)


def __create(table):
def _create(table):
try:
table.create()
except Exception:
log.exception("Creating %s table failed.", table.name)


def __drop(table):
def _drop(table):
try:
table.drop()
except Exception:
Expand Down

0 comments on commit f2c17e5

Please sign in to comment.