From 265a75542484881e3e8bb2f1e985b0304f98cd95 Mon Sep 17 00:00:00 2001 From: baeksunghyun Date: Wed, 5 Mar 2025 17:32:22 +0900 Subject: [PATCH] =?UTF-8?q?Fix:=20WebSocket=20=EA=B4=80=EB=A0=A8=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/ChatMessageServiceImpl.java | 2 +- .../websocket/ChatWebSocketHandler.java | 288 ++++++++++++++---- .../global/websocket/WebSocketConfig.java | 11 +- 3 files changed, 241 insertions(+), 60 deletions(-) diff --git a/src/main/java/cmf/commitField/domain/chat/chatMessage/service/ChatMessageServiceImpl.java b/src/main/java/cmf/commitField/domain/chat/chatMessage/service/ChatMessageServiceImpl.java index 38a073b..2b8a495 100644 --- a/src/main/java/cmf/commitField/domain/chat/chatMessage/service/ChatMessageServiceImpl.java +++ b/src/main/java/cmf/commitField/domain/chat/chatMessage/service/ChatMessageServiceImpl.java @@ -34,7 +34,7 @@ public class ChatMessageServiceImpl implements ChatMessageService { private final UserChatRoomRepository userChatRoomRepository; @Override - @Transactional +// @Transactional public ChatMsgResponse sendMessage(ChatMsgRequest message, Long userId, Long roomId) { User findUser = userRepository.findById(userId) .orElseThrow(() -> new CustomException(ErrorCode.NOT_FOUND_USER)); diff --git a/src/main/java/cmf/commitField/global/websocket/ChatWebSocketHandler.java b/src/main/java/cmf/commitField/global/websocket/ChatWebSocketHandler.java index 659c622..f67ec31 100644 --- a/src/main/java/cmf/commitField/global/websocket/ChatWebSocketHandler.java +++ b/src/main/java/cmf/commitField/global/websocket/ChatWebSocketHandler.java @@ -1,113 +1,287 @@ package cmf.commitField.global.websocket; +import cmf.commitField.domain.chat.chatMessage.controller.request.ChatMsgRequest; +import cmf.commitField.domain.chat.chatMessage.controller.response.ChatMsgResponse; +import cmf.commitField.domain.chat.chatMessage.service.ChatMessageService; +import cmf.commitField.domain.user.entity.User; +import cmf.commitField.domain.user.repository.UserRepository; import cmf.commitField.global.error.ErrorCode; import cmf.commitField.global.exception.CustomException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; import java.io.IOException; +import java.time.LocalDateTime; import java.util.*; @Component @Slf4j +@RequiredArgsConstructor public class ChatWebSocketHandler implements WebSocketHandler { private final Map> chatRooms = new HashMap<>(); - // 방의 키값 - + private final ObjectMapper objectMapper = new ObjectMapper(); + private final ChatMessageService chatMessageService; + private final UserRepository userRepository; // 연결이 되었을 때 @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { -// list.add(session); - Long roomId = extractRoomId(session); - // roomId 가 없을 경우, session list (new ArrayList) - List roomSessions = chatRooms.getOrDefault(roomId, new ArrayList<>()); - // 세션 추가 - roomSessions.add(session); - // 해당 방의 키값에 session list 추가 - chatRooms.put(roomId, roomSessions); - log.info(session + "의 클라이언트 접속"); + log.info("클라이언트 접속: {}", session.getId()); + + // 연결 성공 메시지 전송 + Map connectMessage = new HashMap<>(); + connectMessage.put("type", "SYSTEM"); + connectMessage.put("message", "채팅 서버에 연결되었습니다."); + connectMessage.put("timestamp", LocalDateTime.now().toString()); + + try { + session.sendMessage(new TextMessage(objectMapper.writeValueAsString(connectMessage))); + } catch (Exception e) { + log.error("연결 메시지 전송 실패: {}", e.getMessage()); + } } // 클라이언트로부터 받은 메시지를 처리하는 로직 @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { - // 메시지 처리 로직 - Long roomId = extractRoomId(session); + String payload = message.getPayload().toString(); + log.info("메시지 수신: {}", payload); + + try { + JsonNode jsonNode = objectMapper.readTree(payload); + String messageType = jsonNode.has("type") ? jsonNode.get("type").asText() : "UNKNOWN"; + + switch (messageType) { + case "SUBSCRIBE": + handleSubscribe(session, jsonNode); + break; + case "UNSUBSCRIBE": + handleUnsubscribe(session, jsonNode); + break; + case "CHAT": + handleChatMessage(session, jsonNode); + break; + default: + log.warn("알 수 없는 메시지 타입: {}", messageType); + sendErrorMessage(session, "지원하지 않는 메시지 타입입니다: " + messageType); + } + } catch (Exception e) { + log.error("메시지 처리 중 오류 발생: {}", e.getMessage(), e); + // 오류 메시지 전송 + sendErrorMessage(session, "메시지 처리 중 오류가 발생했습니다: " + e.getMessage()); + } + } + + // 구독 메시지 처리 + private void handleSubscribe(WebSocketSession session, JsonNode jsonNode) { + try { + if (!jsonNode.has("roomId")) { + sendErrorMessage(session, "roomId 필드가 누락되었습니다."); + return; + } + + Long roomId = jsonNode.get("roomId").asLong(); + log.info("채팅방 구독 요청: roomId={}, sessionId={}", roomId, session.getId()); + + // 해당 룸의 세션 목록에 추가 + List roomSessions = chatRooms.getOrDefault(roomId, new ArrayList<>()); + + // 이미 등록된 세션인지 확인하여 중복 등록 방지 + boolean alreadyRegistered = roomSessions.stream() + .anyMatch(existingSession -> existingSession.getId().equals(session.getId())); + + if (!alreadyRegistered) { + roomSessions.add(session); + chatRooms.put(roomId, roomSessions); + log.info("클라이언트 세션 {}가 룸 {}에 구독됨", session.getId(), roomId); + + // 구독 확인 메시지 전송 + Map subscribeResponse = new HashMap<>(); + subscribeResponse.put("type", "SUBSCRIBE_ACK"); + subscribeResponse.put("roomId", roomId); + subscribeResponse.put("timestamp", LocalDateTime.now().toString()); + subscribeResponse.put("message", "채팅방에 연결되었습니다."); + + session.sendMessage(new TextMessage(objectMapper.writeValueAsString(subscribeResponse))); + } else { + log.info("이미 구독 중인 세션: sessionId={}, roomId={}", session.getId(), roomId); + } + } catch (Exception e) { + log.error("구독 처리 중 오류: {}", e.getMessage(), e); + try { + sendErrorMessage(session, "구독 처리 중 오류: " + e.getMessage()); + } catch (IOException ex) { + log.error("오류 메시지 전송 실패: {}", ex.getMessage()); + } + } + } + + // 구독 해제 메시지 처리 + private void handleUnsubscribe(WebSocketSession session, JsonNode jsonNode) { + try { + if (!jsonNode.has("roomId")) { + sendErrorMessage(session, "roomId 필드가 누락되었습니다."); + return; + } + + Long roomId = jsonNode.get("roomId").asLong(); + + List roomSessions = chatRooms.get(roomId); + if (roomSessions != null) { + roomSessions.removeIf(existingSession -> existingSession.getId().equals(session.getId())); + log.info("클라이언트 세션 {}가 룸 {}에서 구독 해제됨", session.getId(), roomId); + + // 구독 해제가 성공적으로 처리되었음을 알리는 메시지 전송 + Map unsubscribeResponse = new HashMap<>(); + unsubscribeResponse.put("type", "UNSUBSCRIBE_ACK"); + unsubscribeResponse.put("roomId", roomId); + unsubscribeResponse.put("timestamp", LocalDateTime.now().toString()); + unsubscribeResponse.put("message", "채팅방에서 연결이 해제되었습니다."); + + session.sendMessage(new TextMessage(objectMapper.writeValueAsString(unsubscribeResponse))); + } else { + log.warn("존재하지 않는 채팅방 구독 해제 시도: roomId={}", roomId); + sendErrorMessage(session, "존재하지 않는 채팅방입니다: " + roomId); + } + } catch (Exception e) { + log.error("구독 해제 처리 중 오류: {}", e.getMessage(), e); + try { + sendErrorMessage(session, "구독 해제 처리 중 오류: " + e.getMessage()); + } catch (IOException ex) { + log.error("오류 메시지 전송 실패: {}", ex.getMessage()); + } + } + } + + // 채팅 메시지 처리 + private void handleChatMessage(WebSocketSession session, JsonNode jsonNode) { + try { + // 필수 필드 검증 + if (!jsonNode.has("roomId") || !jsonNode.has("message") || !jsonNode.has("userId")) { + sendErrorMessage(session, "필수 필드가 누락되었습니다. (roomId, message, userId 필요)"); + return; + } + + Long roomId = jsonNode.get("roomId").asLong(); + Long userId = jsonNode.get("userId").asLong(); + String message = jsonNode.get("message").asText(); + + if (message == null || message.trim().isEmpty()) { + sendErrorMessage(session, "메시지 내용이 비어있습니다."); + return; + } + + log.info("채팅 메시지: roomId={}, userId={}, message={}", roomId, userId, message); + + // 사용자 정보 검증 + User user = userRepository.findById(userId).orElse(null); + if (user == null) { + log.warn("존재하지 않는 사용자: userId={}", userId); + sendErrorMessage(session, "존재하지 않는 사용자입니다."); + return; + } + + // 메시지 저장 및 처리 + try { + ChatMsgRequest chatMsgRequest = new ChatMsgRequest(message); + ChatMsgResponse response = chatMessageService.sendMessage(chatMsgRequest, userId, roomId); + + // 메시지 포맷 변환하여 전송 + String messageJson = objectMapper.writeValueAsString(response); + + // 해당 채팅방의 모든 세션에 메시지 브로드캐스트 + broadcastMessageToRoom(roomId, messageJson); + } catch (Exception e) { + log.error("메시지 저장 처리 중 오류: {}", e.getMessage(), e); + sendErrorMessage(session, "메시지 전송 중 오류가 발생했습니다: " + e.getMessage()); + } + + } catch (Exception e) { + log.error("채팅 메시지 처리 중 오류: {}", e.getMessage(), e); + try { + sendErrorMessage(session, "메시지 전송 중 오류가 발생했습니다: " + e.getMessage()); + } catch (IOException ex) { + log.error("오류 메시지 전송 실패: {}", ex.getMessage()); + } + } + } + + // 특정 채팅방에 메시지 브로드캐스트 + private void broadcastMessageToRoom(Long roomId, String message) { List roomSessions = chatRooms.get(roomId); if (roomSessions != null) { - String payload = message.getPayload().toString(); - log.info("전송 메시지: " + payload); + List failedSessions = new ArrayList<>(); - for (WebSocketSession msg : roomSessions) { + for (WebSocketSession session : roomSessions) { try { - msg.sendMessage(message); + if (session.isOpen()) { + session.sendMessage(new TextMessage(message)); + } else { + failedSessions.add(session); + } } catch (IOException e) { - throw new CustomException(ErrorCode.CHAT_ERROR); + log.error("메시지 브로드캐스트 중 오류: {}", e.getMessage()); + failedSessions.add(session); } } + + // 실패한 세션 정리 + if (!failedSessions.isEmpty()) { + log.info("닫힌 세션 정리: {} 개의 세션 제거", failedSessions.size()); + roomSessions.removeAll(failedSessions); + } } else { - log.info("해당 채팅방에 클라이언트가 없습니다."); - throw new CustomException(ErrorCode.NOT_EXIST_CLIENT); + log.warn("존재하지 않는 채팅방에 메시지 전송 시도: roomId={}", roomId); } } - //오류 처리 로직을 구현 (네트워크 오류, 프로토콜 오류, 처리 오류... 생각 중) + // 오류 메시지 전송 + private void sendErrorMessage(WebSocketSession session, String errorMessage) throws IOException { + Map errorResponse = new HashMap<>(); + errorResponse.put("type", "ERROR"); + errorResponse.put("message", errorMessage); + errorResponse.put("timestamp", LocalDateTime.now().toString()); + + session.sendMessage(new TextMessage(objectMapper.writeValueAsString(errorResponse))); + } + + // 오류 처리 로직 @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - log.error(exception.getMessage()); + log.error("WebSocket 통신 오류: {}", exception.getMessage(), exception); } // 연결 종료되었을 때 @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { - Long roomId = extractRoomId(session); // 클라이언트가 속한 채팅방 ID를 추출 + log.info("클라이언트 접속 해제: {}, 상태코드: {}", session.getId(), closeStatus); - List roomSessions = chatRooms.get(roomId); - if (roomSessions != null) { - roomSessions.remove(session); + // 모든 채팅방에서 세션 제거 + for (Map.Entry> entry : chatRooms.entrySet()) { + Long roomId = entry.getKey(); + List roomSessions = entry.getValue(); + + boolean removed = roomSessions.removeIf(existingSession -> + existingSession.getId().equals(session.getId())); + + if (removed) { + log.info("세션이 채팅방 {}에서 제거됨: {}", roomId, session.getId()); + } } - log.info(session + "의 클라이언트 접속 해제"); } - //부분 메시지를 지원하는지 여부를 반환 (아직까지는 필요 없으니 false) - //대용량(사진이나 동영상 등)이 필요한 경우에는 따로 구현할 필요가 있음. + // 부분 메시지 지원 여부 @Override public boolean supportsPartialMessages() { return false; } - - private Long extractRoomId(WebSocketSession session) { - Long roomId = null; - String uri = Objects.requireNonNull(session.getUri()).toString(); - String[] uriParts = uri.split("/"); - // EX_URL) /chat/room/{roomId} 일 때 roomId 추출 - // 늘어난다면 수 변경해주면.. (일단 임시로 설정) -// if (uriParts.length >= 3 && uriParts[2].equals("room")) { -// roomId = Long.valueOf(uriParts[3]); - if (uriParts.length >= 4 && uriParts[2].equals("msg")) { - return Long.valueOf(uriParts[3]); - } - // /chat/room/join/{roomId}, /chat/room/out/{roomId}, /chat/room/delete/{roomId} 일 때 roomId 추출 - if (uriParts.length >= 5 && uriParts[2].equals("room") && - (uriParts[3].equals("join") || uriParts[3].equals("out") || uriParts[3].equals("delete"))) { - roomId = Long.valueOf(uriParts[4]); - } - return roomId; - } - //메세지 전송 - public void sendMessage(String payload) throws Exception { - for (List sessions : chatRooms.values()) { - for (WebSocketSession session : sessions) { - TextMessage msg = new TextMessage(payload); - session.sendMessage(msg); - } - } - } } \ No newline at end of file diff --git a/src/main/java/cmf/commitField/global/websocket/WebSocketConfig.java b/src/main/java/cmf/commitField/global/websocket/WebSocketConfig.java index b2ff398..8c052a0 100644 --- a/src/main/java/cmf/commitField/global/websocket/WebSocketConfig.java +++ b/src/main/java/cmf/commitField/global/websocket/WebSocketConfig.java @@ -12,9 +12,16 @@ public class WebSocketConfig implements WebSocketConfigurer { private final ChatWebSocketHandler chatWebSocketHandler; + private final NotiWebSocketHandler notiWebSocketHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { - registry.addHandler(chatWebSocketHandler, "/chat").setAllowedOrigins("*"); + // 채팅 웹소켓 핸들러 등록 + registry.addHandler(chatWebSocketHandler, "/chat-rooms") + .setAllowedOrigins("*"); // CORS 설정, 실제 환경에서는 보안을 위해 제한적으로 설정해야 함 + + // 알림 웹소켓 핸들러 등록 + registry.addHandler(notiWebSocketHandler, "/notifications") + .setAllowedOrigins("*"); } -} +} \ No newline at end of file