Skip to content
Permalink
Browse files
MDEV-15364 FOREIGN CASCADE operations in system versioned referenced …
…tables

Make foreign system versioning tables work in CASCADE UPDATE/SET NULL.
In that case basically row update is performed. This patch makes insert
of a historical row performed too.

row_update_versioned_insert(): restores btr_pcur_t, reads row from it, makes
row historical and inserts to table.

row_ins_check_foreign_constraint(): disable constraint check for historical
rows because it has no sense. Also check will fail always, because referenced
table is updated at that point.

row_update_cascade_for_mysql(): insert historical row for system versioning
tables before updating current row.

revert DATA_VERSIONED -> DATA_UNVERSIONED
  • Loading branch information
kevgs committed Mar 27, 2018
1 parent fd73c6d commit 19a182b
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 103 deletions.
@@ -82,6 +82,7 @@ parent_id
select * from child for system_time all;
parent_id
1
1
2
drop table child;
drop table parent;
@@ -160,6 +161,7 @@ NULL
select *, current_row(sys_end) as current_row from child for system_time all order by sys_end;
parent_id current_row
1 0
1 0
NULL 1
delete from child;
insert into parent values(1);
@@ -171,7 +173,9 @@ NULL
select *, current_row(sys_end) as current_row from child for system_time all order by sys_end;
parent_id current_row
1 0
1 0
NULL 0
1 0
NULL 1
drop table child;
drop table parent;
@@ -231,3 +235,58 @@ insert into b(cola, v_cola) values (10,2);
delete from a;
ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails (`test`.`b`, CONSTRAINT `v_cola_fk` FOREIGN KEY (`v_cola`) REFERENCES `a` (`v_cola`))
drop table b, a;
###############################################
# CASCADE UPDATE foreign not system versioned #
###############################################
create or replace table parent (
id smallint unsigned not null auto_increment,
value int unsigned not null,
primary key (id, value)
) engine = innodb;
create or replace table child (
id mediumint unsigned not null auto_increment primary key,
parent_id smallint unsigned not null,
parent_value int unsigned not null,
sys_start SYS_DATATYPE as row start invisible,
sys_end SYS_DATATYPE as row end invisible,
period for system_time(sys_start, sys_end),
constraint `fk_child_parent`
foreign key (parent_id, parent_value) references parent (id, value)
on delete cascade
on update cascade
) engine = innodb with system versioning;
create or replace table subchild (
id int not null auto_increment primary key,
parent_id smallint unsigned not null,
parent_value int unsigned not null,
constraint `fk_subchild_child_parent`
foreign key (parent_id, parent_value) references child (parent_id, parent_value)
on delete cascade
on update cascade
) engine=innodb;
insert into parent (value) values (23);
select id, value from parent into @id, @value;
insert into child values (default, @id, @value);
insert into subchild values (default, @id, @value);
select parent_id from subchild;
parent_id
1
update parent set id = 11, value = value + 1;
select parent_id from subchild;
parent_id
11
select * from child;
id parent_id parent_value
1 11 24
delete from parent;
select count(*) from child;
count(*)
0
select * from child for system_time all;
id parent_id parent_value
1 1 23
1 11 24
select count(*) from subchild;
count(*)
0
drop table subchild, child, parent;
@@ -266,4 +266,55 @@ delete from a;

drop table b, a;

--echo ###############################################
--echo # CASCADE UPDATE foreign not system versioned #
--echo ###############################################
create or replace table parent (
id smallint unsigned not null auto_increment,
value int unsigned not null,
primary key (id, value)
) engine = innodb;

--replace_result $sys_datatype_expl SYS_DATATYPE
eval create or replace table child (
id mediumint unsigned not null auto_increment primary key,
parent_id smallint unsigned not null,
parent_value int unsigned not null,
sys_start $sys_datatype_expl as row start invisible,
sys_end $sys_datatype_expl as row end invisible,
period for system_time(sys_start, sys_end),
constraint `fk_child_parent`
foreign key (parent_id, parent_value) references parent (id, value)
on delete cascade
on update cascade
) engine = innodb with system versioning;

