Metadata migration Closes #789 #1277

Merged
merged 1 commit into from Sep 9, 2016
Jump to file or symbol
Failed to load files and symbols.
+1,495 −45
Split
View
@@ -83,4 +83,43 @@ The following workflow options have changed names:
## Database Migrations
-... to be determined ...
+Cromwell's database structure has changed significantly between 0.19 and 0.21.
+All pre-existing data has to be transformed/moved to new tables in order to be usable.
+The migration process can be split into 2 steps:
+
+### Restart Migration
+
+Cromwell 0.21 will migrate workflows that were in **Running** state to the new database structure in order to attempt to resume them once the server restarts.
+This will ensure that even if cromwell is stopped while workflows are still running they aren't lost.
+No particular action/configuration is required for this step.
+
+### Metadata Migration (MySQL Only)
+
+In order to keep metadata from previous (and current) workflow runs, all the data has to be moved to a new centralized table.
+Depending on the number and shape of the workflows in your database **this step can be significantly time and space consuming**.
+It is not possible to give an accurate estimation due to the multiple variables in play like number of workflows, complexity (number of tasks, scatters, attempts per task, etc...), hardware performance (of the database, of the machine running cromwell), backends used, etc...
+
+However, a good rule of thumb is to make sure that **your database has enough disk space to grow a factor 10**.
+This is due to the fact that data is de-normalized during the migration. In particular all inputs and outputs, which means the more complex / large your outputs are (large arrays, etc...), the more your database will grow.
+Also be aware that **the migration can take several hours for substantially large databases.**
+
+#### Important Notes
+
+* For better performance, make sure the flag `rewriteBatchedStatements` is set to `true`. This can be done by adding to your database connection url.
+
+e.g:
+
+ jdbc:mysql:http://localhost:3006/cromwell_db?rewriteBatchedStatements=true
+
+See the [mysql doc](https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html) for more information.
+
+* Because of the nature of wdl inputs and outputs, as well as the way they were stored up until cromwell 0.19, it is necessary to pull them out of the database, process them one by one in cromwell, and re-insert them.
+For that reason, if your database contains workflows with very large inputs or outputs (for example `Array`s of several thousands of elements, large matrix, etc...),
+you might want to tune the `migration-read-batch-size` and `migration-write-batch-size` configuration fields (see the `database` section in `application.conf`)
+
+ * `migration-read-batch-size` sets the number of rows that should be retrieved at the same time from the database. Once every row is processed, the next set is retrieved until there are no more.
+If your workflows/tasks have very large inputs or outputs, a number too large here could cause out of memory errors, or extended waiting times to pull the data from the database.
+On the other hand, a number too small could decrease performance by causing more round-trips to the database with the associated overhead.
+
+ * `migration-write-batch-size` sets the number of insert statements that are buffered before being committed in a transaction. This decreases the number of queries to the database, while making sure the batch never gets too big.
+You might consider decreasing this field if your workflows/tasks have very large `String`s as input or output (for example several MB of text).
View
@@ -9,13 +9,19 @@ lazy val gcsFileSystem = (project in file("filesystems/gcs"))
.settings(gcsFileSystemSettings:_*)
.withTestSettings
+lazy val databaseCore = (project in file("database/core"))
+ .settings(databaseCoreSettings:_*)
+ .withTestSettings
+
lazy val databaseSql = (project in file("database/sql"))
.settings(databaseSqlSettings:_*)
+ .dependsOn(databaseCore)
.withTestSettings
lazy val databaseMigration = (project in file("database/migration"))
.settings(databaseMigrationSettings: _*)
.dependsOn(core)
+ .dependsOn(databaseCore)
.withTestSettings
lazy val services = (project in file("services"))
@@ -82,6 +88,7 @@ lazy val root = (project in file("."))
// Full list of all sub-projects to build with the root (ex: include in `sbt test`)
.aggregate(core)
.aggregate(gcsFileSystem)
+ .aggregate(databaseCore)
.aggregate(databaseSql)
.aggregate(databaseMigration)
.aggregate(services)
@@ -377,6 +377,11 @@ database {
password = "pass"
connectionTimeout = 5000
}
+
+ migration {
+ read-batch-size = 100000
+ write-batch-size = 100000
+ }
}
}
}
@@ -1,6 +1,6 @@
package cromwell.core.simpleton
-import wdl4s.values.{WdlArray, WdlMap, WdlObjectLike, WdlPrimitive, WdlValue}
+import wdl4s.values._
case class WdlValueSimpleton(simpletonKey: String, simpletonValue: WdlPrimitive)
@@ -29,6 +29,7 @@ object WdlValueSimpleton {
case WdlArray(_, arrayValue) => arrayValue.zipWithIndex flatMap { case (arrayItem, index) => arrayItem.simplify(s"$name[$index]") }
case WdlMap(_, mapValue) => mapValue flatMap { case (key, value) => value.simplify(s"$name:${key.valueString.escapeMeta}") }
case wdlObject: WdlObjectLike => wdlObject.value flatMap { case (key, value) => value.simplify(s"$name:${key.escapeMeta}") }
+ case other => throw new Exception(s"Cannot simplify wdl value $other of type ${other.wdlType}")
}
}
@@ -0,0 +1,13 @@
+package cromwell.database.core
+
+import com.typesafe.config.ConfigFactory
+import lenthall.config.ScalaConfig._
+
+object SqlConfiguration {
+ lazy val rootConfig = ConfigFactory.load()
+ private lazy val rootDatabaseConfig = rootConfig.getConfig("database")
+ private lazy val databaseConfigName = rootDatabaseConfig.getStringOption("config")
+ lazy val defaultDatabaseConfig = databaseConfigName.map(getDatabaseConfig).getOrElse(rootDatabaseConfig)
+
+ def getDatabaseConfig(path: String) = rootDatabaseConfig.getConfig(path)
+}
@@ -44,6 +44,8 @@
<include file="changesets/job_store_simpletons.xml" relativeToChangelogFile="true" />
<include file="changesets/restart_and_recover_migration.xml" relativeToChangelogFile="true"/>
<include file="changesets/summary_status_table.xml" relativeToChangelogFile="true"/>
+ <include file="changesets/standardize_column_names.xml" relativeToChangelogFile="true" />
+ <include file="changesets/metadata_migration.xml" relativeToChangelogFile="true" />
</databaseChangeLog>
<!--
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">
+
+ <changeSet dbms="mysql" author="tjeandet" id="setup_metadata_migration">
+ <!-- Temporary table containing all collectors IDs -->
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/CreateAndLoadTmpExecutionMigration.sql" />
+ </changeSet>
+
+ <!--Symbol Table-->
+ <!--Input Symbol Table-->
+ <changeSet dbms="mysql" id="create_tmp_symbol_for_inputs" author="tjeandet">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/CreateTmpSymbolTable.sql" />
+ </changeSet>
+ <changeSet dbms="mysql" id="load_tmp_symbol_for_inputs" author="tjeandet">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/LoadInputSymbols.sql" />
+ </changeSet>
+ <changeSet dbms="mysql" author="tjeandet" id="input_symbol_table_migration">
+ <customChange class="cromwell.database.migration.metadata.table.symbol.InputSymbolTableMigration" />
+ <dropTable tableName="TMP_SYMBOL" />
+ </changeSet>
+
+ <!--Call Output Symbol Table-->
+ <changeSet dbms="mysql" id="create_tmp_symbol_for_call_outputs" author="tjeandet">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/CreateTmpSymbolTable.sql" />
+ </changeSet>
+ <changeSet dbms="mysql" id="load_tmp_symbol_for_call_outputs" author="tjeandet">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/LoadCallOutputSymbols.sql" />
+ </changeSet>
+ <changeSet dbms="mysql" author="tjeandet" id="call_output_symbol_table_migration">
+ <customChange class="cromwell.database.migration.metadata.table.symbol.CallOutputSymbolTableMigration" />
+ <dropTable tableName="TMP_SYMBOL" />
+ </changeSet>
+
+ <!--Workflow Output Symbol Table-->
+ <changeSet dbms="mysql" id="create_tmp_symbol_for_workflow_outputs" author="tjeandet">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/CreateTmpSymbolTable.sql" />
+ </changeSet>
+ <changeSet dbms="mysql" id="load_tmp_symbol_for_workflow_outputs" author="tjeandet">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/LoadWorkflowOutputSymbols.sql" />
+ </changeSet>
+ <changeSet dbms="mysql" author="tjeandet" id="workflow_output_symbol_table_migration">
+ <customChange class="cromwell.database.migration.metadata.table.symbol.WorkflowOutputSymbolTableMigration" />
+ <dropTable tableName="TMP_SYMBOL" />
+ </changeSet>
+
+ <!--Workflow Execution Table-->
+ <changeSet dbms="mysql" author="tjeandet" id="workflow_execution_table_migration">
+ <customChange class="cromwell.database.migration.metadata.table.workflowexecution.WorkflowExecutionTableMigration" />
+ </changeSet>
+
+ <!--Workflow Execution Aux Table-->
+ <changeSet dbms="mysql" author="tjeandet" id="workflow_execution_aux_table_migration">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/WorkflowExecutionAuxTableMigration.sql" />
+ </changeSet>
+
+ <!--Execution Table-->
+ <changeSet dbms="mysql" author="tjeandet" id="execution_table_migration">
+ <customChange class="cromwell.database.migration.metadata.table.ExecutionTableMigration" />
+ </changeSet>
+
+ <!--Execution Event Table-->
+ <changeSet dbms="mysql" author="tjeandet" id="execution_event_table_migration_start">
+ <customChange class="cromwell.database.migration.metadata.table.executionevent.ExecutionEventTableStartMigration" />
+ </changeSet>
+ <changeSet dbms="mysql" author="tjeandet" id="execution_event_table_migration_end">
+ <customChange class="cromwell.database.migration.metadata.table.executionevent.ExecutionEventTableEndMigration" />
+ </changeSet>
+ <changeSet dbms="mysql" author="tjeandet" id="execution_event_table_migration_description">
+ <customChange class="cromwell.database.migration.metadata.table.executionevent.ExecutionEventTableDescriptionMigration" />
+ </changeSet>
+
+ <!--Execution Info Table-->
+ <changeSet dbms="mysql" author="tjeandet" id="execution_info_table_migration">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/ExecutionInfoTableMigration.sql" />
+ </changeSet>
+
+ <!--Failure Event Table-->
+ <changeSet dbms="mysql" author="tjeandet" id="failure_event_table_migration">
+ <customChange class="cromwell.database.migration.metadata.table.FailureEventTableMigration" />
+ </changeSet>
+
+ <!--Runtime Attributes Table-->
+ <changeSet dbms="mysql" author="tjeandet" id="runtime_attributes_table_migration">
+ <sqlFile relativeToChangelogFile="true" path="migration/metadata/RuntimeAttributesTableMigration.sql" />
+ </changeSet>
+
+ <changeSet dbms="mysql" author="tjeandet" id="cleanup_metadata_migration">
+ <dropTable tableName="TMP_EXECUTION_MIGRATION" />
+ </changeSet>
+</databaseChangeLog>
@@ -0,0 +1,41 @@
+CREATE TABLE TMP_EXECUTION_MIGRATION LIKE EXECUTION;
+
+INSERT INTO TMP_EXECUTION_MIGRATION (
+ EXECUTION_ID,
+ WORKFLOW_EXECUTION_ID,
+ RESULTS_CLONED_FROM,
+ CALL_FQN,
+ STATUS,
+ IDX,
+ RC,
+ START_DT,
+ END_DT,
+ ALLOWS_RESULT_REUSE,
+ DOCKER_IMAGE_HASH,
+ EXECUTION_HASH,
+ ATTEMPT,
+ BACKEND_TYPE
+)
+ SELECT
+ EXECUTION_ID,
+ WORKFLOW_EXECUTION_ID,
+ RESULTS_CLONED_FROM,
+ CALL_FQN,
+ STATUS,
+ IDX,
+ RC,
+ START_DT,
+ END_DT,
+ ALLOWS_RESULT_REUSE,
+ DOCKER_IMAGE_HASH,
+ EXECUTION_HASH,
+ ATTEMPT,
+ BACKEND_TYPE
+ FROM EXECUTION e
+ WHERE e.CALL_FQN NOT LIKE '%$%' AND -- filter out scatters
+ NOT (e.IDX = -1 AND EXISTS ( -- filter out collectors
+ SELECT 1 FROM EXECUTION e2 WHERE
+ e2.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID AND
+ e2.CALL_FQN = e.CALL_FQN AND
+ e2.IDX != -1)
+ );
@@ -0,0 +1,11 @@
+CREATE TABLE TMP_SYMBOL
+(
+ TMP_SYMBOL_ID INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
+ WORKFLOW_EXECUTION_UUID VARCHAR(100) NOT NULL,
+ SYMBOL_NAME VARCHAR(100),
+ SYMBOL_SCOPE VARCHAR(255),
+ SYMBOL_INDEX INT,
+ SYMBOL_ATTEMPT INT,
+ WDL_VALUE LONGTEXT,
+ WDL_TYPE VARCHAR(100)
+);
@@ -0,0 +1,38 @@
+INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ CASE ei.INFO_KEY
+ WHEN '$log_stdout' THEN 'stdout'
+ WHEN '$log_stderr' THEN 'stderr'
+ WHEN 'JES_RUN_ID' THEN 'jobId'
+ WHEN 'JES_STATUS' THEN 'backendStatus'
+ WHEN 'SGE_JOB_NUMBER' THEN 'jobNumber'
+ WHEN 'LSF_JOB_NUMBER' THEN 'jobNumber'
+ ELSE
+ IF(ei.INFO_KEY LIKE '$log_%', -- backend log
+ CONCAT(
+ 'backendLogs:', -- prepend metadata prefix
+ SUBSTRING(ei.INFO_KEY, 6, LENGTH(ei.INFO_KEY) - 5) -- remove $log_ prefix from the key
+ ),
+ ei.INFO_KEY -- Just put the key otherwise
+ ) END,
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ ei.INFO_VALUE,
+ 'string',
+ NOW()
+ FROM EXECUTION_INFO ei
+ LEFT JOIN TMP_EXECUTION_MIGRATION e ON ei.EXECUTION_ID = e.EXECUTION_ID
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID
+ WHERE
+ ei.INFO_VALUE IS NOT NULL;
@@ -0,0 +1,34 @@
+INSERT INTO TMP_SYMBOL (
+ WORKFLOW_EXECUTION_UUID,
+ SYMBOL_NAME,
+ SYMBOL_SCOPE,
+ SYMBOL_INDEX,
+ SYMBOL_ATTEMPT,
+ WDL_VALUE,
+ WDL_TYPE
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ s.NAME,
+ e.CALL_FQN,
+ e.IDX,
+ e.ATTEMPT,
+ s.WDL_VALUE,
+ s.WDL_TYPE
+ FROM SYMBOL s
+ JOIN WORKFLOW_EXECUTION we ON
+ we.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ JOIN TMP_EXECUTION_MIGRATION e ON
+ e.CALL_FQN = s.SCOPE AND
+ e.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID AND
+ s.`INDEX` = e.IDX
+ WHERE
+ s.IO = 'OUTPUT' AND
+ NOT EXISTS ( -- filter out earlier attempts
+ SELECT 1
+ FROM TMP_EXECUTION_MIGRATION e3
+ WHERE
+ e3.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID AND
+ e3.CALL_FQN = e.CALL_FQN AND
+ e3.IDX = e.IDX AND
+ e3.ATTEMPT > e.ATTEMPT);
@@ -0,0 +1,26 @@
+INSERT INTO TMP_SYMBOL (
+ WORKFLOW_EXECUTION_UUID,
+ SYMBOL_NAME,
+ SYMBOL_SCOPE,
+ SYMBOL_INDEX,
+ SYMBOL_ATTEMPT,
+ WDL_VALUE,
+ WDL_TYPE
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ s.NAME,
+ s.SCOPE,
+ e.IDX,
+ e.ATTEMPT,
+ s.WDL_VALUE,
+ s.WDL_TYPE
+ FROM SYMBOL s
+ JOIN WORKFLOW_EXECUTION we ON
+ we.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ LEFT JOIN TMP_EXECUTION_MIGRATION e ON
+ e.CALL_FQN = s.SCOPE AND
+ e.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ -- Don't join on index here because inputs have index null (-1), but we want to duplicate them as many times as there are indices for a given execution
+ WHERE
+ s.IO = 'INPUT';
@@ -0,0 +1,19 @@
+INSERT INTO TMP_SYMBOL (
+ WORKFLOW_EXECUTION_UUID,
+ SYMBOL_NAME,
+ SYMBOL_SCOPE,
+ WDL_VALUE,
+ WDL_TYPE
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ s.NAME,
+ s.SCOPE,
+ s.WDL_VALUE,
+ s.WDL_TYPE
+ FROM SYMBOL s
+ JOIN WORKFLOW_EXECUTION we ON
+ we.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ WHERE
+ s.IO = 'OUTPUT' AND
+ s.REPORTABLE_RESULT = 1;
Oops, something went wrong.