33import com .fizzed .rocker .ContentType ;
44import com .fizzed .rocker .RockerOutputFactory ;
55import io .netty .util .concurrent .MultithreadEventExecutorGroup ;
6+ import io .vertx .core .impl .VertxInternal ;
67import io .vertx .pgclient .*;
78import io .vertx .core .*;
89import io .vertx .core .buffer .Buffer ;
@@ -101,15 +102,13 @@ static int getQueries(HttpServerRequest request) {
101102 private static final String SELECT_WORLDS = "SELECT id, randomnumber from WORLD" ;
102103
103104 private HttpServer server ;
104-
105105 private SqlClientInternal client ;
106-
107106 private CharSequence dateString ;
108-
109107 private CharSequence [] plaintextHeaders ;
110108
111109 private final RockerOutputFactory <BufferRockerOutput > factory = BufferRockerOutput .factory (ContentType .RAW );
112110
111+ private Throwable databaseErr ;
113112 private PreparedQuery <RowSet <Row >> SELECT_WORLD_QUERY ;
114113 private PreparedQuery <RowSet <Row >> SELECT_FORTUNE_QUERY ;
115114 private PreparedQuery <RowSet <Row >> UPDATE_WORLD_QUERY ;
@@ -122,8 +121,8 @@ public static CharSequence createDateHeader() {
122121 @ Override
123122 public void start (Promise <Void > startPromise ) throws Exception {
124123 int port = 8080 ;
125- server = vertx .createHttpServer (new HttpServerOptions ());
126- server .requestHandler (App .this ). listen ( port );
124+ server = vertx .createHttpServer (new HttpServerOptions ())
125+ .requestHandler (App .this );
127126 dateString = createDateHeader ();
128127 plaintextHeaders = new CharSequence [] {
129128 HEADER_CONTENT_TYPE , RESPONSE_TYPE_PLAIN ,
@@ -140,50 +139,70 @@ public void start(Promise<Void> startPromise) throws Exception {
140139 options .setPassword (config .getString ("password" , "benchmarkdbpass" ));
141140 options .setCachePreparedStatements (true );
142141 options .setPipeliningLimit (100_000 ); // Large pipelining means less flushing and we use a single connection anyway
143- PgConnection .connect (vertx , options ).flatMap (conn -> {
144- client = (SqlClientInternal )conn ;
145- Future <PreparedStatement > f1 = conn .prepare (SELECT_WORLD );
146- Future <PreparedStatement > f2 = conn .prepare (SELECT_FORTUNE );
147- Future <PreparedStatement > f3 = conn .prepare (UPDATE_WORLD );
148- Future <WorldCache > f4 = conn .preparedQuery (SELECT_WORLDS )
149- .collecting (Collectors .mapping (row -> new CachedWorld (row .getInteger (0 ), row .getInteger (1 )), Collectors .toList ()))
150- .execute ().map (worlds -> new WorldCache (worlds .value ()));
151- f1 .onSuccess (ps -> SELECT_WORLD_QUERY = ps .query ());
152- f2 .onSuccess (ps -> SELECT_FORTUNE_QUERY = ps .query ());
153- f3 .onSuccess (ps -> UPDATE_WORLD_QUERY = ps .query ());
154- f4 .onSuccess (wc -> WORLD_CACHE = wc );
155- return CompositeFuture .all (f1 , f2 , f3 , f4 );
156- }).onComplete (ar -> startPromise .complete ());
142+ PgConnection .connect (vertx , options )
143+ .flatMap (conn -> {
144+ client = (SqlClientInternal ) conn ;
145+ Future <PreparedStatement > f1 = conn .prepare (SELECT_WORLD )
146+ .andThen (onSuccess (ps -> SELECT_WORLD_QUERY = ps .query ()));
147+ Future <PreparedStatement > f2 = conn .prepare (SELECT_FORTUNE )
148+ .andThen (onSuccess (ps -> SELECT_FORTUNE_QUERY = ps .query ()));
149+ Future <PreparedStatement > f3 = conn .prepare (UPDATE_WORLD )
150+ .andThen (onSuccess (ps -> UPDATE_WORLD_QUERY = ps .query ()));
151+ Future <WorldCache > f4 = conn .preparedQuery (SELECT_WORLDS )
152+ .collecting (Collectors .mapping (row -> new CachedWorld (row .getInteger (0 ), row .getInteger (1 )), Collectors .toList ()))
153+ .execute ()
154+ .map (worlds -> new WorldCache (worlds .value ()))
155+ .andThen (onSuccess (wc -> WORLD_CACHE = wc ));
156+ return CompositeFuture .join (f1 , f2 , f3 , f4 );
157+ })
158+ .transform (ar -> {
159+ databaseErr = ar .cause ();
160+ return server .listen (port );
161+ })
162+ .<Void >mapEmpty ()
163+ .onComplete (startPromise );
164+ }
165+
166+ private static <T > Handler <AsyncResult <T >> onSuccess (Handler <T > handler ) {
167+ return ar -> {
168+ if (ar .succeeded ()) {
169+ handler .handle (ar .result ());
170+ }
171+ };
157172 }
158173
159174 @ Override
160175 public void handle (HttpServerRequest request ) {
161- switch (request .path ()) {
162- case PATH_PLAINTEXT :
163- handlePlainText (request );
164- break ;
165- case PATH_JSON :
166- handleJson (request );
167- break ;
168- case PATH_DB :
169- handleDb (request );
170- break ;
171- case PATH_QUERIES :
172- new Queries (request ).handle ();
173- break ;
174- case PATH_UPDATES :
175- new Update (request ).handle ();
176- break ;
177- case PATH_FORTUNES :
178- handleFortunes (request );
179- break ;
180- case PATH_CACHING :
181- handleCaching (request );
182- break ;
183- default :
184- request .response ().setStatusCode (404 );
185- request .response ().end ();
186- break ;
176+ try {
177+ switch (request .path ()) {
178+ case PATH_PLAINTEXT :
179+ handlePlainText (request );
180+ break ;
181+ case PATH_JSON :
182+ handleJson (request );
183+ break ;
184+ case PATH_DB :
185+ handleDb (request );
186+ break ;
187+ case PATH_QUERIES :
188+ new Queries (request ).handle ();
189+ break ;
190+ case PATH_UPDATES :
191+ new Update (request ).handle ();
192+ break ;
193+ case PATH_FORTUNES :
194+ handleFortunes (request );
195+ break ;
196+ case PATH_CACHING :
197+ handleCaching (request );
198+ break ;
199+ default :
200+ request .response ().setStatusCode (404 );
201+ request .response ().end ();
202+ break ;
203+ }
204+ } catch (Exception e ) {
205+ sendError (request , e );
187206 }
188207 }
189208
@@ -192,6 +211,11 @@ public void stop() {
192211 if (server != null ) server .close ();
193212 }
194213
214+ private void sendError (HttpServerRequest req , Throwable cause ) {
215+ logger .error (cause .getMessage (), cause );
216+ req .response ().setStatusCode (500 ).end ();
217+ }
218+
195219 private void handlePlainText (HttpServerRequest request ) {
196220 HttpServerResponse response = request .response ();
197221 MultiMap headers = response .headers ();
@@ -237,13 +261,11 @@ private void handleDb(HttpServerRequest req) {
237261 .putHeader (HttpHeaders .CONTENT_TYPE , RESPONSE_TYPE_JSON )
238262 .end (Json .encode (new World (row .getInteger (0 ), row .getInteger (1 ))), NULL_HANDLER );
239263 } else {
240- logger .error (res .cause ());
241- resp .setStatusCode (500 ).end (res .cause ().getMessage ());
264+ sendError (req , res .cause ());
242265 }
243266 });
244267 }
245268
246-
247269 class Queries implements Handler <AsyncResult <RowSet <Row >>> {
248270
249271 boolean failed ;
@@ -271,7 +293,7 @@ public void handle(AsyncResult<RowSet<Row>> ar) {
271293 if (!failed ) {
272294 if (ar .failed ()) {
273295 failed = true ;
274- resp . setStatusCode ( 500 ). end ( ar .cause (). getMessage ());
296+ sendError ( req , ar .cause ());
275297 return ;
276298 }
277299
@@ -315,7 +337,7 @@ private void handle() {
315337 if (!failed ) {
316338 if (ar2 .failed ()) {
317339 failed = true ;
318- sendError (ar2 .cause ());
340+ sendError (req , ar2 .cause ());
319341 return ;
320342 }
321343 worlds [index ] = new World (ar2 .result ().iterator ().next ().getInteger (0 ), randomWorld ());
@@ -336,7 +358,7 @@ void handleUpdates() {
336358 }
337359 UPDATE_WORLD_QUERY .executeBatch (batch , ar2 -> {
338360 if (ar2 .failed ()) {
339- sendError (ar2 .cause ());
361+ sendError (req , ar2 .cause ());
340362 return ;
341363 }
342364 JsonArray json = new JsonArray ();
@@ -350,11 +372,6 @@ void handleUpdates() {
350372 .end (json .toBuffer (), NULL_HANDLER );
351373 });
352374 }
353-
354- void sendError (Throwable err ) {
355- logger .error ("" , err );
356- req .response ().setStatusCode (500 ).end (err .getMessage ());
357- }
358375 }
359376
360377 private void handleFortunes (HttpServerRequest req ) {
@@ -379,9 +396,7 @@ private void handleFortunes(HttpServerRequest req) {
379396 .putHeader (HttpHeaders .CONTENT_TYPE , RESPONSE_TYPE_HTML )
380397 .end (FortunesTemplate .template (fortunes ).render (factory ).buffer (), NULL_HANDLER );
381398 } else {
382- Throwable err = ar .cause ();
383- logger .error ("" , err );
384- response .setStatusCode (500 ).end (err .getMessage ());
399+ sendError (req , ar .cause ());
385400 }
386401 });
387402 }
@@ -441,6 +456,7 @@ public static void main(String[] args) throws Exception {
441456
442457 private static void printConfig (Vertx vertx ) {
443458 boolean nativeTransport = vertx .isNativeTransportEnabled ();
459+ String transport = ((VertxInternal ) vertx ).transport ().getClass ().getSimpleName ();
444460 String version = "unknown" ;
445461 try {
446462 InputStream in = Vertx .class .getClassLoader ().getResourceAsStream ("META-INF/vertx/vertx-version.txt" );
@@ -463,5 +479,6 @@ private static void printConfig(Vertx vertx) {
463479 logger .info ("Vertx: " + version );
464480 logger .info ("Event Loop Size: " + ((MultithreadEventExecutorGroup )vertx .nettyEventLoopGroup ()).executorCount ());
465481 logger .info ("Native transport : " + nativeTransport );
482+ logger .info ("Transport : " + transport );
466483 }
467484}
0 commit comments