Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
douglascraigschmidt committed Feb 24, 2021
1 parent 944da42 commit ab4aa73
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 47 deletions.
13 changes: 5 additions & 8 deletions Java8/ex24/src/main/java/utils/NonReentrantSpinLock.java
Expand Up @@ -6,8 +6,6 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

import sun.reflect.generics.reflectiveObjects.NotImplementedException;

/**
* This class emulates a "compare and swap"-style spin-lock with
* non-recursive semantics using the Java VarHandle class.
Expand Down Expand Up @@ -63,7 +61,7 @@ public void lock() {
// cache performance. See the stackoverflow article at
// http://15418.courses.cs.cmu.edu/spring2013/article/31
// and stackoverflow.com/a/4939383/13411862 for details.
while (!(mOwner.get() == null && tryLock()))
while (!(value == 0 && tryLock()))
continue;
}

Expand All @@ -81,7 +79,7 @@ public void lockInterruptibly() throws InterruptedException {
break;
else if (Thread.interrupted())
// Break out of the loop if we were interrupted.
break;
throw new InterruptedException();
}
}

Expand All @@ -99,13 +97,12 @@ public void unlock() {
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new NotImplementedException();
public boolean tryLock(long time, TimeUnit unit) {
throw new UnsupportedOperationException("Timed tryLock() method not implemented");
}

@SuppressWarnings("ConstantConditions")
@Override
public Condition newCondition() {
throw new NotImplementedException();
throw new UnsupportedOperationException("newCondition() method not implemented");
}
}
39 changes: 22 additions & 17 deletions Java8/ex9/src/main/java/ex9.java
Expand Up @@ -48,11 +48,8 @@ public class ex9 {
* Main entry point into the test program.
*/
static public void main(String[] argv) {
// Create an instance to test.
ex9 test = new ex9(argv);

// Run the tests.
test.run();
// Create and run the tests.
new ex9(argv).run();
}

/**
Expand All @@ -68,25 +65,33 @@ static public void main(String[] argv) {
// Parse the command-line arguments.
Options.instance().parseArgs(argv);

// Generate random data for use by the various hashmaps.
mRandomIntegers = generateRandomData();
}

/**
* Generate random data for use by the various hashmaps.
*/
private List<Integer> generateRandomData() {
// Get how many integers we should generate.
int count = Options.instance().count();

// Get the max value for the random numbers.
int maxValue = Options.instance().maxValue();

// Generate a list of random large integers.
mRandomIntegers = new Random()
// Generate "count" random large ints
.ints(count,
// Try to generate duplicates.
maxValue - count,
maxValue)

// Convert each primitive int to Integer.
.boxed()
// Trigger intermediate operations and collect into list.
.collect(toList());
return new Random()
// Generate "count" random large ints
.ints(count,
// Try to generate duplicates.
maxValue - count,
maxValue)

// Convert each primitive int to Integer.
.boxed()

// Trigger intermediate operations and collect into list.
.collect(toList());
}

/**
Expand Down
@@ -0,0 +1,72 @@
package microservices.airports;

import datamodels.AirportInfo;
import org.springframework.http.MediaType;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.function.Function;

/**
* This class serves as a proxy to the asynchronous AirportList
* microservice that uses the RSocket framework to provide a list of
* airport codes and associated airport names.
*/
public class AirportListProxyRSocket {
/**
* The message name that denotes the remote method to obtain the
* list of airport codes/names asynchronously.
*/
private final String mFindAirportListMessage =
"_getAirportList";

/**
* Initialize the RSocketRequestor.
*/
private final Mono<RSocketRequester> rSocketRequester = Mono
.just(RSocketRequester.builder()
.rsocketConnector(rSocketConnector -> rSocketConnector
.reconnect(Retry.fixedDelay(2,
Duration.ofSeconds(2))))
.dataMimeType(MediaType.APPLICATION_CBOR)
.rsocketStrategies(RSocketStrategies.builder()
.encoders(encoders ->
encoders.add(new Jackson2CborEncoder()))
.decoders(decoders ->
decoders.add(new Jackson2CborDecoder()))
.build())
.tcp("localhost", 8090));

/**
* Returns a Flux that emits {@code AirportInfo} objects.
*
* @param scheduler The Scheduler context in which to run the operation
* @return A Flux that emits {@code AirportInfo} objects
*/
public Flux<AirportInfo> findAirportInfo(Scheduler scheduler) {
return Mono
// Return a Flux containing the list of airport
// information.
.fromCallable(() -> rSocketRequester
// Create the data to send to the server.
.map(r -> r
.route(mFindAirportListMessage))

// Get the result back from the server as a
// Flux<AirportInfo>.
.flatMapMany(r -> r.retrieveFlux(AirportInfo.class)))

// Schedule this to run on the given scheduler.
.subscribeOn(scheduler)

// De-nest the result so it's a Flux<AirportInfo>.
.flatMapMany(Function.identity());
}
}
@@ -0,0 +1,60 @@
package microservices.airports.controller;

import datamodels.AirportInfo;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import utils.DataFactory;

import java.util.List;

/**
* This Spring controller demonstrates how WebFlux can be used to
* handle RSocket requests via reactive programming. These requests
* are mapped to method(s) that convert between various currencies
* asynchronously.
*
* In Spring's approach to building RSocket services, message
* requests are handled by a controller that defines the
* endpoints/routes for each supported operation, i.e.,
* {@code @MessageMapping}. These components are
* identified by the @Controller annotation below.
*
* WebFlux uses the {@code @MessageMapping} annotation to map RSocket
* requests onto methods in the {@code ExchangeRateControllerRSocket}.
*/
@Controller
public class AirportListControllerRSocket {
/**
* The list of AirportInfo objects.
*/
private final List<AirportInfo> mAirportList;

/**
* Constructor initializes the field.
*/
AirportListControllerRSocket() {
mAirportList = DataFactory
// Initialize the list of AirportInfo objects from the
// AirportList.txt file.
.getAirportInfoList("airport-list.txt");
}

/**
* This method finds information about all the airports
* asynchronously.
*
* WebFlux maps RSocket requests sent to the _getAirportList
* endpoint to this method.
*
* @return A Flux that emits all {@code AirportInfo} objects
*/
@MessageMapping("_getAirportList")
private Flux<AirportInfo> getAirportInfo() {
return Flux
// Convert the list of AirportInfo objects into a Flux
// stream.
.fromIterable(mAirportList);
}
}
Expand Up @@ -26,11 +26,11 @@
*/
public class APIGatewayProxyRSocket {
/**
* The URI that denotes a remote method to find information about
* The message name that denotes a remote method to find information about
* all the airports asynchronously.
*/
private final String mFindAirportsURIAsync =
"/microservices/APIGatewayAsync/_getAirportList";
private final String mFindAirportsMessage =
"_getAirportList";

/**
* The message name that denotes a remote method to find the best
Expand Down Expand Up @@ -70,31 +70,23 @@ public class APIGatewayProxyRSocket {
* @return A Flux that emits {@code AirportInfo} objects
*/
public Flux<AirportInfo> findAirportInfo(Scheduler scheduler) {
/*
return Mono
// Return a Flux containing the list of airport
// information.
.fromCallable(() -> mAPIGateway
// Create an HTTP POST request.
.get()
// Add the uri to the baseUrl.
.uri(mFindAirportsURIAsync)
// Retrieve the response.
.retrieve()
.fromCallable(() -> rSocketRequester
// Create the data to send to the server.
.map(r -> r
.route(mFindAirportsMessage))

// Convert it to a Flux of AirportInfo
// objects.
.bodyToFlux(AirportInfo.class))
// Get the result back from the server as a
// Flux<TripResponse>.
.flatMapMany(r -> r.retrieveFlux(AirportInfo.class)))

// Schedule this to run on the given scheduler.
.subscribeOn(scheduler)

// De-nest the result so it's a Flux<AirportInfo>.
.flatMapMany(Function.identity());
*/
return null;
}

/**
Expand Down
Expand Up @@ -3,6 +3,7 @@
import datamodels.AirportInfo;
import datamodels.TripResponse;
import microservices.airports.AirportListProxyAsync;
import microservices.airports.AirportListProxyRSocket;
import microservices.apigateway.FlightRequest;
import microservices.exchangerate.ExchangeRateProxyAsync;
import microservices.exchangerate.ExchangeRateProxyRSocket;
Expand Down Expand Up @@ -50,8 +51,8 @@ public class APIGatewayControllerRSocket {
/**
* An async proxy to the AirportList microservice
*/
private final AirportListProxyAsync mAirportListProxy =
new AirportListProxyAsync();
private final AirportListProxyRSocket mAirportListRSocket =
new AirportListProxyRSocket();

/**
* This method finds information about all the airports
Expand All @@ -64,7 +65,7 @@ public class APIGatewayControllerRSocket {
*/
@GetMapping("_getAirportList")
public Flux<AirportInfo> getAirportInfo() {
return mAirportListProxy.findAirportInfo(Schedulers.parallel());
return mAirportListRSocket.findAirportInfo(Schedulers.parallel());
}

/**
Expand Down
2 changes: 1 addition & 1 deletion Reactor/ex5/src/main/java/utils/ReactorUtils.java
Expand Up @@ -216,6 +216,6 @@ public static void randomDelay() {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit ab4aa73

Please sign in to comment.