Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ChatMessageServiceImpl implements ChatMessageService {
private final UserChatRoomRepository userChatRoomRepository;

@Override
@Transactional
// @Transactional
public ChatMsgResponse sendMessage(ChatMsgRequest message, Long userId, Long roomId) {
User findUser = userRepository.findById(userId)
.orElseThrow(() -> new CustomException(ErrorCode.NOT_FOUND_USER));
Expand Down
288 changes: 231 additions & 57 deletions src/main/java/cmf/commitField/global/websocket/ChatWebSocketHandler.java
Original file line number Diff line number Diff line change
@@ -1,113 +1,287 @@
package cmf.commitField.global.websocket;

import cmf.commitField.domain.chat.chatMessage.controller.request.ChatMsgRequest;
import cmf.commitField.domain.chat.chatMessage.controller.response.ChatMsgResponse;
import cmf.commitField.domain.chat.chatMessage.service.ChatMessageService;
import cmf.commitField.domain.user.entity.User;
import cmf.commitField.domain.user.repository.UserRepository;
import cmf.commitField.global.error.ErrorCode;
import cmf.commitField.global.exception.CustomException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;

@Component
@Slf4j
@RequiredArgsConstructor
public class ChatWebSocketHandler implements WebSocketHandler {

private final Map<Long, List<WebSocketSession>> chatRooms = new HashMap<>();
// 방의 키값

private final ObjectMapper objectMapper = new ObjectMapper();
private final ChatMessageService chatMessageService;
private final UserRepository userRepository;

// 연결이 되었을 때
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
// list.add(session);
Long roomId = extractRoomId(session);
// roomId 가 없을 경우, session list (new ArrayList)
List<WebSocketSession> roomSessions = chatRooms.getOrDefault(roomId, new ArrayList<>());
// 세션 추가
roomSessions.add(session);
// 해당 방의 키값에 session list 추가
chatRooms.put(roomId, roomSessions);
log.info(session + "의 클라이언트 접속");
log.info("클라이언트 접속: {}", session.getId());

// 연결 성공 메시지 전송
Map<String, Object> connectMessage = new HashMap<>();
connectMessage.put("type", "SYSTEM");
connectMessage.put("message", "채팅 서버에 연결되었습니다.");
connectMessage.put("timestamp", LocalDateTime.now().toString());

try {
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(connectMessage)));
} catch (Exception e) {
log.error("연결 메시지 전송 실패: {}", e.getMessage());
}
}

// 클라이언트로부터 받은 메시지를 처리하는 로직
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message)
throws Exception {
// 메시지 처리 로직
Long roomId = extractRoomId(session);
String payload = message.getPayload().toString();
log.info("메시지 수신: {}", payload);

try {
JsonNode jsonNode = objectMapper.readTree(payload);
String messageType = jsonNode.has("type") ? jsonNode.get("type").asText() : "UNKNOWN";

switch (messageType) {
case "SUBSCRIBE":
handleSubscribe(session, jsonNode);
break;
case "UNSUBSCRIBE":
handleUnsubscribe(session, jsonNode);
break;
case "CHAT":
handleChatMessage(session, jsonNode);
break;
default:
log.warn("알 수 없는 메시지 타입: {}", messageType);
sendErrorMessage(session, "지원하지 않는 메시지 타입입니다: " + messageType);
}
} catch (Exception e) {
log.error("메시지 처리 중 오류 발생: {}", e.getMessage(), e);
// 오류 메시지 전송
sendErrorMessage(session, "메시지 처리 중 오류가 발생했습니다: " + e.getMessage());
}
}

// 구독 메시지 처리
private void handleSubscribe(WebSocketSession session, JsonNode jsonNode) {
try {
if (!jsonNode.has("roomId")) {
sendErrorMessage(session, "roomId 필드가 누락되었습니다.");
return;
}

Long roomId = jsonNode.get("roomId").asLong();
log.info("채팅방 구독 요청: roomId={}, sessionId={}", roomId, session.getId());

// 해당 룸의 세션 목록에 추가
List<WebSocketSession> roomSessions = chatRooms.getOrDefault(roomId, new ArrayList<>());

// 이미 등록된 세션인지 확인하여 중복 등록 방지
boolean alreadyRegistered = roomSessions.stream()
.anyMatch(existingSession -> existingSession.getId().equals(session.getId()));

if (!alreadyRegistered) {
roomSessions.add(session);
chatRooms.put(roomId, roomSessions);
log.info("클라이언트 세션 {}가 룸 {}에 구독됨", session.getId(), roomId);

// 구독 확인 메시지 전송
Map<String, Object> subscribeResponse = new HashMap<>();
subscribeResponse.put("type", "SUBSCRIBE_ACK");
subscribeResponse.put("roomId", roomId);
subscribeResponse.put("timestamp", LocalDateTime.now().toString());
subscribeResponse.put("message", "채팅방에 연결되었습니다.");

session.sendMessage(new TextMessage(objectMapper.writeValueAsString(subscribeResponse)));
} else {
log.info("이미 구독 중인 세션: sessionId={}, roomId={}", session.getId(), roomId);
}
} catch (Exception e) {
log.error("구독 처리 중 오류: {}", e.getMessage(), e);
try {
sendErrorMessage(session, "구독 처리 중 오류: " + e.getMessage());
} catch (IOException ex) {
log.error("오류 메시지 전송 실패: {}", ex.getMessage());
}
}
}

// 구독 해제 메시지 처리
private void handleUnsubscribe(WebSocketSession session, JsonNode jsonNode) {
try {
if (!jsonNode.has("roomId")) {
sendErrorMessage(session, "roomId 필드가 누락되었습니다.");
return;
}

Long roomId = jsonNode.get("roomId").asLong();

List<WebSocketSession> roomSessions = chatRooms.get(roomId);
if (roomSessions != null) {
roomSessions.removeIf(existingSession -> existingSession.getId().equals(session.getId()));
log.info("클라이언트 세션 {}가 룸 {}에서 구독 해제됨", session.getId(), roomId);

// 구독 해제가 성공적으로 처리되었음을 알리는 메시지 전송
Map<String, Object> unsubscribeResponse = new HashMap<>();
unsubscribeResponse.put("type", "UNSUBSCRIBE_ACK");
unsubscribeResponse.put("roomId", roomId);
unsubscribeResponse.put("timestamp", LocalDateTime.now().toString());
unsubscribeResponse.put("message", "채팅방에서 연결이 해제되었습니다.");

session.sendMessage(new TextMessage(objectMapper.writeValueAsString(unsubscribeResponse)));
} else {
log.warn("존재하지 않는 채팅방 구독 해제 시도: roomId={}", roomId);
sendErrorMessage(session, "존재하지 않는 채팅방입니다: " + roomId);
}
} catch (Exception e) {
log.error("구독 해제 처리 중 오류: {}", e.getMessage(), e);
try {
sendErrorMessage(session, "구독 해제 처리 중 오류: " + e.getMessage());
} catch (IOException ex) {
log.error("오류 메시지 전송 실패: {}", ex.getMessage());
}
}
}

// 채팅 메시지 처리
private void handleChatMessage(WebSocketSession session, JsonNode jsonNode) {
try {
// 필수 필드 검증
if (!jsonNode.has("roomId") || !jsonNode.has("message") || !jsonNode.has("userId")) {
sendErrorMessage(session, "필수 필드가 누락되었습니다. (roomId, message, userId 필요)");
return;
}

Long roomId = jsonNode.get("roomId").asLong();
Long userId = jsonNode.get("userId").asLong();
String message = jsonNode.get("message").asText();

if (message == null || message.trim().isEmpty()) {
sendErrorMessage(session, "메시지 내용이 비어있습니다.");
return;
}

log.info("채팅 메시지: roomId={}, userId={}, message={}", roomId, userId, message);

// 사용자 정보 검증
User user = userRepository.findById(userId).orElse(null);
if (user == null) {
log.warn("존재하지 않는 사용자: userId={}", userId);
sendErrorMessage(session, "존재하지 않는 사용자입니다.");
return;
}

// 메시지 저장 및 처리
try {
ChatMsgRequest chatMsgRequest = new ChatMsgRequest(message);
ChatMsgResponse response = chatMessageService.sendMessage(chatMsgRequest, userId, roomId);

// 메시지 포맷 변환하여 전송
String messageJson = objectMapper.writeValueAsString(response);

// 해당 채팅방의 모든 세션에 메시지 브로드캐스트
broadcastMessageToRoom(roomId, messageJson);
} catch (Exception e) {
log.error("메시지 저장 처리 중 오류: {}", e.getMessage(), e);
sendErrorMessage(session, "메시지 전송 중 오류가 발생했습니다: " + e.getMessage());
}

} catch (Exception e) {
log.error("채팅 메시지 처리 중 오류: {}", e.getMessage(), e);
try {
sendErrorMessage(session, "메시지 전송 중 오류가 발생했습니다: " + e.getMessage());
} catch (IOException ex) {
log.error("오류 메시지 전송 실패: {}", ex.getMessage());
}
}
}

// 특정 채팅방에 메시지 브로드캐스트
private void broadcastMessageToRoom(Long roomId, String message) {
List<WebSocketSession> roomSessions = chatRooms.get(roomId);
if (roomSessions != null) {
String payload = message.getPayload().toString();
log.info("전송 메시지: " + payload);
List<WebSocketSession> failedSessions = new ArrayList<>();

for (WebSocketSession msg : roomSessions) {
for (WebSocketSession session : roomSessions) {
try {
msg.sendMessage(message);
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
} else {
failedSessions.add(session);
}
} catch (IOException e) {
throw new CustomException(ErrorCode.CHAT_ERROR);
log.error("메시지 브로드캐스트 중 오류: {}", e.getMessage());
failedSessions.add(session);
}
}

// 실패한 세션 정리
if (!failedSessions.isEmpty()) {
log.info("닫힌 세션 정리: {} 개의 세션 제거", failedSessions.size());
roomSessions.removeAll(failedSessions);
}
} else {
log.info("해당 채팅방에 클라이언트가 없습니다.");
throw new CustomException(ErrorCode.NOT_EXIST_CLIENT);
log.warn("존재하지 않는 채팅방에 메시지 전송 시도: roomId={}", roomId);
}
}

//오류 처리 로직을 구현 (네트워크 오류, 프로토콜 오류, 처리 오류... 생각 중)
// 오류 메시지 전송
private void sendErrorMessage(WebSocketSession session, String errorMessage) throws IOException {
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("type", "ERROR");
errorResponse.put("message", errorMessage);
errorResponse.put("timestamp", LocalDateTime.now().toString());

session.sendMessage(new TextMessage(objectMapper.writeValueAsString(errorResponse)));
}

// 오류 처리 로직
@Override
public void handleTransportError(WebSocketSession session, Throwable exception)
throws Exception {
log.error(exception.getMessage());
log.error("WebSocket 통신 오류: {}", exception.getMessage(), exception);
}

// 연결 종료되었을 때
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus)
throws Exception {
Long roomId = extractRoomId(session); // 클라이언트가 속한 채팅방 ID를 추출
log.info("클라이언트 접속 해제: {}, 상태코드: {}", session.getId(), closeStatus);

List<WebSocketSession> roomSessions = chatRooms.get(roomId);
if (roomSessions != null) {
roomSessions.remove(session);
// 모든 채팅방에서 세션 제거
for (Map.Entry<Long, List<WebSocketSession>> entry : chatRooms.entrySet()) {
Long roomId = entry.getKey();
List<WebSocketSession> roomSessions = entry.getValue();

boolean removed = roomSessions.removeIf(existingSession ->
existingSession.getId().equals(session.getId()));

if (removed) {
log.info("세션이 채팅방 {}에서 제거됨: {}", roomId, session.getId());
}
}
log.info(session + "의 클라이언트 접속 해제");
}

//부분 메시지를 지원하는지 여부를 반환 (아직까지는 필요 없으니 false)
//대용량(사진이나 동영상 등)이 필요한 경우에는 따로 구현할 필요가 있음.
// 부분 메시지 지원 여부
@Override
public boolean supportsPartialMessages() {
return false;
}

private Long extractRoomId(WebSocketSession session) {
Long roomId = null;
String uri = Objects.requireNonNull(session.getUri()).toString();
String[] uriParts = uri.split("/");
// EX_URL) /chat/room/{roomId} 일 때 roomId 추출
// 늘어난다면 수 변경해주면.. (일단 임시로 설정)
// if (uriParts.length >= 3 && uriParts[2].equals("room")) {
// roomId = Long.valueOf(uriParts[3]);
if (uriParts.length >= 4 && uriParts[2].equals("msg")) {
return Long.valueOf(uriParts[3]);
}
// /chat/room/join/{roomId}, /chat/room/out/{roomId}, /chat/room/delete/{roomId} 일 때 roomId 추출
if (uriParts.length >= 5 && uriParts[2].equals("room") &&
(uriParts[3].equals("join") || uriParts[3].equals("out") || uriParts[3].equals("delete"))) {
roomId = Long.valueOf(uriParts[4]);
}
return roomId;
}
//메세지 전송
public void sendMessage(String payload) throws Exception {
for (List<WebSocketSession> sessions : chatRooms.values()) {
for (WebSocketSession session : sessions) {
TextMessage msg = new TextMessage(payload);
session.sendMessage(msg);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@
public class WebSocketConfig implements WebSocketConfigurer {

private final ChatWebSocketHandler chatWebSocketHandler;
private final NotiWebSocketHandler notiWebSocketHandler;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatWebSocketHandler, "/chat").setAllowedOrigins("*");
// 채팅 웹소켓 핸들러 등록
registry.addHandler(chatWebSocketHandler, "/chat-rooms")
.setAllowedOrigins("*"); // CORS 설정, 실제 환경에서는 보안을 위해 제한적으로 설정해야 함

// 알림 웹소켓 핸들러 등록
registry.addHandler(notiWebSocketHandler, "/notifications")
.setAllowedOrigins("*");
}
}
}