Permalink
Browse files

Merge pull request #70 from davidw/migrations

Migrations
  • Loading branch information...
2 parents ad171d9 + 9bc9b42 commit d68b333abf754d2ad30354af0f0430b635ef25f6 @evanmiller evanmiller committed Mar 22, 2013
Showing with 169 additions and 30 deletions.
  1. +3 −3 rebar.config
  2. +41 −0 src/boss_db.erl
  3. +18 −0 src/boss_db_controller.erl
  4. +35 −26 src/db_adapters/boss_db_adapter_mysql.erl
  5. +72 −1 src/db_adapters/boss_db_adapter_pgsql.erl
View
@@ -1,12 +1,12 @@
{erl_opts, [debug_info]}.
{deps, [
{aleppo, ".*", {git, "git://github.com/evanmiller/aleppo.git", {tag, "bef139e4c7"}}},
- {bson, ".*", {git, "git://github.com/mongodb/bson-erlang.git", {tag, "6d3cc910ea"}}},
+ {bson, ".*", {git, "git://github.com/mongodb/bson-erlang.git", {tag, "adce0e94ab"}}},
{ddb, ".*", {git, "https://github.com/Concurix/ddb.git", {tag, "HEAD"}}},
- {epgsql, ".*", {git, "git://github.com/wg/epgsql.git", {tag, "1.4"}}},
+ {epgsql, ".*", {git, "git://github.com/wg/epgsql.git", "3318bd5d64"}},
{erlmc, ".*", {git, "git://github.com/bipthelin/erlmc.git", {tag, "HEAD"}}},
{medici, ".*", {git, "git://github.com/evanmiller/medici.git", {branch, "rebarify"}}},
- {mongodb, ".*", {git, "git://github.com/evanmiller/mongodb-erlang.git", {tag, "c582c911cd"}}},
+ {mongodb, ".*", {git, "git://github.com/evanmiller/mongodb-erlang.git", {tag, "613f157c66"}}},
{mysql, ".*", {git, "git://github.com/dizzyd/erlang-mysql-driver.git", {tag, "16cae84b5e"}}},
{poolboy, ".*", {git, "git://github.com/devinus/poolboy.git", {tag, "855802e0cc"}}},
{riakc, ".*", {git, "git://github.com/basho/riak-erlang-client", {tag, "1.3.0"}}},
View
@@ -5,6 +5,8 @@
-export([start/1, stop/0]).
-export([
+ migrate/1,
+ migrate/2,
find/1,
find/2,
find/3,
@@ -21,6 +23,8 @@
save_record/1,
push/0,
pop/0,
+ create_table/2,
+ table_exists/1,
depth/0,
dump/0,
execute/1,
@@ -64,6 +68,35 @@ db_call(Msg) ->
Reply
end.
+%% @doc Apply migrations from list [{Tag, Fun}]
+%% currently runs all migrations 'up'
+migrate(Migrations) when is_list(Migrations) ->
+ %% 1. Do we have a migrations table? If not, create it.
+ case table_exists(schema_migrations) of
+ false ->
+ ok = create_table(schema_migrations, [{id, auto_increment, []},
+ {version, string, [not_null]},
+ {migrated_at, datetime, []}]);
+ _ ->
+ noop
+ end,
+ %% 2. Get all the current migrations from it.
+ DoneMigrations = db_call({get_migrations_table}),
+ DoneMigrationTags = [list_to_atom(binary_to_list(Tag)) ||
+ {_Id, Tag, _MigratedAt} <- DoneMigrations],
+ %% 3. Run the ones that are not in this list.
+ transaction(fun() ->
+ [migrate({Tag, Fun}, up) ||
+ {Tag, Fun} <- Migrations,
+ not lists:member(Tag, DoneMigrationTags)]
+ end).
+
+%% @doc Run database migration {Tag, Fun} in Direction
+migrate({Tag, Fun}, Direction) ->
+ io:format("Running migration: ~p ~p~n", [Tag, Direction]),
+ Fun(Direction),
+ db_call({migration_done, Tag, Direction}).
+
%% @spec find(Id::string()) -> Value | {error, Reason}
%% @doc Find a BossRecord with the specified `Id' (e.g. "employee-42") or a value described
%% by a dot-separated path (e.g. "employee-42.manager.name").
@@ -185,6 +218,14 @@ depth() ->
dump() ->
db_call(dump).
+%% @spec create_table ( TableName::string(), TableDefinition ) -> ok | {error, Reason}
+%% @doc Create a table based on TableDefinition
+create_table(TableName, TableDefinition) ->
+ db_call({create_table, TableName, TableDefinition}).
+
+table_exists(TableName) ->
+ db_call({table_exists, TableName}).
+
%% @spec execute( Commands::iolist() ) -> RetVal
%% @doc Execute raw database commands on SQL databases
execute(Commands) ->
View
@@ -113,6 +113,14 @@ handle_call({find, Type, Conditions, Max, Skip, Sort, SortOrder, _}, _From, #sta
{Adapter, Conn} = db_for_type(Type, State),
{reply, Adapter:find(Conn, Type, Conditions, Max, Skip, Sort, SortOrder), State};
+handle_call({get_migrations_table}, _From, #state{ cache_enable = false } = State) ->
+ {Adapter, Conn} = {State#state.adapter, State#state.connection},
+ {reply, Adapter:get_migrations_table(Conn), State};
+
+handle_call({migration_done, Tag, Direction}, _From, #state{ cache_enable = false } = State) ->
+ {Adapter, Conn} = {State#state.adapter, State#state.connection},
+ {reply, Adapter:migration_done(Conn, Tag, Direction), State};
+
handle_call({count, Type}, _From, State) ->
{Adapter, Conn} = db_for_type(Type, State),
{reply, Adapter:count(Conn, Type), State};
@@ -161,6 +169,16 @@ handle_call(dump, _From, State) ->
Conn = State#state.connection,
{reply, Adapter:dump(Conn), State};
+handle_call({create_table, TableName, TableDefinition}, _From, State) ->
+ Adapter = State#state.adapter,
+ Conn = State#state.connection,
+ {reply, Adapter:create_table(Conn, TableName, TableDefinition), State};
+
+handle_call({table_exists, TableName}, _From, State) ->
+ Adapter = State#state.adapter,
+ Conn = State#state.connection,
+ {reply, Adapter:table_exists(Conn, TableName), State};
+
handle_call({execute, Commands}, _From, State) ->
Adapter = State#state.adapter,
Conn = State#state.connection,
@@ -3,6 +3,7 @@
-export([init/1, terminate/1, start/1, stop/0, find/2, find/7]).
-export([count/3, counter/2, incr/3, delete/2, save_record/2]).
-export([push/2, pop/2, dump/1, execute/2, transaction/2]).
+-export([get_migrations_table/1, migration_done/3]).
start(_) ->
ok.
@@ -171,39 +172,47 @@ transaction(Pid, TransactionFun) when is_function(TransactionFun) ->
do_transaction(Pid, TransactionFun) when is_function(TransactionFun) ->
case do_begin(Pid, self()) of
- {error, _} = Err ->
- {aborted, Err};
- {updated,{mysql_result,[],[],0,0,[]}} ->
- case catch TransactionFun() of
- error = Err ->
- do_rollback(Pid, self()),
- {aborted, Err};
- {error, _} = Err ->
- do_rollback(Pid, self()),
- {aborted, Err};
- {'EXIT', _} = Err ->
- do_rollback(Pid, self()),
- {aborted, Err};
- Res ->
- case do_commit(Pid, self()) of
- {error, _} = Err ->
- do_rollback(Pid, self()),
- {aborted, Err};
- _ ->
- {atomic, Res}
- end
- end
- end.
+ {error, _} = Err ->
+ {aborted, Err};
+ {updated,{mysql_result,[],[],0,0,[]}} ->
+ case catch TransactionFun() of
+ error = Err ->
+ do_rollback(Pid, self()),
+ {aborted, Err};
+ {error, _} = Err ->
+ do_rollback(Pid, self()),
+ {aborted, Err};
+ {'EXIT', _} = Err ->
+ do_rollback(Pid, self()),
+ {aborted, Err};
+ Res ->
+ case do_commit(Pid, self()) of
+ {error, _} = Err ->
+ do_rollback(Pid, self()),
+ {aborted, Err};
+ _ ->
+ {atomic, Res}
+ end
+ end
+ end.
do_begin(Pid,_)->
- fetch(Pid, ["BEGIN"]).
+ fetch(Pid, ["BEGIN"]).
do_commit(Pid,_)->
- fetch(Pid, ["COMMIT"]).
+ fetch(Pid, ["COMMIT"]).
do_rollback(Pid,_)->
- fetch(Pid, ["ROLLBACK"]).
+ fetch(Pid, ["ROLLBACK"]).
+
+get_migrations_table(Pid) ->
+ fetch(Pid, "SELECT * FROM schema_migrations").
+migration_done(Pid, Tag, up) ->
+ fetch(Pid, ["INSERT INTO schema_migrations (version, migrated_at) values (",
+ atom_to_list(Tag), ", NOW())"]);
+migration_done(Pid, Tag, down) ->
+ fetch(Pid, ["DELETE FROM schema_migrations WHERE version = ", atom_to_list(Tag)]).
% internal
@@ -2,7 +2,8 @@
-behaviour(boss_db_adapter).
-export([init/1, terminate/1, start/1, stop/0, find/2, find/7]).
-export([count/3, counter/2, incr/3, delete/2, save_record/2]).
--export([push/2, pop/2, dump/1, execute/2, execute/3, transaction/2]).
+-export([push/2, pop/2, dump/1, execute/2, execute/3, transaction/2, create_table/3, table_exists/2]).
+-export([get_migrations_table/1, migration_done/3]).
start(_) ->
ok.
@@ -142,6 +143,33 @@ transaction(Conn, TransactionFun) ->
Other -> {atomic, Other}
end.
+get_migrations_table(Conn) ->
+ Res = pgsql:equery(Conn, "SELECT * FROM schema_migrations", []),
+ case Res of
+ {ok, _Columns, ResultRows} ->
+ ResultRows;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+migration_done(Conn, Tag, up) ->
+ Res = pgsql:equery(Conn, "INSERT INTO schema_migrations (version, migrated_at) values ($1, current_timestamp)",
+ [atom_to_list(Tag)]),
+ case Res of
+ {ok, _ResultRows} ->
+ ok;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+migration_done(Conn, Tag, down) ->
+ Res = pgsql:equery(Conn, "DELETE FROM schema_migrations WHERE version = $1", [atom_to_list(Tag)]),
+ case Res of
+ {ok, _Result} ->
+ ok;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
% internal
id_value_to_string(Id) when is_atom(Id) -> atom_to_list(Id);
@@ -376,3 +404,46 @@ pack_value(true) ->
"TRUE";
pack_value(false) ->
"FALSE".
+
+table_exists(Conn, TableName) when is_atom(TableName) ->
+ Res = pgsql:squery(Conn, ["SELECT COUNT(tablename) FROM PG_TABLES WHERE SCHEMANAME='public' AND TABLENAME = '", atom_to_list(TableName), "'"]),
+ case Res of
+ {ok, _, [{Count}]} ->
+ list_to_integer(binary_to_list(Count)) > 0;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+create_table(Conn, TableName, TableDefinition) when is_atom(TableName) ->
+ Res = pgsql:squery(Conn, ["CREATE TABLE ", atom_to_list(TableName), " ( ", tabledefinition_to_sql(TableDefinition), " )"]),
+ case Res of
+ {ok, [], []} ->
+ ok;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+%% Turns a table definition into an SQL string.
+
+tabledefinition_to_sql(TableDefinition) ->
+ string:join(
+ [atom_to_list(ColumnName) ++ " " ++ column_type_to_sql(ColumnType) ++ " " ++
+ column_options_to_sql(Options) ||
+ {ColumnName, ColumnType, Options} <- TableDefinition], ", ").
+
+column_type_to_sql(auto_increment) ->
+ "SERIAL";
+column_type_to_sql(string) ->
+ "VARCHAR";
+column_type_to_sql(integer) ->
+ "INTEGER";
+column_type_to_sql(datetime) ->
+ "TIMESTAMP".
+
+column_options_to_sql(Options) ->
+ [option_to_sql({Option, Args}) || {Option, Args} <- proplists:unfold(Options)].
+
+option_to_sql({not_null, true}) ->
+ "NOT NULL";
+option_to_sql({primary_key, true}) ->
+ "PRIMARY KEY".

0 comments on commit d68b333

Please sign in to comment.