# 1. 필요한 패키지 설치

In [1]:
!pip install kiwipiepy==0.16.0
!pip install youtube-transcript-api
!pip install googletrans==3.1.0a0
!pip install pyvis
!pip install xlsxwriter

!sudo apt-get install -y fonts-nanum
!sudo fc-cache -fv
!rm ~/.cache/matplotlib -rf

from googleapiclient.discovery import build
from collections import Counter
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from google.protobuf.timestamp_pb2 import Timestamp
from googletrans import Translator
from kiwipiepy import Kiwi

from kiwipiepy.utils import Stopwords

from youtube_transcript_api import YouTubeTranscriptApi #https://pypi.org/project/youtube-transcript-api/
from wordcloud import WordCloud

import networkx as nx
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyvis.network as net

import ast


translator = Translator()
Stopwords = Stopwords()
kiwi = Kiwi(model_type = 'sbg')

def extract_script(videoID): #videoID로부터 스크립트 추출
    try:
        script = YouTubeTranscriptApi.get_transcript(videoID,languages=['ko'])
        return ["["+str(timedelta(seconds=comment['start'])).split('.')[0]+"] "+ comment['text'] for comment in script]
    except :
        pass

def extract_kw(sentence, keyword):
  result = [token.form for token in kiwi.tokenize(sentence, normalize_coda = True, stopwords=Stopwords) if token.tag in ['NNG', 'NNP', 'NNB', 'NR', 'NP']]

  for w in keyword.split(','):
    kw = w.strip()
    if kw in sentence:
      result.append(kw)
  return list(set(result))

def extract_comments(videoID): #videoID로부터 댓글 추출
    comments=[]
    try:
        comment_thread_response = api_obj.commentThreads().list(part="snippet,replies", videoId=videoID, maxResults=100).execute()
    except :
        return ['댓글이 사용중지되었습니다.'] #댓글이 사용중지되어 댓글이 없는 경우와 댓글을 달 수 있지만 사람들이 달지 않아 댓글이 0개인 상황을 구분
    if len(comment_thread_response['items'])>0:
        while comment_thread_response: #100개씩 밖에 크롤링할 수 없기 때문에 100개를 크롤링 한 후에 다음 페이지가 있으면 다음 페이지로 넘어감
            for comment in comment_thread_response['items']:
                name=comment['snippet']['topLevelComment']['snippet']['authorDisplayName']
                text=comment['snippet']['topLevelComment']['snippet']['textOriginal']
                #if all(keyword in text for keyword in keyword.split()): #모든 키워드가 포함되어야만
                #if any(keyword.replace(',','') in text for keyword in keyword.split()): #하나의 키워드라도 포함된 댓글
                if True:
                  comments.append([name,text])
                try: #답글의 개수가 0개 보다 많을 때 크롤링하도록 하였는데 종종 답글이 1개 이상인데 답글이 사라져 있는 경우가 있어서 예외 처리
                    if comment['snippet']['totalReplyCount']>0:
                        comment_response = api_obj.comments().list(part="snippet", parentId=comment['id'], maxResults=100).execute()
                        while comment_response: #100개씩 밖에 크롤링할 수 없기 때문에 100개를 크롤링 한 후에 다음 페이지가 있으면 다음 페이지로 넘어감
                            for reply in comment_response['items']:
                                name=reply['snippet']['authorDisplayName']
                                text=reply['snippet']['textOriginal']
                                #if all(keyword in text for keyword in keyword.split()): #모든 키워드가 포함되어야만
                                #if any(keyword.replace(',','') in text for keyword in keyword.split()): #하나의 키워드라도 포함된 댓글
                                if True:
                                    comments.append([name,text])
                            if 'nextPageToken' in comment_response:
                                comment_response = api_obj.comments().list(part='snippet', parentId=comment['id'], pageToken=comment_response['nextPageToken'], maxResults=100).execute()
                            else:
                                break
                except KeyError:
                    pass
            if 'nextPageToken' in comment_thread_response:
                comment_thread_response = api_obj.commentThreads().list(part='snippet,replies', videoId=videoID, pageToken=comment_thread_response['nextPageToken'], maxResults=100).execute()
            else:
                break
    return comments

def check_keyword(keywords, target_list):
  if '|' in keywords:
    keyword_list = [string.strip().replace('"','') for string in keywords.split('|')]
    for keyword in keyword_list:
      if keyword in target_list:
        return True
    return False
  else:
    if keyword in target_list:
      return True
    else:
      return False

def extract_text(comment_list):
  text = [comment[1] for comment in comment_list]
  return '. '.join(text)

def get_viewCount(VideoID):
  try:
    return int(api_obj.videos().list(part="id, snippet, statistics, status, topicDetails", id = VideoID).execute()['items'][0]['statistics']['viewCount'])
  except KeyError:
    return 0

def get_subCount(ChannelID):
  try:
    return int(api_obj.channels().list(part="id, snippet, brandingSettings, contentDetails, statistics, topicDetails", id = ChannelID).execute()['items'][0]['statistics']['subscriberCount'])
  except KeyError:
    return 0



'sudo'은(는) 내부 또는 외부 명령, 실행할 수 있는 프로그램, 또는
배치 파일이 아닙니다.
'sudo'은(는) 내부 또는 외부 명령, 실행할 수 있는 프로그램, 또는
배치 파일이 아닙니다.


In [None]:
key_list = [] #API key 입력
api_obj = build('youtube', 'v3', developerKey=key_list[0])

In [None]:
keyword_list = [] #키워드 입력
stopwords = '' #금지어 입력

def get_df(keyword_list, stopwords, time):
  temp = pd.DataFrame()
  for keyword in keyword_list:
    query = keyword.replace(",", "|") +' '+ ' '.join(["-" + word for word in stopwords.split(', ')])

    time = time # relative 상대적인 시간 absolute 절대적인 시간

    if time == 'relative':
      publishedBefore = datetime.utcnow().isoformat().split(".")[0] + 'Z'
      publishedAfter = (datetime.utcnow() - relativedelta(months=1)).isoformat().split(".")[0] + 'Z'
    elif time == 'absolute':
      publishedBefore = datetime(2023, 9, 18, 0).isoformat().split(".")[0] + 'Z'
      publishedAfter = datetime(2023, 8, 1, 0).isoformat().split(".")[0] + 'Z'
    print('수집 기간은 ' + publishedAfter + ' ~ ' + publishedBefore +'입니다.')

    videos = []

    while True:
        search_response = api_obj.search().list(part='id, snippet', maxResults='50', order='date', publishedBefore=publishedBefore,
                                                publishedAfter=publishedAfter, q=query, regionCode='KR', safeSearch='none', type='video').execute()
        for search_result in search_response.get("items", []):
            if search_result["id"]["kind"] == "youtube#video":
                videos.append([search_result["id"]["videoId"], search_result['snippet']['publishedAt'], search_result['snippet']['channelTitle'], search_result['snippet']['channelId'],
                              search_result["snippet"]["title"], search_result["snippet"]["description"]])
                publishedBefore=search_result['snippet']['publishedAt']
        if len(videos)>=50:
          if (videos[-1] == videos[-2]) and (videos[-2] == videos[-3]):
            break
        else:
            break

    #https://developers.google.com/youtube/v3/docs/search/list?hl=ko#python,-%EC%98%88-1

    data = pd.DataFrame(videos, columns =['VideoID', 'Date', 'ChannelTitle', 'ChannelID', 'Title', 'Description'])
    data = data.drop_duplicates(['VideoID']).reset_index(drop=True)

    #영상에 대한 정보
    data['URL']= data['VideoID'].map(lambda x: 'https://www.youtube.com/watch?v='+x)
    #data['Script']=data['VideoID'].map(lambda x: extract_script(x))
    data['viewCount'] = data['VideoID'].map(get_viewCount) #KeyError

    #채널에 대한 정보
    data['subscriberCount'] = data['ChannelID'].map(get_subCount)

    data = data[data['viewCount'] < 10000] #일정 조회 수 이하의 영상만 추출
    data = data.loc[data['subscriberCount'] < 1000] #일정 구독자 수 이하의 영상만 추출

    data['source'] = [str(row1) + ' ' + str(row2) for row1, row2 in zip(data['Title'], data['Description'])]
    data['keyword'] = data['source'].map(lambda x : extract_kw(x, keyword))

    data['keyword_en'] = data['keyword'].map(lambda x : [i.text for i in translator.translate(x)])

    #data = data.loc[data['keyword'].apply(lambda x: check_keyword(keyword, x))] #영상의 키워드 중 검색한 키워드와 완벽하게 일치하는 단어가 있는 영상만 추출
    data = data.reset_index(drop=True)


    print('수집한 영상들의 조회 수의 총합은 '+ str(sum(data['viewCount'])) + '입니다.')

    data['Comments']=data['VideoID'].map(lambda x: extract_comments(x))
    #data['Ad'] = 'Y' 미구현
    print(data)

    data['commentsKeyword'] = data['Comments'].map(lambda x: extract_text(x))

    data['commentsKeyword'] = data['commentsKeyword'].map(lambda x : list(set(extract_kw(x, keyword))))
    data['commentsKeyword_en'] = data['commentsKeyword'].map(lambda x : [i.text for i in translator.translate(x)])

    data.to_excel(str(datetime.now()).split()[0]+"_"+keyword+'.xlsx') #20XX-00-00_keyword.xlsx 형태로 저장
    #data.to_excel('파일명.xlsx')를 통해 원하는 이름으로 저장 가능

    for w in keyword.split(','):
      # 특정 컬럼에서 특정 단어를 포함하는 행을 찾음
      mask = data['keyword'].apply(lambda x: w.strip() in x)

      # mask를 사용하여 DataFrame에서 해당 행을 선택함
      result = data[mask]

      if len(result) > 0:
        result.to_excel(str(datetime.now()).split()[0]+"_"+w.strip()+'.xlsx')

    if len(temp) == 0:
      temp = data.copy()
    else:
      temp = pd.concat([temp, data], axis=0)

  return temp.reset_index(drop=True)