create or replace table subchild (
id int not null auto_increment primary key,
parent_id smallint unsigned not null,
parent_value int unsigned not null,
constraint `fk_subchild_child_parent`
foreign key (parent_id, parent_value) references child (parent_id, parent_value)
on delete cascade
on update cascade
) engine=innodb;

insert into parent (value) values (23);
select id, value from parent into @id, @value;
insert into child values (default, @id, @value);
insert into subchild values (default, @id, @value);

select parent_id from subchild;
update parent set id = 11, value = value + 1;
select parent_id from subchild;
select * from child;

delete from parent;
select count(*) from child;
select * from child for system_time all;
select count(*) from subchild;

drop table subchild, child, parent;


--source suite/versioning/common_finish.inc
@@ -312,15 +312,14 @@ dict_mem_table_add_col(

dict_mem_fill_column_struct(col, i, mtype, prtype, len);

if ((prtype & DATA_UNVERSIONED) != DATA_UNVERSIONED) {
if (prtype & DATA_VERS_START) {
ut_ad(!table->vers_start);
table->vers_start = i;
}
if (prtype & DATA_VERS_END) {
ut_ad(!table->vers_end);
table->vers_end = i;
}
switch (prtype & DATA_VERSIONED) {
case DATA_VERS_START:
ut_ad(!table->vers_start);
table->vers_start = i;
break;
case DATA_VERS_END:
ut_ad(!table->vers_end);
table->vers_end = i;
}
}

@@ -11265,9 +11265,9 @@ create_table_info_t::create_table_def()
vers_row = DATA_VERS_START;
} else if (i == m_form->s->row_end_field) {
vers_row = DATA_VERS_END;
} else if (field->flags
& VERS_UPDATE_UNVERSIONED_FLAG) {
vers_row = DATA_UNVERSIONED;
} else if (!(field->flags
& VERS_UPDATE_UNVERSIONED_FLAG)) {
vers_row = DATA_VERSIONED;
}
}

@@ -5009,9 +5009,9 @@ prepare_inplace_alter_table_dict(
} else if (i ==
altered_table->s->row_end_field) {
field_type |= DATA_VERS_END;
} else if (field->flags
& VERS_UPDATE_UNVERSIONED_FLAG) {
field_type |= DATA_UNVERSIONED;
} else if (!(field->flags
& VERS_UPDATE_UNVERSIONED_FLAG)) {
field_type |= DATA_VERSIONED;
}
}

@@ -192,7 +192,8 @@ be less than 256 */
/** System Versioning */
#define DATA_VERS_START 16384U /* start system field */
#define DATA_VERS_END 32768U /* end system field */
#define DATA_UNVERSIONED (DATA_VERS_START|DATA_VERS_END) /* unversioned user field */
/** system-versioned user data column */
#define DATA_VERSIONED (DATA_VERS_START|DATA_VERS_END)

/** Check whether locking is disabled (never). */
#define dict_table_is_locking_disabled(table) false
@@ -541,22 +542,17 @@ struct dtype_t{
unsigned mbmaxlen:3; /*!< maximum length of a character,
in bytes */

/** @return whether this is system field */
bool vers_sys_field() const
{
return vers_sys_start() || vers_sys_end();
}
/** @return whether this is system versioned user field */
bool is_versioned() const { return (prtype & DATA_UNVERSIONED) == 0; }
bool is_versioned() const { return !(~prtype & DATA_VERSIONED); }
/** @return whether this is the system field start */
bool vers_sys_start() const
{
return (prtype & DATA_UNVERSIONED) == DATA_VERS_START;
return (prtype & DATA_VERSIONED) == DATA_VERS_START;
}
/** @return whether this is the system field end */
bool vers_sys_end() const
{
return (prtype & DATA_UNVERSIONED) == DATA_VERS_END;
return (prtype & DATA_VERSIONED) == DATA_VERS_END;
}
};

@@ -652,29 +652,24 @@ struct dict_col_t{
/** @return whether NULL is an allowed value for this column */
bool is_nullable() const { return !(prtype & DATA_NOT_NULL); }

/** @return whether this is system field */
bool vers_sys_field() const
{
return vers_sys_start() || vers_sys_end();
}
/** @return whether table of this system field is TRX_ID-based */
bool vers_native() const
{
ut_ad(vers_sys_field());
ut_ad(vers_sys_start() || vers_sys_end());
ut_ad(mtype == DATA_INT || mtype == DATA_FIXBINARY);
return mtype == DATA_INT;
}
/** @return whether this is system versioned */
bool is_versioned() const { return (prtype & DATA_UNVERSIONED) == 0; }
bool is_versioned() const { return !(~prtype & DATA_VERSIONED); }
/** @return whether this is the system version start */
bool vers_sys_start() const
{
return (prtype & DATA_UNVERSIONED) == DATA_VERS_START;
return (prtype & DATA_VERSIONED) == DATA_VERS_START;
}
/** @return whether this is the system version end */
bool vers_sys_end() const
{
return (prtype & DATA_UNVERSIONED) == DATA_VERS_END;
return (prtype & DATA_VERSIONED) == DATA_VERS_END;
}

/** @return whether this is an instantly-added column */
@@ -568,6 +568,8 @@ struct upd_node_t{
dtuple_t* row; /*!< NULL, or a copy (also fields copied to
heap) of the row to update; this must be reset
to NULL after a successful update */
dtuple_t* historical_row; /*!< historical row used in
CASCADE UPDATE/SET NULL */
row_ext_t* ext; /*!< NULL, or prefixes of the externally
stored columns in the old row */
dtuple_t* upd_row;/* NULL, or a copy of the updated row */
@@ -582,9 +584,22 @@ struct upd_node_t{
/* column assignment list */
ulint magic_n;

/** System Versioning: modify update vector to set row_start
* (or row_end in case of DELETE) to current trx_id. */
void vers_set_fields(const trx_t* trx);
/** Also set row_start = CURRENT_TIMESTAMP/trx->id
@param[in] trx transaction */
void make_versioned_update(const trx_t* trx);
/** Only set row_end = CURRENT_TIMESTAMP/trx->id.
Do not touch other fields at all.
@param[in] trx transaction */
void make_versioned_delete(const trx_t* trx);

private:
/** Appends row_start or row_end field to update vector and sets a
CURRENT_TIMESTAMP/trx->id value to it.
Supposed to be called only by make_versioned_update() and
make_versioned_delete().
@param[in] trx transaction
@param[in] vers_sys_idx table->row_start or table->row_end */
void make_versioned_helper(const trx_t* trx, ulint idx);
};

#define UPD_NODE_MAGIC_N 1579975
@@ -553,6 +553,8 @@ row_ins_cascade_calc_update_vec(
ufield->exp = NULL;

ufield->new_val = parent_ufield->new_val;
dfield_get_type(&ufield->new_val)->prtype |=
col->prtype & DATA_VERSIONED;
ufield_len = dfield_get_len(&ufield->new_val);

/* Clear the "external storage" flag */
@@ -1391,6 +1393,13 @@ row_ins_foreign_check_on_constraint(
}
}

if (table->versioned() && cascade->is_delete != PLAIN_DELETE
&& cascade->update->affects_versioned()) {
cascade->historical_row =
row_build(ROW_COPY_DATA, clust_index, clust_rec, NULL,
table, NULL, NULL, NULL, thr->prebuilt->heap);
}

/* Store pcur position and initialize or store the cascade node
pcur stored position */

@@ -1613,6 +1622,19 @@ row_ins_check_foreign_constraint(
}
}

if (que_node_get_type(thr->run_node) == QUE_NODE_INSERT) {
ins_node_t* insert_node =
static_cast<ins_node_t*>(thr->run_node);
dict_table_t* table = insert_node->index->table;
if (table->versioned()) {
dfield_t* row_end = dtuple_get_nth_field(
insert_node->row, table->vers_end);
if (row_end->vers_history_row()) {
goto exit_func;
}
}
}

if (check_ref) {
check_table = foreign->referenced_table;
check_index = foreign->referenced_index;

0 comments on commit 19a182b

Please sign in to comment.