Skip to content

Commit

Permalink
Fix a bug with added padding and not reseting the header on a timeout.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Jun 21, 2016
1 parent 5d2bf5e commit a401ef8
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 74 deletions.
47 changes: 15 additions & 32 deletions src/main/java/net/openhft/chronicle/queue/JDBCComponent.java
@@ -1,13 +1,10 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.util.ThrowingSupplier;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.WireOut;
import org.jetbrains.annotations.NotNull;

import java.sql.*;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.List;

/**
* Created by peter on 12/04/16.
Expand Down Expand Up @@ -35,40 +32,26 @@ public void executeUpdate(String query, Object... args) {
}

@Override
public void executeQuery(String query, Class<? extends Marshallable> resultType, Object... args) {
public void executeQuery(String query, Object... args) {
try (PreparedStatement ps = connection.prepareStatement(query)) {
for (int i = 0; i < args.length; i++)
ps.setObject(i + 1, args[i]);
ResultSet resultSet = ps.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
result.queryResult(new Iterator<Marshallable>() {
@Override
public boolean hasNext() {
try {
return resultSet.next();
} catch (SQLException e) {
throw Jvm.rethrow(e);
}
List<String> headings = new ArrayList<>(columnCount);
for (int i = 1; i <= columnCount; i++)
headings.add(metaData.getColumnName(i));

List<List<Object>> rows = new ArrayList<>();
while (resultSet.next()) {
List<Object> row = new ArrayList<>(columnCount);
for (int i = 1; i <= columnCount; i++) {
row.add(resultSet.getObject(i));
}

@Override
public Marshallable next() {
return new Marshallable() {
@Override
public void writeMarshallable(@NotNull WireOut wire) {
try {
for (int i = 1; i <= columnCount; i++) {
wire.writeEventName(metaData.getCatalogName(i))
.object(resultSet.getObject(i));
}
} catch (SQLException e) {
throw Jvm.rethrow(e);
}
}
};
}
}, query, args);
rows.add(row);
}
result.queryResult(headings, rows, query, args);

} catch (Throwable t) {
result.queryThrown(t, query, args);
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/net/openhft/chronicle/queue/JDBCResult.java 100644 → 100755
@@ -1,14 +1,12 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.wire.Marshallable;

import java.util.Iterator;
import java.util.List;

/**
* Created by peter on 06/04/16.
*/
public interface JDBCResult {
void queryResult(Iterator<Marshallable> marshallableList, String query, Object... args);
void queryResult(List<String> columns, List<List<Object>> rows, String query, Object... args);

void queryThrown(Throwable t, String query, Object... args);

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/net/openhft/chronicle/queue/JDBCStatement.java 100644 → 100755
@@ -1,12 +1,10 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.wire.Marshallable;

/**
* Created by peter on 06/04/16.
*/
public interface JDBCStatement {
void executeQuery(String query, Class<? extends Marshallable> resultType, Object... args);
void executeQuery(String query, Object... args);

void executeUpdate(String query, Object... args);
}
Expand Up @@ -99,7 +99,13 @@ public long recoverSecondaryAddress(LongArrayValues index2indexArr, int index2,
@Override
public long recoverAndWriteHeader(Wire wire, int length, long timeoutMS) throws UnrecoverableTimeoutException {
while (true) {
Jvm.warn().on(getClass(), "Unable to write a header at index: " + Long.toHexString(wire.headerNumber()) + " position: " + wire.bytes().writePosition());
long offset = wire.bytes().writePosition();
int num = wire.bytes().readInt(offset);
if (Wires.isNotComplete(num) && wire.bytes().compareAndSwapInt(offset, num, 0)) {
Jvm.warn().on(getClass(), "Unable to write a header at index: " + Long.toHexString(wire.headerNumber()) + " position: " + offset + " resetting");
} else {
Jvm.warn().on(getClass(), "Unable to write a header at index: " + Long.toHexString(wire.headerNumber()) + " position: " + offset + " unable to reset.");
}
try {
return wire.writeHeader(length, timeoutMS, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Expand Down
43 changes: 43 additions & 0 deletions src/test/java/net/openhft/chronicle/queue/CountingJDBCResult.java
@@ -0,0 +1,43 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.core.Jvm;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Created by Peter on 18/06/2016.
*/
class CountingJDBCResult implements JDBCResult {
private final AtomicLong queries;
private final AtomicLong updates;

public CountingJDBCResult(AtomicLong queries, AtomicLong updates) {
this.queries = queries;
this.updates = updates;
}

@Override
public void queryResult(List<String> columns, List<List<Object>> rows, String query, Object... args) {
System.out.println("query " + query + " returned " + columns);
for (List<Object> row : rows) {
System.out.println("\t" + row);
}
queries.incrementAndGet();
}

@Override
public void queryThrown(Throwable t, String query, Object... args) {
throw Jvm.rethrow(t);
}

@Override
public void updateResult(long count, String update, Object... args) {
updates.incrementAndGet();
}

@Override
public void updateThrown(Throwable t, String update, Object... args) {
throw Jvm.rethrow(t);
}
}
33 changes: 0 additions & 33 deletions src/test/java/net/openhft/chronicle/queue/JDBCServiceTest.java
@@ -1,11 +1,9 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.MethodReader;
import org.junit.Ignore;
import org.junit.Test;
Expand All @@ -14,15 +12,13 @@
import java.io.IOException;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;

/**
* Created by peter on 08/04/16.
*/
@Ignore("TODO Fix handling of Throwable")
public class JDBCServiceTest {

@Test
public void testCreateTable() throws SQLException, IOException {
doCreateTable(4, 10000);
Expand Down Expand Up @@ -89,33 +85,4 @@ public void doCreateTable(int repeats, int noUpdates) throws SQLException, IOExc
}
}

private static class CountingJDBCResult implements JDBCResult {
private final AtomicLong queries;
private final AtomicLong updates;

public CountingJDBCResult(AtomicLong queries, AtomicLong updates) {
this.queries = queries;
this.updates = updates;
}

@Override
public void queryResult(Iterator<Marshallable> marshallableList, String query, Object... args) {
queries.incrementAndGet();
}

@Override
public void queryThrown(Throwable t, String query, Object... args) {
throw Jvm.rethrow(t);
}

@Override
public void updateResult(long count, String update, Object... args) {
updates.incrementAndGet();
}

@Override
public void updateThrown(Throwable t, String update, Object... args) {
throw Jvm.rethrow(t);
}
}
}
Expand Up @@ -1820,17 +1820,22 @@ public void testLastIndexAppended() {
}
}

@Test(expected = UnsupportedOperationException.class)
@Test
public void testZeroLengthMessage() {
File tmpDir = getTmpDir();
try (ChronicleQueue chronicle = SingleChronicleQueueBuilder.binary(tmpDir)
.rollCycle(RollCycles.TEST_DAILY)
.wireType(this.wireType)
.build()) {

ExcerptAppender appender = chronicle.createAppender();
appender.writeDocument(w -> {
});
System.out.println(chronicle.dump());
ExcerptTailer tailer = chronicle.createTailer();
try (DocumentContext dc = tailer.readingDocument()) {
assertFalse(dc.wire().hasMore());
}
}
}

Expand Down

0 comments on commit a401ef8

Please sign in to comment.