Skip to content

Commit

Permalink
Fix formatting, add documentation and comments
Browse files Browse the repository at this point in the history
Based on review discussions.
  • Loading branch information
jasonmp85 committed Mar 10, 2015
1 parent 9a652f2 commit df73ddf
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 43 deletions.
78 changes: 53 additions & 25 deletions pg_shard--1.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -143,48 +143,76 @@ COMMENT ON FUNCTION sync_table_metadata_to_citus(text)

-- Prepares a specified distributed table to work with COPY operations by
-- creating a temporary table and trigger to handle rows coming from a COPY.
-- After each INSERT, a provided sequence is incremented to track the total -- number of copied rows.
CREATE FUNCTION prepare_distributed_table_for_copy(relation regclass, sequence regclass) RETURNS void
AS $pdtfc$
-- In essence, rows added by a COPY are turned into a series of INSERTS, which
-- are routed to the distributed table. Since pg_shard does not currently allow
-- COPY directly to the distributed table, this workaround permits COPY-based
-- workflows until full COPY support is added.
CREATE FUNCTION prepare_distributed_table_for_copy(relation regclass, sequence regclass)
RETURNS VOID
AS $prepare_distributed_table_for_copy$
DECLARE
table_name text;
temp_table_name text;
attr_names text[];
attr_list text;
param_list text;
using_list text;
insert_command text;
func_tmpl CONSTANT text := $$ CREATE FUNCTION pg_temp.copy_to_insert() RETURNS trigger
AS $cti$
BEGIN
EXECUTE %L USING %s;
PERFORM nextval(%L);
RETURN NULL;
END;
$cti$ LANGUAGE plpgsql VOLATILE;
$$;
-- templates to create dynamic functions, tables, and triggers
func_tmpl CONSTANT text := $$CREATE FUNCTION pg_temp.copy_to_insert()
RETURNS trigger
AS $copy_to_insert$
BEGIN
EXECUTE %L USING %s;
PERFORM nextval(%L);
RETURN NULL;
END;
$copy_to_insert$ LANGUAGE plpgsql;$$;
table_tmpl CONSTANT text := 'CREATE TEMPORARY TABLE %I (LIKE %s)';
trg_tmpl CONSTANT text := $$CREATE TRIGGER copy_to_insert
BEFORE INSERT ON %s
FOR EACH ROW EXECUTE PROCEDURE pg_temp.copy_to_insert()$$;
BEFORE INSERT ON %s FOR EACH ROW
EXECUTE PROCEDURE pg_temp.copy_to_insert()$$;
BEGIN
SELECT relname INTO STRICT table_name FROM pg_class WHERE oid = relation;
temp_table_name = format('%s_copy_facade', table_name);

SELECT array_agg(attname) INTO STRICT attr_names FROM pg_attribute WHERE attrelid = relation AND
attnum > 0 AND NOT attisdropped;
-- create name of temporary table using unqualified input table name
SELECT format('%s_copy_facade', relname)
INTO STRICT temp_table_name
FROM pg_class
WHERE oid = relation;

-- get list of all attributes in table, we'll need shortly
SELECT array_agg(attname)
INTO STRICT attr_names
FROM pg_attribute
WHERE attrelid = relation AND
attnum > 0 AND
NOT attisdropped;

-- build fully specified column list and USING clause from attr. names
SELECT string_agg(quote_ident(attr_name), ','),
string_agg(format('NEW.%I', attr_name), ',')
INTO STRICT attr_list, using_list FROM unnest(attr_names) AS attr_name;
SELECT string_agg('$' || param_num, ',') INTO param_list
FROM generate_series(1, array_length(attr_names, 1)) AS param_num;
INTO STRICT attr_list,
using_list
FROM unnest(attr_names) AS attr_name;

-- build ($1, $2, $3)-style VALUE list to bind parameters
SELECT string_agg('$' || param_num, ',')
INTO STRICT param_list
FROM generate_series(1, array_length(attr_names, 1)) AS param_num;

insert_command = format('INSERT INTO %s (%s) VALUES (%s)', relation, attr_list, param_list);
-- finally, use the above lists to generate appropriate INSERT command
insert_command = format('INSERT INTO %s (%s) VALUES (%s)', relation, attr_list,
param_list);

-- use the command to make one-off trigger targeting specified relation
EXECUTE format(func_tmpl, insert_command, using_list, sequence);

-- create a temporary table exactly like the target relation...
EXECUTE format(table_tmpl, temp_table_name, relation);

-- ... and install the trigger on that temporary table
EXECUTE format(trg_tmpl, temp_table_name::regclass);
END;
$pdtfc$ LANGUAGE plpgsql SET search_path = 'pg_catalog';
$prepare_distributed_table_for_copy$ LANGUAGE plpgsql SET search_path = 'pg_catalog';

COMMENT ON FUNCTION prepare_distributed_table_for_copy(relation regclass,
sequence regclass)
IS 'set up session to allow distributed table to ingest rows from COPY commands';
82 changes: 64 additions & 18 deletions scripts/copy_to_distributed_table.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,43 @@
#!/bin/sh

# default values for certain options
format='text'
options='OIDS false, FREEZE false'
schema='public'

while getopts ':BCc:d:e:Hh:n:q:T' o; do
# we'll append to this string to build options list
options='OIDS false, FREEZE false'

# outputs usage message on specified device before exiting with provided status
usage() {
cat << 'E_O_USAGE' >&"$2"
usage: copy_to_distributed_table [-BCTHh] [-c encoding] [-d delimiter]
[-e escape] [-n null] [-q quote] [-s schema] filename tablename
B : use binary format for input
C : use CSV format for input
T : use text format for input
H : specifies file contains header line to be ignored
h : print this help message
c : specifies file is encoded using `encoding`
Default: the current client encoding
d : specifies the character used to separate columns
Default: a tab character in text format, a comma in CSV format
e : specifies the character used to escape quotes
Default: the same as the `quote` value (quotes within data are doubled)
n : specifies the string that represents a null value
Default: \\N in text format, an unquoted empty string in CSV format
q : specifies the quoting character to be used when a data value is quoted
Default: double-quote
s : specifies the schema in which the target table resides
Default: 'public'
E_O_USAGE

exit $1;
}

# process flags
while getopts ':BCc:d:e:Hhn:q:T' o; do
case "${o}" in
B)
format='binary'
Expand All @@ -28,7 +61,7 @@ while getopts ':BCc:d:e:Hh:n:q:T' o; do
options="${options}, HEADER true"
;;
h)
usage
usage 0 1 # normal status, STDOUT
;;
n)
null=`echo ${OPTARG} | sed s/\'/\'\'/g`
Expand All @@ -44,52 +77,65 @@ while getopts ':BCc:d:e:Hh:n:q:T' o; do
T)
format='text'
;;
*)
usage
;;
esac
*)
echo "$0: illegal option -- ${OPTARG}" >&2
usage 64 2 # EX_USAGE, STDERR
;;
esac
done
shift $((OPTIND-1))

# append format to options and extract file/table names
options="${options}, FORMAT ${format}"
filename=$1
tablename=$2

# exit if filename or tablename are missing
if [ -z "${filename}" ] || [ -z "${tablename}" ]; then
usage
echo "$0: filename and tablename are required" >&2
usage 64 2 # EX_USAGE, STDERR
fi

# TODO: check whether filename exists

# escape single quotes in file/table name and build facade name
filename=`echo ${filename} | sed s/\'/\'\'/g`
facadename=`echo "${tablename}_copy_facade" | sed s/\"/\"\"/g`
tablename=`echo ${tablename} | sed s/\'/\'\'/g`

# invoke psql, ignoring .psqlrc and passing the following heredoc
psql -X << E_O_SQL
-- only print values, left-align them, and don't rollback or stop on error
\set QUIET on
\set ON_ERROR_ROLLBACK off
\pset format unaligned
\pset tuples_only on
\set ON_ERROR_STOP on
-- squelch all output until COPY completes
\o /dev/null
/*
* Use a session-bound counter to keep track of the number of rows inserted: we
* can't roll back so we need to tell the user how many rows were inserted. Due
* to the trigger implementation, the COPY will report zero rows, so we count
* them manually for a better experience.
*/
CREATE TEMPORARY SEQUENCE rows_inserted CACHE 100000;
-- Use a session-bound counter to keep track of the number of rows inserted: we
-- can't roll back so we need to tell the user how many rows were inserted. Due
-- to the trigger implementation, the COPY will report zero rows, so we count
-- them manually for a better experience.
CREATE TEMPORARY SEQUENCE rows_inserted MINVALUE 0 CACHE 100000;
-- initialize counter to zero
SELECT nextval('rows_inserted');
-- prepare the distributed table for copy
SELECT prepare_distributed_table_for_copy('${schema}.${tablename}', 'rows_inserted');
/* Don't stop if copy errors out: continue to print file name and row count*/
-- don't stop if copy errors out: continue to print file name and row count
\set ON_ERROR_STOP off
\copy pg_temp.${facadename} from ${filename} with ($options)
\copy pg_temp.${facadename} from stdin with ($options)
-- reconnect STDOUT to display row count
\o
SELECT currval('rows_inserted');
E_O_SQL

0 comments on commit df73ddf

Please sign in to comment.