Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spring webflux + serverless #239

Closed
jayasai470 opened this issue Feb 14, 2019 · 31 comments
Closed

spring webflux + serverless #239

jayasai470 opened this issue Feb 14, 2019 · 31 comments

Comments

@jayasai470
Copy link

  • Framework version: 1.3
  • Implementations: Spring boot 2.1.2 + web-react

Scenario

just wanted to check if there is a support for spring webflux without server sent events ?

Expected behavior

Actual behavior

Steps to reproduce

Full log output

@sapessi sapessi added this to the Release 1.4 milestone Feb 14, 2019
@sapessi
Copy link
Collaborator

sapessi commented Feb 14, 2019

I have not tested with WebFlux. I will mark this as a feature request and look into it for the next minor release.

@renecairo
Copy link

renecairo commented Aug 22, 2019

Hi, is there any feedback on this?
I'm trying to deploy a lambda function using Spring Boot 2.x + webflux and I'm having the following issue:
Error loading class com.everymundo.openair.farenet.processor.StreamLambdaHandler: org/springframework/web/servlet/DispatcherServlet: java.lang.NoClassDefFoundError java.lang.NoClassDefFoundError: org/springframework/web/servlet/DispatcherServlet

@sapessi was the webflux support added at some point to any release?

Thanks in advance

@sapessi
Copy link
Collaborator

sapessi commented Aug 22, 2019

Hi @renecairo - I have not worked on Webflux yet. I'm assuming you have tested with the latest version of this library? The error indicates that it cannot load org/springframework/web/servlet/DispatcherServlet, which I'd expected to be bundled in the jar/zip every time. Do you have a repo that I can use to replicate?

@renecairo
Copy link

Hi @sapessi

We are building fully reactive micro-services using Spring Boot 2.x and Webflux with Reactor Netty (non-servlet). Spring web-mvc is the one including the missing class and we can't have both dependencies in the same project.

@khakiout
Copy link

Would like to comment that we've experience the same thing. It seems SpringBootLambdaContainerHandler contains a dispatcherServlet field by default. However, this is not included for Webflux.

@sapessi
Copy link
Collaborator

sapessi commented Aug 29, 2019

I spent some time looking into this. I see two possible path forward:

  1. Spring-spefici: Sounds like this should be a separate SpringWebfluxLambdaContainerHandler that interfaces with an WebHandler object instead of the classic DispatcherServlet. We should also provide implementations of the ServerHttpRequest and response interfaces. This is relatively low effort but cannot be used with other frameworks
  2. Create an implementation of the reactor core APIs and integrate with Spring effectively pretending to be a server such as Netty. This is more generic and will likely scale better to other frameworks. However, it is considerably more effort.

To be clear, AWS Lambda itself enforces that each sandbox only processes one event at a time. So you won't see any performance gain by using the reactor API. Adding support for this in the library simply gives you the ability to port your code between compute platforms.

Do you use any other framework for "reactor" development other than Spring @jayasai470, @renecairo, @khakiout?

@jayasai470
Copy link
Author

@sapessi right now no.

@khakiout
Copy link

khakiout commented Sep 1, 2019

@sapessi At the moment no, we only use 'reactor' with Spring.

To be clear, AWS Lambda itself enforces that each sandbox only processes one event at a time. So you won't see any performance gain by using the reactor API.

We'll take note of this. If Serverless doesn't benefit much from Reactor maybe we can go back to the usual way.

@sapessi
Copy link
Collaborator

sapessi commented Sep 3, 2019

I'd say it's still worth testing @khakiout, especially if your function performs multiple, network-bound operations all at once. I'm hoping to find some time to work on this soon.

We'll take note of this. If Serverless doesn't benefit much from Reactor maybe we can go back to the usual way.

sapessi added a commit that referenced this issue Sep 11, 2019
…port WebFlux and reactive embedded servers for Spring to address #239
sapessi added a commit that referenced this issue Sep 11, 2019
sapessi added a commit that referenced this issue Sep 11, 2019
sapessi added a commit that referenced this issue Sep 11, 2019
@sapessi
Copy link
Collaborator

sapessi commented Sep 11, 2019

Hey @jayasai470, @renecairo, @khakiout - this was a deep rabbit hole and I ended up refactoring most of the servlet context and spring integration. The first experimental version of the changes are in the core branch of this repo. I have added a new module called aws-serverless-java-container-springboot2 that supports both reactive and servlet-based containers. Given the size of the change, I'd like to get as many hands on it as possible to test. Will you be able to assist and test with your applications?

  1. Clone this repo in local
  2. Switch to the core branch (git checkout core)
  3. Run mvn install to install a copy of the 1.4-SNAPSHOT version of the library in your local maven repo
  4. In your app/function take a dependency on aws-serverless-java-container-springboot2 version 1.4-SNAPSHOT
  5. Make sure that in your pom.xml you exclude whatever embedded container Spring is using. In my case this was:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.1.8.RELEASE</version>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </exclusion>
    </exclusions>
</dependency>

@sapessi
Copy link
Collaborator

sapessi commented Sep 23, 2019

Hey @jayasai470, @renecairo, @khakiout did you ever get a chance to test this? I want to push out release 1.4 soon and I'd like to have a few more eyes on the new integration.

@zorrofox
Copy link

Hi @sapessi ,

I have tested this release in my Demo project for spring-webflux and it worked fine. Thanks a lot for your guys work.

@sapessi
Copy link
Collaborator

sapessi commented Sep 24, 2019

Thanks for the help @zorrofox, really appreciated!

@khakiout
Copy link

Checking on this now.

@jayasai470
Copy link
Author

@sapessi thanks for the work, i have made a sample project here for testing
https://github.com/jayasai470/aws-serverless-java
router function works fine, but the spring cloud function does not return a valid resp, i guess this has to do with the response type

@sapessi
Copy link
Collaborator

sapessi commented Sep 25, 2019

Weird @jayasai470, I cloned your repo and added a new unit test to run a full request through, the response looks correct to me:

@Test
public void testRequest() throws IOException {
	LambdaHandler h = new LambdaHandler();
	AwsProxyRequest req = new AwsProxyRequestBuilder("/api/status", "GET").build();
	ByteArrayOutputStream os = new ByteArrayOutputStream();
	h.handleRequest(
			new ByteArrayInputStream(LambdaContainerHandler.getObjectMapper().writeValueAsBytes(req)),
			os,
			new MockLambdaContext()
	);
	AwsProxyResponse resp = LambdaContainerHandler.getObjectMapper().readValue(os.toByteArray(), AwsProxyResponse.class);

	assertEquals(200, resp.getStatusCode());
	System.out.println("Body: " + resp.getBody());
	assertEquals("status is up", resp.getBody());
}

Are you getting an actual exception?

@jayasai470
Copy link
Author

the router function "/api/status" and "/api/time" works fine both in local and aws but spring cloud function ("/hello") does not send any resp in aws. If we run the app in local and do a curl -x POST http://localhost:8080/hello -d "somestring" i am getting proper response like "SOMESTRING". But when we deploy to aws lambda and api gateway(sls deploy) i am not receiving any output.

@sapessi
Copy link
Collaborator

sapessi commented Sep 25, 2019

Thanks for the clarification. I'll test this today.

@sapessi
Copy link
Collaborator

sapessi commented Sep 26, 2019

I spent some time looking into this @jayasai470, looks like Functions use a fairly different interface from the standard http APIs. I believe it would require building a fully reactive handler for the framework. It is something I plan to look into but not for the 1.4 release - I'll open a separate issue to track this.

In the meanwhile, I would recommend using Spring's own adapter.

@jayasai470
Copy link
Author

@sapessi yup i think spring cloud function with aws adapaters is good enough if we have one function, but if any one wants to have a spring web or webflux integration for http end points then this is good enough

