Skip to content

Commit

Permalink
sprint-2 - Added isWithinTransaction() method to session.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitiry Setrakyan committed Feb 28, 2015
1 parent 6097e7b commit 16105ec
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 73 deletions.
Expand Up @@ -22,7 +22,6 @@
import org.apache.ignite.examples.datagrid.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;

import javax.cache.*;
Expand Down Expand Up @@ -72,8 +71,6 @@ private void prepareDb() throws IgniteException {

/** {@inheritDoc} */
@Override public void txEnd(boolean commit) {
Transaction tx = transaction();

Map<String, Connection> props = ses.properties();

try (Connection conn = props.remove(ATTR_NAME)) {
Expand All @@ -84,99 +81,94 @@ private void prepareDb() throws IgniteException {
conn.rollback();
}

System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
System.out.println(">>> Transaction ended [commit=" + commit + ']');
}
catch (SQLException e) {
throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
throw new CacheWriterException("Failed to end transaction: " + ses.transaction(), e);
}
}

/** {@inheritDoc} */
@Override public Person load(Long key) {
Transaction tx = transaction();

System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
System.out.println(">>> Loading key: " + key);

Connection conn = null;

try {
conn = connection(tx);
conn = connection();

try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
st.setString(1, key.toString());

ResultSet rs = st.executeQuery();

if (rs.next())
return person(rs.getLong(1), rs.getString(2), rs.getString(3));
return new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load object: " + key, e);
}
finally {
end(tx, conn);
end(conn);
}

return null;
}

/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) {
Transaction tx = transaction();

Long key = entry.getKey();

Person val = entry.getValue();

System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');
System.out.println(">>> Putting [key=" + key + ", val=" + val + ']');

Connection conn = null;

try {
conn = connection(tx);
conn = connection();

int updated;
int updated;

try (PreparedStatement st = conn.prepareStatement(
"update PERSONS set firstName=?, lastName=? where id=?")) {
st.setString(1, val.getFirstName());
st.setString(2, val.getLastName());
st.setLong(3, val.getId());
// Try update first.
try (PreparedStatement st = conn.prepareStatement(
"update PERSONS set firstName=?, lastName=? where id=?")) {
st.setString(1, val.getFirstName());
st.setString(2, val.getLastName());
st.setLong(3, val.getId());

updated = st.executeUpdate();
}
updated = st.executeUpdate();
}

// If update failed, try to insert.
if (updated == 0) {
try (PreparedStatement st = conn.prepareStatement(
"insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) {
st.setLong(1, val.getId());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());
// If update failed, try to insert.
if (updated == 0) {
try (PreparedStatement st = conn.prepareStatement(
"insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) {
st.setLong(1, val.getId());
st.setString(2, val.getFirstName());
st.setString(3, val.getLastName());

st.executeUpdate();
st.executeUpdate();
}
}
}
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e);
}
finally {
end(tx, conn);
end(conn);
}
}

/** {@inheritDoc} */
@Override public void delete(Object key) {
Transaction tx = transaction();

System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
System.out.println(">>> Removing key: " + key);

Connection conn = null;

try {
conn = connection(tx);
conn = connection();

try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
st.setLong(1, (Long)key);
Expand All @@ -188,7 +180,7 @@ private void prepareDb() throws IgniteException {
throw new CacheWriterException("Failed to remove object: " + key, e);
}
finally {
end(tx, conn);
end(conn);
}
}

Expand All @@ -199,17 +191,13 @@ private void prepareDb() throws IgniteException {

final int entryCnt = (Integer)args[0];

Connection conn = null;

try {
conn = connection(null);

try (Connection conn = connection()) {
try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
try (ResultSet rs = st.executeQuery()) {
int cnt = 0;

while (cnt < entryCnt && rs.next()) {
Person person = person(rs.getLong(1), rs.getString(2), rs.getString(3));
Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

clo.apply(person.getId(), person);

Expand All @@ -223,18 +211,16 @@ private void prepareDb() throws IgniteException {
catch (SQLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
finally {
end(null, conn);
}
}

/**
* @param tx Cache transaction.
* @return Connection.
* @throws SQLException In case of error.
*/
private Connection connection(@Nullable Transaction tx) throws SQLException {
if (tx != null) {
private Connection connection() throws SQLException {
// If there is an ongoing transaction,
// we must reuse the same connection.
if (ses.isWithinTransaction()) {
Map<Object, Object> props = ses.properties();

Connection conn = (Connection)props.get(ATTR_NAME);
Expand All @@ -257,11 +243,10 @@ private Connection connection(@Nullable Transaction tx) throws SQLException {
/**
* Closes allocated resources depending on transaction status.
*
* @param tx Active transaction, if any.
* @param conn Allocated connection.
*/
private void end(@Nullable Transaction tx, @Nullable Connection conn) {
if (tx == null && conn != null) {
private void end(@Nullable Connection conn) {
if (!ses.isWithinTransaction() && conn != null) {
// Close connection right away if there is no transaction.
try {
conn.close();
Expand All @@ -286,23 +271,4 @@ private Connection openConnection(boolean autocommit) throws SQLException {

return conn;
}

/**
* Builds person object out of provided values.
*
* @param id ID.
* @param firstName First name.
* @param lastName Last name.
* @return Person.
*/
private Person person(Long id, String firstName, String lastName) {
return new Person(id, firstName, lastName);
}

/**
* @return Current transaction.
*/
private Transaction transaction() {
return ses != null ? ses.transaction() : null;
}
}
Expand Up @@ -42,6 +42,15 @@ public interface CacheStoreSession {
*/
public Transaction transaction();

/**
* Returns {@code true} if performing store operation within a transaction,
* {@code false} otherwise. Analogous to calling {@code transaction() != null}.
*
* @return {@code True} if performing store operation within a transaction,
* {@code false} otherwise.
*/
public boolean isWithinTransaction();

/**
* Gets current session properties. You can add properties directly to the
* returned map.
Expand Down
Expand Up @@ -36,7 +36,6 @@

import javax.cache.*;
import javax.cache.integration.*;
import java.lang.reflect.*;
import java.util.*;

/**
Expand Down Expand Up @@ -912,6 +911,11 @@ private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
return ses0 != null ? ses0.transaction() : null;
}

/** {@inheritDoc} */
@Override public boolean isWithinTransaction() {
return transaction() != null;
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K1, V1> Map<K1, V1> properties() {
Expand Down
Expand Up @@ -49,6 +49,11 @@ public void newSession(@Nullable Transaction tx) {
return tx;
}

/** {@inheritDoc} */
@Override public boolean isWithinTransaction() {
return transaction() != null;
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
Expand Down
Expand Up @@ -48,6 +48,11 @@ public void newSession(@Nullable Transaction tx) {
return ses != null ? ses.transaction() : null;
}

/** {@inheritDoc} */
@Override public boolean isWithinTransaction() {
return transaction() != null;
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> Map<K, V> properties() {
Expand Down

0 comments on commit 16105ec

Please sign in to comment.