Skip to content

Commit

Permalink
Create a new _get_current_sequences_state() function that computes th…
Browse files Browse the repository at this point in the history
…e current state of log or application sequences. This factorizes pieces of code in _set_mark_groups() and emaj_snap_log_group() functions. This also avoids to set a temporary mark in emaj_snap_log_group().
  • Loading branch information
beaud76 committed Mar 11, 2018
1 parent 052794e commit d4817be
Show file tree
Hide file tree
Showing 22 changed files with 371 additions and 223 deletions.
167 changes: 144 additions & 23 deletions sql/emaj--2.2.2--next_version.sql
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,51 @@ $_check_marks_range$
END;
$_check_marks_range$;

CREATE OR REPLACE FUNCTION emaj._get_current_sequences_state(v_groupNames TEXT[], v_relKind TEXT, v_timeId BIGINT)
RETURNS SETOF emaj.emaj_sequence LANGUAGE plpgsql AS
$_get_current_sequences_state$
-- The function returns the current state of all log or application sequences for a tables groups array
-- Input: group names array,
-- kind of relations ('r' for log sequences, 'S' for application sequences),
-- time_id to set the sequ_time_id (if the time id is NULL, get the greatest BIGINT value, i.e. 9223372036854775807)
-- Output: a set of records of type emaj_sequence, with a sequ_time_id set to the supplied v_timeId value
DECLARE
v_schema TEXT;
v_sequence TEXT;
r_tblsq RECORD;
r_sequ emaj.emaj_sequence%ROWTYPE;
BEGIN
FOR r_tblsq IN
SELECT rel_priority, rel_schema, rel_tblseq, rel_log_schema, rel_log_sequence FROM emaj.emaj_relation
WHERE upper_inf(rel_time_range) AND rel_group = ANY (v_groupNames) AND rel_kind = v_relKind
ORDER BY rel_priority, rel_schema, rel_tblseq
LOOP
IF v_relKind = 'r' THEN
v_schema = r_tblsq.rel_log_schema;
v_sequence = r_tblsq.rel_log_sequence;
ELSE
v_schema = r_tblsq.rel_schema;
v_sequence = r_tblsq.rel_tblseq;
END IF;
IF emaj._pg_version_num() < 100000 THEN
EXECUTE 'SELECT '|| quote_literal(v_schema) || ', ' || quote_literal(v_sequence) || ', ' ||
v_timeId || ', last_value, start_value, increment_by, max_value, min_value, cache_value, is_cycled, is_called ' ||
'FROM ' || quote_ident(v_schema) || '.' || quote_ident(v_sequence)
INTO STRICT r_sequ;
ELSE
EXECUTE 'SELECT schemaname, sequencename, ' ||
v_timeId || ', rel.last_value, start_value, increment_by, max_value, min_value, cache_size, cycle, rel.is_called ' ||
'FROM ' || quote_ident(v_schema) || '.' || quote_ident(v_sequence) ||
' rel, pg_catalog.pg_sequences ' ||
' WHERE schemaname = '|| quote_literal(v_schema) || ' AND sequencename = ' || quote_literal(v_sequence)
INTO STRICT r_sequ;
END IF;
RETURN NEXT r_sequ;
END LOOP;
RETURN;
END;
$_get_current_sequences_state$;

CREATE OR REPLACE FUNCTION emaj.emaj_comment_group(v_groupName TEXT, v_comment TEXT)
RETURNS VOID LANGUAGE plpgsql AS
$emaj_comment_group$
Expand Down Expand Up @@ -1261,6 +1306,81 @@ $emaj_set_mark_groups$;
COMMENT ON FUNCTION emaj.emaj_set_mark_groups(TEXT[],TEXT) IS
$$Sets a mark on several E-Maj groups.$$;

