Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The purpose of this guide is to help easily upgrade to Azure Cosmos DB Java SDK

If you have been using a pre-3.x.x Java SDK, it is recommended to review our [Reactor pattern guide](reactor-pattern-guide.md) for an introduction to async programming and Reactor.

Users of the Async Java SDK 2.x.x will want to review our [Reactor vs RxJava Guide]() for additional guidance on converting RxJava code to use Reactor.
Users of the Async Java SDK 2.x.x will want to review our [Reactor vs RxJava Guide](reactor-rxjava-guide.md) for additional guidance on converting RxJava code to use Reactor.

### Java SDK 4.0 implements **Direct Mode** in Async and Sync APIs

Expand All @@ -44,7 +44,7 @@ If you are user of the "Legacy" Sync Java SDK 2.x.x note that a **Direct** **Con

Java SDK 4.0 and Java SDK 3.x.x introduce a hierarchical API which organizes clients, databases and containers in a nested fashion, as shown in this Java SDK 4.0 code snippet:

```java""""
```java
CosmosContainer = client.getDatabase("MyDatabaseName").getContainer("MyContainerName");
```

Expand All @@ -71,9 +71,9 @@ In Java SDK 3.x.x ```CosmosItemProperties"`` 'as exposed"by the public API and s
* ```PartitionKey```
* ```IndexingPolicy```
* ```IndexingMode```
* ...etc.""'""'""'""'
* ...etc.

### Accessors""'""'""'
### Accessors

Java SDK 4.0 exposes ```get``` and ```set``` methods for accessing in"ta"ce members.
* Example: a ```CosmosContainer``` instance has ```container.getId()``` and ```container.setId()``` methods.
Expand Down
44 changes: 22 additions & 22 deletions reactor-pattern-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ How this differs from imperative programming, is that the coder is describing th

### 2. Reactive Streams Frameworks for Java/JVM

