Skip to content

Commit

Permalink
#65 replace DELETE by UPDATE for PostgresInsertOutput
Browse files Browse the repository at this point in the history
  • Loading branch information
justb4 committed Jan 22, 2018
1 parent 7dab85b commit 07dc49b
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions stetl/outputs/dboutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,17 @@ def write(self, packet):

class PostgresInsertOutput(PostgresDbOutput):
"""
Output by inserting single record into Postgres database.
Input is a record (Python dic structure) or a Python list of dicts (records).
Output by inserting a single record in a Postgres database table.
Input is a Stetl record (Python dict structure) or a list of records.
Creates an INSERT for Postgres to insert each single record.
When the "replace" parameter is True, any existing record keyed by "key" is
attempted to be deleted first.
attempted to be UPDATEd first.
NB a constraint is that each record needs to contain all values as
an INSERT query is built once for the columns in the first record.
NB a constraint is that the first and each subsequent each record needs to contain
all values as an INSERT and UPDATE query template is built once for the columns
in the first record.
consumes=FORMAT.record
consumes=[FORMAT.record_array, FORMAT.record]
"""

# Start attribute config meta
Expand Down Expand Up @@ -127,6 +128,7 @@ def key(self):
def __init__(self, configdict, section, consumes=FORMAT.record):
DbOutput.__init__(self, configdict, section, consumes=[FORMAT.record_array, FORMAT.record])
self.query = None
self.update_query = None
self.db = None

def init(self):
Expand All @@ -149,14 +151,31 @@ def create_query(self, record):
log.info('query is %s', query)
return query

def create_update_query(self, record):
# We assume that all records do the same UPDATE key/values
# https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/6527838#6527838
# e.g. UPDATE table SET field='C', field2='Z' WHERE id=3;
query = "UPDATE %s SET (%s) = (%s) WHERE %s = %s" % (
self.cfg.get('table'), ",".join(['%s ' % k for k in record]), ",".join(["%s", ] * len(record.keys())), self.key, "%s")
log.info('update query is %s', query)
return query

def insert(self, record):
res = 0
if self.replace and self.key and self.key in record:
# Try to delete (replace option)
del_query = "DELETE FROM %s WHERE %s = '%s'" % (self.cfg.get('table'), self.key, record[self.key])
self.db.execute(del_query)

# Do insert with values from the record dict
self.db.execute(self.query, record.values())
# Replace option: try UPDATE if existing
# https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/6527838#6527838
values = record.values()
values.append(record[self.key])
res = self.db.execute(self.update_query, values)
# del_query = "DELETE FROM %s WHERE %s = '%s'" % (self.cfg.get('table'), self.key, record[self.key])
# res = self.db.execute(del_query)

if res < 1:
# Do insert with values from the record dict
# only if we did not do an UPDATE (res==0) on existing record.
self.db.execute(self.query, record.values())
self.db.commit(close=False)

def write(self, packet):
Expand All @@ -174,10 +193,13 @@ def write(self, packet):
if type(record) is list and len(record) > 0:
first_record = record[0]

# Create query once
# Create INSERT and optional UPDATE query-templates once
if self.query is None:
self.query = self.create_query(first_record)

if self.replace and self.key and not self.update_query:
self.update_query = self.create_update_query(first_record)

# Check if record is single (dict) or array (list of dict)
if type(record) is dict:
# Do insert with values from the single record
Expand Down

0 comments on commit 07dc49b

Please sign in to comment.