Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of empty typed columns in Alter Table MySQL query and sanitize data-dictionary field names #3844

Merged
merged 17 commits into from
Nov 1, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ public function generateTableSpec(array $columns): array {
foreach ($columns as $column) {
// Sanitize the supplied table header to generate a unique column name;
// null-coalesce potentially NULL column names to empty strings.
$name = $this->sanitizeHeader($column ?? '');
$name = self::sanitizeHeader($column ?? '');

// Truncate the generated table column name, if necessary, to fit the max
// column length.
$name = $this->truncateHeader($name);
$name = self::truncateHeader($name);

// Generate unique numeric suffix for the header if a header already
// exists with the same name.
Expand All @@ -244,7 +244,7 @@ public function generateTableSpec(array $columns): array {

$spec[$name] = [
'type' => "text",
'description' => $this->sanitizeDescription($column ?? ''),
'description' => self::sanitizeDescription($column ?? ''),
];
}

Expand All @@ -260,7 +260,7 @@ public function generateTableSpec(array $columns): array {
* @return string
* Column name on single line.
*/
public function sanitizeDescription(string $column) {
public static function sanitizeDescription(string $column) {
$trimmed = array_filter(array_map('trim', explode("\n", $column)));
return implode(" ", $trimmed);
}
Expand All @@ -274,7 +274,7 @@ public function sanitizeDescription(string $column) {
* @returns string
* Sanitized column name.
*/
protected function sanitizeHeader(string $column): string {
public static function sanitizeHeader(string $column): string {
// Replace all spaces and newline characters with underscores since they are
// not supported.
$column = preg_replace('/(?: |\r\n|\r|\n)/', '_', $column);
Expand Down Expand Up @@ -304,7 +304,7 @@ protected function sanitizeHeader(string $column): string {
* @returns string
* Truncated column name.
*/
protected function truncateHeader(string $column): string {
protected static function truncateHeader(string $column): string {
// If the supplied table column name is longer than the max column length,
// truncate the column name to 5 characters under the max length and
// substitute the truncated characters with a unique hash.
Expand Down
220 changes: 181 additions & 39 deletions modules/datastore/src/DataDictionary/AlterTableQuery/MySQLQuery.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Drupal\Core\Database\StatementInterface;

use Drupal\datastore_mysql_import\Service\MysqlImport;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module is not enabled by default.

Copy link
Member

@janette janette Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the sanitizing and truncating to common/src/Storage/AbstractDatabaseTable.php datastore/src/Storage/DatabaseTable.php

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OR maybe the sanitizing/truncating of things should be in Drupal\datastore\Plugin\QueueWorker\ImportJob.php

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved sanitizing/truncating methods to ImportJob.

use Drupal\datastore\DataDictionary\AlterTableQueryBase;
use Drupal\datastore\DataDictionary\AlterTableQueryInterface;
use Drupal\datastore\DataDictionary\IncompatibleTypeException;
Expand All @@ -13,6 +14,17 @@
*/
class MySQLQuery extends AlterTableQueryBase implements AlterTableQueryInterface {

/**
* Date time specific types.
*
* @var string[]
*/
protected const DATE_TIME_TYPES = [
'DATE',
'TIME',
'DATETIME',
];

/**
* Default type to use for fields if no data-dictionary type is specified.
*
Expand Down Expand Up @@ -60,7 +72,14 @@ class MySQLQuery extends AlterTableQueryBase implements AlterTableQueryInterface
* {@inheritdoc}
*/
public function doExecute(): void {
// Sanitize field names to match database field names.
$this->fields = $this->sanitizeFields($this->fields);
// Filter out fields which are not present in the database table.
$this->fields = $this->mergeFields($this->fields, $this->table);

// Sanitize index field names to match database field names.
$this->indexes = $this->sanitizeIndexes($this->indexes);
// Filter out indexes with fields which are not present in the table.
$this->indexes = $this->mergeIndexes($this->indexes, $this->table);

// Build and execute SQL commands to prepare for table alter.
Expand All @@ -72,6 +91,54 @@ public function doExecute(): void {
$command->execute();
}

/**
* Sanitize field names.
*
* @param array $fields
* Query fields.
*
* @return array
* Query fields list with sanitized field names.
*/
protected function sanitizeFields(array $fields): array {
// Iterate through field list...
foreach (array_keys($fields) as $key) {
// Create reference to index field name.
$field_name = &$fields[$key]['name'];
// Sanitize field name.
$field_name = MysqlImport::sanitizeHeader($field_name);
}

return $fields;
}

/**
* Sanitize index field names.
*
* @param array $indexes
* Query indexes.
*
* @return array
* Query indexes list with sanitized index field names.
*/
protected function sanitizeIndexes(array $indexes): array {
// Iterate through index list...
foreach (array_keys($indexes) as $index_key) {
// Create reference to index field list.
$index_fields = &$indexes[$index_key]['fields'];

// Iterate through index field list...
foreach (array_keys($index_fields) as $field_key) {
// Create reference to index field name.
$field_name = &$index_fields[$field_key]['name'];
// Sanitize field name.
$field_name = MysqlImport::sanitizeHeader($field_name);
}
}

return $indexes;
}

/**
* Remove query fields not found in the given table and copy over names.
*
Expand Down Expand Up @@ -137,7 +204,7 @@ protected function getTableColsAndComments(string $table): array {
}

/**
* Get MySQL equivalent of the given Frictionless "Table Schema" type.
* Build full MySQL equivalent of the given Frictionless "Table Schema" type.
*
* @param string $frictionless_type
* Frictionless "Table Schema" data type.
Expand All @@ -147,59 +214,76 @@ protected function getTableColsAndComments(string $table): array {
* MySQL table to get type for.
*
* @return string
* MySQL data type.
* Full MySQL data type.
*/
protected function getFieldType(string $frictionless_type, string $column, string $table): string {
// Determine MySQL base type.
$base_mysql_type = $this->getBaseType($frictionless_type);
// Build the MySQL type argument list.
$args = $this->buildTypeArgs($frictionless_type, $column, $table);
$args = $this->buildTypeArgs($base_mysql_type, $column, $table);
$args_str = !empty($args) ? '(' . implode(', ', $args) . ')' : '';

// Build full MySQL type.
return $base_mysql_type . $args_str;
}

/**
* Get base MySQL equivalent of the given Frictionless "Table Schema" type.
*
* @param string $frictionless_type
* Frictionless "Table Schema" data type.
*
* @return string
* Base MySQL data type.
*/
protected function getBaseType(string $frictionless_type): string {
return ([
'string' => (fn () => 'TEXT'),
'number' => (fn ($args) => "DECIMAL({$args['size']}, {$args['decimal']})"),
'integer' => (fn () => 'INT'),
'date' => (fn () => 'DATE'),
'time' => (fn () => 'TIME'),
'datetime' => (fn () => 'DATETIME'),
'year' => (fn () => 'YEAR'),
'yearmonth' => (fn () => 'TINYTEXT'),
'boolean' => (fn () => 'BOOL'),
'object' => (fn () => 'TEXT'),
'geopoint' => (fn () => 'TEXT'),
'geojson' => (fn () => 'TEXT'),
'array' => (fn () => 'TEXT'),
'duration' => (fn () => 'TINYTEXT'),
])[$frictionless_type]($args);
'string' => 'TEXT',
'number' => 'DECIMAL',
'integer' => 'INT',
'date' => 'DATE',
'time' => 'TIME',
'datetime' => 'DATETIME',
'year' => 'YEAR',
'yearmonth' => 'TINYTEXT',
'boolean' => 'BOOL',
'object' => 'TEXT',
'geopoint' => 'TEXT',
'geojson' => 'TEXT',
'array' => 'TEXT',
'duration' => 'TINYTEXT',
])[$frictionless_type];
}

/**
* Build MySQL type arg list for the given Frictionless "Table Schema" type.
* Build MySQL type arg list for MySQL type.
*
* @param string $type
* Frictionless "Table Schema" data type.
* MySQL data type.
* @param string $column
* Column name.
* @param string $table
* Table name.
*
* @return array
* MySQL type arguments.
*
* @throws Drupal\datastore\DataDictionary\IncompatibleTypeException
* When incompatible data is found in the table for the specified type.
*/
protected function buildTypeArgs(string $type, string $column, string $table): array {
$args = [];

// If this field is a number field, build decimal and size arguments for
// MySQL type.
if ($type === 'number') {
// If this field is a DECIMAL field, build decimal and size arguments.
if ($type === 'DECIMAL') {
$non_decimals = $this->connection->query("SELECT MAX(LENGTH(TRIM(LEADING '-' FROM SUBSTRING_INDEX({$column}, '.', 1)))) FROM {{$table}};")->fetchField();
$args['decimal'] = $this->connection->query("SELECT MAX(LENGTH(SUBSTRING_INDEX({$column}, '.', -1))) FROM {{$table}};")->fetchField();
$args['size'] = $non_decimals + $args['decimal'];
if ($args['size'] > self::DECIMAL_MAX_SIZE || $args['decimal'] > self::DECIMAL_MAX_DECIMAL) {
$decimal = $this->connection->query("SELECT MAX(LENGTH(SUBSTRING_INDEX({$column}, '.', -1))) FROM {{$table}};")->fetchField();
$size = $non_decimals + $decimal;
if ($size > self::DECIMAL_MAX_SIZE || $decimal > self::DECIMAL_MAX_DECIMAL) {
throw new IncompatibleTypeException("Decimal values found in column too large for DECIMAL type; please use type 'string' for column '{$column}'");
}
return [$size, $decimal];
}

return $args;
return [];
}

/**
Expand All @@ -218,22 +302,80 @@ protected function buildPreAlterCommands(array $query_fields, string $table): ar

// Build pre-alter commands for each query field.
foreach ($query_fields as ['name' => $col, 'type' => $type, 'format' => $format]) {
// If this field is a date field, and a valid format is provided; update
// the format of the date fields to ISO-8601 before importing into MySQL.
if ($type === 'date' && !empty($format) && $format !== 'default') {
$mysql_date_format = $this->dateFormatConverter->convert($format);
// Replace empty date strings with NULL to prevent STR_TO_DATE errors.
// Determine base MySQL type for Frictionless column type.
$base_type = $this->getBaseType($type);

// Replace empty strings with NULL for non-text columns to prevent
// misc. errors (i.e. STR_TO_DATE function related and "Incorrect
// `type` value" errors).
if (!in_array($base_type, self::STRING_FIELD_TYPES, TRUE)) {
$pre_alter_cmds[] = $this->connection->update($table)->condition($col, '')->expression($col, 'NULL');
// Convert date formats for date column.
$pre_alter_cmds[] = $this->connection->update($table)->expression($col, "STR_TO_DATE({$col}, :date_format)", [
':date_format' => $mysql_date_format,
]);
}

// Build pre-alter commands for date fields.
if (in_array($base_type, self::DATE_TIME_TYPES, TRUE)) {
$pre_alter_cmds = array_merge($pre_alter_cmds, $this->buildDatePreAlterCommands($table, $col, $format));
}

// Build pre-alter commands for boolean fields.
if ($base_type === 'BOOL') {
$pre_alter_cmds = array_merge($pre_alter_cmds, $this->buildBoolPreAlterCommands($table, $col));
}
}

return $pre_alter_cmds;
}

/**
* Build pre-alter commands for date fields.
*
* Update format of the date fields to ISO-8601 before importing into MySQL.
*
* @param string $table
* Table name.
* @param string $column
* Table column.
* @param string $format
* Field frictionless date format.
*
* @return \Drupal\Core\Database\Query\Update[]
* Pre-alter update DB queries.
*/
protected function buildDatePreAlterCommands(string $table, string $column, string $format): array {
$pre_alter_cmds = [];

// If a valid format is provided...
if (!empty($format) && $format !== 'default') {
$mysql_date_format = $this->dateFormatConverter->convert($format);
// Convert date formats for date column.
$pre_alter_cmds[] = $this->connection->update($table)->expression($column, "STR_TO_DATE({$column}, :date_format)", [
':date_format' => $mysql_date_format,
]);
}

return $pre_alter_cmds;
}

/**
* Build pre-alter commands for boolean fields.
*
* Convert strings 'true' and 'false' to '1' and '0' for boolean fields.
*
* @param string $table
* Table name.
* @param string $column
* Table column.
*
* @return \Drupal\Core\Database\Query\Update[]
* Pre-alter update DB queries.
*/
protected function buildBoolPreAlterCommands(string $table, string $column): array {
return [
$this->connection->update($table)->where("UPPER({$column}) = :value", [':value' => 'FALSE'])->expression($column, '0'),
$this->connection->update($table)->where("UPPER({$column}) = :value", [':value' => 'TRUE'])->expression($column, '1'),
];
}

/**
* Build alter command to modify table column data types.
*
Expand Down Expand Up @@ -379,7 +521,7 @@ protected function getIndexType(string $frictionless_type): string {
* Formatted index field option string.
*/
protected function buildIndexFieldOption(string $name, ?int $length, string $table, string $type): string {
// Extract base type from full MySQL type.
// Extract base type from full MySQL type ("DECIMAL(12, 3)" -> "DECIMAL").
$base_type = strtok($type, '(');
// If this field is a string type, determine what it's length should be...
if (in_array($base_type, self::STRING_FIELD_TYPES)) {
Expand Down
1 change: 1 addition & 0 deletions modules/datastore/tests/data/data-dict.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
a,b,c,d
1,02/23/1978,9.07,efghijk
,,,
2,09/12/2009,2.37,abcde
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public function testDictionaryEnforcement(): void {
// Build dataset.
$dataset_id = $this->uuid->generate();
$dataset = $this->validMetadataFactory->get($this->getDataset($dataset_id, 'Test ' . $dataset_id, [$this->resourceUrl], TRUE), 'dataset');
// Create datset.
// Create dataset.
$this->metastore->post('dataset', $dataset);
$this->metastore->publish('dataset', $dataset_id);

Expand Down Expand Up @@ -228,7 +228,7 @@ public function testDictionaryEnforcement(): void {
'd',
],
],
'numOfRows' => 2,
'numOfRows' => 3,
], $result);
}

Expand Down