Reactive Streams frameworks implement the Reactive Streams Standard for specific programming languages. [RxJava](https://github.com/ReactiveX/RxJava) ([ReactiveX](http://reactivex.io/) for JVM) was the basis of past Azure Java SDKs, but will not be going forward.
A Reactive Streams framework implements the Reactive Streams Standard for specific programming languages. The [RxJava](https://github.com/ReactiveX/RxJava) ([ReactiveX](http://reactivex.io/) for JVM) framework was the basis of past Azure Java SDKs, but will not be going forward.

[Project Reactor](https://projectreactor.io/) or just *Reactor* is the Reactive Programming framework being used for new Azure Java SDKs. The purpose of the rest of this document is to help you get started with Reactor.

Expand All @@ -44,7 +44,7 @@ To write a program using Reactor, you will need to describe one or more async op

Reactor follows a "hybrid push-pull model": the ```Publisher``` pushes events and data into the pipeline as they are available, but ***only*** once you request events and data from the ```Publisher``` by **subscribing**.

To put this in context, consider a "normal" non-Reactor program you might write that takes takes a dependency on some other code with unpredictable response time. For example, maybe you write a function to perform a calculation, and one input comes from calling a function that requests data over HTTP. You might deal with this by implementing a control flow which first calls the dependency code, waits for it to return output, and then provides that output to your code as input. So your code is pulling output from its dependency on an on-demand basis. This can be inefficient if there is latency in the dependency (as is the case for the aforementioned HTTP request example); your code has to loop waiting for the dependency.
To put this in context, consider a "normal" non-Reactor program you might write that takes takes a dependency on some other code with unpredictable response time. For example, maybe you write a function to perform a calculation, and one input comes from calling a function that requests data over HTTP. You might deal with this by implementing a control flow which first calls the dependency code, waits for it to return output, and then provides that output to your code as input. So your code is "pulling" output from its dependency on an on-demand basis. This can be inefficient if there is latency in the dependency (as is the case for the aforementioned HTTP request example); your code has to loop waiting for the dependency.

In a "push" model the dependency signals your code to consume the HTTP request response on an "on-availability" basis; the rest of the time, your code lies dormant, freeing up CPU cycles. This is an event-driven and async approach. But in order for the dependency to signal your code, ***the dependency has to know that your code depends on it*** – and that is the purpose of defining async operation pipelines in Reactor; each pipeline stage is really a piece of async code servicing events and data from the previous stage on an on-availability basis. By defining the pipeline, you tell each stage where to forward events and data to.

Expand All @@ -54,15 +54,15 @@ Now I will illustrate this with Reactor code examples. Consider a Reminders app.
```java
Flux<String> reminderPipeline =
ReminderAsyncService.getRemindersPublisher() // Pipeline Stage 1
.flatMap(reminder -> Dont forget: + reminder) // Stage 2
.flatMap(strIn -> LocalDateTime.now().toString() + “: ”+ strIn); // Stage 3
.flatMap(reminder -> "Don't forget: " + reminder) // Stage 2
.flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Stage 3
```

**Subscribe phase (execute pipeline on incoming events)**
```java
reminderPipeline.subscribe(System.out::println); // Async – returns immediately, pipeline executes in the background

while (true) doOtherThings(); // Were freed up to do other tasks 😊
while (true) doOtherThings(); // We're freed up to do other tasks 😊
```

The ```Flux<T>``` class internally represents an async operation pipeline as a DAG and provides instance methods for operating on the pipeline. As we will see ```Flux<T>``` is not the only Reactor class for representing pipelines but it is the general-purpose option. The type ```T``` is always the output type of the final pipeline stage; so hypothetically, if you defined an async operation pipeline which published ```Integer```s at one end and processed them into ```String```s at the other end, the representation of the pipeline would be a ```Flux<String>```.
Expand All @@ -71,15 +71,15 @@ In the **Assembly phase** shown above, you describe program logic as an async op

* **Stage 1**: ```ReminderAsyncService.getRemindersPublisher()``` returns a ```Flux<String>``` representing a ```Publisher``` instance for publishing reminders.

* **Stage 2**: ```.flatMap(reminder -> Dont forget: + reminder)``` modifies the ```Flux<String>``` from **Stage 1** and returns an augmented ```Flux<String>``` that represents a two-stage pipeline. The pipeline consists of
* **Stage 2**: ```.flatMap(reminder -> "Don't forget: " + reminder)``` modifies the ```Flux<String>``` from **Stage 1** and returns an augmented ```Flux<String>``` that represents a two-stage pipeline. The pipeline consists of
* the ```RemindersPublisher```, followed by
* the ```reminder -> Dont forget: + reminder``` operation which prepends "Don't forget: " to the ```reminder``` string (```reminder``` is a variable that can have any name and represents the previous stage output.)
* **Stage 3**: ```.flatMap(strIn -> LocalDateTime.now().toString() + “: ”+ strIn)``` modifies the ```Flux<String>``` from **Stage 2** and returns a further-augmented ```Flux<String>``` that represents a three-stage pipeline. The pipeline consists of
* the ```reminder -> "Don't forget: " + reminder``` operation which prepends "Don't forget: " to the ```reminder``` string (```reminder``` is a variable that can have any name and represents the previous stage output.)
* **Stage 3**: ```.flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn)``` modifies the ```Flux<String>``` from **Stage 2** and returns a further-augmented ```Flux<String>``` that represents a three-stage pipeline. The pipeline consists of
* the ```RemindersPublisher```,
* the **Stage 2** operation, and finally
* the ```strIn -> LocalDateTime.now().toString() + “: ”+ strIn``` operation, which timestamps the **Stage 2** output string.
* the ```strIn -> LocalDateTime.now().toString() + ": "+ strIn``` operation, which timestamps the **Stage 2** output string.
Although we "ran" the Assembly phase code, all it did was build up the structure of your program, not run it. In the **Subscribe phase** you execute the pipeline that you defined in the Assembly phase. Here is how that works. You call

```java
Expand All @@ -94,7 +94,7 @@ and

* The ```RemindersPublisher``` instance reads the ```Subscription``` details and responds by pushing an event into the pipeline every time there is a new reminder. The ```RemindersPublisher``` will continue to push an event every time a reminder becomes available, until it has pushed as many events as were requested in the ```Subscription``` (which is infinity in this case, so the ```Publisher``` will just keep going.)

When I say that the ```RemindersPublisher``` "pushes events into the pipeline", I mean that the ```RemindersPublisher``` issues an ```onNext``` signal to the second pipeline stage (```.flatMap(reminder -> Dont forget: + reminder)```) paired with a ```String``` argument containing the reminder. ```flatMap()``` responds to an ```onNext``` signal by taking the ```String``` data passed in and applying the transformation that is in ```flatMap()```'s argument parentheses to the input data (in this case, by prepending the words Dont forget: ). This signal propagates down the pipeline: pipeline Stage 2 issues an ```onNext``` signal to pipeline Stage 3 (```.flatMap(strIn -> LocalDateTime.now().toString() + “: ”+ strIn)```) with its output as the argument; and then pipeline Stage 3 issues its own output along with an ```onNext``` signal.
When I say that the ```RemindersPublisher``` "pushes events into the pipeline", I mean that the ```RemindersPublisher``` issues an ```onNext``` signal to the second pipeline stage (```.flatMap(reminder -> "Don't forget: " + reminder)```) paired with a ```String``` argument containing the reminder. ```flatMap()``` responds to an ```onNext``` signal by taking the ```String``` data passed in and applying the transformation that is in ```flatMap()```'s argument parentheses to the input data (in this case, by prepending the words "Don't forget: "). This signal propagates down the pipeline: pipeline Stage 2 issues an ```onNext``` signal to pipeline Stage 3 (```.flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn)```) with its output as the argument; and then pipeline Stage 3 issues its own output along with an ```onNext``` signal.

Now what happens after pipeline Stage 3 is different – the ```onNext``` signal reached the last pipeline stage, so what happens to the final-stage ```onNext``` signal and its associated ```String``` argument? The answer is that when you called ```subscribe()```, ```subscribe()``` also created a ```Subscriber``` instance which implements a method for handling ```onNext``` signals and serves as the last stage of the pipeline. The ```Subscriber```'s ```onNext``` handler will call whatever code you wrote in the argument parentheses of ```subscribe()```, allowing you to customize for your application. In the Subscribe phase snippet above, we called

Expand All @@ -106,11 +106,11 @@ which means that every time an ```onNext``` signal reaches the end of the operat

In ```subscribe()``` you typically want to handle the pipeline output with some finality, i.e. by printing it to the terminal, displaying it in a GUI, running a calculation on it, etc. or doing something else before discarding the data entirely. That said, Reactor does allow you to call ```subscribe()``` with no arguments and just discard incoming events and data - in that case you would implement all of the logic of your program in the preceding pipeline stages, including saving the results to a global variable or printing them to the terminal.

That was a lot. So lets step back for a moment and mention a few key points.
That was a lot. So let's step back for a moment and mention a few key points.
* Keep in mind that Reactor is following a hybrid push-pull model where async events are published at a rate requested by the ```Subscriber```.
* Observe that a ```Subscription``` for N events is a type of pull operation from the ```Subscriber```. The ```Publisher``` controls the rate and timing of pushing events, until it exhausts the N events requested by the ```Subscriber```, and then it stops.
* This approach enables the implementation of ***backpressure***, whereby the ```Subscriber``` can size ```Subscription``` counts to adjust the rate of ```Publisher``` events if they are coming too slow or too fast to process.
* ```subscribe()``` is Reactors built-in ```Subscription``` generator, by default it requests all events from the ```Publisher``` ("unbounded request".) [See the Project Reactor documentation here](https://projectreactor.io/docs/core/3.1.2.RELEASE/reference/) for more guidance on customizing the subscription process.
* ```subscribe()``` is Reactor's built-in ```Subscription``` generator, by default it requests all events from the ```Publisher``` ("unbounded request".) [See the Project Reactor documentation here](https://projectreactor.io/docs/core/3.1.2.RELEASE/reference/) for more guidance on customizing the subscription process.

And the most important takeaway: **Nothing happens until you subscribe.**

Expand All @@ -120,9 +120,9 @@ The ```Subscriber``` and ```Publisher``` are independent entities; just because

```java
Flux<String> reminderPipeline =
Flux.just(Wash the dishes”,“Mow the lawn”,”Sleep) // Publisher, 3 events
.flatMap(reminder -> Dont forget: + reminder)
.flatMap(strIn -> LocalDateTime.now().toString() + “: ”+ strIn); // Nothing executed yet
Flux.just("Wash the dishes","Mow the lawn","Sleep") // Publisher, 3 events
.flatMap(reminder -> "Don't forget: " + reminder)
.flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn); // Nothing executed yet
```

```Flux.just()``` is a [Reactor factory method](https://projectreactor.io/docs/core/release/reference/) which contrives to create a custom ```Publisher``` based on its input arguments. You could fully customize your ```Publisher``` implementation by writing a class that implements ```Publisher```; that is outside the scope of this discussion. The output of ```Flux.just()``` in the example above is a ```Publisher``` which will immediately and asynchronously push ```"Wash the dishes"```, ```"Mow the lawn"```, and ```"Sleep"``` into the pipeline as soon as it gets a ```Subscription```. Thus, upon subscription,
Expand All @@ -133,7 +133,7 @@ reminderPipeline.subscribe(System.out::println);

will output the three Strings shown and then end.

Suppose now we want to add two special behaviors to our program: (1) After all M Strings have been printed, print End of reminders. so the user knows we are finished. (2) Print the stack trace for any ```Exception```s which occur during execution. A modification to the ```subscribe()``` call handles all of this:
Suppose now we want to add two special behaviors to our program: (1) After all M Strings have been printed, print "End of reminders." so the user knows we are finished. (2) Print the stack trace for any ```Exception```s which occur during execution. A modification to the ```subscribe()``` call handles all of this:

```java
reminderPipeline.subscribe(strIn -> {
Expand All @@ -143,11 +143,11 @@ err -> {
err.printStackTrace();
},
() -> {
System.out.println(End of reminders.);
System.out.println("End of reminders.");
});
```

Lets break this down. Remember we said that the argument to ```subscribe()``` determines how the ```Subscriber``` handles ```onNext```? I will mention two additional signals which Reactor uses to propagate status information along the pipeline: ```onComplete```, and ```onError```. Both signals denote completion of the Stream; only ```onComplete``` represents successful completion. The ```onError``` signal is associated with an ```Exception``` instance related to an error; the ```onComplete``` signal has no associated data.
Let's break this down. Remember we said that the argument to ```subscribe()``` determines how the ```Subscriber``` handles ```onNext```? I will mention two additional signals which Reactor uses to propagate status information along the pipeline: ```onComplete```, and ```onError```. Both signals denote completion of the Stream; only ```onComplete``` represents successful completion. The ```onError``` signal is associated with an ```Exception``` instance related to an error; the ```onComplete``` signal has no associated data.

As it turns out, we can supply additional code to ```subscribe()``` in the form of Java 8 lambdas and handle ```onComplete``` and ```onError``` as well as ```onNext```! Picking apart the code snippet above,

Expand All @@ -160,8 +160,8 @@ For the special cases of M=0 and M=1 for the ```Publisher```, Reactor provides a
```java
Mono<String> reminderPipeline =
Mono.just("Are you sure you want to cancel your Reminders service?") // Publisher, 1 event
.flatMap(reminder -> Act now: + reminder)
.flatMap(strIn -> LocalDateTime.now().toString() + “: ”+ strIn);
.flatMap(reminder -> "Act now: " + reminder)
.flatMap(strIn -> LocalDateTime.now().toString() + ": "+ strIn);
```

Again, ```Mono.just()``` is a Reactor factory method which creates the single-event publisher. This ```Publisher``` will push its argument into the Reactive Stream pipeline with an ```onNext``` signal and then optionally issue an ```onComplete``` signal indicating completion.
Expand Down
Loading