In [8]:
import json
# JSON 저장 함수
def save_to_json(data, filename):
    """
    데이터를 JSON 파일로 저장합니다.
    """
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=4)


# JSON 파일에서 명사 리스트를 읽는 함수
def load_json(file_path):
    """
    JSON 파일에서 데이터를 불러옵니다.

    Args:
        file_path (str): JSON 파일 경로.

    Returns:
        list: JSON 파일의 데이터.
    """
    with open(file_path, "r", encoding="utf-8") as file:
        data = json.load(file)
    return data

# Call API

In [2]:
from googleapiclient.discovery import build # type: ignore
import isodate  # ISO 8601 형식의 시간을 파싱하는 라이브러리

# YouTube API 설정
API_KEY = "AIzaSyDKlMcyK3peJ-woNyOLuMt5MCW_uZpOL0g"  # 자신의 YouTube API 키를 입력하세요
youtube = build("youtube", "v3", developerKey=API_KEY)

# 카테고리 ID 설정
CATEGORIES = {
    "News & Politics": "25"
}

# 채널 정보 가져오기
def fetch_channel_info(channel_id):
    """
    채널 ID를 사용해 채널 정보를 가져옵니다.
    """
    request = youtube.channels().list(
        part="snippet,statistics",
        id=channel_id
    )
    response = request.execute()
    if response["items"]:
        channel = response["items"][0]
        return {
            "channel_id": channel["id"],
            "channel_title": channel["snippet"]["title"],
            "channel_description": channel["snippet"]["description"],
            "subscriber_count": int(channel["statistics"].get("subscriberCount", 0))
        }
    return {}

# 동영상 데이터 가져오기
def fetch_trending_videos(category_id, region_code="KR", max_results=200):
    """
    특정 카테고리의 인기 급상승 동영상을 가져옵니다.
    """
    videos = []
    next_page_token = None

    # page token을 통해 api 추가 호출 (총 4번) 4점점
    while len(videos) < max_results:
        request = youtube.videos().list(
            part="snippet,statistics,contentDetails",
            chart="mostPopular",
            regionCode=region_code,
            videoCategoryId=category_id,
            maxResults=min(50, max_results - len(videos)),
            pageToken=next_page_token
        )
        response = request.execute()

        # 결과에서 동영상 정보 저장
        for item in response.get("items", []):
            # 동영상 길이 확인
            duration = isodate.parse_duration(item["contentDetails"]["duration"])
            duration_in_seconds = duration.total_seconds()

            if duration_in_seconds >= 90:  # 1분 30초 이상인 경우만 저장
                channel_id = item["snippet"]["channelId"]
                channel_info = fetch_channel_info(channel_id)

                videos.append({
                    "video_id": item["id"],
                    "title": item["snippet"]["title"],
                    "description": item["snippet"]["description"],
                    "tags": item["snippet"].get("tags", []),  # 태그
                    "duration": str(duration),  # 동영상 길이
                    "view_count": int(item["statistics"].get("viewCount", 0)),  # 조회수
                    "like_count": int(item["statistics"].get("likeCount", 0)),  # 좋아요
                    "comment_count": int(item["statistics"].get("commentCount", 0)),    # 댓글 수수
                    "category_id": category_id,     # 카테고리 아이디디
                    "channel_info": channel_info  # 채널 정보 추가
                })

        # 다음 페이지 토큰
        next_page_token = response.get("nextPageToken")
        if not next_page_token:
            break

    return videos



# 실행
all_videos = {}

for category_name, category_id in CATEGORIES.items():
    print(f"Fetching trending videos for category: {category_name}")
    videos = fetch_trending_videos(category_id, region_code="KR", max_results=200)
    all_videos[category_name] = videos
    print(f"Fetched {len(videos)} videos for category: {category_name}")

# 결과를 JSON 파일로 저장
output_file = "trending_videos_with_channels.json"
save_to_json(all_videos, output_file)
print(f"Data saved to '{output_file}'")

# 결과 출력 예시
for category, videos in all_videos.items():
    print(f"\nCategory: {category}")
    for video in videos[:5]:  # 각 카테고리에서 상위 5개 출력
        channel = video["channel_info"]
        print(f" - {video['title']} ({video['video_id']}), Channel: {channel['channel_title']} ({channel['subscriber_count']} subscribers)")


Fetching trending videos for category: News & Politics
Fetched 45 videos for category: News & Politics
Data saved to 'trending_videos_with_channels.json'

Category: News & Politics
 - [윤석열 체포 LIVE] 겸손은힘들다 뉴스공장 + 겸공뉴스특보 1월 15일 (oD_d0a0HOZU), Channel: 김어준의 겸손은힘들다 뉴스공장 (2010000 subscribers)
 - 조직적인 폭도와 배후 총정리! (박은정,김경호,신장식,양지열) | 풀버전 (IzEdXoIigoM), Channel: [팟빵] 매불쇼 (2210000 subscribers)
 - 현직 대통령 첫 체포‥ 계엄 선포 43일 만 - [LIVE] MBC 뉴스특보 2025년 01월 15일 (8XQBluUq3bM), Channel: MBCNEWS (5320000 subscribers)
 - 정청래도 처음 듣는 소식 "영장 판사 방만 의도적 파손" 큰 충격 받은 천대엽 법원행정처장.. (39IL4c5TUgA), Channel: 엠키타카 MKTK (614000 subscribers)
 - 명태균 카톡 담은 '검찰 수사보고서' 전면 공개 - 뉴스타파 (ra2Z7sWAa9E), Channel: 뉴스타파 Newstapa (1530000 subscribers)


# Merge

In [10]:
# 병합 함수 정의
def merge_text_fields(video):
    """
    주어진 동영상 데이터에서 필요한 텍스트 필드를 병합하여 하나의 문자열로 반환.
    """
    # 개별 필드 추출
    title = video.get("title", "")
    description = video.get("description", "")
    tags = " ".join(video.get("tags", []))  # 태그 리스트를 공백으로 연결

    # 병합
    merged_text = " ".join([title, description, tags])
    return merged_text.strip()  # 공백 제거

# JSON 데이터에서 각 동영상의 텍스트 필드를 병합
def preprocess_and_merge(data):
    """
    카테고리별 동영상 데이터를 병합하여 텍스트 필드를 생성.
    """
    merged_texts = []

    for category, videos in data.items():
        for video in videos:
            # 병합된 텍스트 생성
            merged_text = merge_text_fields(video)
            merged_texts.append({"category": category, "merged_text": merged_text})

    return merged_texts

# 병합 결과 JSON 저장 함수
def save_merged_texts(data, output_file):
    """
    병합된 텍스트 데이터를 JSON 파일로 저장.
    """
    merged_texts = preprocess_and_merge(data)

    # 병합된 텍스트를 JSON으로 저장
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(merged_texts, f, ensure_ascii=False, indent=4)

def save_merged_texts_as_list(data, output_file):
    """
    병합된 텍스트 데이터를 문자열 리스트로 저장.
    """
    merged_texts = [merge_text_fields(video) for category, videos in data.items() for video in videos]

    # 문자열 리스트를 JSON으로 저장
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(merged_texts, f, ensure_ascii=False, indent=4)


input_file_path = "test_data/trending_videos_with_channels.json"
output_file_path = "test_data/merged_texts.txt"

# Load, process, and save
data = load_json(input_file_path)
save_merged_texts_as_list(data, output_file_path)

output_file_path


'merged_texts.txt'

# Clean

In [11]:
import re
import emoji # type: ignore
import json


