Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-6853: Cassandra cache store does not clean prepared statements… #3088

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions modules/cassandra/pom.xml
Expand Up @@ -42,6 +42,12 @@
<groupId>org.apache.ignite</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 5 additions & 0 deletions modules/cassandra/store/pom.xml
Expand Up @@ -140,6 +140,11 @@
<scope>test</scope>
</dependency>

<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mockito dependency should be specified in the "dependencyManagement" section of parent POM. Such way you don't need to specify tag in all the modules which are going to reuse the dependecy - it will be automatically resolved from parent POM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added dependencyManagement to the parent POM, such that the child POM doesn't need to specify the version.

However, unless we add it to the parent as a 'dependency', then the child pom still needs to add it as a dependency.

<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -250,6 +250,8 @@ else if (CassandraHelper.isHostsAvailabilityError(e)) {
else if (CassandraHelper.isPreparedStatementClusterError(e)) {
prepStatEx = e;
handlePreparedStatementClusterError(e);
preparedSt = prepareStatement(assistant.getTable(), assistant.getStatement(),
assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
}
else
unknownEx = e;
Expand Down
@@ -0,0 +1,177 @@
package org.apache.ignite.tests;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
import org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant;
import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
import org.junit.Test;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedId;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.InvalidQueryException;

public class CassandraSessionImplTest {

private PreparedStatement preparedStatement1 = mockPreparedStatement();
private PreparedStatement preparedStatement2 = mockPreparedStatement();

private MyBoundStatement1 boundStatement1 = new MyBoundStatement1(preparedStatement1);
private MyBoundStatement2 boundStatement2 = new MyBoundStatement2(preparedStatement2);

@SuppressWarnings("unchecked")
@Test
public void executeFailureTest() {
Session session1 = mock(Session.class);
Session session2 = mock(Session.class);
when(session1.prepare(any(String.class))).thenReturn(preparedStatement1);
when(session2.prepare(any(String.class))).thenReturn(preparedStatement2);

ResultSetFuture rsFuture = mock(ResultSetFuture.class);
ResultSet rs = mock(ResultSet.class);
Iterator it = mock(Iterator.class);
when(it.hasNext()).thenReturn(true);
when(it.next()).thenReturn(mock(Row.class));
when(rs.iterator()).thenReturn(it);
when(rsFuture.getUninterruptibly()).thenReturn(rs);
/* @formatter:off */
when(session1.executeAsync(any(Statement.class)))
.thenThrow(new InvalidQueryException("You may have used a PreparedStatement that was created with another Cluster instance"))
.thenThrow(new RuntimeException("this session should be refreshed / recreated"));
when(session2.executeAsync(boundStatement1))
.thenThrow(new InvalidQueryException("You may have used a PreparedStatement that was created with another Cluster instance"));
when(session2.executeAsync(boundStatement2)).thenReturn(rsFuture);
/* @formatter:on */

Cluster cluster = mock(Cluster.class);
when(cluster.connect()).thenReturn(session1).thenReturn(session2);
when(session1.getCluster()).thenReturn(cluster);
when(session2.getCluster()).thenReturn(cluster);

Cluster.Builder builder = mock(Cluster.Builder.class);
when(builder.build()).thenReturn(cluster);

CassandraSessionImpl cassandraSession = new CassandraSessionImpl(builder, null,
ConsistencyLevel.ONE, ConsistencyLevel.ONE, 0, mock(IgniteLogger.class));

BatchExecutionAssistant<String, String> batchExecutionAssistant = new MyBatchExecutionAssistant();
ArrayList<String> data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(String.valueOf(i));
}
cassandraSession.execute(batchExecutionAssistant, data);

verify(cluster, times(2)).connect();
verify(session1, times(1)).prepare(any(String.class));
verify(session2, times(1)).prepare(any(String.class));
assertEquals(9, batchExecutionAssistant.processedCount());
}

private static PreparedStatement mockPreparedStatement() {
PreparedStatement ps = mock(PreparedStatement.class);
when(ps.getVariables()).thenReturn(mock(ColumnDefinitions.class));
when(ps.getPreparedId()).thenReturn(mock(PreparedId.class));
when(ps.getQueryString()).thenReturn("insert into xxx");
return ps;
}

private class MyBatchExecutionAssistant implements BatchExecutionAssistant {

private Set<Integer> processed = new HashSet<>();

@Override
public void process(Row row, int seqNum) {
if (processed.contains(seqNum))
return;

processed.add(seqNum);
}

@Override
public boolean alreadyProcessed(int seqNum) {
return processed.contains(seqNum);
}

@Override
public int processedCount() {
return processed.size();
}

@Override
public boolean tableExistenceRequired() {
return false;
}

@Override
public String getTable() {
return null;
}

@Override
public String getStatement() {
return null;
}

@Override
public BoundStatement bindStatement(PreparedStatement statement, Object obj) {
if (statement == preparedStatement1) {
return boundStatement1;
} else if (statement == preparedStatement2) {
return boundStatement2;
}
throw new RuntimeException("unexpected");
}

@Override
public KeyValuePersistenceSettings getPersistenceSettings() {
return null;
}

@Override
public String operationName() {
return null;
}

@Override
public Object processedData() {
return null;
}

}

private static class MyBoundStatement1 extends BoundStatement {

MyBoundStatement1(PreparedStatement ps) {
super(ps);
}

}

private static class MyBoundStatement2 extends BoundStatement {

MyBoundStatement2(PreparedStatement ps) {
super(ps);
}
}

}
1 change: 1 addition & 0 deletions parent/pom.xml
Expand Up @@ -95,6 +95,7 @@
<lucene.bundle.version>5.5.2_1</lucene.bundle.version>
<lucene.version>5.5.2</lucene.version>
<maven.bundle.plugin.version>2.5.4</maven.bundle.plugin.version>
<mockito.version>1.10.19</mockito.version>
<mysql.connector.version>5.1.39</mysql.connector.version>
<netlibjava.version>1.1.2</netlibjava.version>
<oro.bundle.version>2.0.8_6</oro.bundle.version>
Expand Down