diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java index b127a18670aa0..2b72619a857de 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java @@ -166,7 +166,7 @@ public abstract class CacheStore implements CacheLoader, CacheWriter * * @return Session for current cache operation. */ - @Nullable public CacheStoreSession session() { + public CacheStoreSession session() { return ses; } diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java index 3de4ae43de5a5..afdec207fa5c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java @@ -32,7 +32,6 @@ import javax.cache.*; import javax.cache.integration.*; import javax.sql.*; -import java.net.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; @@ -44,11 +43,11 @@ */ public abstract class JdbcCacheStore extends CacheStore { /** - * Query cache by type. + * Entry mapping description. */ - protected static class QueryCache { + protected static class EntryMapping { /** Database dialect. */ - protected final BasicJdbcDialect dialect; + protected final JdbcDialect dialect; /** Select all items query. */ protected final String loadCacheQry; @@ -89,7 +88,7 @@ protected static class QueryCache { /** * @param typeMetadata Type metadata. */ - public QueryCache(BasicJdbcDialect dialect, GridCacheQueryTypeMetadata typeMetadata) { + public EntryMapping(JdbcDialect dialect, GridCacheQueryTypeMetadata typeMetadata) { this.dialect = dialect; this.typeMetadata = typeMetadata; @@ -221,17 +220,14 @@ protected Collection valueDescriptors() { /** Execute. */ protected ExecutorService exec; - /** Paths to xml with type mapping description. */ - protected Collection typeMetadataPaths; - /** Type mapping description. */ protected Collection typeMetadata; /** Cache with query by type. */ - protected Map entryQtyCache; + protected Map typeMeta; /** Database dialect. */ - protected BasicJdbcDialect dialect; + protected JdbcDialect dialect; /** Max workers thread count. These threads are responsible for execute query. */ protected int maxPoolSz = Runtime.getRuntime().availableProcessors(); @@ -239,13 +235,52 @@ protected Collection valueDescriptors() { /** Maximum batch size for put and remove operations. */ protected int batchSz = DFLT_BATCH_SIZE; + /** + * Get field value from object. + * + * @param typeName Type name. + * @param fieldName Field name. + * @param obj Cache object. + * @return Field value from object. + */ + @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj) + throws CacheException; + + /** + * Construct object from query result. + * + * @param Type of result object. + * @param typeName Type name. + * @param fields Fields descriptors. + * @param rs ResultSet. + * @return Constructed object. + */ + protected abstract R buildObject(String typeName, Collection fields, ResultSet rs) + throws CacheLoaderException; + + /** + * Extract type key from object. + * + * @param key Key object. + * @return Type key. + * @throws CacheException If failed to extract type key. + */ + protected abstract Object typeId(Object key) throws CacheException; + + /** + * Build cache for mapped types. + * + * @throws CacheException If failed to initialize. + */ + protected abstract void buildTypeCache() throws CacheException; + /** * Perform dialect resolution. * * @return The resolved dialect. - * @throws IgniteCheckedException Indicates problems accessing the metadata. + * @throws CacheException Indicates problems accessing the metadata. */ - protected BasicJdbcDialect resolveDialect() throws IgniteCheckedException { + protected JdbcDialect resolveDialect() throws CacheException { Connection conn = null; String dbProductName = null; @@ -256,10 +291,10 @@ protected BasicJdbcDialect resolveDialect() throws IgniteCheckedException { dbProductName = conn.getMetaData().getDatabaseProductName(); } catch (SQLException e) { - throw new IgniteCheckedException("Failed access to metadata for detect database dialect.", e); + throw new CacheException("Failed access to metadata for detect database dialect.", e); } finally { - closeConnection(conn); + U.closeQuiet(conn); } if ("H2".equals(dbProductName)) @@ -283,49 +318,23 @@ protected BasicJdbcDialect resolveDialect() throws IgniteCheckedException { /** * Initializes store. * - * @throws IgniteCheckedException If failed to initialize. + * @throws CacheException If failed to initialize. */ - protected void init() throws IgniteCheckedException { + protected void init() throws CacheException { if (initLatch.getCount() > 0) { if (initGuard.compareAndSet(false, true)) { if (log.isDebugEnabled()) log.debug("Initializing cache store."); if (dataSrc == null && F.isEmpty(connUrl)) - throw new IgniteCheckedException("Failed to initialize cache store (connection is not provided)."); + throw new CacheException("Failed to initialize cache store (connection is not provided)."); if (dialect == null) dialect = resolveDialect(); try { - if (typeMetadata == null) { - if (typeMetadataPaths == null) - throw new IgniteCheckedException( - "Failed to initialize cache store (metadata paths is not provided)."); - -// TODO: IGNITE-32 Replace with reading from config. -// GridSpringProcessor spring = SPRING.create(false); - - Collection typeMeta = new ArrayList<>(); - - for (String path : typeMetadataPaths) { - URL url = U.resolveGridGainUrl(path); -// TODO: IGNITE-32 Replace with reading from config. -// if (url != null) { -// Map beans = spring.loadBeans(url, GridCacheQueryTypeMetadata.class). -// get(GridCacheQueryTypeMetadata.class); -// -// if (beans != null) -// for (Object bean : beans.values()) -// if (bean instanceof GridCacheQueryTypeMetadata) -// typeMeta.add((GridCacheQueryTypeMetadata)bean); -// } -// else - log.warning("Failed to resolve metadata path: " + path); - } - - setTypeMetadata(typeMeta); - } + if (typeMetadata == null) + throw new CacheException("Failed to initialize cache store (mappping description is not provided)."); exec = Executors.newFixedThreadPool(maxPoolSz); @@ -338,26 +347,19 @@ protected void init() throws IgniteCheckedException { } } else - U.await(initLatch); + try { + if (initLatch.getCount() > 0) + initLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new CacheException(e); + } } if (!initOk) - throw new IgniteCheckedException("Cache store was not properly initialized."); - } - - /** - * Closes allocated resources depending on transaction status. - * - * @param tx Active transaction, if any. - * @param conn Allocated connection. - * @param st Created statement, - */ - protected void end(@Nullable IgniteTx tx, @Nullable Connection conn, @Nullable Statement st) { - U.closeQuiet(st); - - if (tx == null) - // Close connection right away if there is no transaction. - closeConnection(conn); + throw new CacheException("Cache store was not properly initialized."); } /** @@ -377,29 +379,23 @@ protected Connection openConnection(boolean autocommit) throws SQLException { } /** - * Closes connection. - * - * @param conn Connection to close. - */ - protected void closeConnection(@Nullable Connection conn) { - U.closeQuiet(conn); - } - - /** - * @param tx Cache transaction. * @return Connection. * @throws SQLException In case of error. */ - protected Connection connection(@Nullable IgniteTx tx) throws SQLException { - if (tx != null) { - Connection conn = null;// TODO: IGNITE-32 FIXME tx.meta(ATTR_CONN); + protected Connection connection() throws SQLException { + CacheStoreSession ses = session(); + + if (ses.transaction() != null) { + Map prop = ses.properties(); + + Connection conn = prop.get(ATTR_CONN); if (conn == null) { conn = openConnection(false); - // Store connection in transaction metadata, so it can be accessed - // for other operations on the same transaction. - // TODO: IGNITE-32 FIXME tx.addMeta(ATTR_CONN, conn); + // Store connection in session, so it can be accessed + // for other operations on the same session. + prop.put(ATTR_CONN, conn); } return conn; @@ -409,83 +405,30 @@ protected Connection connection(@Nullable IgniteTx tx) throws SQLException { return openConnection(true); } - /** {@inheritDoc} */ - public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { - init(); - - Connection conn = null; // TODO: IGNITE-32 FIXME tx.removeMeta(ATTR_CONN); - - if (conn != null) { - try { - if (commit) - conn.commit(); - else - conn.rollback(); - } - catch (SQLException e) { - throw new IgniteCheckedException( - "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); - } - finally { - closeConnection(conn); - } - } - - if (log.isDebugEnabled()) - log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); - } - /** - * Extract database column names from {@link GridCacheQueryTypeDescriptor}. - * - * @param dsc collection of {@link GridCacheQueryTypeDescriptor}. - */ - protected static Collection databaseColumns(Collection dsc) { - return F.transform(dsc, new C1() { - /** {@inheritDoc} */ - @Override public String apply(GridCacheQueryTypeDescriptor desc) { - return desc.getDbName(); - } - }); - } - - /** - * Get field value from object. + * Closes connection. * - * @param typeName Type name. - * @param fieldName Field name. - * @param obj Cache object. - * @return Field value from object. + * @param conn Connection to close. */ - @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj) - throws IgniteCheckedException; + protected void closeConnection(@Nullable Connection conn) { + CacheStoreSession ses = session(); - /** - * Construct object from query result. - * - * @param Type of result object. - * @param typeName Type name. - * @param fields Fields descriptors. - * @param rs ResultSet. - * @return Constructed object. - */ - protected abstract R buildObject(String typeName, Collection fields, ResultSet rs) - throws IgniteCheckedException; + // Close connection right away if there is no transaction. + if (ses.transaction() == null) + U.closeQuiet(conn); + } /** - * Extract type key from object. + * Closes allocated resources depending on transaction status. * - * @param key Key object. - * @return Type key. + * @param conn Allocated connection. + * @param st Created statement, */ - protected abstract Object typeKey(K key); + protected void end(@Nullable Connection conn, @Nullable Statement st) { + U.closeQuiet(st); - /** - * Build cache for mapped types. - * - * @throws IgniteCheckedException If failed to initialize. - */ - protected abstract void buildTypeCache() throws IgniteCheckedException; + closeConnection(conn); + } /** {@inheritDoc} */ @Override public void loadCache(final IgniteBiInClosure clo, @Nullable Object... args) @@ -498,40 +441,40 @@ protected abstract R buildObject(String typeName, Collection> futs = new ArrayList<>(); - for (final QueryCache type : entryQtyCache.values()) + for (final EntryMapping type : typeMeta.values()) futs.add(exec.submit(new Callable() { @Override public Void call() throws Exception { - Connection conn = null; + Connection conn = null; - try { - PreparedStatement stmt = null; + try { + PreparedStatement stmt = null; - try { - conn = connection(null); + try { + conn = connection(); - stmt = conn.prepareStatement(type.loadCacheQry); + stmt = conn.prepareStatement(type.loadCacheQry); - ResultSet rs = stmt.executeQuery(); + ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - K key = buildObject(type.keyType(), type.keyDescriptors(), rs); - V val = buildObject(type.valueType(), type.valueDescriptors(), rs); + while (rs.next()) { + K key = buildObject(type.keyType(), type.keyDescriptors(), rs); + V val = buildObject(type.valueType(), type.valueDescriptors(), rs); - clo.apply(key, val); - } - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to load cache", e); - } - finally { - U.closeQuiet(stmt); + clo.apply(key, val); } } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to load cache", e); + } finally { - closeConnection(conn); + U.closeQuiet(stmt); } + } + finally { + U.closeQuiet(conn); + } - return null; + return null; } })); @@ -543,86 +486,57 @@ protected abstract R buildObject(String typeName, Collectionproperties().remove(ATTR_CONN); - /** - * @param stmt Prepare statement. - * @param type Type description. - * @param key Key object. - * @return Next index for parameters. - */ - protected int fillKeyParameters(PreparedStatement stmt, QueryCache type, K key) throws IgniteCheckedException { - return fillKeyParameters(stmt, 1, type, key); - } - /** - * @param stmt Prepare statement. - * @param i Start index for parameters. - * @param type Type description. - * @param val Value object. - * @return Next index for parameters. - */ - protected int fillValueParameters(PreparedStatement stmt, int i, QueryCache type, V val) - throws IgniteCheckedException { - for (GridCacheQueryTypeDescriptor field : type.uniqValFields) { - Object fieldVal = extractField(type.valueType(), field.getJavaName(), val); + if (conn != null) { + assert tx != null; try { - if (fieldVal != null) - stmt.setObject(i++, fieldVal); + if (commit) + conn.commit(); else - stmt.setNull(i++, field.getDbType()); + conn.rollback(); } catch (SQLException e) { - throw new IgniteCheckedException("Failed to set statement parameter name: " + field.getDbName(), e); + throw new CacheWriterException( + "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); + } + finally { + U.closeQuiet(conn); } } - return i; + if (tx != null && log.isDebugEnabled()) + log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); } /** {@inheritDoc} */ - @Nullable public V load(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + @Nullable @Override public V load(K key) throws CacheLoaderException { + assert key != null; + init(); - QueryCache type = entryQtyCache.get(typeKey(key)); + EntryMapping type = typeMeta.get(typeId(key)); if (type == null) - throw new IgniteCheckedException("Failed to find mapping description for type: " + key.getClass()); + throw new CacheLoaderException("Failed to find store mapping description for key: " + key); if (log.isDebugEnabled()) - log.debug("Start load value from db by key: " + key); + log.debug("Start load value from database by key: " + key); Connection conn = null; PreparedStatement stmt = null; try { - conn = connection(tx); + conn = connection(); stmt = conn.prepareStatement(type.loadQrySingle); @@ -634,143 +548,108 @@ protected int fillValueParameters(PreparedStatement stmt, int i, QueryCache type return buildObject(type.valueType(), type.valueDescriptors(), rs); } catch (SQLException e) { - throw new IgniteCheckedException("Failed to load object by key: " + key, e); + throw new CacheLoaderException("Failed to load object by key: " + key, e); } finally { - end(tx, conn, stmt); + end(conn, stmt); } return null; } - /** - * Loads all values for given keys with same type and passes every value to the provided closure. - * - * @param tx Cache transaction, if write-behind is not enabled, null otherwise. - * @param qry Query cache for type. - * @param keys Collection of keys to load. - * @param c Closure to call for every loaded element. - * @throws IgniteCheckedException If load failed. - */ - protected void loadAll(@Nullable IgniteTx tx, QueryCache qry, Collection keys, - IgniteBiInClosure c) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public Map loadAll(Iterable keys) throws CacheLoaderException { + assert keys != null; + init(); Connection conn = null; - - PreparedStatement stmt = null; - try { - conn = connection(tx); - - stmt = conn.prepareStatement(qry.loadQuery(keys.size())); + conn = connection(); - int i = 1; + Map> workers = U.newHashMap(typeMeta.size()); - for (K key : keys) { - for (GridCacheQueryTypeDescriptor field : qry.keyDescriptors()) { - Object fieldVal = extractField(qry.keyType(), field.getJavaName(), key); + Collection>> futs = new ArrayList<>(); - if (fieldVal != null) - stmt.setObject(i++, fieldVal); - else - stmt.setNull(i++, field.getDbType()); - } - } - - ResultSet rs = stmt.executeQuery(); - - while (rs.next()) { - K key = buildObject(qry.keyType(), qry.keyDescriptors(), rs); - V val = buildObject(qry.valueType(), qry.valueDescriptors(), rs); + int cnt = 0; - c.apply(key, val); - } - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to load objects", e); - } - finally { - end(tx, conn, stmt); - } - } + for (K key : keys) { + Object typeId = typeId(key); - /** {@inheritDoc} */ - public void loadAll(@Nullable final IgniteTx tx, Collection keys, - final IgniteBiInClosure c) throws IgniteCheckedException { - assert keys != null; + final EntryMapping m = typeMeta.get(typeId); - init(); + if (m == null) + throw new CacheWriterException("Failed to find store mapping description for key: " + key); - Map> splittedKeys = U.newHashMap(entryQtyCache.size()); + LoadWorker worker = workers.get(typeId); - final Collection> futs = new ArrayList<>(); + if (worker == null) + workers.put(typeId, worker = new LoadWorker<>(conn, m)); - for (K key : keys) { - final QueryCache qry = entryQtyCache.get(typeKey(key)); + worker.keys.add(key); - Collection batch = splittedKeys.get(qry); + if (worker.keys.size() == m.maxKeysPerStmt) + futs.add(exec.submit(workers.remove(typeId))); - if (batch == null) - splittedKeys.put(qry, batch = new ArrayList<>()); + cnt ++; + } - batch.add(key); + for (LoadWorker worker : workers.values()) + futs.add(exec.submit(worker)); - if (batch.size() == qry.maxKeysPerStmt) { - final Collection p = splittedKeys.remove(qry); + Map res = U.newHashMap(cnt); - futs.add(exec.submit(new Callable() { - @Override public Void call() throws Exception { - loadAll(tx, qry, p, c); + for (Future> fut : futs) + res.putAll(U.get(fut)); - return null; - } - })); - } + return res; + } + catch (SQLException e) { + throw new CacheWriterException("Failed to open connection", e); + } + catch (IgniteCheckedException e) { + throw new CacheWriterException("Failed to load entries from database", e); + } + finally { + closeConnection(conn); } - - for (final Map.Entry> entry : splittedKeys.entrySet()) - futs.add(exec.submit(new Callable() { - @Override public Void call() throws Exception { - loadAll(tx, entry.getKey(), entry.getValue(), c); - - return null; - } - })); - - for (Future fut : futs) - U.get(fut); } /** {@inheritDoc} */ - public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedException { + @Override public void write(Cache.Entry entry) throws CacheWriterException { + assert entry != null; + init(); - QueryCache type = entryQtyCache.get(typeKey(key)); + K key = entry.getKey(); + + EntryMapping type = typeMeta.get(typeId(key)); if (type == null) - throw new IgniteCheckedException("Failed to find metadata for type: " + key.getClass()); + throw new CacheWriterException("Failed to find store mapping description for entry: " + entry); if (log.isDebugEnabled()) - log.debug("Start put value in db: (" + key + ", " + val); + log.debug("Start write entry to database: " + entry); Connection conn = null; PreparedStatement stmt = null; try { - conn = connection(tx); + conn = connection(); if (dialect.hasMerge()) { stmt = conn.prepareStatement(type.mergeQry); int i = fillKeyParameters(stmt, type, key); - fillValueParameters(stmt, i, type, val); + fillValueParameters(stmt, i, type, entry.getValue()); stmt.executeUpdate(); } else { + V val = entry.getValue(); + stmt = conn.prepareStatement(type.updQry); int i = fillValueParameters(stmt, 1, type, val); @@ -791,122 +670,133 @@ public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedExcepti } } catch (SQLException e) { - throw new IgniteCheckedException("Failed to put object by key: " + key, e); + throw new CacheWriterException("Failed to write entry to database: " + entry, e); } finally { - end(tx, conn, stmt); + end(conn, stmt); } } - /** - * Stores given key value pairs in persistent storage. - * - * @param tx Cache transaction, if write-behind is not enabled, null otherwise. - * @param qry Query cache for type. - * @param map Values to store. - * @throws IgniteCheckedException If store failed. - */ /** {@inheritDoc} */ - protected void putAll(@Nullable IgniteTx tx, QueryCache qry, Iterable> map) - throws IgniteCheckedException { - assert map != null; + @Override public void writeAll(Collection> entries) + throws CacheWriterException { + assert entries != null; init(); Connection conn = null; - PreparedStatement stmt = null; - try { - conn = connection(tx); + conn = connection(); - stmt = conn.prepareStatement(qry.mergeQry); + if (dialect.hasMerge()) { + Map workers = U.newHashMap(typeMeta.size()); - int cnt = 0; + Collection> futs = new ArrayList<>(); - for (Map.Entry entry : map) { - int i = fillKeyParameters(stmt, qry, entry.getKey()); + for (Cache.Entry entry : entries) { + Object typeId = typeId(entry.getKey()); - fillValueParameters(stmt, i, qry, entry.getValue()); + final EntryMapping m = typeMeta.get(typeId); - stmt.addBatch(); + if (m == null) + throw new CacheWriterException("Failed to find store mapping description for key: " + + entry.getKey()); - if (cnt++ % batchSz == 0) - stmt.executeBatch(); + WriteWorker worker = workers.get(typeId); + + if (worker == null) + workers.put(typeId, worker = new WriteWorker(conn, m)); + + worker.entries.add(entry); + + if (worker.entries.size() == batchSz) + futs.add(exec.submit(workers.remove(typeId))); + } + + for (WriteWorker worker : workers.values()) + futs.add(exec.submit(worker)); + + for (Future fut : futs) + U.get(fut); } + else { + Map> stmtByType = U.newHashMap(typeMeta.size()); - if (cnt % batchSz != 0) - stmt.executeBatch(); - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to put objects", e); - } - finally { - end(tx, conn, stmt); - } - } + for (Cache.Entry entry : entries) { + Object typeId = typeId(entry.getKey()); - /** {@inheritDoc} */ - public void putAll(@Nullable final IgniteTx tx, Map map) - throws IgniteCheckedException { - assert map != null; + final EntryMapping m = typeMeta.get(typeId); - init(); + if (m == null) + throw new CacheWriterException("Failed to find store mapping description for key: " + + entry.getKey()); - Map>> keyByType = U.newHashMap(entryQtyCache.size()); + T2 stmts = stmtByType.get(typeId); - if (dialect.hasMerge()) { - for (Map.Entry entry : map.entrySet()) { - Object typeKey = typeKey(entry.getKey()); + if (stmts == null) + stmtByType.put(typeId, + stmts = new T2<>(conn.prepareStatement(m.updQry), conn.prepareStatement(m.insQry))); - Collection> batch = keyByType.get(typeKey); + PreparedStatement stmt = stmts.get1(); - if (batch == null) - keyByType.put(typeKey, batch = new ArrayList<>()); + assert stmt != null; - batch.add(entry); - } + int i = fillValueParameters(stmt, 1, m, entry.getValue()); - final Collection> futs = new ArrayList<>(); + fillKeyParameters(stmt, i, m, entry.getKey()); - for (final Map.Entry>> e : keyByType.entrySet()) { - final QueryCache qry = entryQtyCache.get(e.getKey()); + if (stmt.executeUpdate() == 0) { + stmt = stmts.get2(); - futs.add(exec.submit(new Callable() { - @Override public Void call() throws Exception { - putAll(tx, qry, e.getValue()); + assert stmt != null; + + i = fillKeyParameters(stmt, m, entry.getKey()); + + fillValueParameters(stmt, i, m, entry.getValue()); - return null; + stmt.executeUpdate(); } - })); - } + } - for (Future fut : futs) - U.get(fut); + for (T2 stmts : stmtByType.values()) { + U.closeQuiet(stmts.get1()); + + U.closeQuiet(stmts.get2()); + } + } + } + catch (SQLException e) { + throw new CacheWriterException("Failed to open connection", e); + } + catch (IgniteCheckedException e) { + throw new CacheWriterException("Failed to write values into database", e); + } + finally { + closeConnection(conn); } - else - for (Map.Entry e : map.entrySet()) - put(tx, e.getKey(), e.getValue()); } /** {@inheritDoc} */ - public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { + @Override public void delete(Object key) throws CacheWriterException { + assert key != null; + init(); - QueryCache type = entryQtyCache.get(typeKey(key)); + EntryMapping type = typeMeta.get(typeId(key)); if (type == null) - throw new IgniteCheckedException("Failed to find metadata for type: " + key.getClass()); + throw new CacheWriterException("Failed to find store mapping description for key: " + key); if (log.isDebugEnabled()) - log.debug("Start remove value from db by key: " + key); + log.debug("Start remove value from database by key: " + key); Connection conn = null; PreparedStatement stmt = null; try { - conn = connection(tx); + conn = connection(); stmt = conn.prepareStatement(type.remQry); @@ -915,83 +805,136 @@ public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException { stmt.executeUpdate(); } catch (SQLException e) { - throw new IgniteCheckedException("Failed to load object by key: " + key, e); + throw new CacheWriterException("Failed to remove value from database by key: " + key, e); } finally { - end(tx, conn, stmt); + end(conn, stmt); } } - /** - * Removes all vales identified by given keys from persistent storage. - * - * @param tx Cache transaction, if write-behind is not enabled, null otherwise. - * @param qry Query cache for type. - * @param keys Collection of keys to remove. - * @throws IgniteCheckedException If remove failed. - */ - protected void removeAll(@Nullable IgniteTx tx, QueryCache qry, Collection keys) - throws IgniteCheckedException { - assert keys != null && !keys.isEmpty(); + /** {@inheritDoc} */ + @Override public void deleteAll(Collection keys) throws CacheWriterException { + assert keys != null; - init(); + Connection conn = null; - if (log.isDebugEnabled()) - log.debug("Start remove values by keys: " + Arrays.toString(keys.toArray())); + try { + conn = connection(); - Connection conn = null; + Collection> futs = new ArrayList<>(); - PreparedStatement stmt = null; + Map workers = U.newHashMap(typeMeta.size()); - try { - conn = connection(tx); + for (Object key : keys) { + Object typeId = typeId(key); - stmt = conn.prepareStatement(qry.remQry); + final EntryMapping m = typeMeta.get(typeId); - int cnt = 0; + if (m == null) + throw new CacheWriterException("Failed to find store mapping description for key: " + key); - for (K key : keys) { - fillKeyParameters(stmt, qry, key); + DeleteWorker worker = workers.get(typeId); + + if (worker == null) + workers.put(typeId, worker = new DeleteWorker(conn, m)); - stmt.addBatch(); + worker.keys.add(key); - if (cnt++ % batchSz == 0) - stmt.executeBatch(); + if (worker.keys.size() == batchSz) + futs.add(exec.submit(workers.remove(typeId))); } - if (cnt % batchSz != 0) - stmt.executeBatch(); + for (DeleteWorker worker : workers.values()) + futs.add(exec.submit(worker)); + + for (Future fut : futs) + U.get(fut); } catch (SQLException e) { - throw new IgniteCheckedException("Failed to remove values by keys.", e); + throw new CacheWriterException("Failed to open connection", e); + } + catch (IgniteCheckedException e) { + throw new CacheWriterException("Failed to remove values from database", e); } finally { - end(tx, conn, stmt); + closeConnection(conn); } } - /** {@inheritDoc} */ - public void removeAll(@Nullable IgniteTx tx, Collection keys) throws IgniteCheckedException { - assert keys != null; - - Map> keyByType = U.newHashMap(entryQtyCache.size()); + /** + * Extract database column names from {@link GridCacheQueryTypeDescriptor}. + * + * @param dsc collection of {@link GridCacheQueryTypeDescriptor}. + */ + protected static Collection databaseColumns(Collection dsc) { + return F.transform(dsc, new C1() { + /** {@inheritDoc} */ + @Override public String apply(GridCacheQueryTypeDescriptor desc) { + return desc.getDbName(); + } + }); + } - for (K key : keys) { - Object typeKey = typeKey(key); + /** + * @param stmt Prepare statement. + * @param i Start index for parameters. + * @param type Type description. + * @param key Key object. + * @return Next index for parameters. + */ + protected int fillKeyParameters(PreparedStatement stmt, int i, EntryMapping type, + Object key) throws CacheException { + for (GridCacheQueryTypeDescriptor field : type.keyDescriptors()) { + Object fieldVal = extractField(type.keyType(), field.getJavaName(), key); - Collection batch = keyByType.get(typeKey); + try { + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDbType()); + } + catch (SQLException e) { + throw new CacheException("Failed to set statement parameter name: " + field.getDbName(), e); + } + } - if (batch == null) - keyByType.put(typeKey, batch = new ArrayList<>()); + return i; + } - batch.add(key); - } + /** + * @param stmt Prepare statement. + * @param type Type description. + * @param key Key object. + * @return Next index for parameters. + */ + protected int fillKeyParameters(PreparedStatement stmt, EntryMapping type, Object key) throws CacheException { + return fillKeyParameters(stmt, 1, type, key); + } - for (Map.Entry> entry : keyByType.entrySet()) { - QueryCache qry = entryQtyCache.get(entry.getKey()); + /** + * @param stmt Prepare statement. + * @param i Start index for parameters. + * @param type Type description. + * @param val Value object. + * @return Next index for parameters. + */ + protected int fillValueParameters(PreparedStatement stmt, int i, EntryMapping type, Object val) + throws CacheWriterException { + for (GridCacheQueryTypeDescriptor field : type.uniqValFields) { + Object fieldVal = extractField(type.valueType(), field.getJavaName(), val); - removeAll(tx, qry, entry.getValue()); + try { + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDbType()); + } + catch (SQLException e) { + throw new CacheWriterException("Failed to set statement parameter name: " + field.getDbName(), e); + } } + + return i; } /** @@ -1050,22 +993,6 @@ public void setUser(String user) { this.user = user; } - /** - * @return Paths to xml with type mapping description. - */ - public Collection getTypeMetadataPaths() { - return typeMetadataPaths; - } - - /** - * Set paths to xml with type mapping description. - * - * @param typeMetadataPaths Paths to xml. - */ - public void setTypeMetadataPaths(Collection typeMetadataPaths) { - this.typeMetadataPaths = typeMetadataPaths; - } - /** * Set type mapping description. * @@ -1080,7 +1007,7 @@ public void setTypeMetadata(Collection typeMetadata) * * @return Database dialect. */ - public BasicJdbcDialect getDialect() { + public JdbcDialect getDialect() { return dialect; } @@ -1089,7 +1016,7 @@ public BasicJdbcDialect getDialect() { * * @param dialect Database dialect. */ - public void setDialect(BasicJdbcDialect dialect) { + public void setDialect(JdbcDialect dialect) { this.dialect = dialect; } @@ -1129,32 +1056,128 @@ public void setBatchSize(int batchSz) { this.batchSz = batchSz; } - @Override public void txEnd(boolean commit) throws CacheWriterException { - // TODO: SPRINT-32 CODE: implement. - } + private class LoadWorker implements Callable> { + private final Connection conn; - @Override public V load(K k) throws CacheLoaderException { - return null; // TODO: SPRINT-32 CODE: implement. - } + private final Collection keys; - @Override public Map loadAll(Iterable iterable) throws CacheLoaderException { - return null; // TODO: SPRINT-32 CODE: implement. - } + private final EntryMapping m; - @Override public void write(Cache.Entry entry) throws CacheWriterException { - // TODO: SPRINT-32 CODE: implement. - } + private LoadWorker(Connection conn, EntryMapping m) { + this.conn = conn; + keys = new ArrayList<>(batchSz); + this.m = m; + } + + /** {@inheritDoc} */ + @Override public Map call() throws Exception { + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(m.loadQuery(keys.size())); + + int i = 1; - @Override - public void writeAll(Collection> collection) throws CacheWriterException { - // TODO: SPRINT-32 CODE: implement. + for (Object key : keys) + for (GridCacheQueryTypeDescriptor field : m.keyDescriptors()) { + Object fieldVal = extractField(m.keyType(), field.getJavaName(), key); + + if (fieldVal != null) + stmt.setObject(i++, fieldVal); + else + stmt.setNull(i++, field.getDbType()); + } + + ResultSet rs = stmt.executeQuery(); + + Map entries = U.newHashMap(keys.size()); + + while (rs.next()) { + K1 key = buildObject(m.keyType(), m.keyDescriptors(), rs); + V1 val = buildObject(m.valueType(), m.valueDescriptors(), rs); + + entries.put(key, val); + } + + return entries; + } + finally { + U.closeQuiet(stmt); + } + } } - @Override public void delete(Object o) throws CacheWriterException { - // TODO: SPRINT-32 CODE: implement. + private class WriteWorker implements Callable { + private final Connection conn; + + private final Collection> entries; + + private final EntryMapping m; + + private WriteWorker(Connection conn, EntryMapping m) { + this.conn = conn; + entries = new ArrayList<>(batchSz); + this.m = m; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(m.mergeQry); + + for (Cache.Entry entry : entries) { + int i = fillKeyParameters(stmt, m, entry.getKey()); + + fillValueParameters(stmt, i, m, entry.getValue()); + + stmt.addBatch(); + } + + stmt.executeBatch(); + } + finally { + U.closeQuiet(stmt); + } + + return null; + } } - @Override public void deleteAll(Collection collection) throws CacheWriterException { - // TODO: SPRINT-32 CODE: implement. + private class DeleteWorker implements Callable { + private final Connection conn; + + private final Collection keys; + + private final EntryMapping m; + + private DeleteWorker(Connection conn, EntryMapping m) { + this.conn = conn; + keys = new ArrayList<>(batchSz); + this.m = m; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(m.remQry); + + for (Object key : keys) { + fillKeyParameters(stmt, m, key); + + stmt.addBatch(); + } + + stmt.executeBatch(); + } + finally { + U.closeQuiet(stmt); + } + + return null; + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java index 4d9953a2bbe07..16a67d46e4de5 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java @@ -17,12 +17,13 @@ package org.apache.ignite.cache.store.jdbc; -import org.apache.ignite.*; import org.apache.ignite.cache.store.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.*; +import javax.cache.integration.*; import java.lang.reflect.*; import java.sql.*; import java.util.*; @@ -55,8 +56,7 @@ protected static class PojoMethodsCache { * @param clsName Class name. * @param fields Fields. */ - public PojoMethodsCache(String clsName, - Collection fields) throws IgniteCheckedException { + public PojoMethodsCache(String clsName, Collection fields) throws CacheException { try { cls = Class.forName(clsName); @@ -67,10 +67,10 @@ public PojoMethodsCache(String clsName, ctor.setAccessible(true); } catch (ClassNotFoundException e) { - throw new IgniteCheckedException("Failed to find class: " + clsName, e); + throw new CacheException("Failed to find class: " + clsName, e); } catch (NoSuchMethodException e) { - throw new IgniteCheckedException("Failed to find empty constructor for class: " + clsName, e); + throw new CacheException("Failed to find empty constructor for class: " + clsName, e); } setters = U.newHashMap(fields.size()); @@ -88,7 +88,7 @@ public PojoMethodsCache(String clsName, getters.put(field.getJavaName(), cls.getMethod("is" + prop)); } catch (NoSuchMethodException e) { - throw new IgniteCheckedException("Failed to find getter for property " + field.getJavaName() + + throw new CacheException("Failed to find getter for property " + field.getJavaName() + " of class: " + cls.getName(), e); } } @@ -97,7 +97,7 @@ public PojoMethodsCache(String clsName, setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType())); } catch (NoSuchMethodException e) { - throw new IgniteCheckedException("Failed to find setter for property " + field.getJavaName() + + throw new CacheException("Failed to find setter for property " + field.getJavaName() + " of class: " + clsName, e); } } @@ -118,14 +118,14 @@ public PojoMethodsCache(String clsName, * Construct new instance of pojo object. * * @return pojo object. - * @throws IgniteCheckedException If construct new instance failed. + * @throws CacheLoaderException If construct new instance failed. */ - protected Object newInstance() throws IgniteCheckedException { + protected Object newInstance() throws CacheLoaderException { try { return ctor.newInstance(); } catch (Exception e) { - throw new IgniteCheckedException("Failed to create new instance for class: " + cls, e); + throw new CacheLoaderException("Failed to create new instance for class: " + cls, e); } } } @@ -134,8 +134,8 @@ protected Object newInstance() throws IgniteCheckedException { protected Map mtdsCache; /** {@inheritDoc} */ - @Override protected void buildTypeCache() throws IgniteCheckedException { - entryQtyCache = U.newHashMap(typeMetadata.size()); + @Override protected void buildTypeCache() throws CacheException { + typeMeta = U.newHashMap(typeMetadata.size()); mtdsCache = U.newHashMap(typeMetadata.size() * 2); @@ -144,15 +144,19 @@ protected Object newInstance() throws IgniteCheckedException { mtdsCache.put(type.getKeyType(), keyCache); - entryQtyCache.put(keyCache.cls, new QueryCache(dialect, type)); + typeMeta.put(keyCache.cls, new EntryMapping(dialect, type)); mtdsCache.put(type.getType(), new PojoMethodsCache(type.getType(), type.getValueDescriptors())); } + + typeMeta = Collections.unmodifiableMap(typeMeta); + + mtdsCache = Collections.unmodifiableMap(mtdsCache); } /** {@inheritDoc} */ @Override protected R buildObject(String typeName, Collection fields, - ResultSet rs) throws IgniteCheckedException { + ResultSet rs) throws CacheLoaderException { PojoMethodsCache t = mtdsCache.get(typeName); Object obj = t.newInstance(); @@ -164,25 +168,28 @@ protected Object newInstance() throws IgniteCheckedException { return (R)obj; } catch (Exception e) { - throw new IgniteCheckedException("Failed to read object of class: " + typeName, e); + throw new CacheLoaderException("Failed to read object of class: " + typeName, e); } } /** {@inheritDoc} */ @Nullable @Override protected Object extractField(String typeName, String fieldName, Object obj) - throws IgniteCheckedException { + throws CacheException { try { PojoMethodsCache mc = mtdsCache.get(typeName); return mc.getters.get(fieldName).invoke(obj); } catch (Exception e) { - throw new IgniteCheckedException("Failed to read object of class: " + typeName, e); + throw new CacheException("Failed to read object of class: " + typeName, e); } } /** {@inheritDoc} */ - @Override protected Object typeKey(Object key) { - return key.getClass(); + @Override protected Object typeId(Object key) throws CacheException { + if (key != null) + return key.getClass(); + + throw new CacheException(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java index e6b43672559d6..03e6f8b20f459 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java @@ -25,7 +25,7 @@ /** * Represents a dialect of SQL implemented by a particular RDBMS. */ -public class BasicJdbcDialect { +public class BasicJdbcDialect implements JdbcDialect { /** Default max query parameters count. */ protected static final int DFLT_MAX_PARAMS_CNT = 2000; @@ -145,29 +145,13 @@ private static String where(Collection keyCols, int keyCnt) { return sb.toString(); } - /** - * Construct load cache query. - * - * @param schema Database schema name. - * @param tblName Database table name. - * @param uniqCols Database unique value columns. - * @return Load cache query. - */ - public String loadCacheQuery(String schema, String tblName, Iterable uniqCols) { + /** {@inheritDoc} */ + @Override public String loadCacheQuery(String schema, String tblName, Iterable uniqCols) { return String.format("SELECT %s FROM %s.%s", mkString(uniqCols, ","), schema, tblName); } - /** - * Construct load query. - * - * @param schema Database schema name. - * @param tblName Database table name. - * @param keyCols Database key columns. - * @param cols Selected columns. - * @param keyCnt Key count. - * @return Load query. - */ - public String loadQuery(String schema, String tblName, Collection keyCols, Iterable cols, + /** {@inheritDoc} */ + @Override public String loadQuery(String schema, String tblName, Collection keyCols, Iterable cols, int keyCnt) { assert !keyCols.isEmpty(); @@ -176,30 +160,18 @@ public String loadQuery(String schema, String tblName, Collection keyCol return String.format("SELECT %s FROM %s.%s WHERE %s", mkString(cols, ","), schema, tblName, params); } - /** - * Construct insert query. - * - * @param schema Database schema name. - * @param tblName Database table name. - * @param keyCols Database key columns. - * @param valCols Database value columns. - */ - public String insertQuery(String schema, String tblName, Collection keyCols, Collection valCols) { + /** {@inheritDoc} */ + @Override public String insertQuery(String schema, String tblName, Collection keyCols, + Collection valCols) { Collection cols = F.concat(false, keyCols, valCols); return String.format("INSERT INTO %s.%s(%s) VALUES(%s)", schema, tblName, mkString(cols, ","), repeat("?", cols.size(), "", ",", "")); } - /** - * Construct update query. - * - * @param schema Database schema name. - * @param tblName Database table name. - * @param keyCols Database key columns. - * @param valCols Database value columns. - */ - public String updateQuery(String schema, String tblName, Collection keyCols, Iterable valCols) { + /** {@inheritDoc} */ + @Override public String updateQuery(String schema, String tblName, Collection keyCols, + Iterable valCols) { String params = mkString(valCols, new C1() { @Override public String apply(String s) { return s + "=?"; @@ -209,35 +181,19 @@ public String updateQuery(String schema, String tblName, Collection keyC return String.format("UPDATE %s.%s SET %s WHERE %s", schema, tblName, params, where(keyCols, 1)); } - /** - * @return {@code True} if database support merge operation. - */ - public boolean hasMerge() { + /** {@inheritDoc} */ + @Override public boolean hasMerge() { return false; } - /** - * Construct merge query. - * - * @param schema Database schema name. - * @param tblName Database table name. - * @param keyCols Database key columns. - * @param uniqCols Database unique value columns. - * @return Put query. - */ - public String mergeQuery(String schema, String tblName, Collection keyCols, Collection uniqCols) { + /** {@inheritDoc} */ + @Override public String mergeQuery(String schema, String tblName, Collection keyCols, + Collection uniqCols) { return ""; } - /** - * Construct remove query. - * - * @param schema Database schema name. - * @param tblName Database table name. - * @param keyCols Database key columns. - * @return Remove query. - */ - public String removeQuery(String schema, String tblName, Iterable keyCols) { + /** {@inheritDoc} */ + @Override public String removeQuery(String schema, String tblName, Iterable keyCols) { String whereParams = mkString(keyCols, new C1() { @Override public String apply(String s) { return s + "=?"; @@ -247,12 +203,8 @@ public String removeQuery(String schema, String tblName, Iterable keyCol return String.format("DELETE FROM %s.%s WHERE %s", schema, tblName, whereParams); } - /** - * Get max query parameters count. - * - * @return Max query parameters count. - */ - public int getMaxParamsCnt() { + /** {@inheritDoc} */ + @Override public int getMaxParamsCnt() { return maxParamsCnt; } diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java new file mode 100644 index 0000000000000..e0d80e9f887b8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc.dialect; + +import java.util.*; + +/** + * Represents a dialect of SQL implemented by a particular RDBMS. + */ +public interface JdbcDialect { + /** + * Construct load cache query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param uniqCols Database unique value columns. + * @return Load cache query. + */ + public String loadCacheQuery(String schema, String tblName, Iterable uniqCols); + + /** + * Construct load query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param cols Selected columns. + * @param keyCnt Key count. + * @return Load query. + */ + public String loadQuery(String schema, String tblName, Collection keyCols, Iterable cols, + int keyCnt); + + /** + * Construct insert query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param valCols Database value columns. + */ + public String insertQuery(String schema, String tblName, Collection keyCols, Collection valCols); + + /** + * Construct update query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param valCols Database value columns. + */ + public String updateQuery(String schema, String tblName, Collection keyCols, Iterable valCols); + + /** + * @return {@code True} if database support merge operation. + */ + public boolean hasMerge(); + + /** + * Construct merge query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @param uniqCols Database unique value columns. + * @return Put query. + */ + public String mergeQuery(String schema, String tblName, Collection keyCols, Collection uniqCols); + + /** + * Construct remove query. + * + * @param schema Database schema name. + * @param tblName Database table name. + * @param keyCols Database key columns. + * @return Remove query. + */ + public String removeQuery(String schema, String tblName, Iterable keyCols); + + /** + * Get max query parameters count. + * + * @return Max query parameters count. + */ + public int getMaxParamsCnt(); +} diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java index f2688d3951678..e061d7aa116a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java @@ -29,6 +29,7 @@ import org.springframework.context.support.*; import org.springframework.core.io.*; +import javax.cache.configuration.*; import java.io.*; import java.net.*; import java.util.*; @@ -49,7 +50,7 @@ public class PojoCacheStoreMultitreadedSelfTest extends AbstractCacheStoreMultit UrlResource metaUrl; try { - metaUrl = new UrlResource(new File("modules/core/src/test/config/store/auto/all.xml").toURI().toURL()); + metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/all.xml").toURI().toURL()); } catch (MalformedURLException e) { throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e); @@ -97,7 +98,10 @@ public class PojoCacheStoreMultitreadedSelfTest extends AbstractCacheStoreMultit cc.setSwapEnabled(false); cc.setWriteBehindEnabled(false); - // TODO: IGNITE-32 FIXME cc.setStore(store); + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); c.setCacheConfiguration(cc); diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java index fa77385ff7c03..83e95eada50f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java @@ -18,13 +18,17 @@ package org.apache.ignite.cache.store.jdbc; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cache.store.jdbc.model.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.*; import org.gridgain.testframework.junits.cache.GridAbstractCacheStoreSelfTest.*; +import org.gridgain.testframework.junits.cache.*; import org.gridgain.testframework.junits.common.*; import org.h2.jdbcx.*; import org.jetbrains.annotations.*; @@ -33,6 +37,7 @@ import org.springframework.context.support.*; import org.springframework.core.io.*; +import javax.cache.*; import java.io.*; import java.net.*; import java.sql.*; @@ -53,6 +58,9 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest { /** Person count. */ protected static final int PERSON_CNT = 100000; + /** */ + protected TestThreadLocalCacheSession ses = new TestThreadLocalCacheSession(); + /** */ protected final JdbcPojoCacheStore store; @@ -79,7 +87,7 @@ protected JdbcPojoCacheStore store() throws IgniteCheckedException { UrlResource metaUrl; try { - metaUrl = new UrlResource(new File("modules/core/src/test/config/store/auto/all.xml").toURI().toURL()); + metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/all.xml").toURI().toURL()); } catch (MalformedURLException e) { throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e); @@ -116,6 +124,8 @@ protected JdbcPojoCacheStore store() throws IgniteCheckedException { */ protected void inject(JdbcCacheStore store) throws Exception { getTestResources().inject(store); + + GridTestUtils.setFieldValue(store, CacheStore.class, "ses", ses); } /** @@ -175,8 +185,8 @@ else if (k instanceof PersonKey && v instanceof Person) assertEquals(ORGANIZATION_CNT, orgKeys.size()); assertEquals(PERSON_CNT, prnKeys.size()); - store.removeAll(null, orgKeys); - store.removeAll(null, prnKeys); + store.deleteAll(orgKeys); + store.deleteAll(prnKeys); orgKeys.clear(); prnKeys.clear(); @@ -194,31 +204,40 @@ public void testStore() throws Exception { // Create dummy transaction IgniteTx tx = new DummyTx(); + ses.newSession(tx); + OrganizationKey k1 = new OrganizationKey(1); Organization v1 = new Organization(1, "Name1", "City1"); OrganizationKey k2 = new OrganizationKey(2); Organization v2 = new Organization(2, "Name2", "City2"); - store.put(tx, k1, v1); - store.put(tx, k2, v2); + store.write(new CacheEntryImpl<>(k1, v1)); + store.write(new CacheEntryImpl<>(k2, v2)); + + store.txEnd(true); + + ses.newSession(null); - store.txEnd(tx, true); + assertEquals(v1, store.load(k1)); + assertEquals(v2, store.load(k2)); - assertEquals(v1, store.load(null, k1)); - assertEquals(v2, store.load(null, k2)); + ses.newSession(tx); OrganizationKey k3 = new OrganizationKey(3); - assertNull(store.load(tx, k3)); + assertNull(store.load(k3)); - store.remove(tx, k1); + store.delete(k1); - store.txEnd(tx, true); + store.txEnd(true); - assertNull(store.load(tx, k1)); - assertEquals(v2, store.load(tx, k2)); - assertNull(store.load(null, k3)); + assertNull(store.load(k1)); + assertEquals(v2, store.load(k2)); + + ses.newSession(null); + + assertNull(store.load(k3)); } /** @@ -227,118 +246,140 @@ public void testStore() throws Exception { public void testRollback() throws IgniteCheckedException { IgniteTx tx = new DummyTx(); + ses.newSession(tx); + OrganizationKey k1 = new OrganizationKey(1); Organization v1 = new Organization(1, "Name1", "City1"); // Put. - store.put(tx, k1, v1); + store.write(new CacheEntryImpl<>(k1, v1)); - store.txEnd(tx, false); // Rollback. + store.txEnd(false); // Rollback. tx = new DummyTx(); - assertNull(store.load(tx, k1)); + ses.newSession(tx); + + assertNull(store.load(k1)); OrganizationKey k2 = new OrganizationKey(2); Organization v2 = new Organization(2, "Name2", "City2"); // Put all. - assertNull(store.load(tx, k2)); + assertNull(store.load(k2)); + + Collection> col = new ArrayList<>(); - store.putAll(tx, Collections.singletonMap(k2, v2)); + col.add(new CacheEntryImpl<>(k2, v2)); - store.txEnd(tx, false); // Rollback. + store.writeAll(col); + + store.txEnd(false); // Rollback. tx = new DummyTx(); - assertNull(store.load(tx, k2)); + ses.newSession(tx); + + assertNull(store.load(k2)); OrganizationKey k3 = new OrganizationKey(3); Organization v3 = new Organization(3, "Name3", "City3"); - store.putAll(tx, Collections.singletonMap(k3, v3)); + col = new ArrayList<>(); + + col.add(new CacheEntryImpl<>(k3, v3)); - store.txEnd(tx, true); // Commit. + store.writeAll(col); + + store.txEnd(true); // Commit. tx = new DummyTx(); - assertEquals(v3, store.load(tx, k3)); + ses.newSession(tx); + + assertEquals(v3, store.load(k3)); OrganizationKey k4 = new OrganizationKey(4); Organization v4 = new Organization(4, "Name4", "City4"); - store.put(tx, k4, v4); + store.write(new CacheEntryImpl<>(k4, v4)); - store.txEnd(tx, false); // Rollback. + store.txEnd(false); // Rollback. tx = new DummyTx(); - assertNull(store.load(tx, k4)); + ses.newSession(tx); + + assertNull(store.load(k4)); - assertEquals(v3, store.load(tx, k3)); + assertEquals(v3, store.load(k3)); // Remove. - store.remove(tx, k3); + store.delete(k3); - store.txEnd(tx, false); // Rollback. + store.txEnd(false); // Rollback. tx = new DummyTx(); - assertEquals(v3, store.load(tx, k3)); + ses.newSession(tx); + + assertEquals(v3, store.load(k3)); // Remove all. - store.removeAll(tx, Arrays.asList(k3)); + store.deleteAll(Arrays.asList(k3)); - store.txEnd(tx, false); // Rollback. + store.txEnd(false); // Rollback. tx = new DummyTx(); - assertEquals(v3, store.load(tx, k3)); + ses.newSession(tx); + + assertEquals(v3, store.load(k3)); } /** - * @throws IgniteCheckedException if failed. */ - public void testAllOpsWithTXNoCommit() throws IgniteCheckedException { + public void testAllOpsWithTXNoCommit() { doTestAllOps(new DummyTx(), false); } /** - * @throws IgniteCheckedException if failed. */ - public void testAllOpsWithTXCommit() throws IgniteCheckedException { + public void testAllOpsWithTXCommit() { doTestAllOps(new DummyTx(), true); } /** - * @throws IgniteCheckedException if failed. */ - public void testAllOpsWithoutTX() throws IgniteCheckedException { + public void testAllOpsWithoutTX() { doTestAllOps(null, false); } /** * @param tx Transaction. * @param commit Commit. - * @throws IgniteCheckedException If failed. */ - private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) throws IgniteCheckedException { + private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) { try { + ses.newSession(tx); + final OrganizationKey k1 = new OrganizationKey(1); final Organization v1 = new Organization(1, "Name1", "City1"); - store.put(tx, k1, v1); + store.write(new CacheEntryImpl<>(k1, v1)); if (tx != null && commit) { - store.txEnd(tx, true); + store.txEnd(true); tx = new DummyTx(); + + ses.newSession(tx); } if (tx == null || commit) - assertEquals(v1, store.load(tx, k1)); + assertEquals(v1, store.load(k1)); - Map m = new HashMap<>(); + Collection> col = new ArrayList<>(); final OrganizationKey k2 = new OrganizationKey(2); final Organization v2 = new Organization(2, "Name2", "City2"); @@ -346,15 +387,17 @@ private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) throws IgniteCh final OrganizationKey k3 = new OrganizationKey(3); final Organization v3 = new Organization(3, "Name3", "City3"); - m.put(k2, v2); - m.put(k3, v3); + col.add(new CacheEntryImpl<>(k2, v2)); + col.add(new CacheEntryImpl<>(k3, v3)); - store.putAll(tx, m); + store.writeAll(col); if (tx != null && commit) { - store.txEnd(tx, true); + store.txEnd(true); tx = new DummyTx(); + + ses.newSession(tx); } final AtomicInteger cntr = new AtomicInteger(); @@ -362,55 +405,62 @@ private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) throws IgniteCh final OrganizationKey no_such_key = new OrganizationKey(4); if (tx == null || commit) { - store.loadAll(tx, Arrays.asList(k1, k2, k3, no_such_key), new CI2() { - @Override public void apply(Object o, Object o1) { - if (k1.equals(o)) - assertEquals(v1, o1); + Map loaded = store.loadAll(Arrays.asList(k1, k2, k3, no_such_key)); - if (k2.equals(o)) - assertEquals(v2, o1); + for (Map.Entry e : loaded.entrySet()) { + Object key = e.getKey(); + Object val = e.getValue(); - if (k3.equals(o)) - assertEquals(v3, o1); + if (k1.equals(key)) + assertEquals(v1, val); - if (no_such_key.equals(o)) - fail(); + if (k2.equals(key)) + assertEquals(v2, val); - cntr.incrementAndGet(); - } - }); + if (k3.equals(key)) + assertEquals(v3, val); + + if (no_such_key.equals(key)) + fail(); + + cntr.incrementAndGet(); + } assertEquals(3, cntr.get()); } - store.removeAll(tx, Arrays.asList(k2, k3)); + store.deleteAll(Arrays.asList(k2, k3)); if (tx != null && commit) { - store.txEnd(tx, true); + store.txEnd(true); tx = new DummyTx(); + + ses.newSession(tx); } if (tx == null || commit) { - assertNull(store.load(tx, k2)); - assertNull(store.load(tx, k3)); - assertEquals(v1, store.load(tx, k1)); + assertNull(store.load(k2)); + assertNull(store.load(k3)); + assertEquals(v1, store.load(k1)); } - store.remove(tx, k1); + store.delete(k1); if (tx != null && commit) { - store.txEnd(tx, true); + store.txEnd(true); tx = new DummyTx(); + + ses.newSession(tx); } if (tx == null || commit) - assertNull(store.load(tx, k1)); + assertNull(store.load(k1)); } finally { if (tx != null) - store.txEnd(tx, false); + store.txEnd(false); } } @@ -427,6 +477,8 @@ public void testSimpleMultithreading() throws Exception { for (int i = 0; i < 1000; i++) { IgniteTx tx = rnd.nextBoolean() ? new DummyTx() : null; + ses.newSession(tx); + int op = rnd.nextInt(10); boolean queueEmpty = false; @@ -438,29 +490,22 @@ public void testSimpleMultithreading() throws Exception { queueEmpty = true; else { if (rnd.nextBoolean()) - assertNotNull(store.load(tx, key)); + assertNotNull(store.load(key)); else { - final AtomicInteger cntr = new AtomicInteger(); - - store.loadAll(tx, Collections.singleton(key), new CI2() { - @Override public void apply(Object o, Object o1) { - cntr.incrementAndGet(); + Map loaded = store.loadAll(Collections.singleton(key)); - assertNotNull(o); - assertNotNull(o1); + assertEquals(1, loaded.size()); - OrganizationKey key = (OrganizationKey)o; - Organization val = (Organization)o1; + Map.Entry e = loaded.entrySet().iterator().next(); - assertTrue(key.getId().equals(val.getId())); - } - }); + OrganizationKey k = (OrganizationKey)e.getKey(); + Organization v = (Organization)e.getValue(); - assertEquals(1, cntr.get()); + assertTrue(k.getId().equals(v.getId())); } if (tx != null) - store.txEnd(tx, true); + store.txEnd(true); queue.add(key); } @@ -472,12 +517,12 @@ else if (op < 6) { // Remove. queueEmpty = true; else { if (rnd.nextBoolean()) - store.remove(tx, key); + store.delete(key); else - store.removeAll(tx, Collections.singleton(key)); + store.deleteAll(Collections.singleton(key)); if (tx != null) - store.txEnd(tx, true); + store.txEnd(true); } } else { // Update. @@ -489,13 +534,20 @@ else if (op < 6) { // Remove. Organization val = new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId()); + Cache.Entry entry = new CacheEntryImpl<>(key, val); + if (rnd.nextBoolean()) - store.put(tx, key, val); - else - store.putAll(tx, Collections.singletonMap(key, val)); + store.write(entry); + else { + Collection> col = new ArrayList<>(); + + col.add(entry); + + store.writeAll(col); + } if (tx != null) - store.txEnd(tx, true); + store.txEnd(true); queue.add(key); } @@ -505,13 +557,20 @@ else if (op < 6) { // Remove. OrganizationKey key = new OrganizationKey(rnd.nextInt()); Organization val = new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId()); + Cache.Entry entry = new CacheEntryImpl<>(key, val); + if (rnd.nextBoolean()) - store.put(tx, key, val); - else - store.putAll(tx, Collections.singletonMap(key, val)); + store.write(entry); + else { + Collection> col = new ArrayList<>(); + + col.add(entry); + + store.writeAll(col); + } if (tx != null) - store.txEnd(tx, true); + store.txEnd(true); queue.add(key); }