Skip to content

Commit

Permalink
# IGNITE-45 - Examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Kulichenko committed Mar 22, 2015
1 parent da36a72 commit 8f26539
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 152 deletions.
Expand Up @@ -26,7 +26,7 @@ public class CacheConfig {
/** /**
* Configure streaming cache for market ticks. * Configure streaming cache for market ticks.
*/ */
public static CacheConfiguration<String, MarketTick> marketTicksCache() { public static CacheConfiguration<String, Double> marketTicksCache() {
return new CacheConfiguration<>("marketTicks"); return new CacheConfiguration<>("marketTicks");
} }


Expand Down
Expand Up @@ -51,17 +51,17 @@ public class Instrument implements Serializable {
} }


/** /**
* Updates this instrument based on the latest market tick. * Updates this instrument based on the latest price.
* *
* @param tick Market tick. * @param price Latest price.
*/ */
public void update(MarketTick tick) { public void update(double price) {
if (open == 0) if (open == 0)
open = tick.price(); open = price;


high = Math.max(high, tick.price()); high = Math.max(high, price);
low = Math.min(low, tick.price()); low = Math.min(low, price);
this.latest = tick.price(); this.latest = price;
} }


/** /**
Expand Down

This file was deleted.

Expand Up @@ -38,9 +38,6 @@ public class StreamMarketData {
/** Random number generator. */ /** Random number generator. */
private static final Random RAND = new Random(); private static final Random RAND = new Random();


/** Count of total numbers to generate. */
private static final int CNT = 10000000;

/** The list of instruments. */ /** The list of instruments. */
private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};


Expand All @@ -56,17 +53,16 @@ public static void main(String[] args) throws Exception {
return; return;


// The cache is configured with sliding window holding 1 second of the streaming data. // The cache is configured with sliding window holding 1 second of the streaming data.
IgniteCache<String, MarketTick> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache());
final IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); final IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache());


try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName())) { try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) {
// Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Note that we receive market data, but do not populate 'mktCache' (it remains empty).
// Instead we update the instruments in the 'instCache'. // Instead we update the instruments in the 'instCache'.
mktStmr.receiver(new StreamVisitor<String, MarketTick>() { mktStmr.receiver(new StreamVisitor<String, Double>() {
@Override @Override public void apply(IgniteCache<String, Double> cache, Map.Entry<String, Double> e) {
public void apply(IgniteCache<String, MarketTick> cache, Map.Entry<String, MarketTick> e) {
String symbol = e.getKey(); String symbol = e.getKey();
MarketTick tick = e.getValue(); Double tick = e.getValue();


Instrument inst = instCache.get(symbol); Instrument inst = instCache.get(symbol);


Expand All @@ -88,9 +84,7 @@ public void apply(IgniteCache<String, MarketTick> cache, Map.Entry<String, Marke
// numbers closer to 0 have higher probability. // numbers closer to 0 have higher probability.
double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian());


MarketTick tick = new MarketTick(INSTRUMENTS[j], price); mktStmr.addData(INSTRUMENTS[j], price);

mktStmr.addData(tick.symbol(), tick);
} }
} }
} }
Expand Down
Expand Up @@ -26,7 +26,7 @@ public class CacheConfig {
/** /**
* Configure streaming cache for market ticks. * Configure streaming cache for market ticks.
*/ */
public static CacheConfiguration<String, MarketTick> marketTicksCache() { public static CacheConfiguration<String, Double> marketTicksCache() {
return new CacheConfiguration<>("marketTicks"); return new CacheConfiguration<>("marketTicks");
} }


Expand Down
Expand Up @@ -51,17 +51,17 @@ public class Instrument implements Serializable {
} }


/** /**
* Updates this instrument based on the latest market tick. * Updates this instrument based on the latest price.
* *
* @param tick Market tick. * @param price Latest price.
*/ */
public void update(MarketTick tick) { public void update(double price) {
if (open == 0) if (open == 0)
open = tick.price(); open = price;


high = Math.max(high, tick.price()); high = Math.max(high, price);
low = Math.min(low, tick.price()); low = Math.min(low, price);
this.latest = tick.price(); this.latest = price;
} }


/** /**
Expand Down

This file was deleted.

Expand Up @@ -53,15 +53,15 @@ public static void main(String[] args) throws Exception {
return; return;


// The cache is configured with sliding window holding 1 second of the streaming data. // The cache is configured with sliding window holding 1 second of the streaming data.
IgniteCache<String, MarketTick> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache());
IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache());


try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName())) { try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) {
// Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Note that we receive market data, but do not populate 'mktCache' (it remains empty).
// Instead we update the instruments in the 'instCache'. // Instead we update the instruments in the 'instCache'.
mktStmr.receiver(StreamVisitor.from((cache, e) -> { mktStmr.receiver(StreamVisitor.from((cache, e) -> {
String symbol = e.getKey(); String symbol = e.getKey();
MarketTick tick = e.getValue(); Double tick = e.getValue();


Instrument inst = instCache.get(symbol); Instrument inst = instCache.get(symbol);


Expand All @@ -81,9 +81,7 @@ public static void main(String[] args) throws Exception {
// numbers closer to 0 have higher probability. // numbers closer to 0 have higher probability.
double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian());


MarketTick tick = new MarketTick(INSTRUMENTS[j], price); mktStmr.addData(INSTRUMENTS[j], price);

mktStmr.addData(tick.symbol(), tick);
} }
} }
} }
Expand Down

0 comments on commit 8f26539

Please sign in to comment.