Switch branches/tags
Nothing to show
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
..
Failed to load latest commit information.
src/main
README.md
pom.xml

README.md

title keywords date categories
WebSocket - Java & html & JavaScript - 单发 & 群发
websocket
java
java和websocket
2018/07/28 15:40
java

一、背景说明

最近在做app后台相关接口

自建通知中心目前不能很好的支持给APP推送消息

长连接可以保持推送速度,目前app中内嵌了H5,所以考虑使用websocket

之前没有接触过websocket,百度了一堆之后,页面上可以正常使用

但是没有发现可用使用Java后台进行消息的发送,于是乎就琢磨了一上午,解决了这个问题

现在把这个小工程分享给大家,少走点弯路==

ps:很多不能在后台发送消息,是因为缺少java的客户端

二、准备工作

建立一个maven web 工程

添加依赖

        <dependency>
            <groupId>javax.websocket</groupId>
            <artifactId>javax.websocket-api</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>com.neovisionaries</groupId>
            <artifactId>nv-websocket-client</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>

三、主要代码

websocket服务端主逻辑

为了实现简单的非群发操作,在连接websocket的时候,加上了一些get参数

例如:ws://localhost:8080/websocket?sendTo=hisen&method=methodSingle&user=hisenyuan

然后在后端判断,根据参数做出不同的动作

demo完整工程:https://github.com/hisenyuan/IDEAPractice/tree/master/websocket-demo

配置完Tomcat,即可使用,在java后台运行测试类(com.hisen.ws.client.ClientApp4Java)可发送消息到页面

package com.hisen.ws.server;

import com.hisen.ws.util.Constants;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
 */
@ServerEndpoint("/websocket")
public class WebSocketServer {
    // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    // 实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    // 当前用户
    private String user;

    /**
     * 客户端可以是web页面,也可以是Java后台
     * <p>
     * 通过连接或者message可以控制发送给谁
     *
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自客户端的消息:" + message);
        // 获取url传过来的参数
        Map<String, List<String>> parameterMap = session.getRequestParameterMap();
        // 发送方式
        String method = null;
        // 发送给哪些人
        List<String> receivers = new ArrayList<>();
        // 发送者
        String sernder = null;
        if (parameterMap.containsKey(Constants.METHOD)) {
            method = parameterMap.get(Constants.METHOD).get(0);
        }
        if (parameterMap.containsKey(Constants.SEND_TO)) {
            receivers = parameterMap.get(Constants.SEND_TO);
        }
        if (parameterMap.containsKey(Constants.USER)) {
            sernder = parameterMap.get(Constants.USER).get(0);
        }

        System.out.println("sender:" + sernder + ",receivers:" + receivers.toString() + ",method:" + method);
        if (method == null || method.equals(Constants.METHOD_ALL)) {
            //发送所有
            send2All(message);
        } else {
            //单发
            send2Users(receivers, message);
        }

    }

    /**
     * 连接建立成功调用的方法
     *
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.user = session.getRequestParameterMap().get(Constants.USER).get(0);
        // 放入map
        webSocketMap.put(user, this);
        //在线数加1
        addOnlineCount();
        System.out.println("有新连接加入!当前在线人数为" + getOnlineCount() + ",session:" + session.getId() + ",user:" + this.user);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        // 移除
        webSocketMap.remove(this.user);
        //在线数减1
        subOnlineCount();
        System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount() + ",user:" + this.user);
    }


    private void send2Users(List<String> receivers, String message) {
        receivers.forEach(e -> {
            System.out.println("single receiver:" + e);
            Optional.ofNullable(webSocketMap.get(e))
                    .filter(webSocketServer -> webSocketServer.session.isOpen())
                    .ifPresent(webSocketServer -> sendOnce(message, e, webSocketServer));
        });
    }

    private void send2All(String message) {
        webSocketMap.forEach((key, value) -> {
            sendOnce(message, key, value);
        });
    }

    private void sendOnce(String message, String e, WebSocketServer webSocketServer) {
        try {
            webSocketServer.sendMessage(message);
        } catch (IOException exp) {
            System.out.println("发送出错,receiver:" + e);
        }
    }

    /**
     * 发生错误时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误,user:" + this.user);
        error.printStackTrace();
    }

    /**
     * 自定义的方法
     *
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
        //this.session.getAsyncRemote().sendText(message);
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}