From 060d4688d06020eb508f01a2a00878d5bfb71791 Mon Sep 17 00:00:00 2001 From: seongahjo Date: Fri, 21 Feb 2020 18:13:07 +0900 Subject: [PATCH 1/2] add reactor subscription example --- example-graphql-subscription/build.gradle | 2 +- .../StockTickerReactorPublisher.java | 122 ++++++++++++++++++ ...isher.java => StockTickerRxPublisher.java} | 6 +- .../graphql/boot/resolvers/Subscription.java | 6 +- 4 files changed, 129 insertions(+), 7 deletions(-) create mode 100644 example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerReactorPublisher.java rename example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/{StockTickerPublisher.java => StockTickerRxPublisher.java} (97%) diff --git a/example-graphql-subscription/build.gradle b/example-graphql-subscription/build.gradle index ab210942..b2ea698e 100644 --- a/example-graphql-subscription/build.gradle +++ b/example-graphql-subscription/build.gradle @@ -16,7 +16,7 @@ dependencies { compile "com.graphql-java-kickstart:graphql-java-tools:$LIB_GRAPHQL_JAVA_TOOLS_VER" compile "io.reactivex.rxjava2:rxjava:2.1.5" - + compile "io.projectreactor:reactor-core:3.3.2.RELEASE" compile("org.springframework.boot:spring-boot-starter-web:$LIB_SPRING_BOOT_VER") testCompile "org.springframework.boot:spring-boot-starter-test:$LIB_SPRING_BOOT_VER" diff --git a/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerReactorPublisher.java b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerReactorPublisher.java new file mode 100644 index 00000000..d5c353e0 --- /dev/null +++ b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerReactorPublisher.java @@ -0,0 +1,122 @@ +package com.oembedler.moon.graphql.boot.publishers; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import com.oembedler.moon.graphql.boot.resolvers.StockPriceUpdate; +import reactor.core.publisher.ConnectableFlux; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +@Component +public class StockTickerReactorPublisher { + private static final Logger LOG = LoggerFactory.getLogger(StockTickerRxPublisher.class); + + private final Flux publisher; + + public StockTickerReactorPublisher() { + Flux stockPriceUpdateFlux = Flux.create(emitter -> { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS); + }, FluxSink.OverflowStrategy.BUFFER); + ConnectableFlux connectableFlux = stockPriceUpdateFlux.share().publish(); + connectableFlux.connect(); + + publisher = Flux.from(connectableFlux); + } + + private Runnable newStockTick(FluxSink emitter) { + return () -> { + List stockPriceUpdates = getUpdates(rollDice(0, 5)); + if (stockPriceUpdates != null) { + emitStocks(emitter, stockPriceUpdates); + } + }; + } + + private void emitStocks(FluxSink emitter, List stockPriceUpdates) { + for (StockPriceUpdate stockPriceUpdate : stockPriceUpdates) { + try { + emitter.next(stockPriceUpdate); + } catch (RuntimeException e) { + LOG.error("Cannot send StockUpdate", e); + } + } + } + + public Flux getPublisher() { + return publisher; + } + + public Flux getPublisher(List stockCodes) { + if (stockCodes != null) { + return publisher.filter(stockPriceUpdate -> stockCodes.contains(stockPriceUpdate.getStockCode())); + } + return publisher; + } + + private List getUpdates(int number) { + List updates = new ArrayList<>(); + for (int i = 0; i < number; i++) { + updates.add(rollUpdate()); + } + return updates; + } + + private final static Map CURRENT_STOCK_PRICES = new ConcurrentHashMap<>(); + + static { + CURRENT_STOCK_PRICES.put("TEAM", dollars(39, 64)); + CURRENT_STOCK_PRICES.put("IBM", dollars(147, 10)); + CURRENT_STOCK_PRICES.put("AMZN", dollars(1002, 94)); + CURRENT_STOCK_PRICES.put("MSFT", dollars(77, 49)); + CURRENT_STOCK_PRICES.put("GOOGL", dollars(1007, 87)); + } + + private StockPriceUpdate rollUpdate() { + ArrayList STOCK_CODES = new ArrayList<>(CURRENT_STOCK_PRICES.keySet()); + + String stockCode = STOCK_CODES.get(rollDice(0, STOCK_CODES.size() - 1)); + BigDecimal currentPrice = CURRENT_STOCK_PRICES.get(stockCode); + + BigDecimal incrementDollars = dollars(rollDice(0, 1), rollDice(0, 99)); + if (rollDice(0, 10) > 7) { + // 0.3 of the time go down + incrementDollars = incrementDollars.negate(); + } + BigDecimal newPrice = currentPrice.add(incrementDollars); + + CURRENT_STOCK_PRICES.put(stockCode, newPrice); + return new StockPriceUpdate(stockCode, LocalDateTime.now(), newPrice, incrementDollars); + } + + private static BigDecimal dollars(int dollars, int cents) { + return truncate("" + dollars + "." + cents); + } + + private static BigDecimal truncate(final String text) { + BigDecimal bigDecimal = new BigDecimal(text); + if (bigDecimal.scale() > 2) + bigDecimal = new BigDecimal(text).setScale(2, RoundingMode.HALF_UP); + return bigDecimal.stripTrailingZeros(); + } + + private final static Random rand = new Random(); + + private static int rollDice(int min, int max) { + return rand.nextInt((max - min) + 1) + min; + } + +} \ No newline at end of file diff --git a/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerPublisher.java b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerRxPublisher.java similarity index 97% rename from example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerPublisher.java rename to example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerRxPublisher.java index 7eeb3de9..33b8fa13 100644 --- a/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerPublisher.java +++ b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/publishers/StockTickerRxPublisher.java @@ -23,13 +23,13 @@ import java.util.concurrent.TimeUnit; @Component -public class StockTickerPublisher { +public class StockTickerRxPublisher { - private static final Logger LOG = LoggerFactory.getLogger(StockTickerPublisher.class); + private static final Logger LOG = LoggerFactory.getLogger(StockTickerRxPublisher.class); private final Flowable publisher; - public StockTickerPublisher() { + public StockTickerRxPublisher() { Observable stockPriceUpdateObservable = Observable.create(emitter -> { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); diff --git a/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java index bc9cc746..7c199443 100644 --- a/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java +++ b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java @@ -1,7 +1,7 @@ package com.oembedler.moon.graphql.boot.resolvers; import com.coxautodev.graphql.tools.GraphQLSubscriptionResolver; -import com.oembedler.moon.graphql.boot.publishers.StockTickerPublisher; +import com.oembedler.moon.graphql.boot.publishers.StockTickerRxPublisher; import org.reactivestreams.Publisher; import org.springframework.stereotype.Component; @@ -10,9 +10,9 @@ @Component class Subscription implements GraphQLSubscriptionResolver { - private StockTickerPublisher stockTickerPublisher; + private StockTickerRxPublisher stockTickerPublisher; - Subscription(StockTickerPublisher stockTickerPublisher) { + Subscription(StockTickerRxPublisher stockTickerPublisher) { this.stockTickerPublisher = stockTickerPublisher; } From 2059c4fd5e57272153235fd45d8b31242b3303f0 Mon Sep 17 00:00:00 2001 From: seongahjo Date: Mon, 9 Mar 2020 22:33:54 +0900 Subject: [PATCH 2/2] Fix subscription import path --- .../com/oembedler/moon/graphql/boot/resolvers/Subscription.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java index 90aa175d..05c2d68d 100644 --- a/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java +++ b/example-graphql-subscription/src/main/java/com/oembedler/moon/graphql/boot/resolvers/Subscription.java @@ -1,7 +1,7 @@ package com.oembedler.moon.graphql.boot.resolvers; +import com.oembedler.moon.graphql.boot.publishers.StockTickerRxPublisher; import graphql.kickstart.tools.GraphQLSubscriptionResolver; -import com.oembedler.moon.graphql.boot.publishers.StockTickerPublisher; import org.reactivestreams.Publisher; import org.springframework.stereotype.Component;