# 기본 텍스트 정제 함수
def preprocess_text(text):
    """
    텍스트 데이터를 전처리하고 문장을 반환합니다.

    Args:
        text (str): 입력 텍스트 데이터.

    Returns:
        str: 전처리된 텍스트.
    """
    if not isinstance(text, str):
        text = str(text)  # 문자열로 변환

    # 1. HTML 태그 제거
    text = re.sub(r"<[^>]+>", "", text)

    # 2. URL 제거
    text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)

    # 3. 이메일 제거
    text = re.sub(r"\S+@\S+\.\S+", "", text)

    # 4. 숫자 제거 (명확히 숫자만 제거)
    text = re.sub(r"\d+", "", text)

    # 5. 반복된 ㅋ, ㅎ, ㅠ, ㅜ 등 제거
    text = re.sub(r"[ㅋㅎㅠㅜ]+", "", text)

    # 6. 반복된 점(...) 제거
    text = re.sub(r"\.\.+", ".", text)

    # 7. 반복된 문자 축소 (e.g., "와아아" -> "와")
    text = re.sub(r"(.)\1{2,}", r"\1", text)

    # 8. 이모지 제거
    text = emoji.replace_emoji(text, replace="")

    # 9. 특수문자 및 영어 알파벳 제거
    text = re.sub(r"[^\w\s가-힣]", "", text)  # 영어 알파벳 포함 특수문자 제거
    text = re.sub(r"[a-zA-Z]", "", text)     # 영어 알파벳 제거

    # 10. 양쪽 공백 제거
    text = text.strip()

    return text


# 데이터 리스트 처리 함수
def process_text_file(input_file, output_file):
    """
    문자열과 문자열 리스트가 섞인 데이터를 처리하여 결과를 저장합니다.

    Args:
        input_file (str): 입력 파일 경로.
        output_file (str): 출력 파일 경로.
    """
    results = []

    # 파일 로드
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # 데이터 처리
    for item in data:
        if isinstance(item, list):  # 문자열 리스트인 경우
            merged_text = " ".join(item)
            processed_text = preprocess_text(merged_text)
        elif isinstance(item, str):  # 문자열인 경우
            processed_text = preprocess_text(item)
        else:
            continue  # 알 수 없는 형식은 건너뜀

        results.append(processed_text)

    # 결과 저장
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=4)


# 실행
    # 입력 및 출력 파일 경로
input_file = "test_data/merged_texts.txt"  # 문자열과 문자열 리스트가 섞인 txt 파일
output_file = "test_data/cleaned.json"

# 텍스트 파일 처리
process_text_file(input_file, output_file)
print(f"처리된 데이터가 '{output_file}'에 저장되었습니다.")


처리된 데이터가 'test_data/processed_sentences.json'에 저장되었습니다.


# Extract keyword

In [19]:
from sklearn.feature_extraction.text import CountVectorizer
import json


def extract_ngrams(texts, n=2):
    """
    텍스트에서 N-그램 기반 구 단위 키워드 추출.

    Args:
        texts (list of str): 텍스트 리스트.
        n (int): N-그램 크기 (2: bigram, 3: trigram).

    Returns:
        dict: N-그램과 빈도수 딕셔너리.
    """
    vectorizer = CountVectorizer(ngram_range=(n, n))
    ngram_matrix = vectorizer.fit_transform(texts)
    ngram_counts = ngram_matrix.sum(axis=0).A1
    ngram_vocab = vectorizer.get_feature_names_out()

    # numpy.int64 -> int 변환
    return {ngram: int(count) for ngram, count in zip(ngram_vocab, ngram_counts)}


