# DB Activities 시간순 정렬 스크립트

## 목적
- `any_approval.documents` 테이블의 `activities` 컬럼을 `actionDate` 기준 오름차순 정렬
- **원본 문자열 그대로 보존** (공백, 키 순서 등 변경 없음)
- 순서만 변경, 내용은 절대 변경하지 않음

## 핵심 원칙
- `json.dumps()` 사용하지 않음 (공백/키순서 변경 방지)
- 각 activity 객체의 원본 문자열을 추출하여 재배열

In [None]:
import json
import pymysql
from datetime import datetime, timezone, timedelta
from typing import List, Tuple, Optional

## 0. DB 연결 설정

In [None]:
# ========== DB 연결 설정 ==========
DB_CONFIG = {
    'host': 'localhost',
    'user': 'your_user',
    'password': 'your_password',
    'database': 'any_approval',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

# 배치 설정
BATCH_SIZE = 1000

# 한국 시간대 (로그 출력용)
KST = timezone(timedelta(hours=9))

print("설정 완료")
print(f"  - 배치 크기: {BATCH_SIZE}")

## 1. 원본 문자열 보존 파싱 함수

**핵심**: `json.dumps()` 없이 원본 문자열 그대로 추출하여 재배열

In [None]:
def extract_activity_strings(activities_str: str) -> List[Tuple[str, int]]:
    """
    activities JSON 배열에서 각 객체의 원본 문자열과 actionDate 추출
    
    json.dumps() 사용하지 않고 문자열 파싱으로 원본 그대로 추출
    
    Returns: [(원본문자열, actionDate), ...]
    """
    activities_str = activities_str.strip()
    
    # 빈 배열 체크
    if activities_str == '[]':
        return []
    
    # 바깥 대괄호 제거
    if not (activities_str.startswith('[') and activities_str.endswith(']')):
        raise ValueError("Invalid JSON array format")
    
    inner = activities_str[1:-1].strip()
    if not inner:
        return []
    
    # 각 객체의 시작/끝 위치 찾기
    results = []
    i = 0
    
    while i < len(inner):
        # 공백 스킵
        while i < len(inner) and inner[i] in ' \t\n\r':
            i += 1
        
        if i >= len(inner):
            break
        
        # 콤마 스킵
        if inner[i] == ',':
            i += 1
            continue
        
        # 객체 시작
        if inner[i] != '{':
            i += 1
            continue
        
        obj_start = i
        
        # 매칭되는 '}' 찾기 (중괄호 깊이 추적)
        depth = 1
        i += 1
        in_string = False
        escape = False
        
        while i < len(inner) and depth > 0:
            char = inner[i]
            
            if escape:
                escape = False
            elif char == '\\':
                escape = True
            elif char == '"' and not escape:
                in_string = not in_string
            elif not in_string:
                if char == '{':
                    depth += 1
                elif char == '}':
                    depth -= 1
            
            i += 1
        
        if depth == 0:
            obj_str = inner[obj_start:i]
            
            # actionDate 추출 (파싱용으로만 json.loads 사용)
            obj = json.loads(obj_str)
            action_date = obj.get('actionDate', 0)
            
            results.append((obj_str, action_date))
    
    return results


def sort_activities_preserve_original(activities_str: str) -> Tuple[str, bool, List[int], List[int]]:
    """
    activities를 actionDate 기준 오름차순 정렬 (원본 문자열 보존)
    
    Returns: (정렬된_문자열, 순서변경여부, 원본순서, 새순서)
    """
    if not activities_str or activities_str.strip() in ('', '[]', 'null', 'NULL'):
        return activities_str, False, [], []
    
    try:
        items = extract_activity_strings(activities_str)
    except Exception as e:
        # 파싱 실패 시 원본 반환
        return activities_str, False, [], []
    
    if len(items) <= 1:
        # 0개 또는 1개면 정렬 불필요
        return activities_str, False, [], []
    
    # 원본 순서 (actionDate 리스트)
    original_order = [item[1] for item in items]
    
    # actionDate 기준 오름차순 정렬
    sorted_items = sorted(items, key=lambda x: x[1])
    
    # 새 순서
    new_order = [item[1] for item in sorted_items]
    
    # 순서가 바뀌었는지 확인
    if original_order == new_order:
        return activities_str, False, original_order, new_order
    
    # 원본 문자열들을 새 순서로 조합 (json.dumps 사용 안함!)
    sorted_str = '[' + ','.join(item[0] for item in sorted_items) + ']'
    
    return sorted_str, True, original_order, new_order


print("함수 정의 완료")

## 2. 검증 함수

In [None]:
def verify_activities_integrity(original_str: str, sorted_str: str) -> Tuple[bool, str]:
    """
    정렬 전후 activities 무결성 검증
    
    검증 항목:
    1. 개수 동일
    2. 각 activity 객체 내용 동일 (순서만 다름)
    
    Returns: (검증통과여부, 오류메시지)
    """
    try:
        orig_list = json.loads(original_str) if original_str else []
        sorted_list = json.loads(sorted_str) if sorted_str else []
    except json.JSONDecodeError as e:
        return False, f"JSON 파싱 오류: {e}"
    
    # 1. 개수 검증
    if len(orig_list) != len(sorted_list):
        return False, f"개수 불일치: 원본 {len(orig_list)} vs 정렬 {len(sorted_list)}"
    
    # 2. 내용 검증 (set 비교 - 순서 무관)
    # 각 객체를 정렬된 JSON 문자열로 변환하여 비교
    orig_set = set(json.dumps(a, sort_keys=True, ensure_ascii=False) for a in orig_list)
    sorted_set = set(json.dumps(a, sort_keys=True, ensure_ascii=False) for a in sorted_list)
    
    if orig_set != sorted_set:
        return False, "내용 불일치: 순서 외 다른 변경 발생"
    
    # 3. 정렬 확인 (actionDate 오름차순)
    dates = [a.get('actionDate', 0) for a in sorted_list]
    if dates != sorted(dates):
        return False, "정렬 오류: actionDate 오름차순이 아님"
    
    return True, "OK"


print("검증 함수 정의 완료")

## 3. 테스트 (예시 데이터)

In [None]:
# 제공해주신 예시 데이터로 테스트
test_data = '[{"positionName": "과장", "deptName": "S/W개발팀", "actionLogType": "DRAFT", "name": "이진근", "emailId": "temp246", "type": "DRAFT", "actionDate": 1276488864000, "deptCode": "", "actionComment": "출장비품의 요청합니다."}, {"positionName": "상무", "deptName": "DX사업본부", "actionLogType": "APPROVAL", "name": "박정균", "emailId": "jkpark", "type": "APPROVAL", "actionDate": 1276559862000, "deptCode": "DB00", "actionComment": "이진근D는 필요한 증빙은 정주연D에게 별도로 제공하기 바랍니다. \\n\\n정주연D는 출장비품의 처리기간을 검토해 주기 바랍니다. \\n제가 보기에 출장품의는 출장전 또는 완료 후 1주일내에는 품의 정산/완료되어야 한다고 봅니다. \\n\\n이렇게 아무때나 품의/정산요청하는것은 바람직하지 않다고 봅니다. (품의기간내 특별한 사유없이 미품의한 것은  정산안해준다든지.. 하는 뭔가 기한에 대한 정책과 패널티가 필요할 것 같습니다.)"}, {"positionName": "이사", "deptName": "경영기획팀", "actionLogType": "AGREEMENT", "name": "정주연", "emailId": "jyjung", "type": "AGREEMENT", "actionDate": 1276584680000, "deptCode": "AB30", "actionComment": "출장비 품의/정산 기한에 대하여 프로세스 가이드에 관련 내용을 추가하여 공지하도록 하겠습니다."}]'

print("=" * 60)
print("테스트: 예시 데이터")
print("=" * 60)

# 정렬 수행
sorted_str, was_sorted, orig_order, new_order = sort_activities_preserve_original(test_data)

print(f"\n순서 변경 여부: {was_sorted}")

def ts_to_kst(ts):
    """타임스탬프를 한국시간 문자열로 변환"""
    dt = datetime.fromtimestamp(ts / 1000, tz=KST)
    return dt.strftime('%Y-%m-%d %H:%M:%S')

if orig_order:
    print(f"원본 순서: {[ts_to_kst(ts) for ts in orig_order]}")
    print(f"정렬 순서: {[ts_to_kst(ts) for ts in new_order]}")

# 검증
is_valid, msg = verify_activities_integrity(test_data, sorted_str)
print(f"\n검증 결과: {'✅ 통과' if is_valid else '❌ 실패'} - {msg}")

# 이 데이터는 이미 정렬되어 있으므로 was_sorted=False 예상

In [None]:
# 정렬이 필요한 테스트 데이터
test_unsorted = '[{"name":"B","actionDate":300},{"name":"A","actionDate":100},{"name":"C","actionDate":200}]'

print("=" * 60)
print("테스트: 정렬 필요한 데이터")
print("=" * 60)

print(f"\n원본: {test_unsorted}")

sorted_str, was_sorted, orig_order, new_order = sort_activities_preserve_original(test_unsorted)

print(f"정렬: {sorted_str}")
print(f"\n순서 변경됨: {was_sorted}")
print(f"원본 actionDate: {orig_order}")
print(f"정렬 actionDate: {new_order}")

# 검증
is_valid, msg = verify_activities_integrity(test_unsorted, sorted_str)
print(f"\n검증 결과: {'✅ 통과' if is_valid else '❌ 실패'} - {msg}")

In [None]:
# 공백 보존 테스트
test_with_spaces = '[{"name": "B", "actionDate": 300}, {"name": "A", "actionDate": 100}]'

print("=" * 60)
print("테스트: 공백 보존 확인")
print("=" * 60)

print(f"\n원본: {test_with_spaces}")
print(f"       ^-- 콜론 뒤 공백 있음")

sorted_str, was_sorted, _, _ = sort_activities_preserve_original(test_with_spaces)

print(f"\n정렬: {sorted_str}")

# 각 객체 내부의 공백이 보존되었는지 확인
if '"name": "A"' in sorted_str and '"name": "B"' in sorted_str:
    print("\n✅ 공백 보존됨!")
else:
    print("\n❌ 공백 변경됨!")

## 4. DB 처리 함수

In [None]:
def process_batch(connection, offset: int, batch_size: int, stats: dict, dry_run: bool = False):
    """
    배치 단위로 처리
    
    Args:
        connection: DB 연결
        offset: 시작 위치
        batch_size: 배치 크기
        stats: 통계 딕셔너리
        dry_run: True면 UPDATE 실행 안함
    
    Returns: 처리된 row 수
    """
    with connection.cursor() as cursor:
        # SELECT
        cursor.execute("""
            SELECT id, activities 
            FROM documents 
            ORDER BY id 
            LIMIT %s OFFSET %s
        """, (batch_size, offset))
        
        rows = cursor.fetchall()
        
        if not rows:
            return 0
        
        updates = []  # (id, new_activities)
        
        for row in rows:
            doc_id = row['id']
            activities = row['activities']
            
            stats['total'] += 1
            
            # activities가 없거나 비어있으면 스킵
            if not activities or activities.strip() in ('', '[]', 'null', 'NULL'):
                stats['skipped_empty'] += 1
                continue
            
            # 정렬 수행
            sorted_str, was_sorted, orig_order, new_order = sort_activities_preserve_original(activities)
            
            if not was_sorted:
                stats['already_sorted'] += 1
                continue
            
            # 검증
            is_valid, error_msg = verify_activities_integrity(activities, sorted_str)
            
            if not is_valid:
                stats['verification_failed'].append({
                    'id': doc_id,
                    'error': error_msg
                })
                continue
            
            # UPDATE 대상에 추가
            updates.append((doc_id, sorted_str))
            stats['to_update'] += 1
            
            # 샘플 저장 (최대 10개)
            if len(stats['samples']) < 10:
                stats['samples'].append({
                    'id': doc_id,
                    'original_order': orig_order,
                    'new_order': new_order
                })
        
        # UPDATE 실행
        if updates and not dry_run:
            for doc_id, new_activities in updates:
                cursor.execute("""
                    UPDATE documents 
                    SET activities = %s 
                    WHERE id = %s
                """, (new_activities, doc_id))
            
            connection.commit()
            stats['updated'] += len(updates)
        
        return len(rows)


print("DB 처리 함수 정의 완료")

## 5. 메인 실행 (Dry Run - 미리보기)

In [None]:
# ========== DRY RUN (미리보기) ==========
# 실제 UPDATE 없이 몇 건이 변경 대상인지 확인

print("=" * 60)
print("DRY RUN 모드 - 실제 UPDATE 없이 분석만 수행")
print("=" * 60)

stats = {
    'total': 0,
    'skipped_empty': 0,
    'already_sorted': 0,
    'to_update': 0,
    'updated': 0,
    'verification_failed': [],
    'samples': []
}

try:
    connection = pymysql.connect(**DB_CONFIG)
    print("\nDB 연결 성공")
    
    offset = 0
    while True:
        processed = process_batch(connection, offset, BATCH_SIZE, stats, dry_run=True)
        
        if processed == 0:
            break
        
        offset += BATCH_SIZE
        
        # 진행상황 출력 (5000건마다)
        if stats['total'] % 5000 == 0:
            print(f"  처리 중... {stats['total']:,}건")
    
    print(f"\n처리 완료: 총 {stats['total']:,}건")
    
except Exception as e:
    print(f"오류 발생: {e}")
    raise
finally:
    if 'connection' in dir() and connection:
        connection.close()
        print("DB 연결 종료")

In [None]:
# DRY RUN 결과 리포트
print("=" * 60)
print("DRY RUN 결과 리포트")
print("=" * 60)

print(f"\n[처리 통계]")
print(f"  총 row 수: {stats['total']:,}건")
print(f"  activities 없음/비어있음: {stats['skipped_empty']:,}건")
print(f"  이미 정렬됨: {stats['already_sorted']:,}건")
print(f"  ⭐ 정렬 필요 (UPDATE 대상): {stats['to_update']:,}건")

print(f"\n[검증 결과]")
if stats['verification_failed']:
    print(f"  ❌ 검증 실패: {len(stats['verification_failed'])}건")
    for item in stats['verification_failed'][:5]:
        print(f"     - id={item['id']}: {item['error']}")
else:
    print(f"  ✅ 모든 검증 통과")

print(f"\n[정렬 예시] (최대 10건)")
for sample in stats['samples']:
    print(f"\n  id={sample['id']}")
    print(f"    변경 전: {[ts_to_kst(ts) for ts in sample['original_order']]}")
    print(f"    변경 후: {[ts_to_kst(ts) for ts in sample['new_order']]}")

## 6. 실제 UPDATE 실행

⚠️ **주의**: 아래 셀은 실제로 DB를 수정합니다!

In [None]:
# ========== 실제 UPDATE 실행 ==========
# ⚠️ 이 셀을 실행하면 DB가 실제로 수정됩니다!

CONFIRM = False  # True로 변경하면 실행됨

if not CONFIRM:
    print("⚠️ CONFIRM = True 로 변경 후 실행하세요.")
    print("   실제 DB UPDATE가 수행됩니다.")
else:
    print("=" * 60)
    print("실제 UPDATE 실행")
    print("=" * 60)
    
    stats = {
        'total': 0,
        'skipped_empty': 0,
        'already_sorted': 0,
        'to_update': 0,
        'updated': 0,
        'verification_failed': [],
        'samples': []
    }
    
    try:
        connection = pymysql.connect(**DB_CONFIG)
        print("\nDB 연결 성공")
        
        offset = 0
        while True:
            processed = process_batch(connection, offset, BATCH_SIZE, stats, dry_run=False)
            
            if processed == 0:
                break
            
            offset += BATCH_SIZE
            
            # 진행상황 출력
            if stats['total'] % 5000 == 0:
                print(f"  처리 중... {stats['total']:,}건 (업데이트: {stats['updated']:,}건)")
        
        print(f"\n✅ 완료!")
        print(f"   총 처리: {stats['total']:,}건")
        print(f"   실제 업데이트: {stats['updated']:,}건")
        
    except Exception as e:
        print(f"❌ 오류 발생: {e}")
        raise
    finally:
        if 'connection' in dir() and connection:
            connection.close()
            print("DB 연결 종료")

## 7. 최종 검증 (UPDATE 후)

In [None]:
# UPDATE 후 랜덤 샘플 검증
import random

print("=" * 60)
print("최종 검증: 랜덤 샘플 activities 정렬 확인")
print("=" * 60)

try:
    connection = pymysql.connect(**DB_CONFIG)
    
    with connection.cursor() as cursor:
        # 랜덤 10개 선택
        cursor.execute("""
            SELECT id, activities 
            FROM documents 
            WHERE activities IS NOT NULL 
              AND activities != '' 
              AND activities != '[]'
            ORDER BY RAND() 
            LIMIT 10
        """)
        
        rows = cursor.fetchall()
        
        all_sorted = True
        for row in rows:
            doc_id = row['id']
            activities = row['activities']
            
            try:
                act_list = json.loads(activities)
                dates = [a.get('actionDate', 0) for a in act_list]
                is_sorted = dates == sorted(dates)
                
                status = "✅" if is_sorted else "❌"
                if not is_sorted:
                    all_sorted = False
                
                print(f"\n{status} id={doc_id}")
                print(f"   actionDates: {[ts_to_kst(d) for d in dates]}")
                
            except Exception as e:
                print(f"\n⚠️ id={doc_id}: 파싱 오류 - {e}")
        
        print("\n" + "=" * 60)
        if all_sorted:
            print("✅ 모든 샘플이 정상적으로 정렬되어 있습니다!")
        else:
            print("❌ 정렬되지 않은 데이터가 있습니다.")

except Exception as e:
    print(f"오류: {e}")
finally:
    if 'connection' in dir() and connection:
        connection.close()

## 완료!

### 실행 순서
1. **DB 설정** (셀 2) - 접속 정보 수정
2. **함수 정의** (셀 3~5) - 실행
3. **테스트** (셀 6~8) - 로직 확인
4. **DRY RUN** (셀 9~10) - 변경 대상 확인
5. **실제 UPDATE** (셀 11) - `CONFIRM = True` 후 실행
6. **최종 검증** (셀 12) - 결과 확인

### 핵심 보장
- ✅ 원본 문자열 그대로 보존 (공백, 키 순서 등)
- ✅ 순서만 변경, 내용 변경 없음
- ✅ 검증 통과한 건만 UPDATE
- ✅ 배치 처리로 대용량 처리 가능