<a href="https://colab.research.google.com/github/Saaaaangmin/LLM/blob/main/LLM_LogAnal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Optional
import os
from dotenv import load_dotenv

# 환경 변수 불러오기
load_dotenv()

# 라우터 인스턴스 생성
router = APIRouter()

# 로그 한 줄을 나타내는 정규 표현식
log_pattern = re.compile(
    r"^\[(?P<발생시간>\d{8} \d{2}:\d{2}:\d{2}(?: \d{3})?)\] "  # 발생 시간
    r"\[(?P<Port>\d{6})\] "                                # Com Port
    r"\[(?P<유형>[^\]]+)\] "                                # 유형 또는 함수명
    r"\[(?P<에러메시지>.*)\]$"                               # 메시지
)

# 에러 메시지 매핑 (에러 키워드와 설명을 연결)
error_descriptions = {
    "-1 port open error!": "-1 Port open error = 시리얼의 Com Port가 오픈되지 않은 상태로, 케이블 상태 및 COM Port/Baudrate 값이 올바른지 확인이 필요합니다.",
    "-3 Ack error!": "-3 Ack Error = 시리얼 데이터 ACK 수신 오류이므로, 시리얼 연동에 문제가 없는지 확인이 필요합니다.",
    "-4 lrc, cancel!": "-4 lrc, cancel = 시리얼 통신 중 LRC 체크 에러가 발생하여 응답 데이터를 수신하지 못한 상황으로, 시리얼 통신이 정상적으로 되고 있는지 확인이 필요합니다.",
    "Timeout": "Timeout Error = 응답 시간이 초과되었습니다. 연결 상태를 확인하세요.",
}

# 추적할 함수명 목록
function_names = ["ReqToCat", "ReqStop", "SetBmpFile", "SetLogDir", "GetDecSignData", "GetDllVer"]

In [None]:
# 로그 분석 요청 및 응답 모델
class LogAnalysisRequest(BaseModel):
    log_date: str  # YYYYMMDD 형식으로 날짜를 받음

class LogAnalysisResponse(BaseModel):
    발생시간: str
    Port: str
    함수명: str
    에러메시지: str

# 로그 라인 파싱 함수
def parse_log_line(line: str) -> Optional[Dict[str, str]]:
    match = log_pattern.match(line)
    return match.groupdict() if match else None

# 로그 파일 분석 함수
def analyze_log(log_date: str) -> List[Dict[str, str]]:
    file_path = f"C:/NICElog/{log_date}.log"  # log_date를 사용해 경로 생성
    error_logs = []
    last_function_name = "Unknown"

    try:
        with open(file_path, 'r', encoding='cp949') as file:
            for line in file:
                parsed_line = parse_log_line(line.strip())

                if parsed_line:
                    # function_names 목록에 있는 함수명이 감지될 때 업데이트
                    if parsed_line['유형'].strip() in function_names:
                        last_function_name = parsed_line['유형'].strip()

                    # ERROR 메시지를 찾을 경우
                    if "ERROR" in parsed_line['유형']:
                        error_message = parsed_line['에러메시지']
                        detailed_message = error_descriptions.get(error_message, error_message)

                        error_logs.append({
                            "발생시간": parsed_line['발생시간'],
                            "Port": parsed_line['Port'],
                            "함수명": last_function_name,
                            "에러메시지": detailed_message
                        })

        if not error_logs:
            raise HTTPException(status_code=404, detail="No ERROR logs found")

        return error_logs
    except FileNotFoundError:
        raise HTTPException(status_code=404, detail=f"File not found: {file_path}")
    except PermissionError:
        raise HTTPException(status_code=403, detail=f"Permission denied for file: {file_path}")
    except UnicodeDecodeError:
        raise HTTPException(status_code=422, detail="Encoding error while reading the file")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")

# JSON 형식의 로그 파일 날짜를 받아 로그 분석 결과를 반환하는 API 엔드포인트
@router.post("/analyze_log", response_model=List[LogAnalysisResponse])
async def analyze_log_endpoint(request: LogAnalysisRequest):
    return analyze_log(request.log_date)