Reactive software development on JVM
Switch branches/tags
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
doc
gradle/wrapper
slides
src
.env
.gitignore
.travis.yml
Procfile
Procfile.windows
README.md
app.json
build.gradle
gradle.properties
gradlew
gradlew.bat
settings.gradle
system.properties

README.md

RxJava Applied: Concise Examples where It Shines

Build Status

This repo is a home for small presentation about RxJava, which was given on JEEConf 2016 and JavaDay Kyiv 2016. It contains slides, presented code samples, and some useful links. Presentation description is here.

Presentation slides are here, on Slideshare.

Short Content of the Presentation

Simplified schema of modern applications

modern applications

RxJava short history

Date What happended
Nov 17, 2009 Rx for .NET v.1.0 (shipped with .NET 4.0)
Mar 17, 2010 Reactive Extensions for JS released
Aug 15, 2012 Rx for .NET v.2.0
Feb, 2013 Ben Christensen starts library porting to JVM
Nov 18, 2014 RxJava v. 1.0.0
Oct 5, 2016 RxJava v. 1.2.1 (latest at the moment)

Requirements: Stream of Tweets

Having stream of new tweets (based on keywords):
- Track and report most followed tweet author in stream
- Track and report most retweeted tweet of most popular user

Solution mockUp

Used Twitter API

Twitter Stream API (WebSocket alike):

Twitter REST API (Documentation):

  • GET https://api.twitter.com/1.1/users/show.json?screen_name=neposuda
  • GET https://api.twitter.com/1.1/search/tweets.json?q=from:neposuda

Entities used in solution

class Tweet {
    String text;
    int favorite_count;
    String author;
    int author_followers;
}
class Profile {
   String screen_name;
   String name;
   String location;
   int statuses_count;
   int followers_count;
}
class UserWithTweet {
   Profile profile;
   Tweet tweet;
}

Complete solution diagram

Complete solution diagram

Getting user profile synchronously

Profile getUserProfile(String screenName) {       
      ObjectMapper om = new ObjectMapper();
      return (Profile) om.readValue(om.readTree(
            Unirest.get(API_BASE_URL + "users/show.json")
                   .queryString("screen_name", screenName)
                   .header("Authorization", bearerAuth(authToken.get()))
                   .asString()
                   .getBody()),
            Profile.class);
}

Getting user profile asynchronously

Observable<Profile> getUserProfile(String screenName) {
   if (authToken.isPresent()) {
       return Observable.fromCallable(() -> {
           ObjectMapper om = new ObjectMapper();
           return (Profile) om.readValue(om.readTree(
                   Unirest.get(API_BASE_URL + "users/show.json")
                           .queryString("screen_name", screenName)
                           .header("Authorization", bearerAuth(authToken.get()))
                           .asString()
                  .getBody()),
                   Profile.class);
       }).doOnCompleted(() -> log("getUserProfile completed for: " + screenName));
   } else {
       return Observable.error(new RuntimeException("Can not connect to twitter"));
   }
}

Solution diagram for: getUserAndPopularTweet(userName)

getUserAndPopularTweet

Observable<UserWithTweet> getUserAndPopularTweet(String author){
    return Observable.just(author)
    .flatMap(u -> {
        Observable<Profile> profile = client.getUserProfile(u)
            .subscribeOn(Schedulers.io());
        Observable<Tweet> tweet = client.getUserRecentTweets(u)
            .defaultIfEmpty(null)
            .reduce((t1, t2) ->
                t1.retweet_count > t2.retweet_count ? t1 : t2)
            .subscribeOn(Schedulers.io());
        return Observable.zip(profile, tweet, UserWithTweet::new);
    });
}

Tweat stream subscription (most popular user)

streamClient.getStream("RxJava", "JEEConf", "JavaDay" "Java", "Trump")
    .scan((u1, u2) -> u1.author_followers > u2.author_followers ? u1 : u2)
    .distinctUntilChanged()
    .map(p -> p.author)
    .flatMap(name -> getUserAndPopularTweet(name))
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.immediate())
    .subscribe(p -> log.info("The most popular tweet of user "
                             + p.profile.name + ": " + p.tweet));

Same solution but with extended method: getUserAndPopularTweet(name)

streamClient.getStream("RxJava", "JEEConf", "JavaDay", "Java", "Trump")
    .scan((u1, u2) -> u1.author_followers > u2.author_followers ? u1 : u2)
    .distinctUntilChanged()
    .map(p -> p.author)
    .flatMap(name -> {
        Observable<Profile> profile = client.getUserProfile(name)
            .subscribeOn(Schedulers.io());
        Observable<Tweet> tweet = client.getUserRecentTweets(name)
            .defaultIfEmpty(null)
            .reduce((t1, t2) ->
                t1.retweet_count > t2.retweet_count ? t1 : t2)
            .subscribeOn(Schedulers.io());
        return Observable.zip(profile, tweet, UserWithTweet::new);
    })
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.immediate())
    .subscribe(p -> log.info("The most popular tweet of user "
                             + p.profile.name + ": " + p.tweet));

Short conclusions

Pitfalls:

  • API is big (150+ methods to remember)Enhancing Java 8 Streams
  • Requires to understand underlying magic
  • Hard to debug
  • Don’t forget about back pressure

Strength:

  • It is functional, it is reactive*
  • Good for integration scenarios
  • Allows to control execution threads
  • Easy to compose workflows
  • Easy to integrate into existing solutions
  • Easy to test

Test code example

@Test public void correctlyJoinsHttpResults() throws Exception {
   String testUser = "testUser";
   Profile profile = new Profile("u1", "Name", "USA", 10, 20, 30);
   Tweet tweet1    = new Tweet("text-1", 10, 20, testUser, 30);
   Tweet tweet2    = new Tweet("text-2", 40, 50, testUser, 30);

   TwitterClient client = mock(TwitterClient.class);
   when(client.getUserProfile(testUser)).thenReturn(Observable.just(profile));
   when(client.getUserRecentTweets(testUser)).thenReturn(Observable.just(tweet1, tweet2));      

   TestSubscriber<UserWithTweet> testSubscriber = new TestSubscriber<>();
   new Solutions().getUserAndPopularTweet(client, testUser).subscribe(testSubscriber);
   testSubscriber.awaitTerminalEvent();
   assertEquals(singletonList(new UserWithTweet(profile, tweet2)),
           testSubscriber.getOnNextEvents());
}

Used libraries

If You Want to Learn More about Reactive Programming & RxJava

Good starting points:

Recommended tutorials and courses:

Recommended videos:

Recommended articles:

Good Presentations:

Reactive programming for Scala:

Reactive programming for other platforms:

Java 8 Stream API related libraries: