In [1]:
import pymysql
import pandas as pd
import sqlalchemy
import logging
import re
from typing import Dict, List, Optional
from sqlalchemy import create_engine, types
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

In [2]:
host= 
port =  
user = 
password = 
database = 'policy'


In [3]:
ST = datetime.now()
print(ST)

2024-11-12 23:49:30.823777


In [64]:
class PolicyDataProcessor:
    def __init__(self, db_config: Dict):
        """
        데이터베이스 설정 및 로깅 초기화
        """
        self.db_config = db_config
        self.engine = create_engine(
            f"mysql+pymysql://{db_config['user']}:{db_config['password']}@"
            f"{db_config['host']}:{db_config.get('port', 3306)}/{db_config['database']}"
        )
        
        logging.basicConfig(
            level=logging.DEBUG,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        self.keywords = {
            'target': [
                '지원대상[\\s]*:?[\\s]*',
                '대상[\\s]*:?[\\s]*',
                '신청대상[\\s]*:?[\\s]*',
                '참여대상[\\s]*:?[\\s]*',
                '모집대상[\\s]*:?[\\s]*',
                '구직[\\s]*:?[\\s]*',
                '출산[\\s]*:?[\\s]*'
            ],
            'condition': ['자격', '요건', '조건', '기준', '신청자격'],
            'support': ['지원내용', '지원금액', '지원한도', '지원규모', '사업내용', '지원사항'],
            'apply_method': ['신청방법', '접수방법', '지원방법', '신청절차', '신청기간'],
            'contact': ['문의처', '담당자', '연락처', '문의', '전화'],
            'url': ['http', 'www']
        }

    def connect_db(self):
        """데이터베이스 연결"""
        try:
            conn = pymysql.connect(**self.db_config)
            logging.info("Successfully connected to database")
            return conn
        except Exception as e:
            logging.error(f"Database connection error: {e}")
            raise

    def fetch_data(self) -> pd.DataFrame:
        """RDS에서 데이터 가져오기"""
        try:
            query = "SELECT * FROM policy_detail"
            df = pd.read_sql(query, self.engine)
            logging.info(f"Successfully fetched {len(df)} records from database")
            return df
        except Exception as e:
            logging.error(f"Error fetching data: {e}")
            raise

    def clean_text(self, text: Optional[str]) -> str:
        """
        텍스트 클리닝
        """
        if pd.isna(text):
            return ""
        
        text = re.sub(r'<[^>]+>', '', str(text))
        text = re.sub(r'[■▶▪●★※◦○●:*ㅁ★▶◀\-\*o□❍]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text

    def extract_field(self, text: str, keywords: List[str]) -> str:
        """
        주어진 키워드를 기반으로 텍스트에서 관련 정보 추출
        """
        text = self.clean_text(text)
        
        for keyword in keywords:
            pattern = f"{keyword}(.*?)(?=신청방법|제외대상|$)"
            matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE)
            if matches:
                extracted_text = matches[0].strip()
                if len(extracted_text) > 0:
                    cleaned_text = self.clean_text(extracted_text)
                    return cleaned_text.lstrip(',').strip()  # 맨 앞의 콤마 제거
        return ""

    def parse_policy(self, row: Dict) -> Dict:
        """
        정책 데이터 파싱
        빈 문자열도 clean_text 처리
        """
        logging.info(f"Processing row idx: {row.get('idx')}")
        logging.info(f"Original target: {row.get('target')}")
        
        combined_text = "\n".join([
            str(row.get(field, "")) 
            for field in ['subject', 'condition', 'way']
        ])
        
        # target 정보 추출 시도
        target_info = self.extract_field(combined_text, self.keywords['target'])
        logging.info(f"Initially extracted target: {target_info}")
        
        # target 정보가 유효한지 철저히 체크
        if not target_info or target_info.isspace() or target_info == "" or len(target_info.strip()) == 0:
            original_target = row.get('target', '')
            logging.info(f"Using original target: {original_target}")
            
            if not pd.isna(original_target):  # None이 아닌 모든 값에 대해 clean_text 적용
                target_info = self.clean_text(original_target)
                logging.info(f"After cleaning original target: {target_info}")
        
        # 여전히 target_info가 없으면 subject 사용
        if not target_info or target_info.isspace() or target_info == "" or len(target_info.strip()) == 0:
            subject = row.get('subject', '')
            if not pd.isna(subject):  # None이 아닌 모든 값에 대해 clean_text 적용
                target_info = self.clean_text(subject)  # subject도 clean_text 처리
                logging.info(f"Using cleaned subject as target: {target_info}")
        
        logging.info(f"Final target value: {target_info}")
        
        return {
            'idx': row.get('idx', None),
            'target': target_info,
            'condition': self.extract_field(combined_text, self.keywords['condition']) or row.get('condition', ''),
            'content': row.get('content', ''),
            'scale': self.clean_text(row.get('scale', '')),
            'enquiry': self.clean_text(row.get('enquiry', '')),
            'url': row.get('document', row.get('url', ''))
        }

    def process_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        데이터프레임 처리
        """
        results = []
        for _, row in df.iterrows():
            parsed_data = self.parse_policy(row.to_dict())
            results.append(parsed_data)
        
        result_df = pd.DataFrame(results)
        logging.info(f"Successfully processed {len(result_df)} records")
        return result_df

    def save_to_db(self, df: pd.DataFrame, table_name='policy_detail_cleaned') -> None:
        """
        처리된 데이터를 RDS에 저장
        """
        try:
            df['idx'] = df['idx'].astype(int)
            df.to_sql(
                name=table_name,
                con=self.engine,
                index=False,
                if_exists='replace',
                chunksize=1000,
                dtype={'idx': types.BigInteger}
            )
            logging.info(f"Successfully saved {len(df)} records to {table_name}")
        except Exception as e:
            logging.error(f"Error saving data: {e}")
            raise

def main():
    db_config = {
        'host': host,
        'user': user,
        'password': password,
        'database': database
    }
    
    try:
        processor = PolicyDataProcessor(db_config)
        
        df = processor.fetch_data()
        
        processed_df = processor.process_data(df)
        
        processor.save_to_db(processed_df)
        
        logging.info("Data processing completed successfully")
        
    except Exception as e:
        logging.error(f"Error in main process: {e}")
        raise

if __name__ == "__main__":
    main()

2024-11-13 02:01:47,356 - root - INFO - Successfully fetched 111 records from database
2024-11-13 02:01:47,357 - root - INFO - Processing row idx: 1
2024-11-13 02:01:47,358 - root - INFO - Original target: None
2024-11-13 02:01:47,359 - root - INFO - Initially extracted target: 근로활동을 하는 19세 ~ 39세 청년 도내 중소·중견기업 면접자, 도내 중소·중견기업 생애 최초 취업자, 도내 중소·중견기업 1년 이상 근속자 gbw rk.kr 또는 청년애꿈수당.kr
2024-11-13 02:01:47,360 - root - INFO - Final target value: 근로활동을 하는 19세 ~ 39세 청년 도내 중소·중견기업 면접자, 도내 중소·중견기업 생애 최초 취업자, 도내 중소·중견기업 1년 이상 근속자 gbw rk.kr 또는 청년애꿈수당.kr
2024-11-13 02:01:47,362 - root - INFO - Processing row idx: 3
2024-11-13 02:01:47,362 - root - INFO - Original target: None
2024-11-13 02:01:47,363 - root - INFO - Initially extracted target: 19세 ~ 49세 울진 청년 누구나 모집인원 10명(선착순) 강의장소 울진청년일자리센터 1층(울진읍 읍내로 57 6) 기타사항 10명 선착순이나, 취소 등에 대비하여 예비 7명까지 추가 선발합니다. 사전 취소없이 당일 불참시 다음 강의 후순위로 배정됩니다. 원데이 클래스 054 789 6473
2024-11-13 02:01:47,365 - root - INFO - Final target value: 19세 ~ 49세 울진 청년 누구나 모집인원 10명(선착순) 강의

In [65]:
ET = datetime.now()
print(ET-ST)

2:12:19.629581
