Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HiveTap is not concurrency safe. #14

Closed
stephanh opened this issue Nov 13, 2014 · 7 comments
Closed

HiveTap is not concurrency safe. #14

stephanh opened this issue Nov 13, 2014 · 7 comments

Comments

@stephanh
Copy link
Contributor

As part of our testing we are writing to two HivePartitionTaps in parallel locally. This will fail with the stack trace below because it will try to create the same database in parallel.

Our current fix is to catch that particular exception around the createDatabase call. We also get task failures https://github.com/CommBank/ebenezer/issues/34 caused by a similar issue. I don't really have a good fix for this problem.

We are probably going to wrap the createDatabase call in the following try catch block to enable our tests to work:

 if ( !( ex.getCause() != null &&
            ex.getCause().getCause() != null &&
            ex.getCause().getCause().getMessage().startsWith( "The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE'") ) )
            {
            throw ex;
            }

The stack trace:

java.lang.Exception: cascading.CascadingException: java.io.IOException: MetaException(message:javax.jdo.JDODataStoreException: Exception thrown flushing changes to datastore
NestedThrowables:
java.sql.BatchUpdateException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: cascading.CascadingException: java.io.IOException: MetaException(message:javax.jdo.JDODataStoreException: Exception thrown flushing changes to datastore
NestedThrowables:
java.sql.BatchUpdateException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.)
    at cascading.tap.hive.HivePartitionTap$HivePartitionCollector.closeCollector(HivePartitionTap.java:88)
    at cascading.tap.partition.BasePartitionTap$PartitionCollector.close(BasePartitionTap.java:186)
    at cascading.flow.stream.SinkStage.cleanup(SinkStage.java:120)
    at cascading.flow.stream.StreamGraph.cleanup(StreamGraph.java:176)
    at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:155)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)
Caused by: java.io.IOException: MetaException(message:javax.jdo.JDODataStoreException: Exception thrown flushing changes to datastore
NestedThrowables:
java.sql.BatchUpdateException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.)
    at cascading.tap.hive.HiveTap.createHiveTable(HiveTap.java:169)
    at cascading.tap.hive.HiveTap.registerPartition(HiveTap.java:322)
    at cascading.tap.hive.HivePartitionTap$HivePartitionCollector.closeCollector(HivePartitionTap.java:84)
    ... 13 more
Caused by: MetaException(message:javax.jdo.JDODataStoreException: Exception thrown flushing changes to datastore
NestedThrowables:
java.sql.BatchUpdateException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:602)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
    at com.sun.proxy.$Proxy8.create_database(Unknown Source)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:414)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
    at com.sun.proxy.$Proxy9.createDatabase(Unknown Source)
    at cascading.tap.hive.HiveTap.createHiveTable(HiveTap.java:159)
    ... 15 more
Caused by: javax.jdo.JDODataStoreException: Exception thrown flushing changes to datastore
NestedThrowables:
java.sql.BatchUpdateException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.
    at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:451)
    at org.datanucleus.api.jdo.JDOTransaction.commit(JDOTransaction.java:165)
    at org.apache.hadoop.hive.metastore.ObjectStore.commitTransaction(ObjectStore.java:345)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hive.metastore.RetryingRawStore.invoke(RetryingRawStore.java:111)
    at com.sun.proxy.$Proxy7.commitTransaction(Unknown Source)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database_core(HiveMetaStore.java:563)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:591)
    ... 29 more
Caused by: java.sql.BatchUpdateException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.
    at org.apache.derby.impl.jdbc.EmbedStatement.executeBatch(Unknown Source)
    at org.apache.commons.dbcp.DelegatingStatement.executeBatch(DelegatingStatement.java:297)
    at org.apache.commons.dbcp.DelegatingStatement.executeBatch(DelegatingStatement.java:297)
    at org.datanucleus.store.rdbms.ParamLoggingPreparedStatement.executeBatch(ParamLoggingPreparedStatement.java:372)
    at org.datanucleus.store.rdbms.SQLController.processConnectionStatement(SQLController.java:628)
    at org.datanucleus.store.rdbms.SQLController.processStatementsForConnection(SQLController.java:596)
    at org.datanucleus.store.rdbms.SQLController$1.transactionFlushed(SQLController.java:683)
    at org.datanucleus.store.connection.AbstractManagedConnection.transactionFlushed(AbstractManagedConnection.java:86)
    at org.datanucleus.store.connection.ConnectionManagerImpl$2.transactionFlushed(ConnectionManagerImpl.java:454)
    at org.datanucleus.TransactionImpl.flush(TransactionImpl.java:199)
    at org.datanucleus.TransactionImpl.commit(TransactionImpl.java:263)
    at org.datanucleus.api.jdo.JDOTransaction.commit(JDOTransaction.java:98)
    ... 38 more
