Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Jan 15, 2015
1 parent 806ce6a commit 4b8ec5f
Show file tree
Hide file tree
Showing 81 changed files with 1,267 additions and 1,099 deletions.
Expand Up @@ -18,12 +18,12 @@
package org.gridgain.examples.datagrid.store.dummy; package org.gridgain.examples.datagrid.store.dummy;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*; import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*; import org.apache.ignite.transactions.*;
import org.gridgain.examples.datagrid.store.*; import org.gridgain.examples.datagrid.store.*;
import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.store.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


import java.util.*; import java.util.*;
Expand All @@ -32,41 +32,47 @@
/** /**
* Dummy cache store implementation. * Dummy cache store implementation.
*/ */
public class CacheDummyPersonStore extends GridCacheStoreAdapter<Long, Person> { public class CacheDummyPersonStore extends CacheStoreAdapter<Long, Person> {
/** Auto-inject grid instance. */ /** Auto-inject grid instance. */
@IgniteInstanceResource @IgniteInstanceResource
private Ignite ignite; private Ignite ignite;


/** Auto-inject cache name. */ /** Auto-inject cache name. */
@GridCacheName @IgniteCacheNameResource
private String cacheName; private String cacheName;


/** Dummy database. */ /** Dummy database. */
private Map<Long, Person> dummyDB = new ConcurrentHashMap<>(); private Map<Long, Person> dummyDB = new ConcurrentHashMap<>();


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { @Override public Person load(Long key) {
IgniteTx tx = transaction();

System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');


return dummyDB.get(key); return dummyDB.get(key);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void put(@Nullable IgniteTx tx, Long key, Person val) throws IgniteCheckedException { @Override public void put(Long key, Person val) {
IgniteTx tx = transaction();

System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');


dummyDB.put(key, val); dummyDB.put(key, val);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { @Override public void remove(Long key) {
IgniteTx tx = transaction();

System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');


dummyDB.remove(key); dummyDB.remove(key);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException { @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
int cnt = (Integer)args[0]; int cnt = (Integer)args[0];


System.out.println(">>> Store loadCache for entry count: " + cnt); System.out.println(">>> Store loadCache for entry count: " + cnt);
Expand Down
Expand Up @@ -17,22 +17,22 @@


package org.gridgain.examples.datagrid.store.hibernate; package org.gridgain.examples.datagrid.store.hibernate;


import org.apache.ignite.*; import org.apache.ignite.cache.store.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*; import org.apache.ignite.transactions.*;
import org.gridgain.examples.datagrid.store.*; import org.gridgain.examples.datagrid.store.*;
import org.gridgain.grid.cache.store.*;
import org.hibernate.*; import org.hibernate.*;
import org.hibernate.cfg.*; import org.hibernate.cfg.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


import javax.cache.integration.*;
import java.util.*; import java.util.*;


/** /**
* Example of {@link GridCacheStore} implementation that uses Hibernate * Example of {@link CacheStore} implementation that uses Hibernate
* and deals with maps {@link UUID} to {@link Person}. * and deals with maps {@link UUID} to {@link Person}.
*/ */
public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Person> { public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
/** Default hibernate configuration resource path. */ /** Default hibernate configuration resource path. */
private static final String DFLT_HIBERNATE_CFG = "/org/gridgain/examples/datagrid/store/hibernate/hibernate.cfg.xml"; private static final String DFLT_HIBERNATE_CFG = "/org/gridgain/examples/datagrid/store/hibernate/hibernate.cfg.xml";


Expand All @@ -50,7 +50,9 @@ public CacheHibernatePersonStore() {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { @Override public Person load(Long key) {
IgniteTx tx = transaction();

System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');


Session ses = session(tx); Session ses = session(tx);
Expand All @@ -61,20 +63,21 @@ public CacheHibernatePersonStore() {
catch (HibernateException e) { catch (HibernateException e) {
rollback(ses, tx); rollback(ses, tx);


throw new IgniteCheckedException("Failed to load value from cache store with key: " + key, e); throw new CacheLoaderException("Failed to load value from cache store with key: " + key, e);
} }
finally { finally {
end(ses, tx); end(ses, tx);
} }
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void put(@Nullable IgniteTx tx, Long key, @Nullable Person val) @Override public void put(Long key, @Nullable Person val) {
throws IgniteCheckedException { IgniteTx tx = transaction();

System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');


if (val == null) { if (val == null) {
remove(tx, key); remove(key);


return; return;
} }
Expand All @@ -87,7 +90,7 @@ public CacheHibernatePersonStore() {
catch (HibernateException e) { catch (HibernateException e) {
rollback(ses, tx); rollback(ses, tx);


throw new IgniteCheckedException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e); throw new CacheWriterException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e);
} }
finally { finally {
end(ses, tx); end(ses, tx);
Expand All @@ -96,7 +99,9 @@ public CacheHibernatePersonStore() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@SuppressWarnings({"JpaQueryApiInspection"}) @SuppressWarnings({"JpaQueryApiInspection"})
@Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { @Override public void remove(Long key) {
IgniteTx tx = transaction();

System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');


Session ses = session(tx); Session ses = session(tx);
Expand All @@ -108,17 +113,17 @@ public CacheHibernatePersonStore() {
catch (HibernateException e) { catch (HibernateException e) {
rollback(ses, tx); rollback(ses, tx);


throw new IgniteCheckedException("Failed to remove value from cache store with key: " + key, e); throw new CacheWriterException("Failed to remove value from cache store with key: " + key, e);
} }
finally { finally {
end(ses, tx); end(ses, tx);
} }
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException { @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null) if (args == null || args.length == 0 || args[0] == null)
throw new IgniteCheckedException("Expected entry count parameter is not provided."); throw new CacheLoaderException("Expected entry count parameter is not provided.");


final int entryCnt = (Integer)args[0]; final int entryCnt = (Integer)args[0];


Expand All @@ -144,7 +149,7 @@ public CacheHibernatePersonStore() {
System.out.println(">>> Loaded " + cnt + " values into cache."); System.out.println(">>> Loaded " + cnt + " values into cache.");
} }
catch (HibernateException e) { catch (HibernateException e) {
throw new IgniteCheckedException("Failed to load values from cache store.", e); throw new CacheLoaderException("Failed to load values from cache store.", e);
} }
finally { finally {
end(ses, null); end(ses, null);
Expand Down Expand Up @@ -188,8 +193,14 @@ private void end(Session ses, @Nullable IgniteTx tx) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { @Override public void txEnd(boolean commit) {
Session ses = tx.removeMeta(ATTR_SES); CacheStoreSession storeSes = session();

IgniteTx tx = storeSes.transaction();

Map<Object, Object> props = storeSes.properties();

Session ses = (Session)props.remove(ATTR_SES);


if (ses != null) { if (ses != null) {
Transaction hTx = ses.getTransaction(); Transaction hTx = ses.getTransaction();
Expand All @@ -207,7 +218,7 @@ private void end(Session ses, @Nullable IgniteTx tx) {
System.out.println("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); System.out.println("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
} }
catch (HibernateException e) { catch (HibernateException e) {
throw new IgniteCheckedException("Failed to end transaction [xid=" + tx.xid() + throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() +
", commit=" + commit + ']', e); ", commit=" + commit + ']', e);
} }
finally { finally {
Expand All @@ -227,16 +238,18 @@ private Session session(@Nullable IgniteTx tx) {
Session ses; Session ses;


if (tx != null) { if (tx != null) {
ses = tx.meta(ATTR_SES); Map<Object, Object> props = session().properties();

ses = (Session)props.get(ATTR_SES);


if (ses == null) { if (ses == null) {
ses = sesFactory.openSession(); ses = sesFactory.openSession();


ses.beginTransaction(); ses.beginTransaction();


// Store session in transaction metadata, so it can be accessed // Store session in session properties, so it can be accessed
// for other operations on the same transaction. // for other operations on the same transaction.
tx.addMeta(ATTR_SES, ses); props.put(ATTR_SES, ses);


System.out.println("Hibernate session open [ses=" + ses + ", tx=" + tx.xid() + "]"); System.out.println("Hibernate session open [ses=" + ses + ", tx=" + tx.xid() + "]");
} }
Expand Down
Expand Up @@ -18,21 +18,22 @@
package org.gridgain.examples.datagrid.store.jdbc; package org.gridgain.examples.datagrid.store.jdbc;


import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*; import org.apache.ignite.transactions.*;
import org.gridgain.examples.datagrid.store.*; import org.gridgain.examples.datagrid.store.*;
import org.gridgain.grid.cache.store.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


import javax.cache.integration.*;
import java.sql.*; import java.sql.*;
import java.util.*; import java.util.*;


/** /**
* Example of {@link GridCacheStore} implementation that uses JDBC * Example of {@link CacheStore} implementation that uses JDBC
* transaction with cache transactions and maps {@link UUID} to {@link Person}. * transaction with cache transactions and maps {@link UUID} to {@link Person}.
* *
*/ */
public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> { public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** Transaction metadata attribute name. */ /** Transaction metadata attribute name. */
private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION"; private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION";


Expand Down Expand Up @@ -64,8 +65,12 @@ private void prepareDb() throws IgniteCheckedException {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { @Override public void txEnd(boolean commit) {
try (Connection conn = tx.removeMeta(ATTR_NAME)) { IgniteTx tx = transaction();

Map<Object, Object> props = session().properties();

try (Connection conn = (Connection)props.remove(ATTR_NAME)) {
if (conn != null) { if (conn != null) {
if (commit) if (commit)
conn.commit(); conn.commit();
Expand All @@ -76,12 +81,14 @@ private void prepareDb() throws IgniteCheckedException {
System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
} }
catch (SQLException e) { catch (SQLException e) {
throw new IgniteCheckedException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
} }
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { @Nullable @Override public Person load(Long key) {
IgniteTx tx = transaction();

System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');


Connection conn = null; Connection conn = null;
Expand All @@ -99,7 +106,7 @@ private void prepareDb() throws IgniteCheckedException {
} }
} }
catch (SQLException e) { catch (SQLException e) {
throw new IgniteCheckedException("Failed to load object: " + key, e); throw new CacheLoaderException("Failed to load object: " + key, e);
} }
finally { finally {
end(tx, conn); end(tx, conn);
Expand All @@ -109,8 +116,9 @@ private void prepareDb() throws IgniteCheckedException {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void put(@Nullable IgniteTx tx, Long key, Person val) @Override public void put(Long key, Person val) {
throws IgniteCheckedException { IgniteTx tx = transaction();

System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');


Connection conn = null; Connection conn = null;
Expand Down Expand Up @@ -142,15 +150,17 @@ private void prepareDb() throws IgniteCheckedException {
} }
} }
catch (SQLException e) { catch (SQLException e) {
throw new IgniteCheckedException("Failed to put object [key=" + key + ", val=" + val + ']', e); throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e);
} }
finally { finally {
end(tx, conn); end(tx, conn);
} }
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException { @Override public void remove(Long key) {
IgniteTx tx = transaction();

System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');


Connection conn = null; Connection conn = null;
Expand All @@ -165,17 +175,17 @@ private void prepareDb() throws IgniteCheckedException {
} }
} }
catch (SQLException e) { catch (SQLException e) {
throw new IgniteCheckedException("Failed to remove object: " + key, e); throw new CacheLoaderException("Failed to remove object: " + key, e);
} }
finally { finally {
end(tx, conn); end(tx, conn);
} }
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException { @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null) if (args == null || args.length == 0 || args[0] == null)
throw new IgniteCheckedException("Expected entry count parameter is not provided."); throw new CacheLoaderException("Expected entry count parameter is not provided.");


final int entryCnt = (Integer)args[0]; final int entryCnt = (Integer)args[0];


Expand All @@ -201,7 +211,7 @@ private void prepareDb() throws IgniteCheckedException {
} }
} }
catch (SQLException e) { catch (SQLException e) {
throw new IgniteCheckedException("Failed to load values from cache store.", e); throw new CacheLoaderException("Failed to load values from cache store.", e);
} }
finally { finally {
end(null, conn); end(null, conn);
Expand All @@ -215,14 +225,16 @@ private void prepareDb() throws IgniteCheckedException {
*/ */
private Connection connection(@Nullable IgniteTx tx) throws SQLException { private Connection connection(@Nullable IgniteTx tx) throws SQLException {
if (tx != null) { if (tx != null) {
Connection conn = tx.meta(ATTR_NAME); Map<Object, Object> props = session().properties();

Connection conn = (Connection)props.get(ATTR_NAME);


if (conn == null) { if (conn == null) {
conn = openConnection(false); conn = openConnection(false);


// Store connection in transaction metadata, so it can be accessed // Store connection in session properties, so it can be accessed
// for other operations on the same transaction. // for other operations on the same transaction.
tx.addMeta(ATTR_NAME, conn); props.put(ATTR_NAME, conn);
} }


return conn; return conn;
Expand Down

0 comments on commit 4b8ec5f

Please sign in to comment.