In [None]:
data = get_df(keyword_list, stopwords, time = 'absolute')

In [None]:
#data = data[~data['source'].str.contains('산행')]

In [None]:
data_1 = data.copy()

In [None]:
data.subscriberCount.mean()

In [None]:
def filter_non_korean_comments(comments_list):
    filtered_list = []
    print(len(comments_list))
    if type(comments_list) == str:
      return comments_list

    for comment in comments_list:
        author, content = comment
        if all(ord(c) < 128 for c in content):
            # 댓글 내용이 한글이 아닌 경우 (ASCII 범위를 벗어남)
            continue
        filtered_list.append(comment)

    return filtered_list

filtered_list = filter_non_korean_comments(comments_list)
print(filtered_list)

In [None]:
for keyword in keyword_list:
  for w in keyword.split(','):
        a = w.strip().replace('"','')
        # 특정 컬럼에서 특정 단어를 포함하는 행을 찾음
        mask = data['keyword'].apply(lambda x: a in x)
        print(a)

        # mask를 사용하여 DataFrame에서 해당 행을 선택함
        result = data[mask]

        if len(result) > 0:
          result.to_excel(str(datetime.now()).split()[0]+"_"+a+'.xlsx')

In [None]:
print(len(data))

# 특정 단어 수 확인

In [None]:
# 특정 단어 리스트
word_list = ['']

# 특정 컬럼(column_name)에 특정 단어가 포함되어 있는지 확인하는 조건식
condition = data['source'].map(str).str.contains('|'.join(word_list), case=False, na=False)

# 조건을 만족하는 행들을 필터링하여 새로운 데이터프레임 생성
filtered_df = data[condition]
print(len(filtered_df))

In [None]:
filtered_df

# 영상 키워드 네트워크 시각화

In [None]:
#%% class & function definition
class Make_Network:
    def __init__(self, data):
        self.data = data

    def document_keywords(self):
        """wikipedia redirection 완료된 keyphrase를 대문자 -> 소문자로 전처리 후 문서에 할당 """
        document_keywords = []
        for keyword_list in self.data:
            a = []
            for keyword in keyword_list:
              if len(keyword) >1: #임시
                a.append(keyword.lower())
            document_keywords.append(a)
        return document_keywords

    def keyword_corpus(self, frequency):
        """ 전체 키워드 corpus 및 테이블 작성"""
        document_keywords = self.document_keywords()

        result = [] # 전체 keyphrase를 하나의 리스트로 저장
        for keyword_list in document_keywords:
            for keyword in keyword_list:
                result.append(keyword)

        b= Counter(result)
        c = sorted(b.items(), key=lambda x: x[1], reverse=True) # value값으로 내림차순
        d = pd.DataFrame(c,columns = ["KEYWORD","FREQUENCY"])
        keyword_corpus = d[d["FREQUENCY"] >= frequency] #빈도수 k 이상의 keyphrase만 사용
        return keyword_corpus

    def keyword_adj_matrix(self, frequency):
        """ Document keyword matrix 구성 후 내적을 통해 keyword adjacency matrix를 구축 """
        document_keywords = self.document_keywords()
        keyword_corpus = self.keyword_corpus(frequency)["KEYWORD"]

        total_result = []
        for document in document_keywords:
            result = []
            for keyword in keyword_corpus:
                if keyword in document: #document_keywords의 keyword_corpus에 해당되는 keyphrase가 있으면 1 아니면 0
                    result.append(1)
                else:
                    result.append(0)
            total_result.append(result)
        matrix = pd.DataFrame(total_result,columns= keyword_corpus).to_numpy()
        keyword_matrix = np.dot(matrix.T,matrix)
        np.fill_diagonal(keyword_matrix,0,wrap =True) # 대각 성분 0으로 변환
        keyword_adj_matrix = pd.DataFrame(keyword_matrix, columns = keyword_corpus, index = keyword_corpus)
        return keyword_adj_matrix

In [None]:
graph = nx.Graph(Make_Network(data['keyword']).keyword_adj_matrix(20))

layout = nx.kamada_kawai_layout(graph)
#layout = nx.random_layout(graph)
#layout = nx.spectral_layout(graph)
#layout = nx.spring_layout(graph)
#layout = nx.planar_layout(graph)
#layout = nx.shell_layout(graph)

plt.figure(figsize=(8, 5))

nx.draw_networkx_nodes(graph, layout, alpha = 0.7)
nx.draw_networkx_labels(graph, layout, font_family='NanumBarunGothic', font_size = 14)
nx.draw_networkx_edges(graph, layout, alpha = 0.3)

In [None]:
graph = nx.Graph(Make_Network(data['keyword_en']).keyword_adj_matrix(20))

layout = nx.kamada_kawai_layout(graph)
#layout = nx.random_layout(graph)
#layout = nx.spectral_layout(graph)
#layout = nx.spring_layout(graph)
#layout = nx.planar_layout(graph)
#layout = nx.shell_layout(graph)

plt.figure(figsize=(8, 5))

nx.draw_networkx_nodes(graph, layout, alpha = 0.7)
nx.draw_networkx_labels(graph, layout, font_family='NanumBarunGothic', font_size = 14)
nx.draw_networkx_edges(graph, layout, alpha = 0.3)

In [None]:
nx.write_gexf(graph, str(datetime.now()).split()[0]+"_"+keyword+'.gexf')

# 댓글 키워드 네트워크 시각화

In [None]:
graph = nx.Graph(Make_Network(data['commentsKeyword']).keyword_adj_matrix(10))

