In [0]:
from datetime import datetime
import requests, json
import pandas as pd
from pyspark.sql import Row
import psutil
import mlflow
from mlflow.tracking import MlflowClient
import requests
import json
from datetime import datetime, timezone, timedelta
from pyspark.sql import Row
import pytz

## 테이블 생성

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType

schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("endpoint", StringType(), True),
    StructField("model_version", StringType(), True),
    StructField("test_case", StringType(), True),
    StructField("status_code", LongType(), True),
    StructField("latency_ms", DoubleType(), True),
    StructField("success", LongType(), True),
    StructField("error_category", StringType(), True),
    StructField("current_state", StringType(), True),
    StructField("usage_count", LongType(), True),
    StructField("cpu_usage_percent", DoubleType(), True),
    StructField("success_rate", DoubleType(), True),
    StructField("error_rate", DoubleType(), True)
])

# 빈 DataFrame 생성 후 Delta 테이블 저장 (append)
spark.createDataFrame([], schema) \
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("monitoring.endpoint_logs")


## API호출

In [0]:

model_name = "realtime-realScore-model"

def get_latest_model_version(model_name="realtime-realScore-model"):
    """MLflow Model Registry에서 최신 버전을 가져오는 함수"""
    try:
        client = MlflowClient()
        # 새로운 방식: get_registered_model 사용
        model = client.get_registered_model(model_name)
        versions = client.search_model_versions(f"name='{model_name}'")
        if versions:
            return str(max([int(v.version) for v in versions]))
    except Exception as e:
        print(f"최신 버전을 가져올 수 없음: {e}")
    
    return "unknown"

In [0]:
def run_test_case(df_case, case_name="manual_test", version=None, model_name="realtime-realScore-model"):
    if version is None:
        version = get_latest_model_version(model_name)
        print(f"최신 버전 사용: {version}")
    else:
        print(f"지정된 버전 사용: {version}")

    endpoint_url = "https://adb-692732728996904.4.azuredatabricks.net/serving-endpoints/realtime-realscore-inference/invocations"
    token = "dapibcfdc1845089d8844a78fe436ec79c4b-3"  # 실제 토큰으로 교체 필요
    endpoint_name = "realtime-realscore-inference"

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    # 간단한 해결책: pandas DataFrame을 그대로 사용하고 PySpark는 로깅에만 사용
    payload = {
        "dataframe_split": {
            "columns": df_case.columns.tolist(),
            "data": df_case.values.tolist()
        }
    }

    try:
        # 요청 전 CPU 사용량 측정
        cpu_percent_before = psutil.cpu_percent(interval=None)
        
        response = requests.post(endpoint_url, headers=headers, data=json.dumps(payload))
        
        # 요청 후 CPU 평균 사용량 측정 (0.4초)
        cpu_percent_after = psutil.cpu_percent(interval=0.4)
        cpu_percent = cpu_percent_after

        latency = response.elapsed.total_seconds() * 1000
        status = response.status_code

        # 한국시간(KST)
        kst = pytz.timezone('Asia/Seoul')
        kst_time = datetime.now(pytz.utc).astimezone(kst)
        kst_time = kst_time.replace(second=0, microsecond=0, tzinfo=None)

        # 에러 코드 값만
        error_value = str(status) if status != 200 else ""

        # 성공률 / 오류율
        total_count = len(df_case)
        success_count = 1 if status == 200 else 0
        error_count = total_count - success_count
        success_rate = success_count / total_count if total_count > 0 else 0
        error_rate = error_count / total_count if total_count > 0 else 0

        # 작동 상태
        if status == 200:
            current_state = "정상"
        elif status == 500:
            current_state = "오류"
        else:
            current_state = "정지"

        log_data = [{
            "timestamp": kst_time,
            "endpoint": endpoint_name,
            "model_version": version,
            "test_case": case_name,
            "status_code": int(status),
            "latency_ms": float(latency),
            "success": success_count,
            "error_category": error_value,
            "current_state": current_state,
            "usage_count": total_count,
            "cpu_usage_percent": float(cpu_percent),
            "success_rate": float(success_rate),
            "error_rate": float(error_rate)
        }]

        # Delta Table에 기록
        spark.createDataFrame(log_data).write.format("delta").mode("append").saveAsTable("monitoring.endpoint_logs")

        # 결과 반환
        if status == 200:
            return response.json()
        else:
            return {"error": status, "message": response.text}

    except Exception as e:
        return {"error": "exception", "message": str(e)}

##정상 데이터

In [0]:
df_success = pd.DataFrame([
    {"learnerID": "A070000001", "testID": "T001", "assessmentItemID": "Q001",
     "grade": 7, "gender": "F", "discriminationLevel": 1.2, 
     "difficultyLevel": 0.5, "guessLevel": 0.1, "is_correct": 1},
    {"learnerID": "A070000001", "testID": "T001", "assessmentItemID": "Q002",
     "grade": 7, "gender": "F", "discriminationLevel": 0.8, 
     "difficultyLevel": -0.2, "guessLevel": 0.15, "is_correct": 0}
])
result_success = run_test_case(df_success, case_name="success_case", version="7")

In [0]:
df_input = pd.DataFrame([
    {"learnerID": "A070000014", "testID": "A070000001", "assessmentItemID": "A070001001", 
     "grade": 7, "gender": "F", "discriminationLevel": 1.2, "difficultyLevel": 0.5, 
     "guessLevel": 0.1, "is_correct": 1},
    {"learnerID": "A070000014", "testID": "A070000001", "assessmentItemID": "A070001002", 
     "grade": 7, "gender": "F", "discriminationLevel": 0.8, "difficultyLevel": -0.2, 
     "guessLevel": 0.15, "is_correct": 0},
    {"learnerID": "A070000014", "testID": "A070000001", "assessmentItemID": "A070001003", 
     "grade": 7, "gender": "F", "discriminationLevel": 1.5, "difficultyLevel": 1.1, 
     "guessLevel": 0.2, "is_correct": 1}
])

# 테스트 실행
result_success = run_test_case(df_input, case_name="success_case")

## 400 오류

컬럼 누락

In [0]:
import pandas as pd

# 'is_correct' 컬럼을 누락
df_missing_cols = pd.DataFrame([
    {"learnerID": "A070000002", "testID": "T002", "assessmentItemID": "Q003",
     "grade": 6, "gender": "M", "difficultyLevel": 0.3}  # 'is_correct', 'discriminationLevel', 'guessLevel' 누락
])

result_fail = run_test_case(df_missing_cols, case_name="fail_400")

In [0]:
df_type_error = pd.DataFrame([
    {"learnerID": "A070000003", "testID": "T003", "assessmentItemID": "Q004",
     "grade": "seven",  # 숫자 대신 문자열
     "gender": "F", "discriminationLevel": 1.0,
     "difficultyLevel": 0.5, "guessLevel": 0.1, "is_correct": 1}
])
result_fail = run_test_case(df_type_error, case_name="fail_422", version="7")

In [0]:
# grade는 숫자여야 하는데 문자열로 입력
df_fail_500 = pd.DataFrame([
    {"learnerID": "A070000100", "testID": "A070000002", "assessmentItemID": "A070001010",
     "grade": "일곱", "gender": "F", "discriminationLevel": 1.2, "difficultyLevel": 0.3,
     "guessLevel": "A", "is_correct": 1}
])
result_fail2 = run_test_case(df_fail_500, case_name="fail_500", version="7")

In [0]:
df_internal_error = pd.DataFrame([
    {"learnerID": "A070000004", "testID": "T004", "assessmentItemID": "Q005",
     "grade": 7, "gender": "F", "discriminationLevel": 0,
     "difficultyLevel": 0, "guessLevel": 0, "is_correct": 1}
])
result_fail3 = run_test_case(df_internal_error, case_name="fail_500", version="8")

## 논리적오류

In [0]:
# difficultyLevel은 -1.0 ~ +1.0 범위라 가정했는데, 비정상적으로 큰 값 입력
df_fail_logic = pd.DataFrame([
    {"learnerID": "A070000101", "testID": "A070000003", "assessmentItemID": "A070001011",
     "grade": 8, "gender": "M", "discriminationLevel": 0.9, "difficultyLevel": 999,
     "guessLevel": 0.5, "is_correct": 0}
])
result_fail3 = run_test_case(df_fail_logic, case_name="fail_logic", version="6")

## 잘못된 ID 포맷

In [0]:
# learnerID/testID가 규칙에 맞지 않음
df_fail_id = pd.DataFrame([
    {"learnerID": "INVALID_ID", "testID": "123", "assessmentItemID": "??",
     "grade": 6, "gender": "F", "discriminationLevel": 0.7, "difficultyLevel": 0.2,
     "guessLevel": 0.1, "is_correct": 1}
])
result_fail4 = run_test_case(df_fail_id, case_name="fail_400", version="8")