In [None]:
from dotenv import load_dotenv
import time

load_dotenv()

time.sleep(2)  # 환경 변수 로드 후 대기


In [None]:
from zf_parser import ZipFitParser
from crawl.lh import LH
from database.repository import AnncLhRepository, AnncQrRepository, AnncAllRepository, AnncFileRepository, DocChunkRepository


lh_repo = AnncLhRepository()
qr_repo = AnncQrRepository()
all_repo = AnncAllRepository()
file_repo = AnncFileRepository()
dc_repo = DocChunkRepository()


lh_crwaler = LH()

# ZipFitParser 인스턴스 생성
zfp = ZipFitParser()

In [None]:
CRWALING = True         # 크롤링
DB_BULK_INSERT = True   # 크롤링 데이터 DB INSERT
TEST = True


# batch_id가 있으면 크롤링, 크롤링 데이터 insert 진행 안함
# batch_id = ''
batch_id = '6df7b622-2b10-4737-9ab6-2fba5f4214d1'

In [None]:
if batch_id:
    CRWALING = False
    DB_BULK_INSERT = False

In [None]:

if CRWALING:
    df_all_annc = lh_crwaler.crawl_lh_notices_all_data('접수중')
    df_all_annc += lh_crwaler.crawl_lh_notices_all_data('공고중')
    # df_all_annc += lh_crwaler.crawl_lh_notices_all_data('정정공고중')
else:
    df_all_annc = []
    print("크롤링 x")

In [None]:
if DB_BULK_INSERT: 
    batch_id = lh_repo.bulk_insert_announcements(df_all_annc)
else:
    print("크롤링 데이터 삽입 x")


print(batch_id)


In [None]:
if not batch_id:
    raise "batch_id가 없으면 진행 불가."

In [None]:
annc_list_lh = qr_repo.get_announcements_merge_target(batch_id, annc_status=['공고중','접수중'])

print(f'{len(annc_list_lh)}건 조회 완료')

In [None]:
len(annc_list_lh)

In [None]:
if TEST:
    annc_list_lh = annc_list_lh[:5]

In [None]:
from openai import OpenAI
client = OpenAI()


def get_embedding(text):
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    return response.data[0].embedding

In [None]:
# 타임랩스 필요

import json
import time
from datetime import datetime
import os

error_cnt = 0

# 시작 시간 기록
start_time = time.time()

def get_elapsed_time():
    """경과 시간을 [00분00초 경과] 형식으로 반환"""
    elapsed = int(time.time() - start_time)
    minutes = elapsed // 60
    seconds = elapsed % 60
    return f"[{minutes:02d}분{seconds:02d}초 경과]"

def get_task_time(task_start):
    """작업 소요 시간을 (00분00초 소요) 형식으로 반환"""
    elapsed = int(time.time() - task_start)
    minutes = elapsed // 60
    seconds = elapsed % 60
    return f"({minutes:02d}분{seconds:02d}초 소요)"

# print(f"시작 {len(annc_list_lh)}")

for idx_row, row_lh in enumerate(annc_list_lh):
    try:
        start_time = time.time()

        batch_msg = f"배치({idx_row+1}/{len(annc_list_lh)})"

        # 1. 임시 테이블 상태 변경 -> 시작
        print("-"*77)
        lh_repo.update_announcements('START', row_lh['batch_id'], row_lh['batch_seq'])
        print(f"{get_elapsed_time()} {batch_msg} - 시작!")
        


        # 2. 공고 테이블에 넣을 데이터 준비
        row_lh['corp_cd'] = 'LH'
        row_lh['service_status'] = 'CLOSE'

        merge_result = all_repo.merge_announcements([row_lh,]) # 원래 다건을 위한것
        print(f"{get_elapsed_time()} {batch_msg} - 공고 닫기 처리")
        
        if not merge_result:
            raise Exception("머지된 행 없음")
        # 1행만 쓰겠음
        merge_result = merge_result[0]
        annc_id = merge_result['annc_id']

        # 3. 파일 조회
        file_list = lh_crwaler.get_file_list(row_lh)

        if not file_list:
            raise Exception("파일 없음")
        
        
        # 파일 데이터 초기화
        dc_repo.delete_chunks_by_annc_id(annc_id)
        file_repo.delete_files_by_annc_id(annc_id)
        print(f"{get_elapsed_time()} {batch_msg} - 파일 초기화")
        

        # 4. 파일 등록
        for idx_file, file_info in enumerate(file_list):
            if len(file_list) > 1:
                file_msg = f"파일({idx_file+1}/{len(file_list)})"
            else:
                file_msg = f"파일(단건)"
            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} 시작!")


            annc_file = {}

            annc_file['annc_id'] = annc_id
            annc_file['file_name'] = file_info['cmnAhflNm']
            annc_file['file_type'] = file_info['slPanAhflDsCdNm']
            annc_file['file_ext'] = 'pdf'

            # 파일 다운
            task_start = time.time()
            file_path, annc_file = lh_crwaler.down_file(file_info['cmnAhflSn'], annc_file)
            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} 다운 완료! ✅ {get_task_time(task_start)}")

            # 파일 등록
            task_start = time.time()
            inserted_file_info = file_repo.bulk_insert_files([annc_file])[0]
            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} DB등록 완료! ✅ {get_task_time(task_start)}")

            file_id, file_name = inserted_file_info['file_id'], inserted_file_info['file_name']

            # pdf -> markdown
            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} markdown 변환 시작!")
            task_start = time.time()
            elements = zfp.get_llama_parsed_docs(file_path)
            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} markdown 변환 완료! ✅ {get_task_time(task_start)}")

            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} 임베딩 시작!")
            task_start = time.time()
            chunk_dto = [{
                'file_id': file_id,
                'annc_id': annc_id,
                'chunk_type': el.get('element_type','text'),
                'chunk_text': el.get('origin_content',''),
                'page_num': el.get('page_number', 0),
                'embedding': get_embedding(el.get('content', '')),
                'metadata': json.dumps(el.get('metadata', {}))  # dict를 JSON 문자열로 변환
            } for el in elements]
            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} 임베딩 완료! ✅ {get_task_time(task_start)}")

            # 청크 삽입
            task_start = time.time()
            dc_repo.bulk_insert_chunks(chunk_dto)
            print(f"{get_elapsed_time()} {batch_msg} - {file_msg} 청크 DB 등록 완료! ✅ {get_task_time(task_start)}")

            # 사용한 파일 삭제
            if os.path.exists(file_path):
                os.remove(file_path)
                print(f"{get_elapsed_time()} {batch_msg} - {file_msg} 파일 삭제 완료! ✅")

        lh_repo.update_announcements('COMPLETE', row_lh['batch_id'], row_lh['batch_seq'])
        print(f"{get_elapsed_time()} {batch_msg} 완료! ✅")

        
        row_lh['service_status'] = 'OPEN'
        merge_result = all_repo.merge_announcements([row_lh,]) # 원래 다건을 위한것
        print(f"{get_elapsed_time()} {batch_msg} - 공고 열기 처리")
            

    except Exception as e:
        error_cnt += 1
        
        lh_repo.update_announcements('ERROR', row_lh['batch_id'], row_lh['batch_seq'])
        print(f"{get_elapsed_time()} {batch_msg} 오류! ❌ - {e}")

        if error_cnt < 5:
            continue

        print(f"{get_elapsed_time()} {batch_msg} 프로세스 강제 종료 - 오류 여러건 발견")
        raise e
