Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for partitioned tables with inherited constraints #11

Open
jeenut27 opened this issue Mar 15, 2021 · 1 comment
Open

Support for partitioned tables with inherited constraints #11

jeenut27 opened this issue Mar 15, 2021 · 1 comment

Comments

@jeenut27
Copy link

jeenut27 commented Mar 15, 2021

In a particular use case, I would like to use this library to COPY data into a child partition of a table. Primary key is defined on the parent table and it is inherited by the child tables.
However, PostgreSQL doesn’t allow dropping inherited primary keys from the child partition. As a result, the code gives an error when primary key is added to the child table after copy, since the child table has inherited constrains which didn't drop in the drop step. Here is the full error traceback :-

InvalidTableDefinition                    Traceback (most recent call last)
~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1245                     self.dialect.do_execute(
-> 1246                         cursor, statement, parameters, context
   1247                     )

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    587     def do_execute(self, cursor, statement, parameters, context=None):
--> 588         cursor.execute(statement, parameters)
    589 

InvalidTableDefinition: multiple primary keys for table "forms_fz" are not allowed


The above exception was the direct cause of the following exception:

ProgrammingError                          Traceback (most recent call last)
<ipython-input-185-30a0987f282e> in <module>
      3 
      4 with db.engine.connect() as c:
----> 5   DataFrameCopy(forms_df, conn=c, table_obj=table_model).copy()
      6 

~/Documents/MyDev/pandas-to-postgres-0.0.4/pandas_to_postgres/copy_df.py in copy(self, functions)
     51             self.logger.info("All chunks copied ({} rows)".format(self.rows))
     52 
---> 53         self.create_pk()
     54         self.create_fks()
     55         self.analyze()

~/Documents/MyDev/pandas-to-postgres-0.0.4/pandas_to_postgres/_base_copy.py in create_pk(self)
     78         """Create primary key constraints on PostgreSQL table"""
     79         self.logger.info("Creating {} primary key".format(self.sql_table))
---> 80         self.conn.execute(AddConstraint(self.primary_key))
     81 
     82     def drop_fks(self):

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in execute(self, object_, *multiparams, **params)
    980             raise exc.ObjectNotExecutableError(object_)
    981         else:
--> 982             return meth(self, multiparams, params)
    983 
    984     def _execute_function(self, func, multiparams, params):

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/sql/ddl.py in _execute_on_connection(self, connection, multiparams, params)
     70 
     71     def _execute_on_connection(self, connection, multiparams, params):
---> 72         return connection._execute_ddl(self, multiparams, params)
     73 
     74     def execute(self, bind=None, target=None):

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_ddl(self, ddl, multiparams, params)
   1042             compiled,
   1043             None,
-> 1044             compiled,
   1045         )
   1046         if self._has_events or self.engine._has_events:

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1248         except BaseException as e:
   1249             self._handle_dbapi_exception(
-> 1250                 e, statement, parameters, cursor, context
   1251             )
   1252 

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1474                 util.raise_from_cause(newraise, exc_info)
   1475             elif should_wrap:
-> 1476                 util.raise_from_cause(sqlalchemy_exception, exc_info)
   1477             else:
   1478                 util.reraise(*exc_info)

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_from_cause(exception, exc_info)
    396     exc_type, exc_value, exc_tb = exc_info
    397     cause = exc_value if exc_value is not exception else None
--> 398     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    399 
    400 

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
    150             value.__cause__ = cause
    151         if value.__traceback__ is not tb:
--> 152             raise value.with_traceback(tb)
    153         raise value
    154 

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1244                 if not evt_handled:
   1245                     self.dialect.do_execute(
-> 1246                         cursor, statement, parameters, context
   1247                     )
   1248         except BaseException as e:

~/opt/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    586 
    587     def do_execute(self, cursor, statement, parameters, context=None):
--> 588         cursor.execute(statement, parameters)
    589 
    590     def do_execute_no_params(self, cursor, statement, context=None):

ProgrammingError: (psycopg2.errors.InvalidTableDefinition) multiple primary keys for table "forms_fz" are not allowed

[SQL: ALTER TABLE forms_fz ADD CONSTRAINT forms_fz_pkey PRIMARY KEY (foundation, instance_id)]
(Background on this error at: http://sqlalche.me/e/f405)```

@edodge
Copy link

edodge commented Mar 31, 2021

At IDinsight we're using partitioned tables to speed up incremental loads. When data in a partition needs to be updated we drop the whole partition, then use COPY to load the updated data into the partition. To make this easier, files in our data lake are partitioned on the same criteria as the table they're loaded into.

From what I've seen this is a common pattern in many data engineering pipelines and it would be valuable to add support for this approach on partitioned tables in pandas_to_postgres. (And hi @makmanalp 👋 )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants