<a href="https://colab.research.google.com/github/JaehnK/GoogleCSE_Webometrics/blob/main/googleCSE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### 분석에 필요한 패키지 로드

In [None]:
def install_and_import(package):
    import importlib
    try:
        importlib.import_module(package)
    except ImportError:
        import subprocess
        print(f"{package} Not Installed... Start Install")
        subprocess.check_call(["pip", "install", package])
        print(f"{package} Sucessfully Installed.")
    finally:
        globals()[package] = importlib.import_module(package)

In [None]:
install_and_import("requests")
install_and_import("pandas")
install_and_import("tldextract")
install_and_import("matplotlib")
install_and_import("seaborn")

from collections import Counter
import csv
import os
import requests
import time
from typing import List
from urllib.parse import urlparse

import pandas as pd
import tldextract
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline
FILE_PATH = "/content/drive/MyDrive"

### GoogleCSE 객체 선언

In [None]:
class googlecse():

  def __init__(self):
    self._API_KEY = None
    self._SEARCH_ENGINE_ID = None
    self._query = None
    self._response_list = None
    self._tlds = None
    self._stlds = None
    self._domains = None
    self._count_tlds = None
    self._count_stlds = None
    self._count_domains = None

  def __str__(self):
    return f"API_KEY: {self._API_KEY}\nSEARCH_ENGINE_ID: {self._SEARCH_ENGINE_ID}\nquery: {self._query}"

  @property
  def API_KEY(self):
      return self._API_KEY

  @API_KEY.setter
  def API_KEY(self, value: str):
      if not value or not isinstance(value, str):
          raise ValueError("API_KEY must be a non-empty string")
      if len(value) < 10:
          raise ValueError("API_KEY is too short. It should be at least 10 characters long")
      self._API_KEY = value

  @property
  def SEARCH_ENGINE_ID(self):
      return self._SEARCH_ENGINE_ID

  @SEARCH_ENGINE_ID.setter
  def SEARCH_ENGINE_ID(self, value: str):
      if not value or not isinstance(value, str):
          raise ValueError("SEARCH_ENGINE_ID must be a non-empty string")
      self._SEARCH_ENGINE_ID = value

  @property
  def query(self):
      return self._query

  @query.setter
  def query(self, value: str):
      if not value or not isinstance(value, str):
          raise ValueError("Query must be a non-empty string")
      if len(value) < 2:
          raise ValueError("Query is too short. It should be at least 2 characters long")
      self._query = value


  def setup_search_parameters(self):
      """
      사용자 입력을 통해 Google Custom Search API에 필요한 주요 매개변수를 설정합니다.

      이 메서드는 API_KEY, SEARCH_ENGINE_ID, 검색 쿼리를 사용자로부터 입력받아 설정합니다.
      각 입력값은 setter 메서드를 통해 유효성 검사를 거친 후 설정됩니다.

      Returns:
          None: 오류 발생 시 None을 반환, 성공 시 반환값 없음

      """

      try:
          self.API_KEY = input("API_KEY: ")
          self.SEARCH_ENGINE_ID = input("SEARCH_ENGINE_ID: ")
          self.query = input("Query: ")
      except ValueError as e:
          print(f"Error: {e}")
          return

  def get_default_keys(self):
      self._API_KEY = "AIzaSyDhZmbnXs-i1Bi97X9S50_UwW09ZjyiZXU"
      self._SEARCH_ENGINE_ID = "3178b564dfd2e4dac"

  def response_oneline(self):
    """
    Google Custom Search API를 사용하여 단일 페이지의 검색 결과를 가져옵니다.

    API를 호출하여 첫 번째 페이지의 검색 결과를 가져온 후, 두 번째 결과 항목의
    모든 키-값 쌍을 출력합니다. 주로 API 응답 구조를 확인하는 용도로 사용됩니다.

    필수 전제조건: API_KEY, SEARCH_ENGINE_ID, query가 설정되어 있어야 합니다.

    Returns:
        dict: API 응답 전체를 JSON 형태로 반환
        None: 검색 결과가 없거나 오류 발생 시 None 반환

    """

    if (self._API_KEY is None or self._SEARCH_ENGINE_ID is None or self._query is None):
      raise ValueError("API_KEY, SEARCH_ENGINE_ID, and query must be set")

    idx = 1
    url = f"https://www.googleapis.com/customsearch/v1?key={self._API_KEY}&cx={self._SEARCH_ENGINE_ID}&q={self._query}&start={idx}"
    response = requests.get(url).json()
    if ('items' not in response.keys()):
        print("Error: Items not in response")
        return (None)

    for key in response['items'][1].keys():
      print(f"{key:<15}: {response['items'][1][key]}")
      print("\u2501" * 80)

    return response

  def response(self)-> List[str]:
    """
    Google Custom Search API를 사용하여 모든 검색 결과의 URL을 수집합니다.

    페이지네이션을 활용하여 API의 모든 결과 페이지를 순회하면서
    각 검색 결과의 링크(URL)를 수집하여 리스트로 저장합니다.
    API 호출 사이에는 0.5초의 지연을 두어 API 사용량 제한을 고려합니다.

    필수 전제조건: API_KEY, SEARCH_ENGINE_ID, query가 설정되어 있어야 합니다.

    Returns:
        List[str]: 수집된 모든 URL의 리스트

    """

    if (self._API_KEY is None or self._SEARCH_ENGINE_ID is None or self._query is None):
      raise ValueError("API_KEY, SEARCH_ENGINE_ID, and query must be set")

    self._response_list = []
    idx = 1

    while (True):
      url = f"https://www.googleapis.com/customsearch/v1?key={self._API_KEY}&cx={self._SEARCH_ENGINE_ID}&q={self._query}&start={idx}"
      response = requests.get(url).json()

      if 'items' not in response.keys():
          print("Items not in response")
          break;

      len_items = len(response['items'])

      if (len_items == 0):
          print("No more results")
          break;

      for i in range(len_items):
          self._response_list.append(response['items'][i]['link'])

      print(f"{len(self._response_list)} items collected")

      idx += len_items
      time.sleep(.5)

    print(f"GoogleCSE: {len(self._response_list)} links are collected.")
    return self._response_list

  def extract_tld(self):
    """
    수집된 URL 목록에서 최상위 도메인(TLD)만 추출합니다.

    각 URL에서 tldextract 라이브러리를 사용하여 최상위 도메인(예: com, org, net)을
    추출합니다. 복합 TLD(예: co.uk)의 경우 마지막 부분(uk)만 추출합니다.

    필수 전제조건: response() 메서드가 먼저 호출되어 self._response_list가 채워져 있어야 합니다.

    Returns:
        List[str]: 추출된 모든 TLD의 리스트

    Raises:
        ValueError: response_list가 설정되지 않은 경우
    """
    if (self._response_list is None):
      raise ValueError("response_list must be set")

    self._tlds = []
    for url in self._response_list:
        tld = tldextract.extract(url).suffix
        if (len(tld.split('.')) > 1):
            self._tlds.append(tld.split('.')[-1])
        else:
            self._tlds.append(tld)
    return self._tlds

  def extract_stlds(self):
    """
    수집된 URL 목록에서 2단계 최상위 도메인(STLD) 전체를 추출합니다.

    각 URL에서 tldextract 라이브러리를 사용하여 전체 최상위 도메인(예: com, co.uk)을
    추출합니다. TLD와 달리 복합 도메인(co.uk, com.au 등)을 그대로 유지합니다.

    필수 전제조건: response() 메서드가 먼저 호출되어 self._response_list가 채워져 있어야 합니다.

    Returns:
        List[str]: 추출된 모든 STLD의 리스트

    Raises:
        ValueError: response_list가 설정되지 않은 경우
    """
    if (self._response_list is None):
      raise ValueError("response_list must be set")

    self._stlds = [tldextract.extract(url).suffix for url in self._response_list]
    return self._stlds

  def extract_domains(self):
    """
    수집된 URL 목록에서 전체 도메인 이름을 추출합니다.

    각 URL에서 tldextract 라이브러리를 사용하여 도메인과 TLD를 결합한
    전체 도메인 이름(예: google.com, bbc.co.uk)을 추출합니다.
    서브도메인은 제외됩니다.

    필수 전제조건: response() 메서드가 먼저 호출되어 self._response_list가 채워져 있어야 합니다.

    Returns:
        List[str]: 추출된 모든 도메인 이름의 리스트
    """
    self._domains = []
    for url in self._response_list:
        d = f"{tldextract.extract(url).domain}.{tldextract.extract(url).suffix}"
        self._domains.append(d)
    return self._domains

  def count_tlds(self):
    """
    추출된 최상위 도메인(TLD)의 출현 빈도를 계산하고 시각화합니다.

    TLD 목록에서 각 TLD의 등장 횟수를 계산하여 pandas DataFrame으로 변환하고,
    빈도수가 높은 상위 20개 TLD를 막대 그래프로 시각화합니다.
    각 막대 위에 정확한 빈도수를 텍스트로 표시합니다.

    필수 전제조건: extract_tld() 메서드가 먼저 호출되어 self._tlds가 채워져 있어야 합니다.

    Returns:
        pandas.DataFrame: TLD와 해당 빈도수를 포함하는 DataFrame
    """

    self._count_tlds = pd.DataFrame({"tld" : dict(Counter(self._tlds)).keys(),
                                   "value": dict(Counter(self._tlds)).values()})

    plt.figure(figsize=(10, 6))
    tld_df = self._count_tlds.sort_values(by='value', ascending=False).head(20)
    ax = sns.barplot(x='tld', y='value', data=tld_df, palette='viridis')
    plt.title('Top 20 Top-Level Domains (TLDs)')
    plt.xlabel('TLD')
    plt.ylabel('Count')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    for i, v in enumerate(tld_df['value']):
        ax.text(i, v+0.1, str(v), ha='center')
    plt.show()

    return self._count_tlds

  def count_stlds(self):
    """
    추출된 2단계 최상위 도메인(STLD)의 출현 빈도를 계산하고 시각화합니다.

    STLD 목록에서 각 STLD의 등장 횟수를 계산하여 pandas DataFrame으로 변환하고,
    빈도수가 높은 상위 20개 STLD를 막대 그래프로 시각화합니다.
    각 막대 위에 정확한 빈도수를 텍스트로 표시합니다.

    필수 전제조건: extract_stlds() 메서드가 먼저 호출되어 self._stlds가 채워져 있어야 합니다.

    Returns:
        pandas.DataFrame: STLD와 해당 빈도수를 포함하는 DataFrame
    """
    self._count_stlds = pd.DataFrame({"stld": dict(Counter(self._stlds)).keys(),
                                   "value": dict(Counter(self._stlds)).values()})

    plt.figure(figsize=(10, 6))
    stld_df = self._count_stlds.sort_values(by='value', ascending=False).head(20)
    ax = sns.barplot(x='stld', y='value', data=stld_df, palette='coolwarm')
    plt.title('Top 20 Second-Level Domains (STLDs)')
    plt.xlabel('STLD')
    plt.ylabel('Count')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    for i, v in enumerate(stld_df['value']):
        ax.text(i, v+0.1, str(v), ha='center')
    plt.show()

    return self._count_stlds

  def count_domains(self):
    """
    추출된 도메인의 출현 빈도를 계산하고 시각화합니다.

    도메인 목록에서 각 도메인의 등장 횟수를 계산하여 pandas DataFrame으로 변환하고,
    빈도수가 높은 상위 20개 도메인을 막대 그래프로 시각화합니다.
    각 막대 위에 정확한 빈도수를 텍스트로 표시합니다.

    필수 전제조건: extract_domains() 메서드가 먼저 호출되어 self._domains가 채워져 있어야 합니다.

    Returns:
        pandas.DataFrame: 도메인과 해당 빈도수를 포함하는 DataFrame
    """
    self._count_domains = pd.DataFrame({"domains": dict(Counter(self._domains)).keys(),
                                   "value": dict(Counter(self._domains)).values()})

    plt.figure(figsize=(10, 6))
    domain_df = self._count_domains.sort_values(by='value', ascending=False).head(20)
    ax = sns.barplot(x='domains', y='value', data=domain_df, palette='coolwarm')
    plt.title('Top 20 Domains')
    plt.xlabel('Domain')
    plt.ylabel('Count')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    for i, v in enumerate(domain_df['value']):
        ax.text(i, v+0.1, str(v), ha='center')
    plt.show()

    return self._count_domains

  def save_to_csv(self):
    """
    수집 및 분석된 모든 데이터를 CSV 파일로 저장합니다.

    다음 네 가지 파일을 생성합니다:
    1. response_list.csv: 수집된 모든 URL 목록
    2. count_tlds.csv: TLD 빈도 분석 결과
    3. count_stlds.csv: STLD 빈도 분석 결과
    4. count_domains.csv: 도메인 빈도 분석 결과

    각 저장 작업은 독립적으로 수행되며, 한 파일 저장에 실패해도
    다른 파일 저장 작업은 계속 진행됩니다.

    필수 전제조건: 해당 메서드 호출 전에 관련 데이터가 생성되어 있어야 합니다.

    Returns:
        None
    """
    try:
      print("Saving raw link list ...")
      with open('response_list.csv', 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(self._response_list)
    except Exception as e:
      print(f"Error: {e}")

    try:
      print("Saving tlds count ...")
      self._count_tlds.to_csv('count_tlds.csv', index=False)
    except Exception as e:
      print(f"Error: {e}")

    try:
      print("Saving stlds count ...")
      self._count_stlds.to_csv('count_stlds.csv', index=False)
    except Exception as e:
      print(f"Error: {e}")

    try:
      print("Saving domains count ...")
      self._count_domains.to_csv('count_domains.csv', index=False)
    except Exception as e:
      print(f"Error: {e}")


### 실습

In [None]:
cse = googlecse()
#cse.setup_search_parameters()

cse.API_KEY = "PUT_API_KEY_HERE"
cse.SEARCH_ENGINE_ID = "PUT_SEARCH_ENGINE_ID_HERE"
cse.query = "PUT_QUERY_HERE"
print(cse)

In [None]:
responese = cse.response_oneline()

In [None]:
response = cse.response()
print("\u2501" * 80)
response

In [None]:
print(cse.extract_tld())
print(cse.extract_stlds())
print(cse.extract_domains())

In [None]:
cse.count_tlds()

In [None]:
cse.count_stlds()

In [None]:
cse.count_domains()

In [None]:
cse.save_to_csv()