In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install kiwipiepy

Collecting kiwipiepy
  Downloading kiwipiepy-0.20.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.3 kB)
Collecting kiwipiepy_model<0.21,>=0.20 (from kiwipiepy)
  Downloading kiwipiepy_model-0.20.0.tar.gz (34.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.7/34.7 MB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading kiwipiepy-0.20.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.6/3.6 MB[0m [31m40.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: kiwipiepy_model
  Building wheel for kiwipiepy_model (setup.py) ... [?25l[?25hdone
  Created wheel for kiwipiepy_model: filename=kiwipiepy_model-0.20.0-py3-none-any.whl size=34818026 sha256=ac6216685989972d6773e67ef91b8d8e510bd97e1f1feae5e4008da73b32599e
  Stored in directory: /root/.cache/pip/wheels/ca/c8/52/3a539d6e9

In [3]:
import pandas as pd
from kiwipiepy import Kiwi
import os
import json
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq

In [4]:
def process_data_for_local(input_parquet_path, output_path, stopwords_path=None):
    """
    로컬 Parquet 파일을 처리하여 로컬 디렉토리에 저장

    Parameters:
    - input_parquet_path: 입력 Parquet 파일 경로
    - output_path: 결과 파일을 저장할 로컬 디렉토리 경로
    - stopwords_path: 불용어 파일 경로 (선택사항)
    """
    # 1. 데이터 로드
    print("데이터 로드 중...")
    table = pq.read_table(input_parquet_path)
    df = table.to_pandas()

    # 2. 불용어 로드 (제공된 경우)
    FILTER_WORDS = []
    if stopwords_path and os.path.exists(stopwords_path):
        with open(stopwords_path, 'r', encoding='utf-8') as f:
            FILTER_WORDS = [line.strip() for line in f.readlines()]

    # 3. 날짜 변환 (raw_date 형식 'YYYY.MM.DD. HH:MM')
    print("날짜 변환 중...")
    df['post_date'] = pd.to_datetime(df['raw_date'].str.split('.').str[:3].str.join('-'), format='%Y-%m-%d', errors='coerce')
    df['post_month'] = df['post_date'].dt.month
    df['post_year'] = df['post_date'].dt.year

    # 4. 토큰 추출
    print("토큰 추출 중...")
    TERM_POSES = ["NNP", "NNG", "XR"]
    kiwi = Kiwi()

    # 각 텍스트에서 토큰과 품사 정보 추출
    def extract_tokens_with_pos(text):
        analyzed = kiwi.analyze(text)
        tokens_with_pos = []
        for sent in analyzed:
            for word_info in sent[0]:
                token, pos = word_info[0], word_info[1]
                is_stopword = token in FILTER_WORDS
                if pos in TERM_POSES:
                    tokens_with_pos.append({
                        'token': token,
                        'pos': pos,
                        'is_stopword': is_stopword
                    })
        return tokens_with_pos

    # 토큰 정보 추출
    print("각 게시글에서 토큰 추출 중...")
    df['tokens_with_pos'] = df['question_text'].apply(extract_tokens_with_pos)

    # 5. 키워드 그룹 정의 및 추출
    print("키워드 그룹 정보 추출 중...")
    keyword_groups = {
        'hospital': ['병원', '소아과'],
        'symptom': ['증상'],
        'skin': ['피부'],
        'fever': ['열'],
        'cough': ['기침'],
        'runny_nose': ['콧물'],
        'stool': ['변', '똥'],
        'hand_foot': ['수족'],
        'diarrhea': ['설사'],
        'allergy': ['알러지', '알레르기'],
        'atopy': ['아토피'],
        'urticaria': ['두드러기'],
        'vomit': ['토', '구토'],
        'blood': ['피'],
        'heat_rash': ['땀띠'],
        'seborrheic': ['태열'],
        'flu': ['독감'],
        'pneumonia': ['폐렴'],
        'phlegm': ['가래'],
        'rash': ['발진'],
        'blister': ['수포'],
        'otitis': ['중이염']
    }

    display_name_mapping = {
        'hospital': '병원/소아과',
        'symptom': '증상',
        'skin': '피부',
        'fever': '열',
        'cough': '기침',
        'runny_nose': '콧물',
        'stool': '변/똥',
        'hand_foot': '수족',
        'diarrhea': '설사',
        'allergy': '알러지/알레르기',
        'atopy': '아토피',
        'urticaria': '두드러기',
        'vomit': '토/구토',
        'blood': '피',
        'heat_rash': '땀띠',
        'seborrheic': '태열',
        'flu': '독감',
        'pneumonia': '폐렴',
        'phlegm': '가래',
        'rash': '발진',
        'blister': '수포',
        'otitis': '중이염'
    }

    # 키워드 그룹 차원 테이블 생성
    keyword_groups_df = pd.DataFrame([
        {'group_id': i+1, 'group_name': group_name, 'display_name': display_name_mapping.get(group_name, group_name), 'description': ''}
        for i, group_name in enumerate(keyword_groups.keys())
    ])

    # 키워드 사전 테이블 생성
    keywords_list = []
    for group_id, (group_name, keywords) in enumerate(keyword_groups.items(), 1):
        for keyword in keywords:
            keywords_list.append({
                'keyword': keyword,
                'group_id': group_id,
                'is_stopword': keyword in FILTER_WORDS
            })
    keywords_df = pd.DataFrame(keywords_list)

    # 게시글-키워드 그룹 연결 테이블 생성
    def has_keywords(text, keywords):
        return any(keyword in text for keyword in keywords)

    post_keyword_groups_list = []
    for post_id in df['post_id']:
        post_text = df[df['post_id'] == post_id]['question_text'].iloc[0]
        for group_id, (group_name, keywords) in enumerate(keyword_groups.items(), 1):
            has_keyword = has_keywords(post_text, keywords)
            post_keyword_groups_list.append({
                'post_id': post_id,
                'group_id': group_id,
                'has_keyword': has_keyword
            })
    post_keyword_groups_df = pd.DataFrame(post_keyword_groups_list)

    # 6. 토큰 분석 테이블 생성
    print("토큰 분석 테이블 생성 중...")
    tokens_fact_list = []
    token_id = 1

    for index, row in df.iterrows():
        post_id = row['post_id']
        token_position = 0

        # 각 토큰 정보 처리
        token_counts = {}  # 같은 토큰이 여러 번 나오는 경우를 위한 카운팅

        for token_info in row['tokens_with_pos']:
            token = token_info['token']
            pos = token_info['pos']
            is_stopword = token_info['is_stopword']

            # 토큰 카운트 증가
            if token in token_counts:
                token_counts[token] += 1
            else:
                token_counts[token] = 1

            tokens_fact_list.append({
                'token_id': token_id,
                'post_id': post_id,
                'token': token,
                'token_pos': pos,
                'token_count': token_counts[token],
                'token_position': token_position,
                'is_stopword': is_stopword
            })

            token_id += 1
            token_position += 1

    tokens_fact_df = pd.DataFrame(tokens_fact_list)

    # 7. 게시글 기본 정보 테이블 준비
    print("게시글 기본 정보 테이블 준비 중...")
    posts_dim_df = df[['post_id', 'url', 'raw_date', 'post_date', 'post_month', 'post_year', 'author', 'question_text']]

    # 8. 출력 디렉토리 생성
    print("로컬에 데이터 저장 중...")
    os.makedirs(os.path.join(output_path, 'babycareai/posts_dim'), exist_ok=True)
    os.makedirs(os.path.join(output_path, 'babycareai/tokens_fact'), exist_ok=True)
    os.makedirs(os.path.join(output_path, 'babycareai/keyword_groups_dim'), exist_ok=True)
    os.makedirs(os.path.join(output_path, 'babycareai/keywords_dictionary'), exist_ok=True)
    os.makedirs(os.path.join(output_path, 'babycareai/post_keyword_groups'), exist_ok=True)

    # posts_dim 저장
    save_df_to_parquet(posts_dim_df, os.path.join(output_path, 'babycareai/posts_dim/posts_dim.parquet'))

    # tokens_fact 저장 (큰 데이터는 청크로 나누어 저장)
    chunk_size = 100000  # 조정 가능
    for i, chunk_df in enumerate(chunk_generator(tokens_fact_df, chunk_size)):
        save_df_to_parquet(chunk_df, os.path.join(output_path, f'babycareai/tokens_fact/tokens_fact_part_{i:03d}.parquet'))

    # keyword_groups_dim 저장
    save_df_to_parquet(keyword_groups_df, os.path.join(output_path, 'babycareai/keyword_groups_dim/keyword_groups_dim.parquet'))

    # keywords_dictionary 저장
    save_df_to_parquet(keywords_df, os.path.join(output_path, 'babycareai/keywords_dictionary/keywords_dictionary.parquet'))

    # post_keyword_groups 저장 (큰 데이터는 청크로 나누어 저장)
    for i, chunk_df in enumerate(chunk_generator(post_keyword_groups_df, chunk_size)):
        save_df_to_parquet(chunk_df, os.path.join(output_path, f'babycareai/post_keyword_groups/post_keyword_groups_part_{i:03d}.parquet'))

    print("데이터 처리 및 저장 완료!")

In [5]:
def chunk_generator(df, chunk_size):
    """데이터프레임을 청크로 나누어 생성하는 제너레이터"""
    for i in range(0, len(df), chunk_size):
        yield df.iloc[i:i + chunk_size]

In [6]:
def save_df_to_parquet(df, file_path):
    """DataFrame을 Parquet 형식으로 로컬에 저장"""
    # DataFrame을 Parquet으로 변환
    table = pa.Table.from_pandas(df)
    pq.write_table(table, file_path)
    print(f"'{file_path}'에 파일 저장 완료")

In [7]:
# 사용 예시
if __name__ == "__main__":
    # 실행 파라미터
    input_parquet_path = '/content/drive/MyDrive/babycareai/part-00000-0d16091a-7fa1-464f-9f1c-b2c0bf27fe0a-c000.snappy.parquet'
    output_path = '/content/drive/MyDrive/babycareai'
    stopwords_path = '/content/drive/MyDrive/babycareai/stopwords.txt'

    # 처리 실행
    process_data_for_local(input_parquet_path, output_path, stopwords_path)

데이터 로드 중...
날짜 변환 중...
토큰 추출 중...
각 게시글에서 토큰 추출 중...
키워드 그룹 정보 추출 중...
토큰 분석 테이블 생성 중...
게시글 기본 정보 테이블 준비 중...
로컬에 데이터 저장 중...
'/content/drive/MyDrive/babycareai/babycareai/posts_dim/posts_dim.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_000.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_001.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_002.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_003.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_004.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_005.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_006.parquet'에 파일 저장 완료
'/content/drive/MyDrive/babycareai/babycareai/tokens_fact/tokens_fact_part_007.parquet'에 파일 저장 완료
'/co