layout = nx.kamada_kawai_layout(graph)
#layout = nx.random_layout(graph)
#layout = nx.spectral_layout(graph)
#layout = nx.spring_layout(graph)
#layout = nx.planar_layout(graph)
#layout = nx.shell_layout(graph)

plt.figure(figsize=(8, 5))

nx.draw_networkx_nodes(graph, layout, alpha = 0.7)
nx.draw_networkx_labels(graph, layout, font_family='NanumBarunGothic', font_size = 14)
nx.draw_networkx_edges(graph, layout, alpha = 0.3)

In [None]:
graph = nx.Graph(Make_Network(data['commentsKeyword_en']).keyword_adj_matrix(10))

layout = nx.kamada_kawai_layout(graph)
#layout = nx.random_layout(graph)
#layout = nx.spectral_layout(graph)
#layout = nx.spring_layout(graph)
#layout = nx.planar_layout(graph)
#layout = nx.shell_layout(graph)

plt.figure(figsize=(8, 5))

nx.draw_networkx_nodes(graph, layout, alpha = 0.7)
nx.draw_networkx_labels(graph, layout, font_family='NanumBarunGothic', font_size = 14)
nx.draw_networkx_edges(graph, layout, alpha = 0.3)

In [None]:
nx.write_gexf(graph, str(datetime.now()).split()[0]+"_"+keyword+'.gexf')

# 워드 클라우드 시각화



In [None]:
corpus =  ' '.join(sum(data['keyword'],[]))
wordcloud = WordCloud(font_path = 'NanumBarunGothic', width=1130, height=800, background_color='white').generate(corpus)

# 워드 클라우드 이미지 생성 및 출력
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(wordcloud)
plt.axis("off")
plt.tight_layout(pad=0)
plt.show()

In [None]:
corpus =  ' '.join(sum(data['keyword_en'],[]))
wordcloud = WordCloud(font_path = 'NanumBarunGothic', width=1130, height=800, background_color='white').generate(corpus)

# 워드 클라우드 이미지 생성 및 출력
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(wordcloud)
plt.axis("off")
plt.tight_layout(pad=0)
plt.show()

In [None]:
corpus =  ' '.join(sum(data['commentsKeyword'],[]))
wordcloud = WordCloud(font_path = 'NanumBarunGothic', width=1130, height=800, background_color='white').generate(corpus)

# 워드 클라우드 이미지 생성 및 출력
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(wordcloud)
plt.axis("off")
plt.tight_layout(pad=0)
plt.show()

In [None]:
corpus =  ' '.join(sum(data['commentsKeyword_en'],[]))
wordcloud = WordCloud(font_path = 'NanumBarunGothic', width=1130, height=800, background_color='white').generate(corpus)

# 워드 클라우드 이미지 생성 및 출력
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(wordcloud)
plt.axis("off")
plt.tight_layout(pad=0)
plt.show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

plt.rcParams['font.family'] = 'NanumGothic'

df_expanded = data.copy()
df_expanded['Date'] = pd.to_datetime(df_expanded['Date'])


# 키워드 리스트를 펼쳐서 각 키워드를 행 단위로 유지하고 날짜와 키워드를 열로 가지는 데이터프레임 생성
df_expanded = df_expanded.explode('keyword')

# 'date' 열을 인덱스로 설정
df_expanded.set_index('Date', inplace=True)

# 월별 키워드 빈도 계산
monthly_counts = df_expanded.groupby([pd.Grouper(freq='M'), 'keyword']).size().unstack()

# 꺾은선 그래프 그리기
for i in ['']:# 원하는 키워드 선택
  desired_keyword = i

  # 원하는 키워드의 꺾은선 그래프 그리기
  monthly_counts[desired_keyword].plot(kind='line', marker='o')

# 그래프 스타일 및 레이블 설정
plt.xlabel('Month')
plt.ylabel('Frequency')
plt.title('Keyword Frequency by Month')
plt.legend(title='Keyword')

# 그래프 출력
plt.show()
