Skip to content

Commit

Permalink
Migrations are parsing down and up statements already
Browse files Browse the repository at this point in the history
  • Loading branch information
heynemann committed Aug 6, 2010
1 parent 4d1f196 commit 7742d57
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 7 deletions.
30 changes: 30 additions & 0 deletions db_migrate/domain/migrations.py
Expand Up @@ -4,6 +4,7 @@
Module that contains the migrations model after they've been parsed from disk.
"""

import string
import codecs
import re
from os.path import split, exists
Expand Down Expand Up @@ -36,6 +37,8 @@ def __init__(self, filepath):
self.version = self.filename.split('_')[0]
self.title = '_'.join(self.filename.split('_')[1:])\
.replace('.migration', '')
self.up_statements = []
self.down_statements = []

@staticmethod
def is_valid_filename(filename):
Expand Down Expand Up @@ -70,3 +73,30 @@ def load(self):
"It should have both SQL_UP and SQL_DOWN variable " + \
"assignments.") % self.filepath
raise InvalidMigrationFileError(msg)

self.up_statements = Migration.parse_sql_statements(SQL_UP)
self.down_statements = Migration.parse_sql_statements(SQL_DOWN)

@classmethod
def parse_sql_statements(cls, sql):
all_statements = []
last_statement = ''

for statement in sql.split(';'):
if len(last_statement) > 0:
curr_statement = '%s;%s' % (last_statement, statement)
else:
curr_statement = statement

single_quotes = string.count(curr_statement, "'")
double_quotes = string.count(curr_statement, '"')
left_parenthesis = string.count(curr_statement, '(')
right_parenthesis = string.count(curr_statement, ')')

if single_quotes % 2 == 0 and double_quotes % 2 == 0 and left_parenthesis == right_parenthesis:
all_statements.append(curr_statement)
last_statement = ''
else:
last_statement = curr_statement

return [s.strip() for s in all_statements if s.strip() != ""]
56 changes: 49 additions & 7 deletions tests/domain/test_migrations.py
Expand Up @@ -80,9 +80,6 @@ def test_load_gets_the_file_contents_and_errors_if_file_has_no_SQL_UP():
def test_load_gets_the_file_contents_and_errors_if_file_has_no_SQL_DOWN():
clear_expectations()

expected_ups = ['up']
expected_downs = ['down']

migration = Migration(filepath=test_path)

fake_file = Fake('file')
Expand All @@ -103,14 +100,59 @@ def test_load_gets_the_file_contents_and_errors_if_file_has_no_SQL_DOWN():
return
assert False, "Should not have gotten this far"

def test_load_gets_the_file_contents_and_parses_ups_and_downs():
assert False

def test_is_filename_valid_for_invalid_file():
assert not Migration.is_valid_filename('config.ini')
assert not Migration.is_valid_filename('20101010101010_some_commands.ini')

def test_is_filename_valid_for_valid_file():
filename = "20101010101010_some_commands.migration"
assert Migration.is_valid_filename(filename)


@with_fakes
@with_patched_object(migr, 'exists', Fake(callable=True))
@with_patched_object(migr, 'codecs', Fake('codecs'))
def test_load_gets_the_file_contents_and_parses_ups_and_downs():

clear_expectations()

migration = Migration(filepath=test_path)

fake_file = Fake('file')
migr.exists.with_args(test_path).returns(True)
migr.codecs.expects('open') \
.with_args(test_path, "rU", "utf-8") \
.returns(fake_file)

fake_file.expects('read').returns('''
SQL_UP = "up command; up command 2;"
SQL_DOWN = "down command; down command 2;"
''')
fake_file.expects('close')

migration.load()

assert migration.up_statements == ['up command', 'up command 2']
assert migration.down_statements == ['down command', 'down command 2']

def test_it_should_parse_sql_statements_handles_extra_semicolons():
sql = 'create table eggs; drop table spam; ; ;'
statements = Migration.parse_sql_statements(sql)

assert len(statements) == 2
assert statements[0] == 'create table eggs'
assert statements[1] == 'drop table spam'

def test_it_should_parse_sql_statements_and_not_split_strings():
sql = 'create table "eggs;other";'
statements = Migration.parse_sql_statements(sql)

assert len(statements) == 1
assert statements[0] == 'create table "eggs;other"'

sql = "create table 'eggs;other';"
statements = Migration.parse_sql_statements(sql)

assert len(statements) == 1
assert statements[0] == "create table 'eggs;other'"

0 comments on commit 7742d57

Please sign in to comment.