Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

QuoteProviderImp

QuoteEngine
  • Loading branch information...
commit 0d76ece3150f7594bc71eef64618da24a2ae16c9 1 parent 2473b65
@Alagert authored
View
5 pom.xml
@@ -16,6 +16,11 @@
<version>4.8.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>2.2.8</version>
+ </dependency>
</dependencies>
<build>
View
38 src/main/java/com/alagert/java/tradebar/Main.java
@@ -0,0 +1,38 @@
+package com.alagert.java.tradebar;
+
+import com.alagert.java.tradebar.model.Quote;
+import com.alagert.java.tradebar.service.impl.QuoteEngineImpl;
+import com.alagert.java.tradebar.service.impl.QuoteProviderImpl;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * @author Andrey Tsvetkov
+ */
+public class Main {
+
+
+ public static void main(String[] args) throws Exception {
+
+
+ BlockingQueue<Quote> quotes = new LinkedBlockingQueue<Quote>();
+
+ QuoteProviderImpl quoteProvider = new QuoteProviderImpl(quotes);
+ QuoteEngineImpl quoteEngine = new QuoteEngineImpl(quotes);
+
+ Thread providerThread = new Thread(quoteProvider);
+ Thread engineThread = new Thread(quoteEngine);
+
+ synchronized (Main.class) {
+ providerThread.start();
+ engineThread.start();
+
+ }
+
+ Thread.sleep(200000);
+
+ quoteProvider.flag = true;
+
+ }
+}
View
40 src/main/java/com/alagert/java/tradebar/model/Quote.java
@@ -0,0 +1,40 @@
+package com.alagert.java.tradebar.model;
+
+import java.math.BigDecimal;
+
+/**
+ * @author Andrey Tsvetkov
+ */
+public class Quote {
+ private final Symbol symbol;
+ private final double price;
+ private final long timestamp;
+
+ public Quote(Symbol symbol, double price, long timestamp) {
+ this.symbol = symbol;
+ this.price = price;
+ this.timestamp = timestamp;
+ }
+
+ public Symbol getSymbol() {
+ return symbol;
+ }
+
+ public double getPrice() {
+ return price;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+
+ @Override
+ public String toString() {
+ return "Quote{" +
+ "symbol=" + symbol +
+ ", price=" + price +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
View
7 src/main/java/com/alagert/java/tradebar/service/QuoteProvider.java
@@ -1,7 +1,14 @@
package com.alagert.java.tradebar.service;
+import com.alagert.java.tradebar.model.Quote;
+
/**
* @author Andrey Tsvetkov
*/
public interface QuoteProvider {
+
+ Quote getQuote();
+
+ void generateQuotes() throws InterruptedException;
+
}
View
65 src/main/java/com/alagert/java/tradebar/service/impl/QuoteEngineImpl.java
@@ -0,0 +1,65 @@
+package com.alagert.java.tradebar.service.impl;
+
+import com.alagert.java.tradebar.model.Quote;
+
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Andrey Tsvetkov
+ */
+public class QuoteEngineImpl implements Runnable {
+ private final BlockingQueue<Quote> quotes;
+ private final AtomicInteger counter;
+
+ private final TreeSet<Double> minuteSet = new TreeSet<Double>();
+
+
+ public QuoteEngineImpl(BlockingQueue<Quote> quotes) {
+ this.quotes = quotes;
+ counter = new AtomicInteger(0);
+ ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
+ service.scheduleWithFixedDelay(new TrendBarMaintainer(), 5, 5, TimeUnit.SECONDS);
+ }
+
+ public void consumeQuotes() throws InterruptedException {
+ Quote quoteFromQueue = null;
+
+ while ((quoteFromQueue = quotes.poll(200, TimeUnit.MILLISECONDS)) != null) {
+
+ synchronized (minuteSet) {
+ minuteSet.add(quoteFromQueue.getPrice());
+ }
+
+ System.out.println("Counter=" + counter.getAndIncrement() + " " + quoteFromQueue.toString());
+ }
+
+ }
+
+ @Override
+ public void run() {
+ try {
+ consumeQuotes();
+ } catch (InterruptedException e) {
+ System.out.println("Consumer interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private class TrendBarMaintainer implements Runnable {
+
+
+
+ @Override
+ public void run() {
+ synchronized (minuteSet) {
+ System.out.println("minPrice = " + minuteSet.first() + " highPrice" + minuteSet.last());
+ minuteSet.clear();
+ }
+ }
+ }
+}
View
52 src/main/java/com/alagert/java/tradebar/service/impl/QuoteProviderImpl.java
@@ -0,0 +1,52 @@
+package com.alagert.java.tradebar.service.impl;
+
+import com.alagert.java.tradebar.model.Quote;
+import com.alagert.java.tradebar.model.Symbol;
+import com.alagert.java.tradebar.service.QuoteProvider;
+
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * @author Andrey Tsvetkov
+ */
+public class QuoteProviderImpl implements QuoteProvider, Runnable {
+
+ private final BlockingQueue<Quote> quotes;
+
+ private final Random rnd;
+
+ public volatile boolean flag;
+
+ public QuoteProviderImpl(BlockingQueue<Quote> quotes) {
+ this.quotes = quotes;
+ this.rnd = new Random(12l);
+ flag = false;
+ }
+
+ @Override
+ public Quote getQuote() {
+ return null;
+ }
+
+ @Override
+ public void generateQuotes() throws InterruptedException {
+ Quote q = new Quote(Symbol.EURUSD, rnd.nextDouble() * 100, System.currentTimeMillis());
+ quotes.put(q);
+ }
+
+ @Override
+ public void run() {
+ int i = 0;
+ while (!flag) {
+ try {
+ generateQuotes();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ System.out.println("QuoteProducer was interrupted");
+ Thread.currentThread().interrupt();
+ }
+ i++;
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.