CREATE OR REPLACE FUNCTION emaj._set_mark_groups(v_groupNames TEXT[], v_mark TEXT, v_multiGroup BOOLEAN, v_eventToRecord BOOLEAN, v_loggedRlbkTargetMark TEXT DEFAULT NULL, v_timeId BIGINT DEFAULT NULL)
RETURNS INT LANGUAGE plpgsql AS
$_set_mark_groups$
-- This function effectively inserts a mark in the emaj_mark table and takes an image of the sequences definitions for the array of groups.
-- It also updates the previous mark of each group to setup the mark_log_rows_before_next column with the number of rows recorded into all log tables between this previous mark and the new mark.
-- It is called by emaj_set_mark_group and emaj_set_mark_groups functions but also by other functions that set internal marks, like functions that start or rollback groups.
-- Input: group names array, mark to set,
-- boolean indicating whether the function is called by a multi group function
-- boolean indicating whether the event has to be recorded into the emaj_hist table
-- name of the rollback target mark when this mark is created by the logged_rollback functions (NULL by default)
-- time stamp identifier to reuse (NULL by default) (this parameter is set when the mark is a rollback start mark)
-- Output: number of processed tables and sequences
-- The insertion of the corresponding event in the emaj_hist table is performed by callers.
DECLARE
v_nbTbl INT;
v_nbSeq INT;
BEGIN
-- if requested, record the set mark begin in emaj_hist
IF v_eventToRecord THEN
INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording)
VALUES (CASE WHEN v_multiGroup THEN 'SET_MARK_GROUPS' ELSE 'SET_MARK_GROUP' END, 'BEGIN', array_to_string(v_groupNames,','), v_mark);
END IF;
-- get the time stamp of the operation, if not supplied as input parameter
IF v_timeId IS NULL THEN
SELECT emaj._set_time_stamp('M') INTO v_timeId;
END IF;
-- record sequences state as early as possible (no lock protects them from other transactions activity)
INSERT INTO emaj.emaj_sequence (sequ_schema, sequ_name, sequ_time_id, sequ_last_val, sequ_start_val,
sequ_increment, sequ_max_val, sequ_min_val, sequ_cache_val, sequ_is_cycled, sequ_is_called)
SELECT * FROM emaj._get_current_sequences_state(v_groupNames, 'S', v_timeId);
GET DIAGNOSTICS v_nbSeq = ROW_COUNT;
-- record the number of log rows for the old last mark of each group
-- the statement updates no row in case of emaj_start_group(s)
WITH stat_group1 AS ( -- for each group, the mark id and time id of the last active mark
SELECT mark_group, max(mark_id) as last_mark_id, max(mark_time_id) AS last_mark_time_id
FROM emaj.emaj_mark
WHERE mark_group = ANY (v_groupNames) AND NOT mark_is_deleted
GROUP BY mark_group),
stat_group2 AS ( -- compute the number of log rows for all tables currently belonging to these groups
SELECT mark_group, last_mark_id, coalesce(
(SELECT sum(emaj._log_stat_tbl(emaj_relation, last_mark_time_id, NULL))
FROM emaj.emaj_relation
WHERE rel_group = mark_group AND rel_kind = 'r' AND upper_inf(rel_time_range)), 0) AS mark_stat
FROM stat_group1 )
UPDATE emaj.emaj_mark m SET mark_log_rows_before_next = mark_stat
FROM stat_group2 s
WHERE s.mark_group = m.mark_group AND s.last_mark_id = m.mark_id;
-- for tables currently belonging to the groups, record the associated log sequence state into the emaj sequence table
INSERT INTO emaj.emaj_sequence (sequ_schema, sequ_name, sequ_time_id, sequ_last_val, sequ_start_val,
sequ_increment, sequ_max_val, sequ_min_val, sequ_cache_val, sequ_is_cycled, sequ_is_called)
SELECT * FROM emaj._get_current_sequences_state(v_groupNames, 'r', v_timeId);
GET DIAGNOSTICS v_nbTbl = ROW_COUNT;
-- record the mark for each group into the emaj_mark table
INSERT INTO emaj.emaj_mark (mark_group, mark_name, mark_time_id, mark_is_deleted, mark_is_rlbk_protected, mark_logged_rlbk_target_mark)
SELECT group_name, v_mark, v_timeId, FALSE, FALSE, v_loggedRlbkTargetMark
FROM emaj.emaj_group WHERE group_name = ANY(v_groupNames) ORDER BY group_name;
-- before exiting, cleanup the state of the pending rollback events from the emaj_rlbk table
IF emaj._dblink_is_cnx_opened('rlbk#1') THEN
-- ... either through dblink if we are currently performing a rollback with a dblink connection already opened
-- this is mandatory to avoid deadlock
PERFORM 0 FROM dblink('rlbk#1','SELECT emaj._cleanup_rollback_state()') AS (dummy INT);
ELSE
-- ... or directly
PERFORM emaj._cleanup_rollback_state();
END IF;
-- if requested, record the set mark end in emaj_hist
IF v_eventToRecord THEN
INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording)
VALUES (CASE WHEN v_multiGroup THEN 'SET_MARK_GROUPS' ELSE 'SET_MARK_GROUP' END, 'END', array_to_string(v_groupNames,','), v_mark);
END IF;
--
RETURN v_nbSeq + v_nbTbl;
END;
$_set_mark_groups$;

CREATE OR REPLACE FUNCTION emaj.emaj_comment_mark_group(v_groupName TEXT, v_mark TEXT, v_comment TEXT)
RETURNS VOID LANGUAGE plpgsql AS
$emaj_comment_mark_group$
Expand Down Expand Up @@ -2386,14 +2506,18 @@ $emaj_snap_log_group$
SELECT time_last_emaj_gid, time_clock_timestamp INTO v_firstEmajGid, v_firstMarkTs
FROM emaj.emaj_time_stamp WHERE time_id = v_firstMarkTimeId;
IF v_noSuppliedLastMark THEN
-- the end mark is not supplied (look for the current state), so temporarily create a mark, without locking tables
SELECT emaj._check_new_mark(ARRAY[v_groupName], 'TEMP_%') INTO v_lastMark;
PERFORM emaj._set_mark_groups(ARRAY[v_groupName], v_lastMark, false, false);
-- the end mark is not supplied (look for the current state)
-- get a simple time stamp and its attributes
SELECT emaj._set_time_stamp('S') INTO v_lastMarkTimeId;
SELECT time_last_emaj_gid, time_clock_timestamp INTO v_lastEmajGid, v_lastMarkTs
FROM emaj.emaj_time_stamp
WHERE time_id = v_lastMarkTimeId;
ELSE
-- the end mark is supplied, get additional data for the last mark
SELECT mark_time_id, time_last_emaj_gid, time_clock_timestamp INTO v_lastMarkTimeId, v_lastEmajGid, v_lastMarkTs
FROM emaj.emaj_mark, emaj.emaj_time_stamp
WHERE mark_time_id = time_id AND mark_group = v_groupName AND mark_name = v_lastMark;
END IF;
-- get additional data for the last mark
SELECT mark_time_id, time_last_emaj_gid, time_clock_timestamp INTO v_lastMarkTimeId, v_lastEmajGid, v_lastMarkTs
FROM emaj.emaj_mark, emaj.emaj_time_stamp
WHERE mark_time_id = time_id AND mark_group = v_groupName AND mark_name = v_lastMark;
-- build the conditions on emaj_gid corresponding to this marks frame, used for the COPY statements dumping the tables
v_conditions = 'TRUE';
IF NOT v_firstMark IS NOT NULL AND v_firstMark <> '' THEN
Expand Down Expand Up @@ -2429,26 +2553,23 @@ $emaj_snap_log_group$
' ORDER BY sequ_schema, sequ_name) TO ' || quote_literal(v_fileName) || ' ' ||
coalesce (v_copyOptions, '');
EXECUTE v_stmt;
-- generate the full file name for sequences state at end mark
-- prepare the file for sequences state at end mark
-- generate the full file name and the COPY statement
IF v_noSuppliedLastMark THEN
v_fileName = v_dir || '/' || v_groupName || '_sequences_at_' || to_char(v_lastMarkTs,'HH24.MI.SS.MS');
v_stmt = 'SELECT * FROM ' ||
'emaj._get_current_sequences_state(ARRAY[' || quote_literal(v_groupName) || '], ''S'', ' || quote_literal(v_lastMarkTimeId) || ')';
ELSE
v_fileName = v_dir || '/' || v_groupName || '_sequences_at_' || v_lastMark;
v_stmt = 'SELECT emaj_sequence.*' ||
' FROM emaj.emaj_sequence, emaj.emaj_relation' ||
' WHERE sequ_time_id = ' || quote_literal(v_lastMarkTimeId) || ' AND ' ||
' rel_kind = ''S'' AND rel_group = ' || quote_literal(v_groupName) || ' AND' ||
' sequ_schema = rel_schema AND sequ_name = rel_tblseq' ||
' ORDER BY sequ_schema, sequ_name';
END IF;
-- and execute the COPY statement
v_stmt= 'COPY (SELECT emaj_sequence.*' ||
' FROM emaj.emaj_sequence, emaj.emaj_relation' ||
' WHERE sequ_time_id = ' || quote_literal(v_lastMarkTimeId) || ' AND ' ||
' rel_kind = ''S'' AND rel_group = ' || quote_literal(v_groupName) || ' AND' ||
' sequ_schema = rel_schema AND sequ_name = rel_tblseq' ||
' ORDER BY sequ_schema, sequ_name) TO ' || quote_literal(v_fileName) || ' ' ||
coalesce (v_copyOptions, '');
EXECUTE v_stmt;
IF v_noSuppliedLastMark THEN
-- no last mark has been supplied, suppress the just created mark
PERFORM emaj._delete_intermediate_mark_group(v_groupName, v_lastMark, mark_id, mark_time_id)
FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_lastMark;
END IF;
-- and create the file
EXECUTE 'COPY (' || v_stmt || ') TO ' || quote_literal(v_fileName) || ' ' || coalesce (v_copyOptions, '');
-- create the _INFO file to keep general information about the snap operation
EXECUTE 'COPY (SELECT ' ||
quote_literal('E-Maj log tables snap of group ' || v_groupName ||
Expand Down Expand Up @@ -2551,7 +2672,7 @@ $_gen_sql_groups$
-- if there is at least 1 group to process, go on
IF v_groupNames IS NOT NULL THEN
-- check that there is no tables without pkey
SELECT string_agg(rel_schema || '.' || rel_tblseq,', '), count(*) INTO v_tblList, v_count
SELECT string_agg(rel_schema || '.' || rel_tblseq,', ' ORDER BY rel_schema || '.' || rel_tblseq), count(*) INTO v_tblList, v_count
FROM pg_catalog.pg_class, pg_catalog.pg_namespace, emaj.emaj_relation
WHERE relnamespace = pg_namespace.oid
AND nspname = rel_schema AND relname = rel_tblseq
Expand Down

0 comments on commit d4817be

Please sign in to comment.