22
33import dev .gustavoavila .websocketclient .common .Utils ;
44import dev .gustavoavila .websocketclient .exceptions .IllegalSchemeException ;
5+ import dev .gustavoavila .websocketclient .exceptions .InvalidReceivedFrameException ;
56import dev .gustavoavila .websocketclient .exceptions .InvalidServerHandshakeException ;
6- import dev .gustavoavila .websocketclient .exceptions .UnknownOpcodeException ;
77import dev .gustavoavila .websocketclient .model .Payload ;
88
99import java .io .BufferedInputStream ;
1717import java .security .MessageDigest ;
1818import java .security .NoSuchAlgorithmException ;
1919import java .security .SecureRandom ;
20- import java .util .HashMap ;
21- import java .util .LinkedList ;
22- import java .util .Map ;
23- import java .util .Random ;
24- import java .util .Queue ;
20+ import java .util .*;
2521
2622import javax .net .SocketFactory ;
2723import javax .net .ssl .SSLSocketFactory ;
3228 * @author Gustavo Avila
3329 */
3430public abstract class WebSocketClient {
31+ public static final int CLOSE_CODE_NORMAL = 1000 ;
32+
3533 /**
3634 * Max number of response handshake bytes to read before raising an exception
3735 */
@@ -137,6 +135,8 @@ public abstract class WebSocketClient {
137135 */
138136 private SSLSocketFactory sslSocketFactory ;
139137
138+ private volatile Timer closeTimer ;
139+
140140 /**
141141 * Initialize all the variables
142142 *
@@ -199,7 +199,7 @@ public WebSocketClient(URI uri) {
199199 /**
200200 * Called when a close code has been received
201201 */
202- public abstract void onCloseReceived ();
202+ public abstract void onCloseReceived (int reason , String description );
203203
204204 /**
205205 * Adds a new header to the set of headers that will be send into the
@@ -449,14 +449,31 @@ private void notifyOnException(Exception e) {
449449 /**
450450 * If the close method wasn't called, call onCloseReceived() method.
451451 */
452- private void notifyOnCloseReceived () {
452+ private void notifyOnCloseReceived (int reason , String description ) {
453453 synchronized (globalLock ) {
454454 if (isRunning ) {
455- onCloseReceived ();
455+ onCloseReceived (reason , description );
456456 }
457457 }
458458 }
459459
460+ private void forceClose () {
461+ new Thread (new Runnable () {
462+ @ Override
463+ public void run () {
464+ synchronized (globalLock ) {
465+ isRunning = false ;
466+
467+ if (reconnectionThread != null ) {
468+ reconnectionThread .interrupt ();
469+ }
470+
471+ webSocketConnection .closeInternal ();
472+ }
473+ }
474+ }).start ();
475+ }
476+
460477 /**
461478 * Sends a text message If the WebSocket is not connected yet, message will
462479 * be send the next time the connection is opened
@@ -465,7 +482,7 @@ private void notifyOnCloseReceived() {
465482 */
466483 public void send (String message ) {
467484 byte [] data = message .getBytes (Charset .forName ("UTF-8" ));
468- final Payload payload = new Payload (OPCODE_TEXT , data );
485+ final Payload payload = new Payload (OPCODE_TEXT , data , false );
469486
470487 new Thread (new Runnable () {
471488 @ Override
@@ -483,7 +500,7 @@ public void run() {
483500 * @param data Binary data that will be send to the WebSocket server
484501 */
485502 public void send (byte [] data ) {
486- final Payload payload = new Payload (OPCODE_BINARY , data );
503+ final Payload payload = new Payload (OPCODE_BINARY , data , false );
487504
488505 new Thread (new Runnable () {
489506 @ Override
@@ -503,7 +520,7 @@ public void sendPing(byte[] data) {
503520 throw new IllegalArgumentException ("Control frame payload cannot be greater than 125 bytes" );
504521 }
505522
506- final Payload payload = new Payload (OPCODE_PING , data );
523+ final Payload payload = new Payload (OPCODE_PING , data , false );
507524 new Thread (new Runnable () {
508525 @ Override
509526 public void run () {
@@ -522,7 +539,7 @@ public void sendPong(byte[] data) {
522539 throw new IllegalArgumentException ("Control frame payload cannot be greater than 125 bytes" );
523540 }
524541
525- final Payload payload = new Payload (OPCODE_PONG , data );
542+ final Payload payload = new Payload (OPCODE_PONG , data , false );
526543 new Thread (new Runnable () {
527544 @ Override
528545 public void run () {
@@ -534,21 +551,40 @@ public void run() {
534551 /**
535552 * Closes the WebSocket connection
536553 */
537- public void close () {
538- new Thread (new Runnable () {
539- @ Override
540- public void run () {
541- synchronized (globalLock ) {
542- isRunning = false ;
554+ public void close (final int timeout , int code , String reason ) {
555+ if (timeout == 0 ) {
556+ forceClose ();
557+ } else if (code < 0 || code >= 5000 ) {
558+ throw new IllegalArgumentException ("Close frame code must be greater or equal than zero and less than 5000" );
559+ } else {
560+ byte [] internalReason = new byte [0 ];
561+ if (reason != null ) {
562+ internalReason = reason .getBytes (Charset .forName ("UTF-8" ));
563+ if (internalReason .length > 123 ) {
564+ throw new IllegalArgumentException ("Close frame reason is too large" );
565+ }
566+ }
543567
544- if ( reconnectionThread != null ) {
545- reconnectionThread . interrupt ( );
546- }
568+ byte [] codeLength = Utils . to2ByteArray ( code );
569+ byte [] data = Arrays . copyOf ( codeLength , 2 + internalReason . length );
570+ System . arraycopy ( internalReason , 0 , data , codeLength . length , internalReason . length );
547571
548- webSocketConnection .closeInternal ();
572+ final Payload payload = new Payload (OPCODE_CLOSE , data , false );
573+ new Thread (new Runnable () {
574+ @ Override
575+ public void run () {
576+ webSocketConnection .sendInternal (payload );
549577 }
550- }
551- }).start ();
578+ }).start ();
579+
580+ closeTimer = new Timer ();
581+ closeTimer .schedule (new TimerTask () {
582+ @ Override
583+ public void run () {
584+ forceClose ();
585+ }
586+ }, timeout );
587+ }
552588 }
553589
554590 /**
@@ -569,10 +605,15 @@ private class WebSocketConnection {
569605 */
570606 private volatile boolean isClosed ;
571607
608+ /**
609+ * Flag that indicates that a graceful close is in process
610+ */
611+ private volatile boolean isClosing ;
612+
572613 /**
573614 * Data waiting to be read from the writer thread
574615 */
575- private final LinkedList <Payload > outBuffer ;
616+ private final Queue <Payload > queue ;
576617
577618 /**
578619 * This will act as a lock for synchronized statements
@@ -606,7 +647,8 @@ private class WebSocketConnection {
606647 private WebSocketConnection () {
607648 this .pendingMessages = false ;
608649 this .isClosed = false ;
609- this .outBuffer = new LinkedList <Payload >();
650+ this .isClosing = false ;
651+ this .queue = new LinkedList <Payload >();
610652 this .internalLock = new Object ();
611653
612654 this .writerThread = new Thread (new Runnable () {
@@ -627,13 +669,17 @@ public void run() {
627669 if (socket .isClosed ()) {
628670 return ;
629671 } else {
630- while (outBuffer .size () > 0 ) {
631- Payload payload = outBuffer . removeFirst ();
672+ while (queue .size () > 0 ) {
673+ Payload payload = queue . poll ();
632674 int opcode = payload .getOpcode ();
633675 byte [] data = payload .getData ();
634676
635677 try {
636678 send (opcode , data );
679+
680+ if (payload .isCloseEcho ()) {
681+ closeInternalInsecure ();
682+ }
637683 } catch (IOException e ) {
638684 // Reader thread will notify this
639685 // exception
@@ -1125,9 +1171,29 @@ private void read() throws IOException {
11251171 notifyOnBinaryReceived (data );
11261172 break ;
11271173 case OPCODE_CLOSE :
1128- closeInternal ();
1129- notifyOnCloseReceived ();
1130- return ;
1174+ if (data .length > 125 ) {
1175+ closeInternal ();
1176+ Exception e = new InvalidReceivedFrameException ("Close frame payload is too big" );
1177+ notifyOnException (e );
1178+ return ;
1179+ } else {
1180+ int code = getCloseCode (data );
1181+ String reason = getCloseReason (data );
1182+ notifyOnCloseReceived (code , reason );
1183+ }
1184+
1185+ synchronized (internalLock ) {
1186+ if (isClosing ) {
1187+ // This is the echo of a client initiated close so the connection can be closed immediately
1188+ closeInternalInsecure ();
1189+ return ;
1190+ } else {
1191+ // This is a server initiated close so an echo must be sent
1192+ Payload payload = new Payload (OPCODE_CLOSE , data , true );
1193+ sendInternalInsecure (payload );
1194+ break ;
1195+ }
1196+ }
11311197 case OPCODE_PING :
11321198 notifyOnPingReceived (data );
11331199 sendPong (data );
@@ -1137,28 +1203,38 @@ private void read() throws IOException {
11371203 break ;
11381204 default :
11391205 closeInternal ();
1140- Exception e = new UnknownOpcodeException ("Unknown opcode: 0x" + Integer .toHexString (opcode ));
1206+ Exception e = new InvalidReceivedFrameException ("Unknown opcode: 0x" + Integer .toHexString (opcode ));
11411207 notifyOnException (e );
11421208 return ;
11431209 }
11441210 }
11451211
1146- // If there are not more data to be read,
1147- // and if the connection didn't receive a close frame,
1148- // an IOException must be thrown because the connection didn't close
1149- // gracefully
1150- throw new IOException ("Unexpected end of stream" );
1212+ synchronized (internalLock ) {
1213+ // There is no need to notify an exception if the connection is closing
1214+ if (!isClosing ) {
1215+ // An IOException must be thrown because the connection didn't close gracefully
1216+ throw new IOException ("Unexpected end of stream" );
1217+ }
1218+ }
11511219 }
11521220
11531221 /**
1154- * Puts the payload into the out buffer and notifies the writer thread
1155- * that new data is available
1222+ * Puts the payload into the out queue and notifies the writer thread that new data is available
11561223 *
1157- * @param payload Payload to be send to the WebSocket server
1224+ * @param payload Payload to be sent to the WebSocket server
11581225 */
11591226 private void sendInternal (Payload payload ) {
11601227 synchronized (internalLock ) {
1161- outBuffer .addLast (payload );
1228+ sendInternalInsecure (payload );
1229+ }
1230+ }
1231+
1232+ private void sendInternalInsecure (Payload payload ) {
1233+ if (!isClosing ) {
1234+ if (payload .getOpcode () == OPCODE_CLOSE ) {
1235+ isClosing = true ;
1236+ }
1237+ queue .offer (payload );
11621238 pendingMessages = true ;
11631239 internalLock .notify ();
11641240 }
@@ -1169,20 +1245,45 @@ private void sendInternal(Payload payload) {
11691245 * thread and the reconnection thread that they must finish
11701246 */
11711247 private void closeInternal () {
1248+ synchronized (internalLock ) {
1249+ closeInternalInsecure ();
1250+ }
1251+ }
1252+
1253+ private void closeInternalInsecure () {
11721254 try {
1173- synchronized (internalLock ) {
1174- if (!isClosed ) {
1175- isClosed = true ;
1176- if (socket != null ) {
1177- socket .close ();
1178- pendingMessages = true ;
1179- internalLock .notify ();
1180- }
1255+ if (!isClosed ) {
1256+ isClosed = true ;
1257+ if (socket != null ) {
1258+ socket .close ();
1259+ pendingMessages = true ;
1260+ internalLock .notify ();
11811261 }
11821262 }
1263+
1264+ if (closeTimer != null ) {
1265+ closeTimer .cancel ();
1266+ closeTimer = null ;
1267+ }
11831268 } catch (IOException e ) {
11841269 // This should never happen
11851270 }
11861271 }
1272+
1273+ private int getCloseCode (byte [] data ) {
1274+ if (data .length > 1 ) {
1275+ byte [] baseCode = Arrays .copyOfRange (data , 0 , 2 );
1276+ return Utils .fromByteArray (new byte []{0 , 0 , baseCode [0 ], baseCode [1 ]});
1277+ }
1278+ return -1 ;
1279+ }
1280+
1281+ private String getCloseReason (byte [] data ) {
1282+ if (data .length > 2 ) {
1283+ byte [] baseReason = Arrays .copyOfRange (data , 2 , data .length );
1284+ return new String (baseReason , Charset .forName ("UTF-8" ));
1285+ }
1286+ return null ;
1287+ }
11871288 }
11881289}
0 commit comments