Permalink
Browse files

Implement --load-from-stdin

  • Loading branch information...
1 parent 38c8a07 commit cbd167c0e841c16fad6fdfcb907741cedd88f1e0 @dimitri committed Apr 15, 2010
Showing with 130 additions and 15 deletions.
  1. +1 −0 debian/changelog
  2. +5 −0 examples/pgloader.conf
  3. +8 −0 examples/stdin/stdin.data
  4. +4 −0 examples/stdin/stdin.sql
  5. +18 −0 pgloader.1.txt
  6. +73 −10 pgloader.py
  7. +3 −0 pgloader/options.py
  8. +18 −5 pgloader/reader.py
View
@@ -8,6 +8,7 @@ pgloader (2.3.3~dev2-1) unstable; urgency=low
* Have --debug show a traceback
* Fix a bug where pgloader would freeze on early error (no such file)
* Implement an option to set csv field size limit
+ * Implement --load-from-stdin
-- Dimitri Fontaine <dim@tapoueh.org> Sun, 4 Apr 2010 19:34:39 +0200
@@ -120,6 +120,11 @@ columns = *
fixed_specs = a:0:10, b:10:8, c:18:8, d:26:17
reformat = c:pgtime:time
+[stdin]
+table = stdin
+format = csv
+columns = *
+
[csv]
table = csv
format = csv
@@ -0,0 +1,8 @@
+1|first entry
+2|second one
+3|another
+4|still running
+5|well, some more
+6|antepenultima
+7|next to last
+8|hey, it's the last!
@@ -0,0 +1,4 @@
+CREATE TABLE stdin (
+ a integer,
+ b text
+);
View
@@ -216,6 +216,24 @@ Example: -o standard_conforming_strings=on -o client_encoding=utf8
Force +pgloader+ to use given version of psycopg, either +1+ or
+2+.
+== INTERNAL USAGE OPTIONS ==
+
+Those have been developped for internal +pgloader+ usage only, but still
+need to be documented. Also, they are maintained and you could find an usage
+for them.
+
+--load-from-stdin::
+
+ Consider standard input as the data file. When using this function,
+ either give a section name from which to apply all the setup except for
+ the +filename+ to load from, or use +--load-to-table+.
+
+--load-to-table::
+
+ This option's argument must be the name of the PostgreSQL table you're
+ loading the data to, it's useful when you want to load from +stdin+ and
+ avoid editing a full configuration section.
+
== GLOBAL CONFIGURATION SECTION ==
The configuration file has a +.ini+ file syntax, its first section has
View
@@ -143,6 +143,19 @@ def parse_options():
default = None,
help = "Force usage of given version of psycopg")
+ parser.add_option("--load-from-stdin", action = "store_true",
+ dest = "stdin",
+ default = False,
+ help = "Load standard input data into given table name")
+
+ parser.add_option("--load-to-table", dest = "table",
+ default = None,
+ help = "Load to given table when --load-from-stdin")
+
+ parser.add_option("--boundaries", dest = "boundaries",
+ default = None,
+ help = "Load only in given boundaries, Start..End")
+
(opts, args) = parser.parse_args()
if opts.version:
@@ -214,6 +227,9 @@ def parse_options():
pgloader.options.SECTION_THREADS = opts.section_threads
pgloader.options.MAX_PARALLEL_SECTIONS = opts.parallel
+ pgloader.options.LOAD_FROM_STDIN = opts.stdin
+ pgloader.options.LOAD_TO_TABLE = opts.table
+
if pgloader.options.MAX_PARALLEL_SECTIONS is None:
from pgloader.options import DEFAULT_MAX_PARALLEL_SECTIONS
pgloader.options.MAX_PARALLEL_SECTIONS = DEFAULT_MAX_PARALLEL_SECTIONS
@@ -279,6 +295,15 @@ def parse_options():
else:
pgloader.options.PSYCOPG_VERSION = opts.psycopg_version
+ if opts.boundaries:
+ try:
+ start, end = [int(x) for x in opts.boundaries.split("..")]
+ pgloader.options.FILE_BOUNDARIES = (start, end)
+ except ValueError, e:
+ print >>sys.stderr, \
+ "Error: boundaries should be an integer range written X..Y"
+ sys.exit(1)
+
return opts.config, args
def parse_config(conffile):
@@ -548,14 +573,43 @@ def load_data():
from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY
from pgloader.options import DRY_RUN, PEDANTIC, VACUUM
from pgloader.options import MAX_PARALLEL_SECTIONS
+ from pgloader.options import LOAD_FROM_STDIN, LOAD_TO_TABLE
+ from pgloader.options import FILE_BOUNDARIES
from pgloader.pgloader import PGLoader
from pgloader.tools import PGLoader_Error
sections = []
summary = {}
- # args are meant to be configuration sections, or filenames
- if len(args) > 0:
+ # args are meant to be configuration sections, or filenames, or stdin
+ if LOAD_FROM_STDIN:
+ if FILE_BOUNDARIES is not None:
+ log.warning("Can't use --boundaries on stdin")
+
+ if len(args) == 0:
+ s = '<stdin>'
+ config.add_section(s)
+ config.set(s, 'table', LOAD_TO_TABLE)
+ config.set(s, 'filename', 'sys.stdin')
+ config.set(s, 'columns', '*')
+ config.set(s, 'format', 'csv')
+ sections.append(s)
+
+ elif len(args) == 1:
+ if config.has_section(args[0]):
+ # apply given section parameters, then load from stdin
+ config.set(args[0], 'filename', 'sys.stdin')
+ sections.append(args[0])
+ else:
+ print >>sys.stderr, \
+ "Error: Please provide a [%s] section" % args[0]
+ sys.exit(5)
+ else:
+ print >>sys.stderr, \
+ "Error: can't read several sections all from stdin"
+ sys.exit(5)
+
+ elif len(args) > 0:
for s in args:
if config.has_section(s):
sections.append(s)
@@ -577,14 +631,21 @@ def load_data():
sections.append(s)
else:
- log.debug("No argument on CLI, will consider all sections")
- for s in config.sections():
- if s != 'pgsql':
- sections.append(s)
-
- # we run through sorted section list, unless we got the section list
- # from command line
- sections.sort()
+ if not LOAD_FROM_STDIN:
+ # don't load all sections first when asked to load stdin
+ log.debug("No argument on CLI, will consider all sections")
+ for s in config.sections():
+ if s != 'pgsql':
+ sections.append(s)
+
+ # we run through sorted section list, unless we got the section list
+ # from command line
+ sections.sort()
+
+ if FILE_BOUNDARIES is not None and len(sections) > 1:
+ print >>sys.stderr, \
+ "Error: will not apply boundaries on more than one file"
+ sys.exit(5)
log.info('Will consider following sections:')
for line in myprint(sections):
@@ -634,6 +695,8 @@ def load_data():
if loader:
if not loader.template:
+ if FILE_BOUNDARIES is not None and len(sections) == 1:
+ loader.reader.set_boundaries(FILE_BOUNDARIES)
filename = loader.filename
input_encoding = loader.input_encoding
threads[s] = loader
View
@@ -53,3 +53,6 @@
REJECT_LOG_FILE = '%s.rej.log'
REJECT_DATA_FILE = '%s.rej'
+
+LOAD_FROM_STDIN = None
+LOAD_TO_TABLE = None
View
@@ -2,6 +2,7 @@
#
# pgloader data reader interface and defaults
+import sys
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
@@ -192,9 +193,13 @@ def __init__(self, filename, log,
if self.encoding is not None:
try:
import codecs
- self.fd = codecs.open(self.filename,
- encoding = self.encoding,
- buffering = self.bufsize)
+ if self.filename == 'sys.stdin':
+ f = sys.stdin
+ else:
+ f = open(self.filename, self.mode, self.bufsize)
+
+ self.log.warning('PHOQUE "%s"', f)
+ self.fd = codecs.getreader(self.encoding)(f)
self.log.info("Opened '%s' with encoding '%s'" \
% (self.filename, self.encoding))
except LookupError, e:
@@ -206,7 +211,10 @@ def __init__(self, filename, log,
else:
try:
- self.fd = open(self.filename, mode, self.bufsize)
+ if self.filename == 'sys.stdin':
+ self.fd = sys.stdin
+ else:
+ self.fd = open(self.filename, mode, self.bufsize)
except IOError, error:
raise PGLoader_Error, error
@@ -240,7 +248,12 @@ def __iter__(self):
while line != '':
line = self.fd.readline()
self.line_nb += 1
- self.position = self.fd.tell()
+ try:
+ self.position = self.fd.tell()
+ except IOError, error:
+ #IOError: [Errno 29] Illegal seek --- when stdin reaches EOF
+ self.log.info(error)
+ return
##
# if -F is used, count lines to skip, and skip them

0 comments on commit cbd167c

Please sign in to comment.