Thanks for the hard work and quick support.

@cesardrk
Copy link

@sapessi i have tried the 1.4 version creating a simple rest api using spring boot 2.1.8 with webflux, but i get the same blank body output mentioned by @jayasai470 from my endpoint. Calling the api using the netty embedded server works fine, retrieving the body data.

@sapessi
Copy link
Collaborator

sapessi commented Nov 11, 2019

@cesardrk you are using the aws-serverless-java-container-springboot2 package?

@cesardrk
Copy link

@sapessi yes, i'm using the springboot2 package.

@cesardrk
Copy link

cesardrk commented Nov 12, 2019

I have created an ElasticSearch client using RestHighLevelClient, with a simple get endpoint to retrieve all nearby places using geolocation.

This is my endpoint:

@RestController
@Import({PlacesRepository.class})
public class RadarController {

    private final PlacesRepository placesRepository;

    @Autowired
    public RadarController(PlacesRepository placesRepository) {
        this.placesRepository = placesRepository;
    }

    @RequestMapping(path = "/", method = RequestMethod.GET)
    public Flux<Place> getPlaces(@RequestParam("lat_pos") Double latPos,
                                 @RequestParam("lon_pos") Double lonPos,
                                 @RequestParam("lat_ref") Optional<Double> latRef,
                                 @RequestParam("lon_ref") Optional<Double> lonRef,
                                 @RequestParam("keywords") Optional<String> keywords,
                                 @RequestParam(value = "radius", defaultValue = "1000") Integer radius,
                                 @RequestParam(value = "offset", defaultValue = "0") Integer offset,
                                 @RequestParam(value = "limit", defaultValue = "25") Integer limit,
                                 Principal principal) {

        return placesRepository.findByLatLonAndKeywords(latPos, lonPos, latRef.orElse(latPos), lonRef.orElse(lonPos),
                keywords.map(isPresent -> Optional.of(isPresent.split(","))).orElse(Optional.empty()),
                radius, offset, limit);
    }

}

and this is my repository class:

@Component
@Import({ElasticSearchConfiguration.class, JacksonConfiguration.class})
public class PlacesRepository {

    private static final String INDEX_NAME = "places";
    private static final String COORDINATES_FIELD = "coordinates";
    private static final String KEYWORDS_FIELD = "keywords";

    private final RestHighLevelClient client;
    private final ObjectMapper objectMapper;

    @Autowired
    public PlacesRepository(RestHighLevelClient client, ObjectMapper objectMapper) {
        this.client = client;
        this.objectMapper = objectMapper;
    }

    public Flux<Place> findByLatLonAndKeywords(Double latPos, Double lonPos, Double latRef, Double lonRef,
                                               Optional<String[]> keywords, Integer radius, Integer offset,
                                               Integer limit) {
        return Mono
                .<SearchResponse>create(sink -> {
                    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

                    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

                    if (keywords.isPresent()) {
                        Arrays.asList(keywords.get()).forEach(keyword -> boolQueryBuilder.must(
                                QueryBuilders.matchQuery(KEYWORDS_FIELD, keyword))
                        );
                    } else {
                        boolQueryBuilder.must(QueryBuilders.matchAllQuery());
                    }

                    QueryBuilder geoDistanceQueryBuilder = QueryBuilders
                            .geoDistanceQuery(COORDINATES_FIELD)
                            .point(latPos, lonPos)
                            .distance(radius, DistanceUnit.METERS);

                    QueryBuilder finalQuery = boolQueryBuilder.filter(geoDistanceQueryBuilder);

                    sourceBuilder.query(finalQuery).from(offset).size(limit);

                    SearchRequest searchRequest = new SearchRequest(INDEX_NAME)
                            .source(sourceBuilder.sort(SortBuilders.geoDistanceSort(COORDINATES_FIELD, latRef, lonRef)
                                    .order(SortOrder.ASC)
                                    .unit(DistanceUnit.METERS)));

                    client.searchAsync(searchRequest, RequestOptions.DEFAULT, listenerToSink(sink));
                })
                .map(SearchResponse::getHits)
                .flatMapMany(hits -> Flux.fromArray(hits.getHits()))
                .map(hit -> objectMapper.convertValue(hit.getSourceAsMap(), Place.class)
                );
    }

    private <T> ActionListener<T> listenerToSink(MonoSink<T> sink) {
        return new ActionListener<T>() {
            @Override
            public void onResponse(T response) {
                sink.success(response);
            }

            @Override
            public void onFailure(Exception e) {
                sink.error(e);
            }
        };
    }
}

And my test class which fails:

@Test
public void testValidRequestToLambda() throws IOException {
    StreamLambdaHandler h = new StreamLambdaHandler();
    AwsProxyRequest req = new AwsProxyRequestBuilder("/?lat_pos=-23.570000&lon_pos=-46.692000&radius=3000", "GET").build();
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    h.handleRequest(
            new ByteArrayInputStream(LambdaContainerHandler.getObjectMapper().writeValueAsBytes(req)),
            os,
            new MockLambdaContext()
    );
    AwsProxyResponse resp = LambdaContainerHandler.getObjectMapper().readValue(os.toByteArray(), AwsProxyResponse.class);

    assertEquals(200, resp.getStatusCode());
    System.out.println("Body: " + resp.getBody());
    assertThat(resp.getBody(), not(isEmptyString()));
}

@cesardrk
Copy link

@sapessi should i open a new issue with the above example?

@sapessi
Copy link
Collaborator

sapessi commented Nov 13, 2019

@cesardrk Let me have a play with the sample code to see if I can root-cause the issue. I'll open a separate issue if it's needed. Is your LambdaHandler class based on our samples?

@cesardrk
Copy link

@sapessi yes, i have used your sample as a starting point. The handler actually is almost identical, i just removed the cognito filter.

@sapessi
Copy link
Collaborator

sapessi commented Nov 13, 2019

Thanks @cesardrk - I hope to test with this code tomorrow. Any chance you can also share the execution log from CloudWatch? Just in case something jumps out at me.

@cesardrk
Copy link

cesardrk commented Nov 13, 2019

@sapessi i did not see any useful information from the logs. But i could upload then if u want. Anyway, i just uploaded an example project in github, replacing the elasticsearch client with an mocked completable future call (with a 5s sleep), that reproduces the issue im having: https://github.com/cesardrk/aws-serverless-java-container-webflux

@sapessi
Copy link
Collaborator

sapessi commented Nov 14, 2019

Quick update @cesardrk. I've managed to replicate the issue in my tests. Looks like when you use a future object either the Flux does not wait for the completion or the flush method on our response object is called prematurely. What has me confused is that we do not create the subscriber manually in this library, instead we rely on Spring's own ServletHttpHandlerAdapter. Running the method as a purely reactive implementation is works fine:

return Flux.create((sink) -> {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    sink.next(MESSAGE);
});

However, when I test your method with SpringBoot's embedded container it seems to work fine. With suggests there is an issue in the response flushing logic on our side. Let me dig deeper into this today.

@sapessi
Copy link
Collaborator

sapessi commented Nov 14, 2019

Another update. It does indeed look like it has something to do in how we release the latch in the response object. At the moment, we release it when the flush method is called on the output stream. This decision was driven by the fact that, from Lambda, we need to return the full output of the response at once - we don't get to "stream" response data.

I'll open a separate issue to track this and investigate how I we can find a good middle-ground.

steliospaps added a commit to steliospaps/spike-simple-full-stack that referenced this issue May 21, 2020
I made the lambda start, but the application context is not created correctly becasue it instanciates a webflux bean.
It looks like netty does not play well with aws lambda. hmm relevant links:
aws/serverless-java-container#239
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants