In [None]:
import time
import csv
import pandas as pd
import re
import requests
import json
import ast
import matplotlib.pyplot as plt

from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.action_chains import ActionChains
from tqdm import tqdm
from konlpy.tag import Komoran
from wordcloud import WordCloud

### 크롤링

In [None]:
# 모든 댓글을 수집하기 위해 댓글이 모두 나올때까지 스크롤을 내려주는 함수
def scroll_down_comment(driver):
    last_height = driver.execute_script("return document.documentElement.scrollHeight")
    
    # 화면이 스크롤을 내려도 똑같을 때까지 계속해서 내림
    while True:
        driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
        time.sleep(2)
        new_height = driver.execute_script("return document.documentElement.scrollHeight")
        if new_height == last_height:
            break
        last_height = new_height

# 영상 링크 수집을 위한 스크롤 내리는 함수
def scroll_down(driver):
    last_height = driver.execute_script("return document.documentElement.scrollHeight")
    
    # 유튜브 영상은 계속해서 나오므로 10번만 스크롤을 내려 나오는 영상 링크만 수집하도록 함
    for _ in range(10):
        driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
        time.sleep(2)
        new_height = driver.execute_script("return document.documentElement.scrollHeight")
        
        # 만약 그 전에 스크롤이 멈추게 된다면 반복을 중지
        if new_height == last_height:
            break
        last_height = new_height
        
# 유튜브 영상에 존재하는 '더보기' 버튼을 클릭하여 조회수와 업로드 날짜 등을 확인하기 위한 함수
def click_more_button(driver):
    try:
        more_button = driver.find_element(By.XPATH, '//tp-yt-paper-button[@class="button style-scope ytd-text-inline-expander" and @id="expand"]')
        if more_button:
            more_button.click()
            time.sleep(1)
    except NoSuchElementException:
        print("더보기 버튼을 찾을 수 없습니다.")
        
# int로 변환이 가능한지 아닌지를 판단하여 True or False를 return하는 함수
def is_int_convertible(value):
    try:
        int(value)
        return True
    except ValueError:
        return False
    
# 영상의 채널명, 구독자 수, 조회수, 좋아요 수, 댓글 개수를 추출하는 함수        
def get_video_details(soup):
    channel_name_element = soup.find("a", class_="yt-simple-endpoint style-scope yt-formatted-string")
    channel_name = [i.text for i in channel_name_element][0] if channel_name_element else "Unknown Channel Name"

    subscriber_count_element = soup.find("yt-formatted-string", class_="style-scope ytd-video-owner-renderer")
    subscriber_count = subscriber_count_element.text.strip() if subscriber_count_element else "Unknown Subscriber Count"

    view_count_element = soup.find_all("span", class_="style-scope yt-formatted-string bold")
    view_count = [element.text.strip() for element in view_count_element][0] if view_count_element else "Unknown View Count"
    
    like_count_elements = soup.find_all("span", class_="yt-core-attributed-string yt-core-attributed-string--white-space-no-wrap")
    like_count = [i.text for i in like_count_elements if i.text[-1] in ['천','만'] or is_int_convertible(i.text)][0]

    comment_count_element = soup.find("yt-formatted-string", class_="count-text style-scope ytd-comments-header-renderer")
    comment_count = comment_count_element.text.strip() if comment_count_element else "Unknown Comment Count"

    return channel_name, subscriber_count, view_count, like_count, comment_count
        
# 영상의 제목, 업로드 날짜, 채널명, 구독자 수, 조회수, 좋아요 수, 댓글 개수, 댓글을 추출하는 함수 
def get_comments(driver, video_link):
    driver.get(video_link) # 영상 링크에 접속
    time.sleep(5) # 화면이 나올 때까지 대기
    
    click_more_button(driver) # 더보기 버튼 클릭
    
    scroll_down(driver) # 모든 댓글이 나오도록 스크롤을 끝까지 내리기

    soup = BeautifulSoup(driver.page_source, "html.parser")
    title_element = soup.find("h1", class_="title style-scope ytd-video-primary-info-renderer")
    title = title_element.text.strip() if title_element else "Unknown Title"
    comments = soup.find_all("yt-formatted-string", class_="style-scope ytd-comment-renderer")
    
    upload_date_elements = soup.find_all("span", class_="style-scope yt-formatted-string bold")
    upload_date = [element.text.strip() for element in upload_date_elements][2] if upload_date_elements else "Unknown Upload Date"
    
    channel_name, subscriber_count, view_count, like_count, comment_count = get_video_details(soup) # 앞서 정의한 함수를 통해 정보 추출

    return title, upload_date, channel_name, subscriber_count, view_count, like_count, comment_count, comments

In [None]:
# 드라이버 path 설정
driver_path = "C:\\Users\\rltjq\\chromedriver.exe"

# 검색어 지정
search_query = "갤럭시 리뷰"

# 드라이버 생성
driver = webdriver.Chrome(driver_path)

# 유튜브 링크 접속
driver.get("https://www.youtube.com")

# 화면이 나올 때까지 대기
time.sleep(5)

# 검색창에 지정한 검색어 입력하고 검색
search_box = driver.find_element(By.XPATH, "//input[@id='search']")
search_box.send_keys(search_query)
search_box.submit()

# 화면이 나올 때까지 대기
time.sleep(5)

# 스크롤을 내려 수집할 영상 생성
scroll_down(driver)

# beautifulsoup으로 html을 불러옴
soup = BeautifulSoup(driver.page_source, "html.parser")

# html 파일 내에 존재하는 모든 영상 링크를 수집
video_links = set()
for link in soup.find_all("a", id="thumbnail"):
    href = link.get("href")
    
    # short 영상은 다른 영상과 포맷이 달라 short 영상은 제거
    if href is not None and 'shorts' not in href:
        video_links.add("https://www.youtube.com" + href)

# 수집한 정보를 저장하기 위한 csv 파일 생성 후, 영상마다 정보 수집
with open("galaxy_review_comment.csv", "w", encoding="utf-8", newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow(["Video Title", "Upload_date", "Channel_name", "Subscriber_count", "View_count", 'Like_count', 'Comment_count',
                         "Comment"])

    for video_link in video_links:
        title, upload_date, channel_name, subscriber_count, view_count, like_count, comment_count, comments = get_comments(driver, video_link)
        for comment in comments:
            csv_writer.writerow([title, upload_date, channel_name, subscriber_count, view_count, like_count, comment_count, comment.text.strip()])
        print(f"Comments from '{title}' have been saved")

print("All comments have been saved to 'comments.csv'")

driver.quit()

In [None]:
# 갤럭시 리뷰와 모두 동일, 검색어만 변경

driver_path = "C:\\Users\\rltjq\\chromedriver.exe"

search_query = "애플 리뷰"

driver = webdriver.Chrome(driver_path)

driver.get("https://www.youtube.com")

time.sleep(5)

search_box = driver.find_element(By.XPATH, "//input[@id='search']")
search_box.send_keys(search_query)
search_box.submit()

time.sleep(5)

scroll_down(driver)

soup = BeautifulSoup(driver.page_source, "html.parser")

video_links = set()
for link in soup.find_all("a", id="thumbnail"):
    href = link.get("href")
    if href is not None and 'shorts' not in href:
        video_links.add("https://www.youtube.com" + href)

with open("apple_review_comment.csv", "w", encoding="utf-8", newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow(["Video Title", "Upload_date", "Channel_name", "Subscriber_count", "View_count", 'Like_count', 'Comment_count',
                         "Comment"])

    for video_link in video_links:
        title, upload_date, channel_name, subscriber_count, view_count, like_count, comment_count, comments = get_comments(driver, video_link)
        for comment in comments:
            csv_writer.writerow([title, upload_date, channel_name, subscriber_count, view_count, like_count, comment_count, comment.text.strip()])
        print(f"Comments from '{title}' have been saved")

print("All comments have been saved to 'comments.csv'")

driver.quit()

### 데이터 전처리(갤럭시)

In [None]:
galaxy = pd.read_csv('galaxy_review_comment.csv')

In [None]:
# 영어로만 이루어진 댓글 삭제 + 결측치 삭제
pattern = r'^[a-zA-Z\s\W\d]*$'

galaxy['Comment'] = galaxy['Comment'].apply(lambda x: x if type(x) != str or not re.match(pattern, x) else pd.NA)

galaxy = galaxy.dropna(subset=['Comment'])

In [None]:
# 가디언즈 오브 갤럭시 영화와 관련된 리뷰 영상 삭제
keywords = ['가디언즈', '가오갤', '마블', '그루트']

for keyword in keywords:
    galaxy = galaxy[~galaxy['Video Title'].str.contains(keyword, na=False)]
    
galaxy = pd.DataFrame(galaxy.values, columns=galaxy.columns)

In [None]:
# 영상의 시간을 표시한 부분은 모두 삭제
def remove_time(text):
    return re.sub(r'\b(\d{1,2}):(\d{2})\b', '', text)

galaxy['Comment'] = galaxy['Comment'].apply(remove_time)

### 데이터 전처리(애플)

In [None]:
apple = pd.read_csv('apple_review_comment.csv')

In [None]:
# 영어로만 이루어진 댓글 삭제 + 결측치 삭제
pattern = r'^[a-zA-Z\s\W\d]*$'

apple['Comment'] = apple['Comment'].apply(lambda x: x if type(x) != str or not re.match(pattern, x) else pd.NA)

apple = apple.dropna(subset=['Comment'])

apple = pd.DataFrame(apple.values, columns=apple.columns)

In [None]:
# 애플 TV와 연관되어 있는 영화나 드라마에 대한 리뷰 삭제
def video_title_filter(text):
    if '애플' not in text or 'TV' in text:
        if '아이폰' in text:
            return None
        elif '맥' in text:
            return None
        elif '워치' in text:
            return None
        
        filter_list.append(text)
        return None
    
filter_list = []
apple['Video Title'].apply(video_title_filter)

In [None]:
drop_idx = []
for i in range(len(apple)):
    if apple['Video Title'][i] in filter_list:
        drop_idx.append(i)

apple.drop(drop_idx, axis=0, inplace=True)

In [None]:
# 영상의 시간을 표시한 부분은 모두 삭제
apple['Comment'] = apple['Comment'].apply(remove_time)

### 맞춤법 검사 및 띄어쓰기 처리

In [None]:
def text_replace(text, error, candWord):
    return text.replace(error, candWord)

def spell_correct(text):
    # 1. 텍스트 준비 & 개행문자 처리
    text = text.replace('\n', '\r\n')
    # 2. 맞춤법 검사 요청 (requests)
    response = requests.post('http://164.125.7.61/speller/results', data={'text1': text})
    # 3. 응답에서 필요한 내용 추출 (html 파싱)
    data = response.text.split('data = [', 1)[-1].rsplit('];', 1)[0]
    
    try: # 맞춤법이 틀린 것이 존재하지 않을 수 있어 try-except 문으로 해결
        data = json.loads(data)
    
        for err in data['errInfo']:
            text = text_replace(text, err['orgStr'], err['candWord'].split('|')[0])
        
        text = text.replace('\r\n', '\n')
        
    except:
        text = text.replace('\r\n', '\n')
        
    return text

In [None]:
tqdm.pandas()

galaxy['Comment'] = galaxy['Comment'].progress_apply(spell_correct)
apple['Comment'] = apple['Comment'].progress_apply(spell_correct)

### 데이터 토큰화

In [None]:
tqdm.pandas()
komoran = Komoran()

def tokenizer(text):
    try: # 이모티콘과 같이 토큰화가 불가능한 문장은 건너뛰고 수행
        return komoran.pos(text)
    except:
        return pd.NA
    
galaxy['token'] = galaxy['Comment'].progress_apply(tokenizer)
apple['token'] = apple['Comment'].progress_apply(tokenizer)

galaxy = galaxy.dropna(subset=['token'])
apple = apple.dropna(subset=['token'])

galaxy = pd.DataFrame(galaxy.values, columns=galaxy.columns)
apple = pd.DataFrame(apple.values, columns=apple.columns)

galaxy.to_csv('galaxy_review_token.csv', index=False)
apple.to_csv('apple_review_token.csv', index=False)

### 워드클라우드 생성(갤럭시)

In [None]:
import pandas as pd
galaxy = pd.read_csv('galaxy_review_token.csv')

In [None]:
from tqdm import tqdm
import ast

tqdm.pandas()
galaxy_word_dict = {} # 갤럭시 단어 사전 정의

stopwords = pd.read_csv('koreanStopwords.txt')
stopwords = list(stopwords['0']) # stopwords 불러와서 저장

del_tag = ['JKS','JKC','JKG','JKO','JKB','JKV',
'JKQ','JX','JC','EP','EF','EC','ETN','ETM',
'XPN','XSN','XSV','XSA','XR','SF','SP','SS',
'SE','SO','SH','SW','NF','NV','SN','NA'] # 사용하지 않을 품사 지정


def create_dict(lst):
    str_list = lst
    list_obj = ast.literal_eval(str_list) # 기존에 str 형태로 저장되어 있던 것을 list 형태로 변환
    
    for i in list_obj:
        if i[1] not in del_tag and i[0] not in stopwords: # 기존 사전에 존재하는 단어는 +1을, 없던 단어는 1로 생성
            try:
                galaxy_word_dict[i[0]] += 1
            except:
                galaxy_word_dict[i[0]] = 1

galaxy['token'].progress_apply(create_dict)

In [None]:
import matplotlib.pyplot as plt
from wordcloud import WordCloud

font_path = 'C:\\Users\\rltjq\\anaconda3\\envs\\text\\Lib\\site-packages\\matplotlib\\mpl-data\\fonts\\ttf\\NanumGothic.ttf'

freq = galaxy_word_dict
cloud = WordCloud(font_path = font_path,
                  background_color='white',
                  width=800, height=800)

galaxy_cloud = cloud.generate_from_frequencies(freq)

arr = galaxy_cloud.to_array()

fig = plt.figure(figsize=(10, 10))
plt.imshow(arr)
plt.savefig('galaxy_wordcloud.png')
plt.show()

### 워드클라우드 생성(애플)

In [None]:
apple = pd.read_csv('apple_review_token.csv')

In [None]:
tqdm.pandas()
apple_word_dict = {}

stopwords = pd.read_csv('koreanStopwords.txt')
stopwords = list(stopwords['0'])

del_tag = ['JKS','JKC','JKG','JKO','JKB','JKV',
'JKQ','JX','JC','EP','EF','EC','ETN','ETM',
'XPN','XSN','XSV','XSA','XR','SF','SP','SS',
'SE','SO','SH','SW','NF','NV','SN','NA']


def create_dict(lst):
    str_list = lst
    list_obj = ast.literal_eval(str_list)
    
    for i in list_obj:
        if i[1] not in del_tag and i[0] not in stopwords:
            try:
                apple_word_dict[i[0]] += 1
            except:
                apple_word_dict[i[0]] = 1
                
apple['token'].progress_apply(create_dict)

In [None]:
freq = apple_word_dict
cloud = WordCloud(font_path = font_path,
                  background_color='white',
                  width=800, height=800)

apple_cloud = cloud.generate_from_frequencies(freq)

arr = apple_cloud.to_array()

fig = plt.figure(figsize=(10, 10))
plt.imshow(arr)
plt.savefig('apple_wordcloud.png')
plt.show()

### text classification 데이터 전처리

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

galaxy['label'] = 0
apple['label'] = 1
all_review = pd.concat([galaxy, apple])

all_review = all_review.loc[:, ['token','label']]
all_review = all_review.dropna()
train, test = train_test_split(all_review, test_size=0.2, random_state=42)

In [None]:
from torchtext.legacy import data
import torch

# 입력 받은 텍스트를 단어 별로 분리하고, 첫 번째 요소(vocab)만 반환
def tokenize(text):
    return [i[0] for i in text]

# Field 객체를 정의
# TEXT : sequential=True로 설정하여 시퀀스 데이터임을 명시
#        tokenize 함수는 위에서 정의한 tokenize 함수로 설정
#        batch_first=True로 설정하여 배치 차원을 맨 앞으로 가져옴
#        use_vocab=True로 설정하여 어휘 사전을 사용
# LABEL : sequential=False로 설정하여 시퀀스 데이터가 아님을 명시
#         use_vocab=False로 설정하여 어휘 사전을 사용하지 않음
#         preprocessing 인자를 float로 설정하여 레이블 데이터를 실수로 변환
#         dtype을 torch.float로 설정하여 텐서의 데이터 타입을 실수로 설정
TEXT = data.Field(sequential=True, tokenize=tokenize, batch_first=True, use_vocab=True)
LABEL = data.LabelField(sequential=False, use_vocab=False, preprocessing=float, dtype=torch.float)

train.to_csv('train.csv', index=False)
test.to_csv('test.csv', index=False)

# 저장된 CSV 파일을 이용하여 TabularDataset 객체 생성
# train.csv와 test.csv에 대해 각각 TabularDataset 객체 생성
fields = [('text', TEXT), ('label', LABEL)]
train_data, = data.TabularDataset.splits(
        path = '', # 현재 디렉토리
        train = 'train.csv', # 훈련 데이터 파일 이름
        format = 'csv', # 파일 형식
        fields = fields, # 사용할 필드
        skip_header=True # 헤더 건너뛰기
)
test_data, = data.TabularDataset.splits(
        path = '', # 현재 디렉토리
        train = 'test.csv', # 테스트 데이터 파일 이름
        format = 'csv', # 파일 형식
        fields = fields, # 사용할 필드
        skip_header=True # 헤더 건너뛰기
)

# 어휘 사전을 생성
# 훈련 데이터에 있는 단어를 최대 25000개까지만 사용하도록 제한
TEXT.build_vocab(train_data, max_size=25000)
LABEL.build_vocab(train_data)

## text classifiction 모델 구조

In [None]:
from torchtext.legacy import datasets
from torch import nn, optim

# Iterator 생성
BATCH_SIZE = 8
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, test_data), 
    batch_size = BATCH_SIZE, 
    sort_within_batch = True, # 배치 내에서 데이터를 정렬
    sort_key = lambda x: len(x.text), # 정렬 기준을 지정
    device = device)

# 모델 정의
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers,
                 bidirectional, dropout):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim,
                           hidden_dim,
                           num_layers=n_layers,
                           bidirectional=bidirectional,
                           dropout=dropout,
                           batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))
        _, (hidden, _) = self.rnn(embedded)
        if self.rnn.bidirectional:
            hidden = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
        else:
            hidden = hidden[-1, :, :]
        output = self.fc(self.dropout(hidden))
        return output

## Optimizer 및 손실함수 정의

In [None]:
# 모델 파라미터 설정
vocab_size = len(TEXT.vocab)
embedding_dim = 100
hidden_dim = 64
output_dim = 1
n_layers = 2
bidirectional = True
dropout = 0.5

# 모델 인스턴스 생성
model = LSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers,
                       bidirectional, dropout).to(device)

# 옵티마이저와 손실 함수 설정
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()

def binary_accuracy(preds, y):
    # 반올림한 예측값과 실제 값이 같은지 확인
    correct = (torch.round(torch.sigmoid(preds)) == y).float() 
    acc = correct.sum() / len(correct)
    return acc

## 학습 & 평가 함수 정의

In [None]:
# 학습 함수 정의
def train(model, iterator, optimizer, criterion):
    model.train()
    
    for i, batch in enumerate(iterator):
        optimizer.zero_grad()
        
        predictions = model(batch.text).squeeze(1)
        
        loss = criterion(predictions, batch.label)
        acc = binary_accuracy(predictions, batch.label)
        
        loss.backward()
        optimizer.step()

# 테스트 함수 정의
def evaluate(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0

    model.eval()

    with torch.no_grad():
        for batch in iterator:
            predictions = model(batch.text).squeeze(1)

            loss = criterion(predictions, batch.label)
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

## 학습 진행

In [None]:
# 학습 및 테스트 진행
n_epochs = 50
loss_list = []
acc_list = []

for epoch in tqdm(range(n_epochs)):
    train(model, train_iterator, optimizer, criterion)
    val_loss, val_acc = evaluate(model, test_iterator, criterion)
    
    loss_list.append(val_loss)
    acc_list.append(val_acc)

## loss & acc 결과 그래프

In [None]:
import matplotlib.pyplot as plt
import numpy as np

x = np.linspace(1,51, 50)
plt.plot(x, loss_list)
plt.title('LSTM Loss Graph')
plt.grid()
plt.show()
plt.savefig('loss_graph.png')

In [None]:
x = np.linspace(1,51, 50)
plt.plot(x, acc_list)
plt.title("LSTM Accuracy Graph")
plt.grid()
plt.show()
plt.savefig('acc_graph.png')

## 모델 저장 및 Inference 코드

In [None]:
# 모델 파라미터 저장
torch.save(model.state_dict(), 'model.pt')

In [None]:
# 모델 파라미터 불러오기
model.load_state_dict(torch.load('model.pt'))
model = model.to(device)

In [None]:
# Inference Code
from konlpy.tag import Komoran

def predict_sentiment(model, sentence):
    model.eval()
    komoran = Komoran()
    tokenized = komoran.morphs(sentence)  # 문장을 토큰화
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]  # 각 토큰을 정수로 변환
    length = [len(indexed)]  # 한 문장의 길이
    tensor = torch.LongTensor(indexed).to(device)  # 변환된 정수를 텐서로 변환
    tensor = tensor.unsqueeze(1).T  # 배치 차원을 추가
    length_tensor = torch.LongTensor(length)  # 문장 길이를 텐서로 변환
    prediction = torch.sigmoid(model(tensor))  # 예측
    return prediction.item()  # 예측 결과를 반환

print(predict_sentiment(model, "아이폰 14 사고싶다"))