Permalink
Browse files

Change naming convention and check for anyblok constraint

  • Loading branch information...
jssuzanne committed Jan 12, 2018
1 parent 9ba459f commit 3c3fa5d7d07716d88efcc5ca95a124497fbbed38
Showing with 75 additions and 58 deletions.
  1. +36 −9 anyblok/common.py
  2. +24 −36 anyblok/migration.py
  3. +13 −13 anyblok/tests/test_migration.py
  4. +2 −0 doc/CHANGES.rst
View
@@ -17,9 +17,9 @@
def all_column_name(constraint, table):
"""Define the convention for merge the column"""
if isinstance(constraint, ForeignKeyConstraint):
return '-'.join(constraint.column_keys)
return '_'.join(constraint.column_keys)
else:
return '-'.join(constraint.columns.keys())
return '_'.join(constraint.columns.keys())
def all_referred_column_name(constraint, table):
@@ -34,20 +34,47 @@ def all_referred_column_name(constraint, table):
referred_columns.append(refcol)
return '-'.join(referred_columns)
return '_'.join(referred_columns)
def model_name(constraint, table):
"""return a shortest table name"""
name = table.name.split('_')
if len(name) == 1:
return name[0]
return ''.join(x[0] for x in name[:-1]) + '_' + name[-1]
def referred_model_name(constraint, table):
"""return a shortest referred table name"""
el = constraint.elements[0]
refs = el.target_fullname.split(".")
if len(refs) == 3:
refschema, reftable, refcol = refs
else:
reftable, refcol = refs
name = reftable.split('_')
if len(name) == 1:
return name[0]
return ''.join(x[0] for x in name[:-1]) + '_' + name[-1]
"""table convention for constraint"""
naming_convention = {
"all_column_name": all_column_name,
"all_referred_column_name": all_referred_column_name,
"ix": "ix_%(table_name)s__%(all_column_name)s",
"uq": "uq_%(table_name)s__%(all_column_name)s",
"ck": "ck_%(table_name)s__%(constraint_name)s",
"fk": ("fk_%(table_name)s_%(all_column_name)s_on_"
"%(referred_table_name)s__"
"model_name": model_name,
"referred_model_name": referred_model_name,
"ix": "anyblok_ix_%(model_name)s__%(all_column_name)s",
"uq": "anyblok_uq_%(model_name)s__%(all_column_name)s",
"ck": "anyblok_ck_%(model_name)s__%(constraint_name)s",
"fk": ("anyblok_fk_%(model_name)s_%(all_column_name)s_on_"
"%(referred_model_name)s__"
"%(all_referred_column_name)s"),
"pk": "pk_%(table_name)s"
"pk": "anyblok_pk_%(table_name)s"
}
View
@@ -49,28 +49,11 @@ def init_add_column(self, diff):
_, _, table, column = diff
self.log_names.append('Add %s.%s' % (table, column.name))
def check_if_table_and_columns_exist(self, table, *columns):
try: # check if the table and the column exist
table = self.migration.table(table)
for column in columns:
table.column(column)
return True
except Exception:
return False
def can_remove_constraints(self, name):
unique = "uq_(?P<table>\w+)__(?P<columns>[\w-]+)"
check = "ck_(?P<table>\w+)__(?P<constraint>\w+)"
unique = "anyblok_uq_(?P<table>\w+)__(?P<columns>\w+)"
m = re.search(unique, name)
if m and self.check_if_table_and_columns_exist(
m.group('table'),
*m.group('columns').split('-')
):
return True
m = re.search(check, name)
if m and self.check_if_table_and_columns_exist(m.group('table')):
if m and m.groups():
return True
if self.migration.reinit_constraints:
@@ -82,17 +65,25 @@ def can_remove_constraints(self, name):
return False
def can_remove_fk_constraints(self, name):
fk = ("fk_(?P<table>\w+)__(?P<columns>[\w-]+)_on_"
"(?P<referred_table>\w+)__(?P<referred_columns>[\w-]+)")
fk = ("anyblok_fk_(?P<table>\w+)__(?P<columns>\w+)_on_"
"(?P<referred_table>\w+)__(?P<referred_columns>\w+)")
m = re.search(fk, name)
if m is not None:
local = self.check_if_table_and_columns_exist(
m.group('table'), *m.group('columns').split('-'))
referred = self.check_if_table_and_columns_exist(
m.group('referred_table'),
*m.group('referred_columns').split('-'))
if local and referred:
return True
if m is not None and m.groups():
return True
if self.migration.reinit_constraints:
return True
if self.migration.reinit_all:
return True
return False
def can_remove_check_constraints(self, name):
check = "anyblok_ck_(?P<table>\w+)__(?P<constraint>\w+)"
m = re.search(check, name)
if m is not None and m.groups():
return True
if self.migration.reinit_constraints:
return True
@@ -113,12 +104,9 @@ def init_remove_constraint(self, diff):
return True
def can_remove_index(self, name):
key = "ix_(?P<table>\w+)__(?P<columns>[\w-]+)"
key = "anyblok_ix_(?P<table>\w+)__(?P<columns>\w+)"
m = re.search(key, name)
if m and self.check_if_table_and_columns_exist(
m.group('table'),
*m.group('columns').split('-')
):
if m and m.groups():
return True
if self.migration.reinit_indexes:
@@ -174,7 +162,7 @@ def init_remove_ck(self, diff):
self.log_names.append('Drop check constraint %s on %s' % (
ck['name'], table))
if not self.can_remove_constraints(ck['name']):
if not self.can_remove_check_constraints(ck['name']):
return True
self.raise_if_withoutautomigration()
@@ -723,7 +711,7 @@ def __init__(self, table):
self.name = self.format_name()
def format_name(self, *columns):
return 'pk_%s' % self.table.name
return 'anyblok_pk_%s' % self.table.name
def add(self, *columns):
""" Add the constraint
@@ -283,7 +283,7 @@ def test_constraint_check(self):
Test = self.registry.Test
t.check('test').add(Test.other != 'test')
# particuliar case of check constraint
t.check('ck_test__test').drop()
t.check('anyblok_ck_test__test').drop()
def test_detect_under_noautocommit_flag(self):
with self.cnx() as conn:
@@ -481,14 +481,14 @@ def test_detect_drop_index(self):
def test_detect_drop_anyblok_index(self):
with self.cnx() as conn:
conn.execute(
"""CREATE INDEX ix_test__other ON test (other);""")
"""CREATE INDEX anyblok_ix_test__other ON test (other);""")
report = self.registry.migration.detect_changed()
self.assertTrue(
report.log_has("Drop index ix_test__other on test"))
report.log_has("Drop index anyblok_ix_test__other on test"))
report.apply_change()
report = self.registry.migration.detect_changed()
self.assertFalse(
report.log_has("Drop index ix_test__other on test"))
report.log_has("Drop index anyblok_ix_test__other on test"))
def test_detect_drop_index_with_reinit_indexes(self):
with self.cnx() as conn:
@@ -580,7 +580,7 @@ def test_detect_drop_anyblok_foreign_key(self):
"""CREATE TABLE test(
integer INT PRIMARY KEY NOT NULL,
other CHAR(64)
CONSTRAINT fk_test__other_on_system_blok__name
CONSTRAINT anyblok_fk_test__other_on_system_blok__name
references system_blok(name)
);""")
report = self.registry.migration.detect_changed()
@@ -680,15 +680,15 @@ def test_detect_drop_unique_anyblok_constraint(self):
conn.execute(
"""CREATE TABLE test(
integer INT PRIMARY KEY NOT NULL,
other CHAR(64) CONSTRAINT uq_test__other UNIQUE
other CHAR(64) CONSTRAINT anyblok_uq_test__other UNIQUE
);""")
report = self.registry.migration.detect_changed()
self.assertTrue(
report.log_has("Drop constraint uq_test__other on test"))
report.log_has("Drop constraint anyblok_uq_test__other on test"))
report.apply_change()
report = self.registry.migration.detect_changed()
self.assertFalse(
report.log_has("Drop constraint uq_test__other on test"))
report.log_has("Drop constraint anyblok_uq_test__other on test"))
def test_detect_drop_unique_constraint_with_reinit_constraints(self):
with self.cnx() as conn:
@@ -733,11 +733,11 @@ def test_detect_add_check_constraint(self):
);""")
report = self.registry.migration.detect_changed()
self.assertTrue(report.log_has(
"Add check constraint ck_testcheck__test on testcheck"))
"Add check constraint anyblok_ck_testcheck__test on testcheck"))
report.apply_change()
report = self.registry.migration.detect_changed()
self.assertFalse(report.log_has(
"Add check constraint ck_testcheck__test on testcheck"))
"Add check constraint anyblok_ck_testcheck__test on testcheck"))
def test_detect_drop_check_constraint(self):
with self.cnx() as conn:
@@ -761,16 +761,16 @@ def test_detect_drop_check_anyblok_constraint(self):
conn.execute(
"""CREATE TABLE test(
integer INT PRIMARY KEY NOT NULL,
other CHAR(64) CONSTRAINT ck_test__check
other CHAR(64) CONSTRAINT anyblok_ck__test__check
CHECK (other != 'test')
);""")
report = self.registry.migration.detect_changed()
self.assertTrue(report.log_has(
"Drop check constraint ck_test__check on test"))
"Drop check constraint anyblok_ck__test__check on test"))
report.apply_change()
report = self.registry.migration.detect_changed()
self.assertFalse(report.log_has(
"Drop check constraint ck_test__check on test"))
"Drop check constraint anyblok_ck__test__check on test"))
def test_detect_drop_check_constraint_with_reinit_constraint(self):
with self.cnx() as conn:
View
@@ -16,6 +16,8 @@ CHANGELOG
------
* [REF] change log level, the instalation become less verbose
* [REF] change namimg convention, dont check in function of
table and column name to know if it is an AnyBlok constraint
0.13.0 (2018-01-09)
-------------------

0 comments on commit 3c3fa5d

Please sign in to comment.