In [1]:
pip install pymysql --break-system-packages

Note: you may need to restart the kernel to use updated packages.


In [4]:
"""
 [기능]
 approval_data_2010 테이블의 결재일시를 읽어서
 documents 테이블의 activities JSON 내 actionDate 필드 업데이트

 [테이블 연동]
 - approval_data_2010: 결재 상세 데이터 
   - document_id, post_title, status, approver, approval_date
 - documents: 기존 문서 테이블
   - source_id, title, activities (JSON)

 [매칭 로직]
 1. source_id로 두 테이블 연결
 2. 제목(title vs post_title) 일치 여부 검증
 3. activities 내 각 activity에서:
    - type과 status 매칭 (기안→DRAFT, 승인→APPROVAL, 합의→AGREEMENT)
    - name과 approver 매칭
 4. 매칭 성공 시 actionDate를 approval_date로 업데이트

 [날짜 변환]
 - approval_date → Unix timestamp (밀리초)
 - datetime 객체와 문자열 모두 처리

 [검증 및 로그]
 - 제목 불일치 시 업데이트 스킵
 - 불일치 목록을 title_mismatch.log 파일로 저장
 - 처리 진행 상황 1000건마다 출력

 [동작 순서]
 1. approval_data_2010 전체 로드 → document_id별 딕셔너리 구성
 2. documents 테이블 전체 조회
 3. 각 문서별로:
    - source_id로 approval_data 조회
    - 제목 일치 검증
    - activities JSON 파싱 → actionDate 업데이트
 4. 변경된 activities를 DB에 UPDATE
 5. 제목 불일치 로그 파일 저장

 [출력]
 - MariaDB: documents 테이블의 activities 컬럼 UPDATE
 - 로그 파일: title_mismatch.log (제목 불일치 목록)

 [의존성]
 - pymysql
  -> pip install pymysql
"""
import json
import pymysql
import re
from datetime import datetime

# 상태 매핑 딕셔너리
STATUS_MAP = {
    '기안': 'DRAFT',
    '승인': 'APPROVAL',
    '합의': 'AGREEMENT'
}

def parse_approval_date(date_value):
    """날짜를 유닉스 타임스탬프 밀리초로 변환"""
    # datetime 객체인 경우 (DB에서 가져온 경우)
    if isinstance(date_value, datetime):
        return int(date_value.timestamp() * 1000)
    # 문자열인 경우
    else:
        dt = datetime.strptime(date_value, '%Y-%m-%d %H:%M:%S')
        return int(dt.timestamp() * 1000)

def process_documents():
    #여기를 수정하세요
    # DB 연결 설정 (실제 환경에 맞게 수정하세요)
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='1234',
        database='any_approval',
        charset='utf8mb4'
    )

    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        
        # 1. approval_data_2020 조회 및 dictionary 구조화
        print('approval_data_2020 로딩 중...')
        cursor.execute("""
            SELECT document_id, post_title, status, approver, approval_date
            FROM approval_data_2020
        """)
        
        approval_data = {}
        for row in cursor.fetchall():
            doc_id = row['document_id']
            if doc_id not in approval_data:
                approval_data[doc_id] = []
            approval_data[doc_id].append(row)
        
        print(f'approval_data 로딩 완료: {len(approval_data)}개 문서')
        
        # 2. documents 테이블 조회
        print('documents 테이블 조회 중...')
        cursor.execute("""
            SELECT id, source_id, title, activities
            FROM documents
        """)
        documents = cursor.fetchall()
        print(f'documents 조회 완료: {len(documents)}건')
        
        # 3. 처리 변수
        mismatch_list = []
        processed_count = 0
        updated_count = 0
        error_count = 0
        
        # 4. documents 처리
        for doc in documents:
            processed_count += 1
            
            # 진행 상황 표시 (1000건마다)
            if processed_count % 1000 == 0:
                print(f'처리 중... {processed_count}/{len(documents)}건 (업데이트: {updated_count}건)')
            
            doc_id = doc['id']
            source_id = doc['source_id']
            title = doc['title']
            activities_str = doc['activities']
            
            # source_id로 approval_data에서 찾기
            if source_id not in approval_data:
                continue
            
            approval_rows = approval_data[source_id]
            
            # 제목 비교 (공백 정규화)
            csv_title = approval_rows[0]['post_title']
            normalized_db_title = re.sub(r'\s+', ' ', title)
            normalized_csv_title = re.sub(r'\s+', ' ', csv_title)
            
            if normalized_db_title != normalized_csv_title:
                # 제목 불일치 - 리스트에 추가하고 스킵
                mismatch_list.append({
                    'source_id': source_id,
                    'db_title': title,
                    'csv_title': csv_title
                })
                continue
            
            # activities JSON 파싱
            try:
                activities = json.loads(activities_str)
            except json.JSONDecodeError as e:
                error_count += 1
                print(f'\n⚠️ JSON 파싱 에러 (id={doc_id}, source_id={source_id}): {str(e)}')
                continue
            
            # activities 업데이트
            updated = False
            for activity in activities:
                activity_type = activity.get('type', '')
                activity_name = activity.get('name', '')
                
                # approval_data에서 매칭되는 행 찾기
                for approval_row in approval_rows:
                    csv_status = STATUS_MAP.get(approval_row['status'], '')
                    csv_approver = approval_row['approver']
                    
                    # type과 name이 모두 일치하는지 확인
                    if (activity_type == csv_status and 
                        activity_name in csv_approver):
                        
                        # actionDate 업데이트
                        approval_date = approval_row['approval_date']
                        activity['actionDate'] = parse_approval_date(approval_date)
                        updated = True
                        break
            
            # DB 업데이트 (변경사항이 있을 때만)
            if updated:
                try:
                    updated_activities_str = json.dumps(activities, ensure_ascii=False, separators=(',', ':'))
                    cursor.execute("""
                        UPDATE documents 
                        SET activities = %s 
                        WHERE id = %s
                    """, (updated_activities_str, doc_id))
                    conn.commit()
                    updated_count += 1
                except Exception as e:
                    error_count += 1
                    print(f'\n⚠️ DB 업데이트 에러 (id={doc_id}): {str(e)}')
                    conn.rollback()
        
        # 처리 완료 후 로그 출력
        print(f'\n=== 처리 완료 ===')
        print(f'총 처리: {processed_count}건')
        print(f'업데이트: {updated_count}건')
        if error_count > 0:
            print(f'⚠️ 에러: {error_count}건')
        
        # 제목 불일치 로그 저장
        if len(mismatch_list) > 0:
            with open('title_mismatch.log', 'w', encoding='utf-8') as log:
                for item in mismatch_list:
                    log.write(f"Source ID: {item['source_id']}\n")
                    log.write(f"DB title: {item['db_title']}\n")
                    log.write(f"CSV post_title: {item['csv_title']}\n\n")
            print(f'⚠️ 제목 불일치: {len(mismatch_list)}건 (title_mismatch.log 파일 확인)')
        else:
            print('✓ 모든 제목 일치')
    
    finally:
        cursor.close()
        conn.close()

# 실행
if __name__ == '__main__':
    process_documents()
    print('모든 작업 완료!')

approval_data_2020 로딩 중...
approval_data 로딩 완료: 7704개 문서
documents 테이블 조회 중...
documents 조회 완료: 23306건
처리 중... 1000/23306건 (업데이트: 0건)
처리 중... 2000/23306건 (업데이트: 0건)
처리 중... 3000/23306건 (업데이트: 0건)
처리 중... 4000/23306건 (업데이트: 0건)
처리 중... 5000/23306건 (업데이트: 0건)
처리 중... 6000/23306건 (업데이트: 0건)
처리 중... 7000/23306건 (업데이트: 0건)
처리 중... 8000/23306건 (업데이트: 0건)
처리 중... 9000/23306건 (업데이트: 824건)
처리 중... 10000/23306건 (업데이트: 1824건)
처리 중... 11000/23306건 (업데이트: 2824건)
처리 중... 12000/23306건 (업데이트: 3824건)
처리 중... 13000/23306건 (업데이트: 4824건)
처리 중... 14000/23306건 (업데이트: 5824건)
처리 중... 15000/23306건 (업데이트: 6824건)
처리 중... 16000/23306건 (업데이트: 7704건)
처리 중... 17000/23306건 (업데이트: 7704건)
처리 중... 18000/23306건 (업데이트: 7704건)
처리 중... 19000/23306건 (업데이트: 7704건)
처리 중... 20000/23306건 (업데이트: 7704건)
처리 중... 21000/23306건 (업데이트: 7704건)
처리 중... 22000/23306건 (업데이트: 7704건)
처리 중... 23000/23306건 (업데이트: 7704건)

=== 처리 완료 ===
총 처리: 23306건
업데이트: 7704건
✓ 모든 제목 일치
모든 작업 완료!