Caused by: java.sql.SQLIntegrityConstraintViolationException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.
    at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source)
    at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
    at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source)
    at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source)
    at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
    at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)
    at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source)
    at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeBatchElement(Unknown Source)
    ... 50 more
Caused by: java.sql.SQLException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.
    at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
    at org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown Source)
    ... 58 more
Caused by: ERROR 23505: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUE_DATABASE' defined on 'DBS'.
    at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
    at org.apache.derby.impl.sql.execute.IndexChanger.insertAndCheckDups(Unknown Source)
    at org.apache.derby.impl.sql.execute.IndexChanger.doInsert(Unknown Source)
    at org.apache.derby.impl.sql.execute.IndexChanger.insert(Unknown Source)
    at org.apache.derby.impl.sql.execute.IndexSetChanger.insert(Unknown Source)
    at org.apache.derby.impl.sql.execute.RowChangerImpl.insertRow(Unknown Source)
    at org.apache.derby.impl.sql.execute.InsertResultSet.normalInsertCore(Unknown Source)
    at org.apache.derby.impl.sql.execute.InsertResultSet.open(Unknown Source)
    at org.apache.derby.impl.sql.GenericPreparedStatement.executeStmt(Unknown Source)
    at org.apache.derby.impl.sql.GenericPreparedStatement.execute(Unknown Source)
    ... 52 more
@stephanh stephanh changed the title HiveTap is not thread safe. HiveTap is not concurrency safe. Nov 13, 2014
@fs111
Copy link
Contributor

fs111 commented Nov 14, 2014

Doing the SQL error message check will not work, since the MetaStore can use a different DB backend like mysql: https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin#AdminManualMetastoreAdmin-RemoteMetastoreDatabase

If your tables do not exist, you can call createRessource() on the HivePartitionTap before running the flow, which wont work if SinkMode == REPLACE, since it will delete the table again before starting the flow. A structural way to solve it would be to inject a FlowListener that reacts to onStarting and makes sure, the DB is registered in the MetaStore, before anything else happens. I have to experiment with that a bit.

@stephanh
Copy link
Contributor Author

Agreed, this "fix" leaves a lot to be desired. In our particular case this issues comes up when we do unit testing where we run jobs locally against Hive's in memory derby db. Hive has the ability to retry metastore commands/connections. However, at least when running this locally with our version of Hive (0.10-cdh4.6) this has another bug around mismatch between started and closed transactions which causes another failure.

This issue will also occur when running on the cluster. However, in that case I think both the Hive retrying connections and MR restarting tasks cause to overall job to succeed. On the cluster we mostly get failed tasks around https://github.com/CommBank/ebenezer/issues/34 which I think is caused by trying to register the same partition at the same time.

Do you think you will be able to address both the issue of creating the same database only once across multiple taps and creating each partition only once accross multiple MR tasks without changes to cascading?

@fs111
Copy link
Contributor

fs111 commented Nov 19, 2014

What we could do, is simply catch the Exception and retry, which is somewhat shaky, but if we want to support hive all the way back to 0.10, we will have to live with some sub-optimal solutions. I am going to implement that and then you can give it a few spins in your environment.

@fs111
Copy link
Contributor

fs111 commented Nov 19, 2014

I did some more research and since 0.13 the hive metastore has a locking API, which would solve all those problems: https://issues.apache.org/jira/browse/HIVE-5843

Since you guys are still on 0.10 and you are a big user of cascading-hive, I don't want to drop support for that just yet, but we should consider moving away from such old versions to get the new lock support and make this less brittle.

@stephanh
Copy link
Contributor Author

Agreeded. We are hoping to move to CDH5.2 late this year or early next year which will have Hive 0.13.

@fs111
Copy link
Contributor

fs111 commented Mar 9, 2015

I'd like to open this discussion again. Now that Hive 1.0.0 is out, we should start thinking about dropping support for the ancient Hive versions w/o lock support. Would that work for you guys?

@fs111
Copy link
Contributor

fs111 commented Nov 30, 2015

stale issue. closing. If still a problem, please re-open.

@fs111 fs111 closed this as completed Nov 30, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants