Permalink
Browse files

Merge branch 'trunk' of github.com:impetus-opensource/Kundera into trunk

  • Loading branch information...
2 parents 4876541 + 7cd1eed commit b923cf5f0aee7c2b61ae93c0ee855e1e4e5658f1 @kkmishra kkmishra committed Jan 11, 2013
Showing with 1,813 additions and 257 deletions.
  1. +13 −0 kundera-cassandra/src/main/java/com/impetus/client/cassandra/CassandraClientBase.java
  2. +7 −3 kundera-cassandra/src/test/java/com/impetus/client/crud/EntityTransactionTest.java
  3. +2 −0 kundera-core/src/main/java/com/impetus/kundera/PersistenceProperties.java
  4. +2 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/AbstractEntityReader.java
  5. +110 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/Coordinator.java
  6. +151 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/DefaultTransactionResource.java
  7. +17 −57 kundera-core/src/main/java/com/impetus/kundera/persistence/EntityManagerImpl.java
  8. +147 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/KunderaEntityTransaction.java
  9. +64 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/KunderaTransactionException.java
  10. +130 −39 kundera-core/src/main/java/com/impetus/kundera/persistence/PersistenceDelegator.java
  11. +37 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/TransactionBinder.java
  12. +70 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/TransactionResource.java
  13. +5 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/api/Batcher.java
  14. +23 −16 kundera-core/src/main/java/com/impetus/kundera/persistence/context/FlushManager.java
  15. +5 −0 kundera-core/src/main/java/com/impetus/kundera/persistence/jta/KunderaJTAUserTransaction.java
  16. +14 −0 kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClient.java
  17. +14 −0 kundera-mongo/src/main/java/com/impetus/client/mongodb/MongoDBClient.java
  18. +49 −21 kundera-neo4j/src/main/java/com/impetus/client/neo4j/GraphEntityMapper.java
  19. +25 −0 kundera-neo4j/src/main/java/com/impetus/client/neo4j/config/IndexMode.java
  20. +48 −0 kundera-neo4j/src/main/java/com/impetus/client/neo4j/index/AutoIndexing.java
  21. +417 −93 kundera-redis/src/main/java/com/impetus/client/redis/RedisClient.java
  22. +18 −24 kundera-redis/src/main/java/com/impetus/client/redis/RedisClientFactory.java
  23. +128 −0 kundera-redis/src/main/java/com/impetus/client/redis/RedisTransaction.java
  24. +312 −0 kundera-redis/src/test/java/com/impetus/client/RedisTransactionTest.java
  25. +1 −0 kundera-redis/src/test/resources/META-INF/persistence.xml
  26. +4 −4 pom.xml
View
13 kundera-cassandra/src/main/java/com/impetus/client/cassandra/CassandraClientBase.java
@@ -1211,6 +1211,19 @@ public int getBatchSize()
return batchSize;
}
+ /* (non-Javadoc)
+ * @see com.impetus.kundera.persistence.api.Batcher#clear()
+ */
+ public void clear()
+ {
+ if(nodes != null)
+ {
+ nodes.clear();
+ nodes=null;
+ nodes = new ArrayList<Node>();
+ }
+ }
+
/*
* (non-Javadoc)
*
View
10 kundera-cassandra/src/test/java/com/impetus/client/crud/EntityTransactionTest.java
@@ -96,9 +96,8 @@ public void setUp() throws Exception
public void onRollback() throws IOException, TException, InvalidRequestException, UnavailableException,
TimedOutException, SchemaDisagreementException
{
-// em.setFlushMode(FlushModeType.COMMIT);
- em.getTransaction().begin();
+ em.getTransaction().begin();
Object p1 = prepareData("1", 10);
Object p2 = prepareData("2", 20);
Object p3 = prepareData("3", 15);
@@ -108,6 +107,9 @@ public void onRollback() throws IOException, TException, InvalidRequestException
// roll back.
em.getTransaction().rollback();
+
+ em.getTransaction().begin();
+
PersonCassandra p = findById(PersonCassandra.class, "1", em);
Assert.assertNull(p);
@@ -139,7 +141,6 @@ public void onRollback() throws IOException, TException, InvalidRequestException
public void onCommit() throws IOException, TException, InvalidRequestException, UnavailableException,
TimedOutException, SchemaDisagreementException
{
-// em.setFlushMode(FlushModeType.COMMIT);
em.getTransaction().begin();
@@ -156,11 +157,14 @@ public void onCommit() throws IOException, TException, InvalidRequestException,
PersonCassandra p = findById(PersonCassandra.class, "1", em);
Assert.assertNotNull(p);
+ em.getTransaction().begin();
+
((PersonCassandra) p2).setPersonName("rollback");
em.merge(p2);
// roll back, should roll back person name for p2!
em.getTransaction().rollback();
+
p = findById(PersonCassandra.class, "1", em);
Assert.assertNotNull(p);
View
2 kundera-core/src/main/java/com/impetus/kundera/PersistenceProperties.java
@@ -87,4 +87,6 @@
/** Location where datastore file is stored */
public static final String KUNDERA_DATASTORE_FILE_PATH = "kundera.datastore.file.path";
+
+ public static final String KUNDERA_TRANSACTION_RESOURCE="kundera.transaction.resource.class";
}
View
2 kundera-core/src/main/java/com/impetus/kundera/persistence/AbstractEntityReader.java
@@ -157,6 +157,8 @@ public Object recursivelyFindEntities(Object entity, Map<String, Object> relatio
PersistenceCacheManager.addEntityToPersistenceCache(entity, pd, entityId);
//For M-M relationship of Collection type, relationship entities are always fetched from Join Table.
+
+
if(relation.getPropertyType().isAssignableFrom(Collection.class) || relation.getPropertyType().isAssignableFrom(Set.class))
{
associationBuilder.populateRelationFromJoinTable(entity, m, pd, relation);
View
110 kundera-core/src/main/java/com/impetus/kundera/persistence/Coordinator.java
@@ -0,0 +1,110 @@
+/*******************************************************************************
+ * * Copyright 2012 Impetus Infotech.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ ******************************************************************************/
+
+package com.impetus.kundera.persistence;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.impetus.kundera.persistence.KunderaEntityTransaction.TxAction;
+import com.impetus.kundera.persistence.TransactionResource.Response;
+
+/**
+ * @author vivek
+ *
+ */
+class Coordinator
+{
+
+// private List<TransactionResource> txResources = new ArrayList<TransactionResource>();
+
+ private Map<String,TransactionResource> txResources = new HashMap<String,TransactionResource>();
+
+
+ public Coordinator()
+ {
+
+ }
+
+ void addResource(TransactionResource resource, final String pu)
+ {
+ txResources.put(pu, resource);
+ }
+
+ TransactionResource getResource (final String pu)
+ {
+ return txResources.get(pu);
+ }
+
+ Response coordinate(TxAction action)
+ {
+ Response response = Response.YES;
+ switch (action)
+ {
+ case BEGIN:
+ for (TransactionResource res : txResources.values())
+ {
+ res.onBegin();
+ }
+ break;
+
+ case PREPARE:
+
+ // TODO:: need to handle case of two phase commit, in case of
+ // polyglot persistence.
+
+ for (TransactionResource res : txResources.values())
+ {
+ res.prepare();
+ }
+ break;
+
+ case COMMIT:
+
+ for (TransactionResource res : txResources.values())
+ {
+ res.onCommit();
+ }
+ break;
+
+ case ROLLBACK:
+ for (TransactionResource res : txResources.values())
+ {
+ res.onRollback();
+ }
+
+ break;
+
+ default:
+ throw new IllegalArgumentException("Invalid transaction action : " + action);
+ }
+
+ return response;
+ }
+
+ boolean isTransactionActive()
+ {
+ for (TransactionResource res : txResources.values())
+ {
+ if(res.isActive())
+ {
+ return true;
+ }
+
+ }
+ return false;
+ }
+}
View
151 kundera-core/src/main/java/com/impetus/kundera/persistence/DefaultTransactionResource.java
@@ -0,0 +1,151 @@
+/*******************************************************************************
+ * * Copyright 2012 Impetus Infotech.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ ******************************************************************************/
+
+package com.impetus.kundera.persistence;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.impetus.kundera.client.Client;
+import com.impetus.kundera.graph.Node;
+import com.impetus.kundera.persistence.api.Batcher;
+
+/**
+ * Default transaction implementation for databases who does not support
+ * transactions. This can only ensure ATOMICITY out of ACID properties.
+ *
+ * @author vivek.mishra
+ */
+public class DefaultTransactionResource implements TransactionResource
+{
+
+ private boolean isActive;
+
+ private Client client;
+
+ /** The Constant log. */
+ private static final Log log = LogFactory.getLog(DefaultTransactionResource.class);
+
+
+ private List<Node> nodes = new ArrayList<Node>();
+
+
+ public DefaultTransactionResource(Client client)
+ {
+ this.client = client;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.impetus.kundera.persistence.TransactionResource#onBegin()
+ */
+ @Override
+ public void onBegin()
+ {
+ isActive = true;
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.impetus.kundera.persistence.TransactionResource#onCommit()
+ */
+ @Override
+ public void onCommit()
+ {
+ onFlush();
+ isActive = false;
+ }
+
+ /* (non-Javadoc)
+ * @see com.impetus.kundera.persistence.TransactionResource#onFlush()
+ */
+ public void onFlush()
+ {
+ for(Node node : nodes)
+ {
+ node.flush();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.impetus.kundera.persistence.TransactionResource#onRollback()
+ */
+ @Override
+ public void onRollback()
+ {
+ onBatchRollBack();
+
+ nodes.clear();
+ nodes=null;
+ nodes = new ArrayList<Node>();
+ isActive = false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.impetus.kundera.persistence.TransactionResource#prepare()
+ */
+ @Override
+ public Response prepare()
+ {
+ return Response.YES;
+ }
+
+ /**
+ *
+ * @param node
+ * @param events
+ */
+ void syncNode(Node node)
+ {
+ nodes.add(node);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.impetus.kundera.persistence.TransactionResource#isActive()
+ */
+ @Override
+ public boolean isActive()
+ {
+ return isActive;
+ }
+
+
+ /**
+ * In case of rollback, clear added batch, if any.
+ */
+ private void onBatchRollBack()
+ {
+ if(client instanceof Batcher)
+ {
+ ((Batcher) client).clear();
+ }
+
+ }
+
+}
View
74 kundera-core/src/main/java/com/impetus/kundera/persistence/EntityManagerImpl.java
@@ -47,7 +47,6 @@
import com.impetus.kundera.cache.Cache;
import com.impetus.kundera.metadata.model.ApplicationMetadata;
import com.impetus.kundera.metadata.model.KunderaMetadata;
-import com.impetus.kundera.persistence.context.FlushManager;
import com.impetus.kundera.persistence.context.PersistenceCache;
import com.impetus.kundera.persistence.jta.KunderaJTAUserTransaction;
import com.impetus.kundera.query.KunderaTypedQuery;
@@ -58,7 +57,7 @@
*
* @author animesh.kumar
*/
-public class EntityManagerImpl implements EntityManager, EntityTransaction, ResourceManager
+public class EntityManagerImpl implements EntityManager, ResourceManager
{
/** The Constant log. */
@@ -90,10 +89,10 @@
private PersistenceCache persistenceCache;
- FlushManager flushStackManager;
-
- UserTransaction utx;
-
+ private UserTransaction utx;
+
+ private EntityTransaction entityTransaction;
+
/**
* Instantiates a new entity manager impl.
*
@@ -117,7 +116,8 @@ public EntityManagerImpl(EntityManagerFactory factory, PersistenceUnitTransactio
}
this.persistenceContextType = persistenceContextType;
this.transactionType = transactionType;
-
+ this.entityTransaction = new KunderaEntityTransaction(this);
+
logger.debug("Created EntityManager for persistence unit : " + getPersistenceUnit());
}
@@ -144,10 +144,16 @@ private void onLookUp(PersistenceUnitTransactionType transactionType)
throw new KunderaException("Please bind [" + KunderaJTAUserTransaction.class.getName()
+ "] for :{java:comp/UserTransaction} lookup" + utx.getClass());
+ }
+
+ if(((KunderaJTAUserTransaction)utx).isTransactionInProgress())
+ {
+ entityTransaction.begin();
}
this.setFlushMode(FlushModeType.COMMIT);
((KunderaJTAUserTransaction) utx).setImplementor(this);
+
}
catch (NamingException e)
{
@@ -547,7 +553,7 @@ public final EntityTransaction getTransaction()
{
throw new IllegalStateException("A JTA EntityManager cannot use getTransaction()");
}
- return this;
+ return this.entityTransaction;
}
/*
@@ -901,7 +907,7 @@ private EntityManagerSession getSession()
*
* @return the persistence delegator
*/
- private PersistenceDelegator getPersistenceDelegator()
+ PersistenceDelegator getPersistenceDelegator()
{
return persistenceDelegator;
}
@@ -922,10 +928,8 @@ public PersistenceContextType getPersistenceContextType()
@Override
public void doCommit()
{
-
checkClosed();
- persistenceDelegator.commit();
-
+ this.entityTransaction.commit();
}
/*
@@ -937,51 +941,7 @@ public void doCommit()
public void doRollback()
{
checkClosed();
- persistenceDelegator.rollback();
- }
-
- // ///////////////////////////////////////////////////////////////////////
- /** Methods from {@link EntityTransaction} interface */
- // ///////////////////////////////////////////////////////////////////////
-
- @Override
- public void begin()
- {
- persistenceDelegator.begin();
- }
-
- @Override
- public void commit()
- {
- doCommit();
- }
-
- @Override
- public boolean getRollbackOnly()
- {
- if (!isActive())
- {
- throw new IllegalStateException("No active transaction found");
- }
- return persistenceDelegator.getRollbackOnly();
- }
-
- @Override
- public void setRollbackOnly()
- {
- persistenceDelegator.setRollbackOnly();
- }
-
- @Override
- public boolean isActive()
- {
- return isOpen() && persistenceDelegator.isActive();
- }
-
- @Override
- public void rollback()
- {
- doRollback();
+ this.entityTransaction.rollback();
}
/**
View
147 kundera-core/src/main/java/com/impetus/kundera/persistence/KunderaEntityTransaction.java
@@ -0,0 +1,147 @@
+/*******************************************************************************
+ * * Copyright 2012 Impetus Infotech.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ ******************************************************************************/
+
+package com.impetus.kundera.persistence;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityTransaction;
+
+/**
+ * Class implements <code>EntityTransaction </code> interface. It implements
+ * begin/commit/roll back and other methods.
+ *
+ * @author vivek.mishra
+ *
+ */
+public class KunderaEntityTransaction implements EntityTransaction
+{
+ private EntityManager entityManager;
+
+ private Coordinator coordinator;
+
+ private Boolean rollbackOnly;
+
+ enum TxAction
+ {
+ BEGIN, COMMIT, ROLLBACK, PREPARE;
+ }
+
+ public KunderaEntityTransaction(EntityManager entityManager)
+ {
+ this.entityManager = entityManager;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.persistence.EntityTransaction#begin()
+ */
+ @Override
+ public void begin()
+ {
+
+ if (isActive())
+ {
+ throw new IllegalStateException("Transaction is already active");
+ }
+ else
+ {
+ this.coordinator = ((EntityManagerImpl) entityManager).getPersistenceDelegator().getCoordinator();
+ ((EntityManagerImpl) entityManager).getPersistenceDelegator().begin(); // transaction de-marcation.
+ this.coordinator.coordinate(TxAction.BEGIN);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.persistence.EntityTransaction#commit()
+ */
+ @Override
+ public void commit()
+ {
+ if (!getRollbackOnly())
+ {
+ onTransaction(TxAction.COMMIT);
+ ((EntityManagerImpl) entityManager).getPersistenceDelegator().commit();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.persistence.EntityTransaction#getRollbackOnly()
+ */
+ @Override
+ public boolean getRollbackOnly()
+ {
+ if (isActive())
+ {
+ return rollbackOnly != null ? rollbackOnly : false;
+ }
+ else
+ {
+ throw new IllegalStateException("No transaction in progress");
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.persistence.EntityTransaction#isActive()
+ */
+ @Override
+ public boolean isActive()
+ {
+ return (coordinator != null && coordinator.isTransactionActive());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.persistence.EntityTransaction#rollback()
+ */
+ @Override
+ public void rollback()
+ {
+ onTransaction(TxAction.ROLLBACK);
+ ((EntityManagerImpl) entityManager).getPersistenceDelegator().rollback();
+ }
+
+ private void onTransaction(TxAction action)
+ {
+ if (isActive())
+ {
+ coordinator.coordinate(action);
+ }
+ else
+ {
+ throw new IllegalStateException("No transaction in progress");
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.persistence.EntityTransaction#setRollbackOnly()
+ */
+ @Override
+ public void setRollbackOnly()
+ {
+ this.rollbackOnly =true;
+ }
+
+}
View
64 kundera-core/src/main/java/com/impetus/kundera/persistence/KunderaTransactionException.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * * Copyright 2012 Impetus Infotech.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ ******************************************************************************/
+package com.impetus.kundera.persistence;
+
+import com.impetus.kundera.KunderaException;
+
+/**
+ * @author amresh
+ *
+ */
+public class KunderaTransactionException extends KunderaException
+{
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -7672192031279208722L;
+
+ /**
+ *
+ */
+ public KunderaTransactionException()
+ {
+ }
+
+ /**
+ * @param arg0
+ */
+ public KunderaTransactionException(String arg0)
+ {
+ super(arg0);
+ }
+
+ /**
+ * @param arg0
+ */
+ public KunderaTransactionException(Throwable arg0)
+ {
+ super(arg0);
+ }
+
+ /**
+ * @param arg0
+ * @param arg1
+ */
+ public KunderaTransactionException(String arg0, Throwable arg1)
+ {
+ super(arg0, arg1);
+ }
+
+}
View
169 kundera-core/src/main/java/com/impetus/kundera/persistence/PersistenceDelegator.java
@@ -38,6 +38,7 @@
import org.apache.commons.logging.LogFactory;
import com.impetus.kundera.KunderaException;
+import com.impetus.kundera.PersistenceProperties;
import com.impetus.kundera.client.Client;
import com.impetus.kundera.client.ClientPropertiesSetter;
import com.impetus.kundera.client.ClientResolver;
@@ -53,6 +54,7 @@
import com.impetus.kundera.lifecycle.states.TransientState;
import com.impetus.kundera.metadata.KunderaMetadataManager;
import com.impetus.kundera.metadata.model.EntityMetadata;
+import com.impetus.kundera.metadata.model.PersistenceUnitMetadata;
import com.impetus.kundera.metadata.model.attributes.AbstractAttribute;
import com.impetus.kundera.persistence.api.Batcher;
import com.impetus.kundera.persistence.context.CacheBase;
@@ -110,6 +112,7 @@
private boolean enableFlush;
+ private Coordinator coordinator;
/**
* Instantiates a new persistence delegator.
*
@@ -161,10 +164,8 @@ public void persist(Object e)
{
// build flush stack.
- flushManager.buildFlushStack(node, EventType.INSERT);
+ flushManager.buildFlushStack(node, com.impetus.kundera.persistence.context.EventLog.EventType.INSERT);
- // TODO : push into action queue.
- // Action/ExecutionQueue/ActivityQueue :-> id, name, EndPoint,
flush();
@@ -377,8 +378,8 @@ public void remove(Object e)
public void flush()
{
// Get flush stack from Flush Manager
- if (applyFlush())
- {
+// if (applyFlush())
+// {
FlushStack fs = flushManager.getFlushStack();
// Flush each node in flush stack from top to bottom unit it's empty
@@ -408,7 +409,13 @@ public void flush()
}
else if (flushMode.equals(FlushModeType.AUTO) || enableFlush)
{
- node.flush();
+ if(isTransactionInProgress && defaultTransactionSupported(metadata.getPersistenceUnit()))
+ {
+ onSynchronization(node, metadata);
+ } else
+ {
+ node.flush();
+ }
}
// Update Link value for all nodes attached to this one
@@ -443,11 +450,10 @@ else if (flushMode.equals(FlushModeType.AUTO) || enableFlush)
flushJoinTableData();
// performed,
}
- // clear it.
}
- }
+// }
}
public <E> E merge(E e)
@@ -807,21 +813,6 @@ public void rollback()
getPersistenceCache().clean();
}
- public boolean getRollbackOnly()
- {
- return false;
- }
-
- public void setRollbackOnly()
- {
-
- }
-
- public boolean isActive()
- {
- return isTransactionInProgress;
- }
-
/**
* Populates client specific properties.
*
@@ -903,38 +894,138 @@ private void execute()
*/
private void flushJoinTableData()
{
- Map<String, JoinTableData> joinTableDataMap = flushManager.getJoinTableDataMap();
- for (JoinTableData jtData : joinTableDataMap.values())
+ if (applyFlush())
{
- EntityMetadata m = KunderaMetadataManager.getEntityMetadata(jtData.getEntityClass());
- Client client = getClient(m);
-
- if (OPERATION.INSERT.equals(jtData.getOperation()))
+// Map<String, JoinTableData> joinTableDataMap = flushManager.getJoinTableData();
+ for (JoinTableData jtData : flushManager.getJoinTableData())
{
- client.persistJoinTable(jtData);
- }
- else if (OPERATION.DELETE.equals(jtData.getOperation()))
- {
- for (Object pk : jtData.getJoinTableRecords().keySet())
+ if (!jtData.isProcessed())
{
- client.deleteByColumn(jtData.getSchemaName(), jtData.getJoinTableName(),
- ((AbstractAttribute) m.getIdAttribute()).getJPAColumnName(), pk);
+ EntityMetadata m = KunderaMetadataManager.getEntityMetadata(jtData.getEntityClass());
+ Client client = getClient(m);
+ /*
+ * if (isTransactionInProgress &&
+ * defaultTransactionSupported(m.getPersistenceUnit())) {
+ * DefaultTransactionResource resource =
+ * (DefaultTransactionResource) coordinator.getResource(m
+ * .getPersistenceUnit()); resource.syncJoinTable(jtData); }
+ * else
+ */if (OPERATION.INSERT.equals(jtData.getOperation()))
+ {
+ client.persistJoinTable(jtData);
+ jtData.setProcessed(true);
+ }
+ else if (OPERATION.DELETE.equals(jtData.getOperation()))
+ {
+ for (Object pk : jtData.getJoinTableRecords().keySet())
+ {
+ client.deleteByColumn(jtData.getSchemaName(), jtData.getJoinTableName(),
+ ((AbstractAttribute) m.getIdAttribute()).getJPAColumnName(), pk);
+ }
+ jtData.setProcessed(true);
+ }
}
+
}
- jtData.setProcessed(true);
}
- joinTableDataMap.clear(); // All Join table operation
}
- /**
+/**
* Returns true, if flush mode is AUTO and not running within transaction ||
* running within transaction and commit is invoked.
*
* @return boolean value.
*/
private boolean applyFlush()
{
+// return true;
return (!isTransactionInProgress && flushMode.equals(FlushModeType.AUTO)) || enableFlush;
}
+ /**
+ * Returns transaction coordinator.
+ * @return
+ */
+ Coordinator getCoordinator()
+ {
+ coordinator = new Coordinator();
+ try
+ {
+ for (String pu : clientMap.keySet())
+ {
+ PersistenceUnitMetadata puMetadata = KunderaMetadataManager.getPersistenceUnitMetadata(pu);
+
+ String txResource = puMetadata.getProperty(PersistenceProperties.KUNDERA_TRANSACTION_RESOURCE);
+
+ if (txResource != null)
+ {
+
+ TransactionResource resource = (TransactionResource) Class.forName(txResource).newInstance();
+ coordinator.addResource(resource, pu);
+ Client client = clientMap.get(pu);
+
+ if(! (client instanceof TransactionBinder))
+ {
+ throw new KunderaTransactionException(
+ "Client : "
+ + client.getClass()
+ + " must implement TransactionBinder interface, if {kundera.transaction.resource.class} property provided!");
+ } else
+ {
+ ((TransactionBinder)client).bind(resource);
+ }
+ }
+ else
+ {
+ coordinator.addResource(new DefaultTransactionResource(clientMap.get(pu)), pu);
+ }
+ }
+ }
+ catch (InstantiationException e)
+ {
+ log.error("Error while initializing Transaction Resource:", e);
+ throw new KunderaTransactionException(e);
+ }
+ catch (IllegalAccessException e)
+ {
+ log.error("Error while initializing Transaction Resource:", e);
+ throw new KunderaTransactionException(e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ log.error("Error while initializing Transaction Resource:", e);
+ throw new KunderaTransactionException(e);
+ }
+
+ return coordinator;
+ }
+
+
+ /**
+ * If transaction is in progress and user explicitly invokes em.flush()!
+ *
+ * @param node data node
+ * @param metadata entity metadata.
+ */
+ private void onSynchronization(Node node, EntityMetadata metadata)
+ {
+ DefaultTransactionResource resource = (DefaultTransactionResource) coordinator.getResource(metadata.getPersistenceUnit());
+ if(enableFlush)
+ {
+ resource.onFlush();
+ } else
+ {
+ resource.syncNode(node/*, flushManager.getEvents(node.getNodeId())*/);
+ }
+ }
+
+
+ public boolean defaultTransactionSupported(final String persistenceUnit)
+ {
+ PersistenceUnitMetadata puMetadata = KunderaMetadataManager.getPersistenceUnitMetadata(persistenceUnit);
+
+ String txResource = puMetadata.getProperty(PersistenceProperties.KUNDERA_TRANSACTION_RESOURCE);
+
+ return txResource == null;
+ }
}
View
37 kundera-core/src/main/java/com/impetus/kundera/persistence/TransactionBinder.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * * Copyright 2012 Impetus Infotech.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ ******************************************************************************/
+
+package com.impetus.kundera.persistence;
+
+/**
+ * TransactionBinder interface. If underlying database provides in-built transaction support, client has to implement this interface and
+ * bind transaction resource with client.
+ *
+ * @author vivek.mishra
+ *
+ */
+public interface TransactionBinder
+{
+
+ /**
+ * Binds a transaction resource with client instance. Any subsequent CRUD calls will use this transaction resource to mark bind within
+ * already running transaction boundary. TransactionResource is responsible to bind and provide connection instance with for subsequent
+ * commit/rollback.
+ *
+ * @param resource transactional resource
+ */
+ void bind(TransactionResource resource);
+}
View
70 kundera-core/src/main/java/com/impetus/kundera/persistence/TransactionResource.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * * Copyright 2012 Impetus Infotech.
+ * *
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ ******************************************************************************/
+package com.impetus.kundera.persistence;
+
+/**
+ * TransactionResource interface will delegate begin/commit/rollback to client and each client will implement this interface provided transaction
+ * support is required for corresponding database.
+ * If underlying database supports transaction then it will
+ *
+ * @author vivek.mishra
+ *
+ */
+public interface TransactionResource
+{
+
+ /**
+ * On begin transactions.
+ */
+ void onBegin();
+ /**
+ * On commit transactions.
+ */
+ void onCommit();
+ /**
+ * On rollback transactions.
+ */
+ void onRollback();
+
+ /**
+ * On intermediate flush, when explicitly flush is invoked by em.flush()!
+ */
+ void onFlush();
+
+ /**
+ * On prepare for two phase commit.
+ *
+ * @return response, returns YES if it is ready for commit
+ */
+ Response prepare();
+
+ /**
+ * Returns true if transaction is active else false.
+ *
+ * @return boolean true.
+ */
+ boolean isActive();
+
+ /**
+ *
+ * Response enum
+ *
+ */
+ enum Response
+ {
+ YES,NO;
+ }
+}
View
5 kundera-core/src/main/java/com/impetus/kundera/persistence/api/Batcher.java
@@ -47,5 +47,10 @@
* @return batch size as integer
*/
int getBatchSize();
+
+ /**
+ * In case user asked for
+ */
+ void clear();
}
View
39 kundera-core/src/main/java/com/impetus/kundera/persistence/context/FlushManager.java
@@ -15,10 +15,12 @@
*/
package com.impetus.kundera.persistence.context;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -59,7 +61,7 @@
* Key -> Name of Join Table Value -> records to be persisted in the join
* table
*/
- private Map<String, JoinTableData> joinTableDataMap;
+ private List<JoinTableData> joinTableDataCollection = new ArrayList<JoinTableData>();
/** The event log queue. */
private EventLogQueue eventLogQueue = new EventLogQueue();
@@ -73,7 +75,7 @@
public FlushManager()
{
flushStack = new FlushStack();
- joinTableDataMap = new HashMap<String, JoinTableData>();
+// joinTableDataMap = new HashMap<String, JoinTableData>();
}
/**
@@ -207,7 +209,7 @@ else if (node.getCurrentNodeState().getClass().equals(RemovedState.class))
operation = OPERATION.DELETE;
}
- addJoinTableDataIntoMap(operation, jtmd.getJoinTableSchema(), jtmd.getJoinTableName(),
+ addJoinTableData(operation, jtmd.getJoinTableSchema(), jtmd.getJoinTableName(),
joinColumnName, inverseJoinColumnName, node.getDataClass(), entityId, childValues);
}
}
@@ -325,9 +327,9 @@ public void setFlushStack(FlushStack flushStack)
*
* @return the joinTableDataMap
*/
- public Map<String, JoinTableData> getJoinTableDataMap()
+ public List<JoinTableData> getJoinTableData()
{
- return joinTableDataMap;
+ return joinTableDataCollection;
}
/**
@@ -341,9 +343,9 @@ public void clearFlushStack()
flushStack.clear();
// flushStack = null;
}
- if (joinTableDataMap != null && !joinTableDataMap.isEmpty())
+ if (joinTableDataCollection != null && !joinTableDataCollection.isEmpty())
{
- joinTableDataMap.clear();
+ joinTableDataCollection.clear();
// joinTableDataMap = null;
}
@@ -446,7 +448,8 @@ private void onRollBack(PersistenceDelegator delegator, Map<Object, EventLog> ev
Class clazz = node.getDataClass();
EntityMetadata metadata = KunderaMetadataManager.getEntityMetadata(clazz);
Client client = delegator.getClient(metadata);
- if (node.isProcessed())
+
+ if (node.isProcessed() /*&& delegator.defaultTransactionSupported(metadata.getPersistenceUnit())*/)
{
if (node.getOriginalNode() == null)
{
@@ -493,23 +496,23 @@ private void onRollBack(PersistenceDelegator delegator, Map<Object, EventLog> ev
* @param invJoinColumnValues
* the inv join column values
*/
- private void addJoinTableDataIntoMap(OPERATION operation, String schemaName, String joinTableName,
+ private void addJoinTableData(OPERATION operation, String schemaName, String joinTableName,
String joinColumnName, String invJoinColumnName, Class<?> entityClass, Object joinColumnValue,
Set<Object> invJoinColumnValues)
{
- JoinTableData joinTableData = joinTableDataMap.get(joinTableName);
+/* JoinTableData joinTableData = joinTableDataCollection.get(joinTableName);
if (joinTableData == null)
{
- joinTableData = new JoinTableData(operation, schemaName, joinTableName, joinColumnName, invJoinColumnName,
+*/ JoinTableData joinTableData = new JoinTableData(operation, schemaName, joinTableName, joinColumnName, invJoinColumnName,
entityClass);
joinTableData.addJoinTableRecord(joinColumnValue, invJoinColumnValues);
- joinTableDataMap.put(joinTableName, joinTableData);
- }
+ joinTableDataCollection.add(joinTableData);
+/* }
else
{
joinTableData.addJoinTableRecord(joinColumnValue, invJoinColumnValues);
}
-
+*/
}
/**
@@ -530,8 +533,8 @@ private void logEvent(Node node, EventType eventType)
private void rollbackJoinTableData(PersistenceDelegator delegator)
{
// on deleting join table data.
- Map<String, JoinTableData> joinTableDataMap = getJoinTableDataMap();
- for (JoinTableData jtData : joinTableDataMap.values())
+// Map<String, JoinTableData> joinTableDataMap = getJoinTableDataMap();
+ for (JoinTableData jtData : joinTableDataCollection)
{
if (jtData.isProcessed())
{
@@ -554,6 +557,10 @@ else if (OPERATION.DELETE.equals(jtData.getOperation()))
}
}
}
+ joinTableDataCollection.clear();
+ joinTableDataCollection = null;
+ joinTableDataCollection = new ArrayList<JoinTableData>();
+
}
}
View
5 ...era-core/src/main/java/com/impetus/kundera/persistence/jta/KunderaJTAUserTransaction.java
@@ -276,4 +276,9 @@ public Reference getReference() throws NamingException
{
return UserTransactionFactory.getReference(this);
}
+
+ public boolean isTransactionInProgress()
+ {
+ return isTransactionInProgress;
+ }
}
View
14 kundera-hbase/src/main/java/com/impetus/client/hbase/HBaseClient.java
@@ -623,6 +623,20 @@ public int getBatchSize()
return batchSize;
}
+ /* (non-Javadoc)
+ * @see com.impetus.kundera.persistence.api.Batcher#clear()
+ */
+ @Override
+ public void clear()
+ {
+ if(nodes != null)
+ {
+ nodes.clear();
+ nodes=null;
+ nodes = new ArrayList<Node>();
+ }
+ }
+
/*
* (non-Javadoc)
*
View
14 kundera-mongo/src/main/java/com/impetus/client/mongodb/MongoDBClient.java
@@ -543,6 +543,20 @@ public int getBatchSize()
return batchSize;
}
+ /* (non-Javadoc)
+ * @see com.impetus.kundera.persistence.api.Batcher#clear()
+ */
+ @Override
+ public void clear()
+ {
+ if(nodes != null)
+ {
+ nodes.clear();
+ nodes=null;
+ nodes = new ArrayList<Node>();
+ }
+ }
+
/*
* (non-Javadoc)
*
View
70 kundera-neo4j/src/main/java/com/impetus/client/neo4j/GraphEntityMapper.java
@@ -36,18 +36,19 @@
import org.neo4j.graphdb.index.UniqueFactory;
import com.impetus.client.neo4j.index.AutoIndexing;
+import com.impetus.kundera.PersistenceProperties;
import com.impetus.kundera.db.RelationHolder;
import com.impetus.kundera.metadata.KunderaMetadataManager;
import com.impetus.kundera.metadata.model.EntityMetadata;
import com.impetus.kundera.metadata.model.KunderaMetadata;
import com.impetus.kundera.metadata.model.MetamodelImpl;
+import com.impetus.kundera.metadata.model.PersistenceUnitMetadata;
import com.impetus.kundera.metadata.model.attributes.AbstractAttribute;
-import com.impetus.kundera.persistence.AbstractEntityReader;
import com.impetus.kundera.persistence.EntityReaderException;
import com.impetus.kundera.property.PropertyAccessorHelper;
/**
- * Responsible for converting Neo4J graph (nodes+relationships) to JPA entities and vice versa
+ * Responsible for converting Neo4J graph (nodes+relationships) into JPA entities and vice versa
* @author amresh.singh
*/
public class GraphEntityMapper
@@ -67,7 +68,7 @@ public Node fromEntity(Object entity, List<RelationHolder> relations, GraphDatab
m.getPersistenceUnit());
EntityType entityType = metaModel.entity(m.getEntityClazz());
- //Iterate over, entity attributes
+ //Iterate over entity attributes
Set<Attribute> attributes = entityType.getAttributes();
for(Attribute attribute : attributes)
{
@@ -98,33 +99,47 @@ else if(attribute.isCollection() && attribute.isAssociation())
Class<?> valueClass = mapAttribute.getBindableJavaType(); //Class for adjoining node object
Class<?> keyClass = mapAttribute.getKeyJavaType(); //Class for relationship object
+ DynamicRelationshipType relType = DynamicRelationshipType.withName(mapKeyJoinColumnAnn.name());
+
+ EntityMetadata childMetadata = KunderaMetadataManager.getEntityMetadata(valueClass);
+
for(Object key : mapObject.keySet())
{
- //Construct Adjoining Node
- Object value = mapObject.get(key);
- EntityMetadata childMetadata = KunderaMetadataManager.getEntityMetadata(valueClass);
- Node childNode = fromEntity(value, null, graphDb, childMetadata);
- //Connect adjoining node through relationships
- DynamicRelationshipType relType = DynamicRelationshipType.withName(mapKeyJoinColumnAnn.name());
- Relationship relationship = node.createRelationshipTo(childNode, relType);
-
- //Set relations's own attributes into it
- for(Field f : keyClass.getDeclaredFields())
+ //If child entities are meant for Neo4J, Create "Real" nodes.
+ if(isEntityForNeo4J(childMetadata))
{
- if(! f.getType().equals(valueClass) && !f.getType().equals(m.getEntityClazz()))
+ //Construct Adjoining Node
+ Object value = mapObject.get(key);
+ Node childNode = fromEntity(value, null, graphDb, childMetadata);
+
+ //Connect adjoining node through relationships
+ Relationship relationship = node.createRelationshipTo(childNode, relType);
+
+ //Set relations's own attributes into it
+ for(Field f : keyClass.getDeclaredFields())
+ {
+ if(! f.getType().equals(valueClass) && !f.getType().equals(m.getEntityClazz()))
+ {
+ String relPropertyName = f.getAnnotation(Column.class) != null
+ ? f.getAnnotation(Column.class).name() : f.getName();
+ relationship.setProperty(relPropertyName, PropertyAccessorHelper.getObject(key, f));
+ }
+ }
+
+ //TODO: If relationship auto-indexing is disabled, manually index this relationship
+ if(! autoIndexing.isRelationshipAutoIndexingEnabled(graphDb))
{
- String relPropertyName = f.getAnnotation(Column.class) != null
- ? f.getAnnotation(Column.class).name() : f.getName();
- relationship.setProperty(relPropertyName, PropertyAccessorHelper.getObject(key, f));
+
}
+
}
- //TODO: If relationship auto-indexing is disabled, manually index this relationship
- if(! autoIndexing.isRelationshipAutoIndexingEnabled(graphDb))
+ //Otherwise, create "Proxy" nodes
+ else
{
-
- }
+ //TODO: Write code for create proxy nodes, as part of Polyglot implementation
+ }
}
}
@@ -234,5 +249,18 @@ protected void initialize(Relationship relationship, Map<String,Object> properti
return factory.getOrCreate(idFieldName, id);
}
+
+ private boolean isEntityForNeo4J(EntityMetadata entityMetadata)
+ {
+ String persistenceUnit = entityMetadata.getPersistenceUnit();
+ PersistenceUnitMetadata puMetadata = KunderaMetadataManager.getPersistenceUnitMetadata(persistenceUnit);
+ String clientFactory = puMetadata.getProperty(PersistenceProperties.KUNDERA_CLIENT_FACTORY);
+ if(clientFactory.indexOf("com.impetus.client.neo4j") > 0)
+ {
+ return true;
+ }
+ return false;
+ }
+
}
View
25 kundera-neo4j/src/main/java/com/impetus/client/neo4j/config/IndexMode.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright 2012 Impetus Infotech.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.impetus.client.neo4j.config;
+
+/**
+ * Different index modes available in Neo4J
+ * @author amresh.singh
+ */
+public enum IndexMode
+{
+ MANUAL, AUTO
+}
View
48 kundera-neo4j/src/main/java/com/impetus/client/neo4j/index/AutoIndexing.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2012 Impetus Infotech.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.impetus.client.neo4j.index;
+
+import java.util.Set;
+
+import org.neo4j.graphdb.GraphDatabaseService;
+
+/**
+ * Provides functionalities specific to Auto-indexing
+ * @author amresh.singh
+ */
+public class AutoIndexing
+{
+ public boolean isNodeAutoIndexingEnabled(GraphDatabaseService graphDb)
+ {
+ return graphDb.index().getNodeAutoIndexer().isEnabled();
+ }
+
+ public boolean isRelationshipAutoIndexingEnabled(GraphDatabaseService graphDb)
+ {
+ return graphDb.index().getRelationshipAutoIndexer().isEnabled();
+ }
+
+ public Set<String> getNodeIndexedProperties(GraphDatabaseService graphDb)
+ {
+ return graphDb.index().getNodeAutoIndexer().getAutoIndexedProperties();
+ }
+
+ public Set<String> getRelationsipIndexedProperties(GraphDatabaseService graphDb)
+ {
+ return graphDb.index().getRelationshipAutoIndexer().getAutoIndexedProperties();
+ }
+
+}
View
510 kundera-redis/src/main/java/com/impetus/client/redis/RedisClient.java
@@ -19,6 +19,7 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -35,8 +36,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import redis.clients.jedis.BinaryJedis;
+import redis.clients.jedis.BinaryTransaction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Response;
+import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisConnectionException;
import com.impetus.client.redis.RedisQueryInterpreter.Clause;
@@ -56,6 +61,9 @@
import com.impetus.kundera.metadata.model.PersistenceUnitMetadata;
import com.impetus.kundera.metadata.model.attributes.AbstractAttribute;
import com.impetus.kundera.persistence.EntityReader;
+import com.impetus.kundera.persistence.KunderaTransactionException;
+import com.impetus.kundera.persistence.TransactionBinder;
+import com.impetus.kundera.persistence.TransactionResource;
import com.impetus.kundera.persistence.api.Batcher;
import com.impetus.kundera.persistence.context.jointable.JoinTableData;
import com.impetus.kundera.property.PropertyAccessorFactory;
@@ -66,7 +74,7 @@
*
* @author vivek.mishra
*/
-public class RedisClient extends ClientBase implements Client<RedisQuery>, Batcher, ClientPropertiesSetter
+public class RedisClient extends ClientBase implements Client<RedisQuery>, Batcher, ClientPropertiesSetter, TransactionBinder
{
/**
* Reference to redis client factory.
@@ -79,6 +87,8 @@
/** list of nodes for batch processing. */
private List<Node> nodes = new ArrayList<Node>();
+
+ private TransactionResource resource;
/** batch size. */
private int batchSize;
@@ -108,9 +118,15 @@
@Override
protected void onPersist(EntityMetadata entityMetadata, Object entity, Object id, List<RelationHolder> rlHolders)
{
- Jedis connection = getConnection();
+ Object connection = getConnection();
// Create a hashset and populate data into it
- Pipeline pipeLine = connection.pipelined();
+
+ Pipeline pipeLine = null;
+ if(resource == null)
+ {
+ pipeLine = ((Jedis) connection).pipelined();
+ }
+
try
{
@@ -119,8 +135,11 @@ protected void onPersist(EntityMetadata entityMetadata, Object entity, Object id
finally
{
//
- pipeLine.sync(); // send I/O.. as persist call. so no need to read
- // response?
+ if(pipeLine != null)
+ {
+ pipeLine.sync(); // send I/O.. as persist call. so no need to read
+ } // response?
+
onCleanup(connection);
}
@@ -136,7 +155,7 @@ private double getDouble(String valueAsStr)
public Object find(Class entityClass, Object key)
{
Object result = null;
- Jedis connection = getConnection();
+ Object connection = getConnection();
try
{
result = fetch(entityClass, key, connection, null);
@@ -177,7 +196,7 @@ public Object find(Class entityClass, Object key)
* @throws IllegalAccessException
* throws in case of runtime exception
*/
- private Object fetch(Class clazz, Object key, Jedis connection, byte[][] fields) throws InstantiationException,
+ private Object fetch(Class clazz, Object key, Object connection, byte[][] fields) throws InstantiationException,
IllegalAccessException
{
Object result;
@@ -206,7 +225,18 @@ private Object fetch(Class clazz, Object key, Jedis connection, byte[][] fields)
// IF it is for selective columns
if (fields != null)
{
- List<byte[]> fieldValues = connection.hmget(getEncodedBytes(hashKey), fields);
+ List<byte[]> fieldValues = null;
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction)connection).hmget(getEncodedBytes(hashKey), fields);
+ ((Transaction) connection).exec();
+
+ fieldValues = (List<byte[]>) response.get();
+
+ } else
+ {
+ fieldValues = ((Jedis)connection).hmget(getEncodedBytes(hashKey), fields);
+ }
if (fieldValues != null && !fieldValues.isEmpty())
{
@@ -218,7 +248,7 @@ private Object fetch(Class clazz, Object key, Jedis connection, byte[][] fields)
}
else
{
- columns = connection.hgetAll(getEncodedBytes(hashKey));
+ columns = getColumns(connection, hashKey, columns);
}
// Map<byte[], byte[]>
result = unwrap(entityMetadata, columns, key);
@@ -233,6 +263,29 @@ private Object fetch(Class clazz, Object key, Jedis connection, byte[][] fields)
return result;
}
+ private Map<byte[], byte[]> getColumns(Object connection, String hashKey, Map<byte[], byte[]> columns)
+ {
+ if(resource != null && resource.isActive())
+ {
+ // Why transaction API returns response in byte[] format/?
+ Response response = ((Transaction) connection).hgetAll(getEncodedBytes(hashKey));
+ ((Transaction) connection).exec();
+ Map<String,String> cols = (Map<String, String>) response.get();
+
+ if (cols != null)
+ {
+ for (String name : cols.keySet())
+ {
+ columns.put(getEncodedBytes(name), getEncodedBytes(cols.get(name)));
+ }
+ }
+ } else
+ {
+ columns = ((Jedis) connection).hgetAll(getEncodedBytes(hashKey));
+ }
+ return columns;
+ }
+
/*
* (non-Javadoc)
*
@@ -242,8 +295,7 @@ private Object fetch(Class clazz, Object key, Jedis connection, byte[][] fields)
@Override
public <E> List<E> findAll(Class<E> entityClass, Object... keys)
{
- Jedis connection = getConnection();
- connection.pipelined();
+ Object connection = getConnection();
List results = new ArrayList();
try
{
@@ -295,18 +347,24 @@ public void close()
@Override
public void delete(Object entity, Object pKey)
{
- Jedis connection = null;
- connection = getConnection();
- Pipeline pipeLine = connection.pipelined();
-
+ Object connection =getConnection();
+ Pipeline pipeLine = null;
+ if(resource == null)
+ {
+ pipeLine = ((Jedis) connection).pipelined();
+ }
try
{
onDelete(entity, pKey, connection);
}
finally
{
- pipeLine.sync();
+ if(pipeLine != null)
+ {
+ pipeLine.sync();
+ }
+
onCleanup(connection);
}
}
@@ -321,15 +379,23 @@ public void delete(Object entity, Object pKey)
* @param rowKey
* row key.
*/
- private void deleteRelation(Jedis connection, EntityMetadata entityMetadata, String rowKey)
+ private void deleteRelation(Object connection, EntityMetadata entityMetadata, String rowKey)
{
List<String> relations = entityMetadata.getRelationNames();
if (relations != null)
{
for (String relation : relations)
{
- connection.hdel(getHashKey(entityMetadata.getTableName(), rowKey), relation);
+ if(resource != null && resource.isActive())
+ {
+ ((Transaction) connection).hdel(getHashKey(entityMetadata.getTableName(), rowKey), relation);
+
+ } else
+ {
+ ((Jedis) connection).hdel(getHashKey(entityMetadata.getTableName(), rowKey), relation);
+
+ }
}
}
@@ -343,7 +409,8 @@ public void persistJoinTable(JoinTableData joinTableData)
String joinColumn = joinTableData.getJoinColumnName();
Map<Object, Set<Object>> joinTableRecords = joinTableData.getJoinTableRecords();
- Jedis connection = null;
+ Object connection = null;
+ Pipeline pipeline = null;
/**
* Example: join table : PERSON_ADDRESS join column : PERSON_ID (1_p)
* inverse join column : ADDRESS_ID (1_a) store in REDIS:
@@ -353,7 +420,10 @@ public void persistJoinTable(JoinTableData joinTableData)
try
{
connection = getConnection();
- Pipeline pipeline = connection.pipelined();
+ if(resource == null)
+ {
+ pipeline = ((Jedis) connection).pipelined();
+ }
Set<Object> joinKeys = joinTableRecords.keySet();
for (Object joinKey : joinKeys)
{
@@ -377,19 +447,36 @@ public void persistJoinTable(JoinTableData joinTableData)
// field
// add to hash table.
- connection.hmset(getEncodedBytes(redisKey), redisFields);
- // add index
- connection.zadd(getHashKey(tableName, inverseJoinKeyAsStr), getDouble(inverseJoinKeyAsStr),
- redisKey);
- connection.zadd(getHashKey(tableName, joinKeyAsStr), getDouble(joinKeyAsStr), redisKey);
+
+ if(resource != null && resource.isActive())
+ {
+ ((Transaction) connection).hmset(getEncodedBytes(redisKey), redisFields);
+ // add index
+ ((Transaction) connection).zadd(getHashKey(tableName, inverseJoinKeyAsStr), getDouble(inverseJoinKeyAsStr),
+ redisKey);
+ ((Transaction) connection).zadd(getHashKey(tableName, joinKeyAsStr), getDouble(joinKeyAsStr), redisKey);
+
+ } else
+ {
+ ((Jedis) connection).hmset(getEncodedBytes(redisKey), redisFields);
+ // add index
+ ((Jedis) connection).zadd(getHashKey(tableName, inverseJoinKeyAsStr), getDouble(inverseJoinKeyAsStr),
+ redisKey);
+ ((Jedis) connection).zadd(getHashKey(tableName, joinKeyAsStr), getDouble(joinKeyAsStr), redisKey);
+
+ }
redisFields.clear();
}
}
- pipeline.sync();
+
}
finally
{
+ if(pipeline != null)
+ {
+ pipeline.sync();
+ }
onCleanup(connection);
}
@@ -404,25 +491,46 @@ public void persistJoinTable(JoinTableData joinTableData)
public <E> List<E> getColumnsById(String schemaName, String tableName, String pKeyColumnName, String columnName,
Object pKeyColumnValue)
{
- Jedis connection = null;
+ Object connection = null;
List results = new ArrayList();
try
{
connection = getConnection();
- Pipeline pipeLine = connection.pipelined();
String valueAsStr = PropertyAccessorHelper.getString(pKeyColumnValue);
Double score = getDouble(valueAsStr);
- Set<String> resultKeys = connection.zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
+
+ Set<String> resultKeys = null;
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction) connection).zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
+ ((Transaction) connection).exec();
+// ((Transaction) connection).zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
+ resultKeys = (Set<String>) response.get();
+
+ } else
+ {
+
+ resultKeys = ((Jedis) connection).zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
+ }
for (String hashKey : resultKeys)
{
- List columnValues = connection.hmget(hashKey, columnName);
+ List columnValues = null;
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction) connection).hmget(hashKey, columnName);
+ ((Transaction) connection).exec();
+
+ columnValues = (List) response.get();
+ } else
+ {
+ columnValues = ((Jedis) connection).hmget(hashKey, columnName);
+ }
- pipeLine.syncAndReturnAll();
if (columnValues != null && !columnValues.isEmpty())
{
results.addAll(columnValues); // Currently returning list of
@@ -446,15 +554,28 @@ public void persistJoinTable(JoinTableData joinTableData)
public Object[] findIdsByColumn(String schemaName, String tableName, String pKeyName, String columnName,
Object columnValue, Class entityClazz)
{
- Jedis connection = null;
+ Object connection = null;
try
{
connection = getConnection();
String valueAsStr = PropertyAccessorHelper.getString(columnValue);
- Set<String> results = connection.zrangeByScore(getHashKey(tableName, columnName), getDouble(valueAsStr),
- getDouble(valueAsStr));
+ Set<String> results = null;
+
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction) connection).zrangeByScore(getHashKey(tableName, columnName), getDouble(valueAsStr),
+ getDouble(valueAsStr));
+ ((Transaction) connection).exec();
+
+ results = (Set<String>) response.get();
+ } else
+ {
+ results = ((Jedis) connection).zrangeByScore(getHashKey(tableName, columnName), getDouble(valueAsStr),
+ getDouble(valueAsStr));
+
+ }
if (results != null)
{
return results.toArray(new Object[0]);
@@ -472,42 +593,79 @@ public void persistJoinTable(JoinTableData joinTableData)
@Override
public void deleteByColumn(String schemaName, String tableName, String columnName, Object columnValue)
{
- Jedis connection = null;
+ Object connection = null;
+ Pipeline pipeLine = null;
try
{
+
connection = getConnection();
- Pipeline pipeLine = connection.pipelined();
+
+ if(resource == null)
+ {
+ pipeLine = ((Jedis) connection).pipelined();
+ }
+
String valueAsStr = PropertyAccessorHelper.getString(columnValue);
Double score = getDouble(valueAsStr);
- Set<String> results = connection.zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
+ Set<String> results = null;
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction) connection).zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
+ ((Transaction) connection).exec();
+
+ results = (Set<String>) response.get();
+ } else
+ {
+ results = ((Jedis) connection).zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
+ }
+// Set<String> results = connection.zrangeByScore(getHashKey(tableName, valueAsStr), score, score);
if (results != null)
{
for (String rowKey : results)
{
// byte[] hashKey = getEncodedBytes(getHashKey(tableName,
// rowKey));
- Map<byte[], byte[]> columns = connection.hgetAll(getEncodedBytes(rowKey));
+
+ Map<byte[], byte[]> columns = null;
+ columns = getColumns(connection, rowKey, columns);
for (byte[] column : columns.keySet()) // delete each
// column(e.g.
// field)
{
// connection.get(key)
- connection.hdel(getEncodedBytes(rowKey), column); // delete
- // record
String colName = PropertyAccessorFactory.STRING.fromBytes(String.class, columns.get(column));
- connection.zrem(getHashKey(tableName, colName), rowKey); // delete
- // inverted
- // index.
+
+ if (resource != null && resource.isActive())
+ {
+ ((Transaction) connection).hdel(getEncodedBytes(rowKey), column); // delete
+ // record
+ ((Transaction) connection).zrem(getHashKey(tableName, colName), rowKey); // delete
+ // inverted
+ // index.
+
+ }
+ else
+ {
+ ((Jedis) connection).hdel(getEncodedBytes(rowKey), column); // delete
+ // record
+ ((Jedis) connection).zrem(getHashKey(tableName, colName), rowKey); // delete
+ // inverted
+ // index.
+
+ }
}
}
}
- pipeLine.sync();
}
finally
{
+ if(pipeLine != null)
+ {
+ pipeLine.sync();
+ }
onCleanup(connection);
}
}
@@ -581,9 +739,13 @@ public void addBatch(Node node)
@Override
public int executeBatch()
{
- Jedis connection = getConnection();
+ Object connection = getConnection();
// Create a hashset and populate data into it
- Pipeline pipeLine = connection.pipelined();
+ Pipeline pipeLine = null;
+ if(resource == null)
+ {
+ pipeLine = ((Jedis) connection).pipelined();
+ }
try
{
for (Node node : nodes)
@@ -609,8 +771,11 @@ public int executeBatch()
finally
{
//
- pipeLine.sync(); // send I/O.. as persist call. so no need to read
+ if(pipeLine != null)
+ {
+ pipeLine.sync(); // send I/O.. as persist call. so no need to read
// response?
+ }
onCleanup(connection);
}
@@ -628,6 +793,20 @@ public int getBatchSize()
return batchSize;
}
+ /* (non-Javadoc)
+ * @see com.impetus.kundera.persistence.api.Batcher#clear()
+ */
+ @Override
+ public void clear()
+ {
+ if(nodes != null)
+ {
+ nodes.clear();
+ nodes=null;
+ nodes = new ArrayList<Node>();
+ }
+ }
+
/**
* Check on batch limit.
*/
@@ -745,12 +924,20 @@ private String getHashKey(final String tableName, final String rowKey)
* @param rowKey
* row key to be stor
*/
- private void addIndex(final Jedis connection, final AttributeWrapper wrapper, final String rowKey)
+ private void addIndex(final Object connection, final AttributeWrapper wrapper, final String rowKey)
{
Set<String> indexKeys = wrapper.getIndexes().keySet();
for (String idx_Name : indexKeys)
{
- connection.zadd(idx_Name, wrapper.getIndexes().get(idx_Name), rowKey);
+ if(resource != null && resource.isActive())
+ {
+ ((Transaction)connection).zadd(idx_Name, wrapper.getIndexes().get(idx_Name), rowKey);
+
+ } else
+ {
+ ((Jedis)connection).zadd(idx_Name, wrapper.getIndexes().get(idx_Name), rowKey);
+
+ }
}
}
@@ -765,12 +952,20 @@ private void addIndex(final Jedis connection, final AttributeWrapper wrapper, fi
* @param member
* sorted set member name.
*/
- private void unIndex(final Jedis connection, final AttributeWrapper wrapper, final String member)
+ private void unIndex(final Object connection, final AttributeWrapper wrapper, final String member)
{
Set<String> keys = wrapper.getIndexes().keySet();
for (String key : keys)
{
- connection.zrem(key, member);
+ if(resource != null && resource.isActive())
+ {
+ ((Transaction) connection).zrem(key, member);
+
+ } else
+ {
+ ((Jedis) connection).zrem(key, member);
+
+ }
}
}
@@ -780,15 +975,15 @@ private void unIndex(final Jedis connection, final AttributeWrapper wrapper, fin
* @param connection
* redis connection instance.
*/
- private void onCleanup(Jedis connection)
+ private void onCleanup(Object connection)
{
- if (connection != null)
+ if ( resource == null && connection != null)
{
if (settings != null)
{
- connection.configResetStat();
+ ((Jedis) connection).configResetStat();
}
- factory.releaseConnection(connection);
+ factory.releaseConnection((Jedis)connection);
}
}
@@ -1062,7 +1257,7 @@ List onExecuteQuery(RedisQueryInterpreter queryParameter, Class entityClazz)
/**
* Find a list of id's and then call findById for each!
*/
- Jedis connection = null;
+ Object connection = null;
List<Object> results = new ArrayList<Object>();
try
{
@@ -1086,17 +1281,44 @@ List onExecuteQuery(RedisQueryInterpreter queryParameter, Class entityClazz)
if (queryParameter.getClause().equals(Clause.INTERSECT))
{
- connection.zinterstore(destStore, keySets.toArray(new String[] {}));
+ if(resource != null && resource.isActive())
+ {
+ ((Transaction) connection).zinterstore(destStore, keySets.toArray(new String[] {}));
+
+ } else
+ {
+ ((Jedis) connection).zinterstore(destStore, keySets.toArray(new String[] {}));
+ }
}
else
{
- connection.zunionstore(destStore, keySets.toArray(new String[] {}));
+ if(resource != null && resource.isActive())
+ {
+ ((Transaction) connection).zunionstore(destStore, keySets.toArray(new String[] {}));
+ } else
+ {
+ ((Jedis) connection).zunionstore(destStore, keySets.toArray(new String[] {}));
+ }
+
}
- rowKeys = connection.zrange(destStore, 0, -1);
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction) connection).zrange(destStore, 0, -1);
+ ((Transaction) connection).exec();
+
+ rowKeys = (Set<String>) response.get();
+//
+// ((Transaction) connection).del(destStore);
+
+ } else
+ {
+
+ rowKeys = ((Jedis) connection).zrange(destStore, 0, -1);
+ ((Jedis) connection).del(destStore);
+ }
// delete intermediate store after find.
- connection.del(destStore);
//
// means it is a query over sorted set.
}
@@ -1108,8 +1330,20 @@ else if (queryParameter.isByRange())
String column = minimum.keySet().iterator().next();
- rowKeys = connection.zrangeByScore(getHashKey(entityMetadata.getTableName(), column),
- minimum.get(column), maximum.get(column));
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction) connection).zrangeByScore(getHashKey(entityMetadata.getTableName(), column),
+ minimum.get(column), maximum.get(column));
+ ((Transaction) connection).exec();
+
+ rowKeys = (Set<String>) response.get();
+
+ } else
+ {
+ rowKeys = ((Jedis) connection).zrangeByScore(getHashKey(entityMetadata.getTableName(), column),
+ minimum.get(column), maximum.get(column));
+
+ }
}
else if (queryParameter.isById())
@@ -1129,16 +1363,37 @@ else if (queryParameter.getFields() != null)
// ideally it will always be 1 value in map, else it will go
// it queryParameter.getClause() will not be null!
Double value = getDouble(PropertyAccessorHelper.getString(queryParameter.getFields().get(column)));
- rowKeys = connection.zrangeByScore(getHashKey(entityMetadata.getTableName(), column), value, value);
+ if(resource != null && resource.isActive())
+ {
+ Response response = ((Transaction) connection).zrangeByScore(getHashKey(entityMetadata.getTableName(), column), value, value);
+ ((Transaction) connection).exec();
+ rowKeys = (Set<String>) response.get();
+
+ } else
+ {
+ rowKeys = ((Jedis) connection).zrangeByScore(getHashKey(entityMetadata.getTableName(), column), value, value);