# 라이브러리 불러오기

In [None]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns
import re

# 한글 폰트 설정
import matplotlib.font_manager as fm

font_path = 'C:/Windows/Fonts/malgun.ttf'
font_name = fm.FontProperties(fname=font_path).get_name()
plt.rc('font', family=font_name)


# 경고 문구 미표
import warnings
warnings.filterwarnings('ignore')

# YouTubeSearchTool 
- 유튜브 검색해서 URL 목록을 제공하는 도구
- 검색어: 키워드,개수

In [None]:
from langchain_community.tools import YouTubeSearchTool

tool = YouTubeSearchTool()

In [None]:
results = tool.run("코스트코 상품 리뷰,10")

In [None]:
# 문자열을 파이썬 리스트로 변환
import json
url_list = json.loads(results.replace("'", '"'))

# 결과 출력
print(url_list)

In [None]:
len(url_list)

In [None]:
url_list[0]

In [None]:
# 함수로 정의
def search_youtube(keyword, max_results=20):
    """
    유튜브에서 키워드 검색 결과를 가져오는 함수
    Args:
        keyword: 검색할 키워드
        max_results: 가져올 최대 결과 수
    Returns:
        url_list: 검색 결과의 URL 리스트
    """
    tool = YouTubeSearchTool()
    results = tool.run(f"{keyword},{max_results}")
    url_list = json.loads(results.replace("'", '"'))
    return url_list


# 함수 실행
url_list = search_youtube("코스트코 상품 리뷰")
len(url_list)

In [None]:
url_list2 = search_youtube("코스트코 상품 리뷰")
len(url_list2)

In [None]:
len(set(url_list) - set(url_list2))

# YoutubeLoader
- url을 입력하면 유튜브 자막을 추출하여 문서 객체로 변환
- add_video_info: 비디오 정보를 추가

In [None]:
from langchain_community.document_loaders import YoutubeLoader

# 자막을 포함한 동영상 정보를 가져오기
loader = YoutubeLoader.from_youtube_url(
    url_list[0], 
    add_video_info=True,
    language=["ko", "en"],
    translation="ko",
)

docs = loader.load()
print(len(docs))
print(docs[0])
print(docs[0].metadata)

In [None]:
# 함수를 정의
def get_youtube_video_transcript(url):
    """
    YouTube 동영상의 자막을 가져오는 함수
    Args:
        url (str): YouTube 동영상 URL
    Returns:
        Document: 동영상 자막 정보 객체
    """
    loader = YoutubeLoader.from_youtube_url(
        url,
        add_video_info=True,
        language=["ko", "en"],
        translation="ko",
    )

    docs = loader.load()
    return docs

# 함수 실행
docs = get_youtube_video_transcript(url_list[0])
print(len(docs))
print(docs[0])

# create_extraction_chain 활용하여 요약, 추출

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

In [None]:
docs[0]

In [None]:
from langchain.chains import create_extraction_chain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 스키마 정의
schema = {
    "properties": {
        "상품_이름": {"type": "string"},
        "상품_설명": {"type": "string"},
        "상품_가격": {"type": "string"},
        "상품_평가": {"type": "string"},
    },
    "required": ["상품_이름", "상품_설명", "상품_가격", "상품_평가"],
}


# Chain 생성
prompt_template = """Extract and save the relevant entities mentioned \
in the following passage together with their properties. 
Only extract the properties mentioned in the 'information_extraction' function. \
When no suitable data is present, show the default value 'N/A'.

Passage:
{input}
""" 

prompt = ChatPromptTemplate.from_template(prompt_template)
llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo-0125", api_key=OPENAI_API_KEY)
chain = create_extraction_chain(schema=schema, llm=llm, prompt=prompt)

# Chain 실행
response = chain.invoke(docs[0])

# 결과 확인
response

In [None]:
# 결과를 판다스 데이터프레임으로 변환
df = pd.DataFrame(response['text'])
df.head()

In [None]:
# 전체 동영상에 대해 자막을 추출하고 엔티티를 추출
def extract_entities_from_youtube_videos(url_list):
    """
    YouTube 동영상의 자막을 가져와 엔티티를 추출하는 함수
    Args:
        url_list (list): YouTube 동영상 URL 리스트
    Returns:
        pd.DataFrame: 엔티티 추출 결과
    """
    # 스키마 정의
    schema = {
        "properties": {
            "상품_이름": {"type": "string"},
            "상품_설명": {"type": "string"},
            "상품_가격": {"type": "string"},
            "상품_평가": {"type": "string"},
        },
        "required": ["상품_이름", "상품_설명", "상품_가격", "상품_평가"],
    }

    # Chain 생성
    prompt_template = """Extract and save the relevant entities mentioned \
    in the following passage together with their properties. 
    Only extract the properties mentioned in the 'information_extraction' function. \
    When no suitable data is present, show the default value 'N/A'.

    Passage:
    {input}
    """ 

    prompt = ChatPromptTemplate.from_template(prompt_template)
    llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo-0125", api_key=OPENAI_API_KEY)
    chain = create_extraction_chain(schema=schema, llm=llm, prompt=prompt)

    # 결과를 저장할 데이터프레임 생성
    df = pd.DataFrame()

    for url in url_list:
        # 동영상 자막 가져오기
        docs = get_youtube_video_transcript(url)

        # Chain 실행
        response = chain.invoke(docs[0])

        # 결과를 데이터프레임으로 변환
        df = pd.concat([df, pd.DataFrame(response['text'])])

    return df

In [None]:
df = extract_entities_from_youtube_videos(url_list[:2])
df

# 유튜브 동영상 댓글 분석
삼성 SDS 채널 구독 이벤트   
https://www.youtube.com/watch?v=j_EciRV3V1k&t=15s

## 댓글 데이터 크롤링으로 수집

In [None]:
# 삼성 SDS 채널 구독 이벤트 

url = "https://www.youtube.com/watch?v=j_EciRV3V1k&t=15s"

In [None]:
# Seleium 드라이버 생성
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By

from bs4 import BeautifulSoup
import time
import random

# Chrome 옵션 설정
options = webdriver.ChromeOptions()
# options.add_argument('--headless')  

# 드라이버 서비스 생성
service = Service(ChromeDriverManager().install())

# 웹 드라이버 초기화
driver = webdriver.Chrome(service=service, options=options)

# 윈도우 크기 설정
driver.set_window_size(800, 600)

# URL 접속하여 실행
driver.implicitly_wait(5)
driver.get(url)

# 팝업의 '아니요' 버튼을 클릭하여 닫기
try:
    dismiss_button = driver.find_element(By.CSS_SELECTOR, "#dismiss-button button")
    dismiss_button.click()
except:
    pass 

In [None]:
# 현재 페이지 높이
current_page_height = driver.execute_script("return document.documentElement.scrollHeight") 
print(current_page_height)


# 스크롤 다운
while current_page_height < 100000: 
    
    driver.execute_script(f"window.scrollTo({current_page_height}, {current_page_height + 10000});") 
    time.sleep(random.randint(1, 4))
    body = driver.find_element(By.CSS_SELECTOR, 'body')
    body.send_keys(Keys.PAGE_DOWN)
    time.sleep(random.randint(1, 4))
    
    scrolled_page_height = driver.execute_script("return document.documentElement.scrollHeight")  
    if scrolled_page_height == current_page_height: 
        break
    current_page_height = driver.execute_script("return document.documentElement.scrollHeight")     


In [None]:
# HTML 소스 파싱 
html = driver.page_source
soup = BeautifulSoup(html, 'html.parser')
soup.find(name="div", attrs={"id":"contents"})

# 댓글 요소를 찾기
reviews = soup.find_all(name='ytd-comment-thread-renderer', attrs={'class':'style-scope ytd-item-section-renderer'})

print(len(reviews))
print(reviews[2])

In [None]:
### 사용자 이름
#author-text > span
reviews[2].select("#author-text > span")[0].text.strip()

In [None]:
### 댓글 텍스트
#content-text
reviews[2].select("#content-text")[0].text

In [None]:
### 날짜
#published-time-text
reviews[2].select("#published-time-text")[0].text.strip()

In [None]:
### 리뷰 정보(이름, 댓글, 날짜)를 추출하는 함수 정의
def get_review_info(review):
    """
    댓글 요소에서 사용자 이름, 댓글, 날짜를 추출하는 함수
    Args:
        review: 댓글 요소
    Returns:
        dict: 사용자 이름, 댓글, 날짜 정보를 담은 딕셔너리
    """
    author = review.select("#author-text > span")[0].text.strip()
    comment = review.select("#content-text")[0].text
    date = review.select("#published-time-text")[0].text.strip()
    return {"author": author, "comment": comment, "date": date}


# 리뷰 정보 추출
review_info = get_review_info(reviews[2])
print(review_info)

In [None]:
from datetime import datetime
from dateutil.relativedelta import relativedelta

def convert_time_strings(time_str, current_time=None):
    """
    시간 문자열을 datetime 객체로 변환하는 함수
    Args:
        time_str: 시간 문자열
        current_time: 기준 시간
    Returns:
        datetime: 변환된 datetime 객체
    """
    if current_time is None:
        current_time = datetime.now()

    time_str = time_str.replace(" ", "").strip()

    if "분전" in time_str:
        minutes = time_str.replace("분전", "")
        minutes = re.sub("[^0-9]", "", minutes)
        minutes = int(minutes)
        return (current_time - relativedelta(minutes=minutes)).strftime("%Y-%m-%d %H:%M:%S")
        
    elif "시간전" in time_str:
        hours = time_str.replace("시간전", "")
        hours = re.sub("[^0-9]", "", hours)
        hours = int(hours)
        return (current_time - relativedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
    
    elif "일전" in time_str:
        days = time_str.replace("일전", "")
        days = re.sub("[^0-9]", "", days)
        days = int(days)
        return (current_time - relativedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
    elif "주전" in time_str:
        weeks = time_str.replace("주전", "")
        weeks = re.sub("[^0-9]", "", weeks)
        weeks = int(weeks)
        return (current_time - relativedelta(weeks=weeks)).strftime("%Y-%m-%d %H:%M:%S")
    elif "개월전" in time_str:
        months = time_str.replace("개월전", "")
        months = re.sub("[^0-9]", "", months)
        months = int(months)
        return (current_time - relativedelta(months=months)).strftime("%Y-%m-%d %H:%M:%S")
    elif "년전" in time_str:
        years = time_str.replace("년전", "")
        years = re.sub("[^0-9]", "", years)
        years = int(years)
        return (current_time - relativedelta(years=years)).strftime("%Y-%m-%d %H:%M:%S")
    else:
        try:
            time_str = (time_str).strftime("%Y-%m-%d %H:%M:%S")
        except:
            time_str = np.nan
        return time_str
    
# 시간 문자열 변환
time_str = "1시간전"
converted_time = convert_time_strings(time_str)
print(converted_time)

In [None]:
### 리뷰 정보(이름, 댓글, 좋아요, 날짜)를 추출하는 함수 정의
def get_review_info(review):
    """
    댓글 요소에서 사용자 이름, 댓글, 날짜를 추출하는 함수
    Args:
        review: 댓글 요소
    Returns:
        dict: 사용자 이름, 댓글, 날짜 정보를 담은 딕셔너리
    """
    author = review.select("#author-text > span")[0].text.strip()
    comment = review.select("#content-text")[0].text
    date = review.select("#published-time-text")[0].text.strip()
    try:
        date = convert_time_strings(date)
    except:
        pass
    return {"author": author, "comment": comment, "date": date}


# 리뷰 정보 추출
review_info = get_review_info(reviews[2])
print(review_info)

In [None]:
### 한 페이지의 모든 리뷰를 반복문으로 추출하는 함수 정의

def get_youtube_reviews(url):
    """
    한 페이지의 모든 리뷰를 추출하는 함수
    Args:
        url: YouTube 동영상 URL
    Returns:
        list: 리뷰 정보 딕셔너리를 담은 리스트
    """
    # driver 설정
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')  
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)
    driver.set_window_size(800, 600)
    driver.get(url)
    driver.implicitly_wait(5)
    time.sleep(5)

    # 팝업 닫기
    try:
        dismiss_button = driver.find_element(By.CSS_SELECTOR, "#dismiss-button button")
        dismiss_button.click()
    except:
        pass

    driver.implicitly_wait(5)
    time.sleep(5)
   
    current_page_height = driver.execute_script("return document.documentElement.scrollHeight") 
    while current_page_height < 100000: 
        driver.execute_script(f"window.scrollTo({current_page_height}, {current_page_height + 10000});") 
        time.sleep(random.randint(1, 4))
        body = driver.find_element(By.CSS_SELECTOR, 'body')
        body.send_keys(Keys.PAGE_DOWN)
        time.sleep(random.randint(1, 4))
        scrolled_page_height = driver.execute_script("return document.documentElement.scrollHeight")  
        if scrolled_page_height == current_page_height: 
            break
        current_page_height = driver.execute_script("return document.documentElement.scrollHeight")  

    # HTML 추출하고 driver 닫기
    driver.implicitly_wait(5)
    html = driver.page_source
    driver.close()
    
    # HTML 파싱
    soup = BeautifulSoup(html, 'html.parser')
    reviews = soup.find_all(name='ytd-comment-thread-renderer', attrs={'class':'style-scope ytd-item-section-renderer'})
    review_list = []
    for review in reviews:
        review_info = get_review_info(review)
        review_list.append(review_info)
    return review_list


# 함수 실행
reviews = get_youtube_reviews(url)
print(len(reviews))
print(reviews[0])

In [None]:
# 판다스 데이터프레임으로 변환
df = pd.DataFrame(reviews)
df.head()

In [None]:
# 수집한 데이터를 저장
df.to_csv("youtube_reviews.csv", index=False)

In [None]:
# 저장해둔 데이터를 불러오는 코드
df = pd.read_csv("youtube_reviews.csv")

In [None]:
pd.to_datetime(df['date']).describe()

## 시간에 따른 리뷰 수의 변화를 그래프로 시각화

In [None]:
# 날짜 데이터를 datetime 객체로 변환
df['date'] = pd.to_datetime(df['date'])

# 날짜별로 데이터를 그룹화하고 리뷰 수를 계산
review_counts = df.groupby(df['date'].dt.date).size()

# 그래프로 시각화
plt.figure(figsize=(10, 4))
review_counts.plot(kind='line', marker='o', linestyle='-')
plt.title('Daily Review Counts Over Time')
plt.xlabel('Date')
plt.ylabel('Number of Reviews')
plt.grid(True)
plt.show()


## Huggingface Pipleline 으로 감성 분석

In [None]:
from transformers import pipeline
sentiment_pipeline = pipeline(model="dudcjs2779/sentiment-analysis-with-klue-bert-base")
data = ["너무 좋아요", "조금 아쉬웠어요", "좋은지 나쁜지 모르겠어요", "최악이에요", "최고에요"]
sentiment_pipeline(data)

In [None]:
df['comment'].str.len().describe()

In [None]:
df[df['comment'].str.len() < 10]

In [None]:
# 리뷰에 대한 감정 분석 결과를 긍정, 부정으로 예측 
def analyze_sentiment_classification(review):
    """
    리뷰에 대한 감정 분석 결과를 중립(LABEL_0), 긍정(LABEL_1), 부정(LABEL_2)로 분류
    Args:
        review: 리뷰 텍스트
    Returns:
        float: 감정 분석 결과
    """
    sentiment_pipeline = pipeline(model="dudcjs2779/sentiment-analysis-with-klue-bert-base")
    sentiment = sentiment_pipeline([str(review)[:100]])[0]
    
    return sentiment['label']

# 함수 실행
sentiment_labels = analyze_sentiment_classification(data[0])
sentiment_labels

In [None]:
data[0]

In [None]:
df['comment'].head()

In [None]:
# HTML 태그 제거
df['comment'] = df['comment'].str.replace('[^A-Za-z0-9가-힣ㄱ-ㅎ\ ]', '', regex=True)
df['comment'].head()

In [None]:
# 리뷰에 대한 감정 분석 결과를 데이터프레임에 추가
df['sentiment_class'] = df['comment'].apply(analyze_sentiment_classification)
df.head()

In [None]:
# df.to_csv("youtube_reviews_classification.csv", index=False)
df = pd.read_csv("youtube_reviews_classification.csv")
df.head()

In [None]:
# 감성 분석 결과를 막대 그래프로 비교
plt.figure(figsize=(10, 3))
sns.countplot(x='sentiment_class', data=df)
plt.show()

In [None]:
# 부정 감성을 갖고 있는 데이터를 확인 
df[df['sentiment_class']=="LABEL_2"]

In [None]:
### 시간의 변화에 따른 감성 분석 변화

# 'date' 열을 datetime 형태로 변환
df['date'] = pd.to_datetime(df['date'])

# 월별로 감성 클래스의 빈도 계산
df['month'] = df['date'].dt.month  
grouped_data = df.groupby(['month', 'sentiment_class']).size().reset_index(name='counts')

# Pivot Table 생성
pivot_data = grouped_data.pivot(index='month', columns='sentiment_class', values='counts').fillna(0)

# 시각화
pivot_data.plot(kind='bar', stacked=True, figsize=(10, 5))
plt.title('월별 감성 분석 결과')
plt.xlabel('날짜')
plt.ylabel('리뷰 수')
plt.show()

## 랭체인을 활용한 LLM 감성분석 (분류)

In [None]:
from langchain_core.prompts.few_shot import FewShotPromptTemplate
from langchain_core.prompts.prompt import PromptTemplate

# 리뷰 예시와 해당 리뷰의 감성(긍정, 부정, 중립)을 정의
examples = [
    {
        "review": "매우 유익했습니다!",
        "sentiment": "긍정",
    },
    {
        "review": "매우 실망스러웠어요.",
        "sentiment": "부정",
    },
    {
        "review": "만족스러웠습니다.",
        "sentiment": "긍정",
    },
    {
        "review": "사용이 간편해서 좋았습니다.",
        "sentiment": "긍정",
    },
    {
        "review": "내용이 조금 지루했지만, 정보는 유용했습니다.",
        "sentiment": "중립",
    },
    {
        "review": "예상보다 참석자가 많아서 질문을 충분히 하지 못했습니다.",
        "sentiment": "부정",
    },
]


# Few-shot 학습을 위한 템플릿 정의
example_prompt = PromptTemplate(
    input_variables=["review", "sentiment"], template="review: {review}\n{sentiment}"
)

print(example_prompt.format(**examples[0]))

In [None]:
prompt = FewShotPromptTemplate(
    examples=examples,
    example_prompt=example_prompt,
    suffix="review: {input}",
    input_variables=["input"],
)

# 새 리뷰에 대한 감성 분석 예제 생성
new_review = "이벤트 내용은 좋은데 기간이 너무 짧았어요."
print(prompt.format(input=new_review))

In [None]:
# LLM
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0125",
                 api_key=OPENAI_API_KEY)

chain = prompt | llm

response = chain.invoke(new_review)

response

In [None]:
sentiment = response.content
sentiment

In [None]:
# 5개의 샘플 데이터
test_df = df.head()
test_df

In [None]:
# 모든 리뷰에 대해서 감성 분석을 적용해서 새로운 열에 추가하는 함수
def analyze_sentiment_llm(review):
    response = chain.invoke(review)
    return response.content

test_df['sentiment_llm'] = test_df['comment'].apply(analyze_sentiment_llm)
test_df

## 랭체인을 활용한 LLM 감성분석 (평점 예측)

In [None]:
from langchain_core.prompts.prompt import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser


llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0125",
                 api_key=OPENAI_API_KEY)


template = """
Classify the given sentiment of the given text into positive or negative classes and provide a relevant score.
The sentiment score should be between 0.0 and 1.0, where 0.0 indicates a negative sentiment and 1.0 indicates a positive sentiment.

Input: {input}

Output: sentiment_label (Score: sentiment_score)
"""

prompt = PromptTemplate.from_template(template)
output_parser = StrOutputParser()

# LCEL chaining
chain = prompt | llm | output_parser

# chain 호출
response = chain.invoke({"input": "최고예요"})

# 결과 확인
response

In [None]:
re.search(r"\d+\.\d+", response).group()

In [None]:
# 감정 분석 결과를 반환하는 함수 정의
def analyze_sentiment_score_llm(review):
    """
    리뷰에 대한 감정 분석 결과를 0~1 사이의 값으로 변환하는 함수
    Args:
        review: 리뷰 텍스트
    Returns:
        float: 감정 분석 결과
    """
    template = """
    Classify the given sentiment of the given text into positive or negative classes and provide a relevant score.
    The sentiment score should be between 0.0 and 1.0, where 0.0 indicates a negative sentiment and 1.0 indicates a positive sentiment.

    Input: {input}

    Output: sentiment_label (Score: sentiment_score)
    """

    prompt = PromptTemplate.from_template(template)
    output_parser = StrOutputParser()

    llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0125",
                     api_key=OPENAI_API_KEY)

    chain = prompt | llm | output_parser

    response = chain.invoke({"input": review})
    
    sentiment_score = re.search(r"\d+\.\d+", response).group()
    
    return sentiment_score


# 함수 실행
sentiment_score = analyze_sentiment_score_llm("최고예요")
sentiment_score

In [None]:
# 함수 실행
sentiment_score = analyze_sentiment_score_llm("별로예요")
sentiment_score

In [None]:
# 함수 실행
sentiment_score = analyze_sentiment_score_llm("보통 수준이에요")
sentiment_score

In [None]:
test_df['sentiment_llm_socre'] = test_df['comment'].apply(analyze_sentiment_score_llm)
test_df