Skip to content

Commit

Permalink
CHRON-93 tutorial examples
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Feb 21, 2015
1 parent c9afcb5 commit 57d938d
Showing 1 changed file with 96 additions and 52 deletions.
148 changes: 96 additions & 52 deletions chronicle-demo/src/main/java/vanilla/java/tutorial/DirectAccess.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import net.openhft.chronicle.ExcerptTailer; import net.openhft.chronicle.ExcerptTailer;
import net.openhft.lang.model.Byteable; import net.openhft.lang.model.Byteable;


import java.io.IOException; import java.util.Random;
import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand All @@ -38,41 +37,92 @@ public class DirectAccess {
// //
// ************************************************************************* // *************************************************************************


public static void main(String[] ignored) { public static void main(String[] ignored) throws Exception {
final int items = 1000; final int items = 100;
final String path = System.getProperty("java.io.tmpdir") + "/direct-access";
//final StockPrice price = newDirectReference(StockPrice.class);
//appender.position(price.maxSize());


final StockPrice price = newDirectInstance(StockPrice.class); appendWithDirectInstance(items);
price.bytes().position(price.maxSize()); appendWithDirectReference(items);
}

// *************************************************************************
//
// *************************************************************************

private static void appendWithDirectInstance(final int items) throws Exception {
final int readers = items / 10;

final String path = System.getProperty("java.io.tmpdir") + "/direct-instance";
final Event event = newDirectInstance(Event.class);


try (Chronicle chronicle = ChronicleQueueBuilder.vanilla(path).build()) { try (Chronicle chronicle = ChronicleQueueBuilder.vanilla(path).build()) {
chronicle.clear(); chronicle.clear();


ExcerptAppender appender = chronicle.createAppender(); ExcerptAppender appender = chronicle.createAppender();
for(int i=0;i<items;i++) { for(int i=0; i<items; i++) {
price.bytes(appender, 0); event.bytes(appender, 0);
price.setStockId(i / 10); event.setOwner(0);
price.setTransactionTime(System.currentTimeMillis()); event.setType(i / 10);
price.setPrice(i); event.setTimestamp(System.currentTimeMillis());
price.setQuantity(i); event.setId(i);


appender.startExcerpt(price.maxSize()); appender.startExcerpt(event.maxSize());
appender.write(price.bytes(),0, price.maxSize()); appender.write(event.bytes(), 0, event.maxSize());
appender.finish(); appender.finish();
} }


appender.close(); appender.close();


ExecutorService ex = Executors.newFixedThreadPool(5); process(chronicle, items);
for(int i=0;i<(items / 100);i++) { }
ex.execute(new Reader(chronicle, i)); }

private static void appendWithDirectReference(final int items) throws Exception {
final String path = System.getProperty("java.io.tmpdir") + "/direct-instance";
final Event event = newDirectReference(Event.class);

try (Chronicle chronicle = ChronicleQueueBuilder.vanilla(path).build()) {
chronicle.clear();

ExcerptAppender appender = chronicle.createAppender();
for(int i=0; i<items; i++) {
appender.startExcerpt(event.maxSize());

event.bytes(appender, 0);
event.setOwner(0);
event.setType(i / 10);
event.setTimestamp(System.currentTimeMillis());
event.setId(i);

appender.position(event.maxSize());
appender.finish();
} }


ex.awaitTermination(5, TimeUnit.MINUTES); appender.close();
} catch (Exception e) {
e.printStackTrace(); process(chronicle, items);
}
}

private static void process(Chronicle chronicle, int items) throws Exception {
final int readers = items / 10;
ExecutorService ex = Executors.newFixedThreadPool(readers * 2);
for (int i = 0; i < readers * 2; i++) {
ex.execute(new Reader(chronicle, i, i / 2));
}

ex.shutdown();
ex.awaitTermination(1, TimeUnit.MINUTES);

try (ExcerptTailer tailer = chronicle.createTailer()) {
final Event evt = newDirectReference(Event.class);
for (int i = 0; i < items; ) {
if (tailer.nextIndex()) {
evt.bytes(tailer, 0);
System.out.printf("read : %s\n", evt);
tailer.finish();
i++;
}
}
} }
} }


Expand All @@ -82,36 +132,33 @@ public static void main(String[] ignored) {


public static class Reader implements Runnable { public static class Reader implements Runnable {
private final Chronicle chronicle; private final Chronicle chronicle;
private final Random random;
private final int id; private final int id;
private final int type;


public Reader(final Chronicle chronicle, int id) { public Reader(final Chronicle chronicle, int id, int type) {
this.chronicle = chronicle; this.chronicle = chronicle;
this.random = new Random();
this.id = id; this.id = id;

this.type = type;
} }


@Override @Override
public void run() { public void run() {
System.out.println("Start reader id=" + this.id);
try (ExcerptTailer tailer = chronicle.createTailer()) { try (ExcerptTailer tailer = chronicle.createTailer()) {
final StockPrice price = newDirectReference(StockPrice.class); final Event event = newDirectReference(Event.class);
while(tailer.nextIndex()) { while(tailer.nextIndex()) {
price.bytes(tailer, 0); event.bytes(tailer, 0);
if(price.getStockId() == this.id) { if(event.getType() == this.type) {
if (price.compareAndSwapMeta(0, this.id)) { if (event.compareAndSwapOwner(0, this.id * 100)) {
System.out.printf("%d : %s - sotock-%d %f@%f\n", event.compareAndSwapOwner(this.id * 100, this.id);
this.id, Thread.sleep(this.random.nextInt(250));
new Date(price.getTransactionTime()),
price.getStockId(),
price.getQuantity(),
price.getPrice()
);
} }
} }


tailer.finish(); tailer.finish();
} }
} catch(IOException e) { } catch(Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
Expand All @@ -121,21 +168,18 @@ public void run() {
// //
// ************************************************************************* // *************************************************************************


public static interface StockPrice extends Byteable { public static interface Event extends Byteable {
boolean compareAndSwapMeta(int expected, int value); boolean compareAndSwapOwner(int expected, int value);
int getMeta(); int getOwner();
void setMeta(int meta); void setOwner(int meta);

void setStockId(long id);
long getStockId();


void setTransactionTime(long timestamp); void setId(long id);
long getTransactionTime(); long getId();


void setPrice(double price); void setType(long id);
double getPrice(); long getType();


void setQuantity(double quantity); void setTimestamp(long timestamp);
double getQuantity(); long getTimestamp();
} }
} }

0 comments on commit 57d938d

Please sign in to comment.