def extract_keyword_from_file(input_file, output_file, n=2):
    """
    문자열과 문자열 리스트가 섞인 데이터를 처리하여 결과를 저장합니다.

    Args:
        input_file (str): 입력 파일 경로.
        output_file (str): 출력 파일 경로.
        n (int): N-그램 크기.
    """
    # 파일 로드
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # 문자열 리스트로 변환 (리스트나 문자열 모두 처리)
    texts = []
    for item in data:
        if isinstance(item, list):  # 문자열 리스트인 경우 병합
            texts.append(" ".join(item))
        elif isinstance(item, str):  # 단일 문자열인 경우 그대로 추가
            texts.append(item)

    # N-그램 키워드 추출
    ngram_keywords = extract_ngrams(texts, n=n)

    # 결과 저장
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(ngram_keywords, f, ensure_ascii=False, indent=4)


if __name__ == "__main__":
    # 입력 및 출력 파일 경로
    input_file = "test_data/cleaned.json"  # 문자열과 문자열 리스트가 섞인 txt 파일
    output_file = "test_data/keyword.json"

    # 텍스트 파일 처리 (2-그램 추출)
    extract_keyword_from_file(input_file, output_file, n=2)
    print(f"처리된 데이터가 '{output_file}'에 저장되었습니다.")


처리된 데이터가 'test_data/keyword.json'에 저장되었습니다.


# Filter

In [21]:
import json
from collections import Counter



def load_stopwords(filepath):
    """
    불용어 리스트를 로드합니다.

    Args:
        filepath (str): 불용어 파일 경로.

    Returns:
        set: 불용어 집합.
    """
    with open(filepath, 'r', encoding='utf-8') as f:
        stopwords = {line.strip() for line in f}
    return stopwords


def split_filter_and_count_keywords(keywords, stopwords):
    """
    키워드를 단어로 나누고, 불용어를 제거하며 1번만 등장한 키워드를 필터링합니다.

    Args:
        keywords (dict): 키워드와 점수로 이루어진 딕셔너리.
        stopwords (set): 불용어 집합.

    Returns:
        dict: 불용어 제거 및 1번만 등장한 키워드 제거 후 결과.
    """
    filtered_keywords = {}

    for key, value in keywords.items():
        # 1. 키워드를 공백으로 분리하여 단어 리스트 생성
        words = key.split()

        # 2. 단어들 중 불용어가 포함되어 있는지 확인
        if any(word in stopwords for word in words):
            continue  # 불용어가 포함된 경우 제거
        
    
    # 단어 분리 및 출현 횟수 계산
    word_counts = Counter()
    for phrase, count in keywords.items():
    words = phrase.split()  # 띄어쓰기 기준으로 단어 분리
    word_counts.update({word: count for word in words})
       
    return filtered_keywords


def process_keywords(input_file, stopwords_file, output_file):
    """
    키워드 파일에서 불용어를 제거하고 1번 등장한 키워드를 제외한 결과를 저장합니다.

    Args:
        input_file (str): 입력 키워드 파일 경로.
        stopwords_file (str): 불용어 파일 경로.
        output_file (str): 결과 저장 경로.
    """
    # 불용어 로드
    stopwords = load_stopwords(stopwords_file)

    # 키워드 파일 로드
    with open(input_file, 'r', encoding='utf-8') as f:
        keywords = json.load(f)

    # 불용어 제거 및 1번 등장한 키워드 필터링
    filtered_keywords = split_filter_and_count_keywords(keywords, stopwords)

    # 결과 저장
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(filtered_keywords, f, ensure_ascii=False, indent=4)

    print(f"불용어 제거 및 1번 등장 키워드 제외 후 결과가 '{output_file}'에 저장되었습니다.")


# 실행
if __name__ == "__main__":
    # 파일 경로 설정
    input_file = "test_data/keyword.json"  # 키워드 추출 결과 파일
    stopwords_file = "test_data/stopwords-ko.txt"  # 불용어 리스트 파일
    output_file = "test_data/filtered_keywords.json"  # 결과 저장 파일

    # 키워드 필터링 수행
    process_keywords(input_file, stopwords_file, output_file)


불용어 제거 및 1번 등장 키워드 제외 후 결과가 'test_data/filtered_keywords.json'에 저장되었습니다.
