# IMDb 정보 크롤링

In [None]:
# import
import pandas as pd
import numpy as np

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
import time
from tqdm import tqdm_notebook

from bs4 import BeautifulSoup
from selenium.common.exceptions import NoSuchElementException
import requests
import urllib.parse

- 위에 자기가 import한게 없으면 추가할것!

## 01. 구글 검색 사이트에서 'IMDb URL' 크롤링

In [None]:
# 엑셀 파일을 읽어옵니다.
just = pd.read_excel('../data/justwatch.xlsx', index_col=0)
just.head()

In [None]:
def get_imdb_result(query):
    # NaN 값인 경우 처리합니다.
    if pd.isna(query):
        return None
    
    # 검색 쿼리를 URL 인코딩합니다.
    query = urllib.parse.quote(query + " TV IMDb")
    
    # 구글 검색 URL을 구성합니다.
    url = f"https://www.google.com/search?q={query}"
    
    # 요청 헤더를 설정합니다.
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
    }
    
    # 구글 검색 페이지에 요청을 보냅니다.
    response = requests.get(url, headers=headers)
    
    # 응답의 상태 코드를 확인합니다.
    if response.status_code == 200:
        # HTML 내용을 파싱합니다.
        soup = BeautifulSoup(response.text, "html.parser")
        
        # 모든 검색 결과를 가져옵니다.
        search_results = soup.find_all("div", class_="g")
        
        # IMDb 사이트의 URL을 포함한 첫 번째 검색 결과를 찾습니다.
        for result in search_results:
            link = result.find("a")
            if link and 'href' in link.attrs:
                url = link['href']
                # IMDb 사이트의 URL인지 확인합니다.
                if "imdb.com" in url:
                    title = result.find("h3").text
                    return {"title": title, "url": url}
    
    return None

In [None]:
# 기존 df에 다른 컬럼에 넣기
def get_imdb_results(df):
    imdb_titles = []
    imdb_urls = []
    for i, row in df.iterrows():
        query = str(row["original_title"]) + str(row["title"]) + str(row["year"])  # NaN 값이 있을 수 있으므로 문자열로 변환합니다.
        result = get_imdb_result(query)
        if result:
            imdb_titles.append(result["title"])
            imdb_urls.append(result["url"])
        else:
            imdb_titles.append(None)
            imdb_urls.append(None)
    return imdb_titles, imdb_urls

# IMDb 제목과 URL을 가져와서 just 데이터프레임에 새로운 열로 추가합니다.
just["IMDb_title"], just["IMDb_URL"] = get_imdb_results(just)

In [None]:
# 저장

just.to_excel('../data/justwatch_url.xlsx')

- 하지만 구글 검색 특성상 개인화?가 잘 되어있어 원하는 url를 모두 가져오기 힘듬 
- 일단 가져온 뒤 수기로 다 확인해서 잘못된 url은 직접 가져옴

## 02. 드라마 기본 페이지 크롤링(total_rate, user_review 등)

In [None]:
# 엑셀 파일을 읽어옵니다.
df = pd.read_excel('../data/justwatch_url.xlsx', index_col=0)
df.head()

In [None]:
data = []
for url in df['IMDb_URL']:
    # 크롬 드라이버 설정 및 페이지 열기
    driver = webdriver.Chrome()
    try:
        driver.get(url)
        time.sleep(0.5)
        
        # 첫 번째 페이지에서 디테일 데이터 수집 
        # 페이지 소스 가져오기
        page_source = driver.page_source
        soup = BeautifulSoup(page_source, 'html.parser')
        
        time.sleep(0.5)

        # 디테일 정보 가져오기 
        title_i = soup.select_one('div.sc-491663c0-3.bdjVSf > div.sc-b7c53eda-0.dUpRPQ > h1 > span').text.strip() if soup.select_one('div.sc-491663c0-3.bdjVSf > div.sc-b7c53eda-0.dUpRPQ > h1 > span') else 'N/A'
        original_title_o = soup.select_one('div.sc-b7c53eda-0.dUpRPQ > div').text.strip() if soup.select_one('div.sc-b7c53eda-0.dUpRPQ > div') else 'N/A'
        original_title_i = original_title_o.replace('Original title: ', '')
        year_i = soup.select_one('div.sc-b7c53eda-0.dUpRPQ > ul > li:nth-child(2) > a').text.strip() if soup.select_one('div.sc-b7c53eda-0.dUpRPQ > ul > li:nth-child(2) > a') else 'N/A'
        
        total_rate = soup.select_one('div.sc-bde20123-2.cdQqzc > span.sc-bde20123-1.cMEQkK').text.strip() if soup.select_one('div.sc-bde20123-2.cdQqzc > span.sc-bde20123-1.cMEQkK') else 'N/A'
        
        tc = soup.select_one('div.sc-bde20123-0.dLwiNw > div.sc-bde20123-3.gPVQxL').text.strip() if soup.select_one('div.sc-bde20123-0.dLwiNw > div.sc-bde20123-3.gPVQxL') else 'N/A'
        total_count = convert_to_number(tc)

        add_to_watchlist = soup.select_one('button.ipc-split-button__btn > div > div.sc-b23676b3-3.bRgISf').text.strip() if soup.select_one('button.ipc-split-button__btn > div > div.sc-b23676b3-3.bRgISf') else 0
        wl = add_to_watchlist.split('명의')[0] if isinstance(add_to_watchlist, str) else '0'
        wl = str(wl)
        watchlist = convert_to_number(wl)
        
        popularity = soup.select_one('div.sc-5f7fb5b4-0.brylPD > div.sc-5f7fb5b4-1.fTREEx').text.strip() if soup.select_one('div.sc-5f7fb5b4-0.brylPD > div.sc-5f7fb5b4-1.fTREEx') else 0
        
        ur = soup.select_one('div.sc-491663c0-11.cvvyMK > ul > li:nth-child(1) > a > span > span.score').text.strip() if soup.select_one('div.sc-491663c0-11.cvvyMK > ul > li:nth-child(1) > a > span > span.score') else 0
        if ur == 0:
            ur =soup.select_one('div.sc-491663c0-11.bmRzqx > ul > li > a > span > span.score').text.strip() if soup.select_one('div.sc-491663c0-11.bmRzqx > ul > li > a > span > span.score') else 0
        user_review = convert_to_number(ur)


        critic_review = soup.select_one('div.sc-491663c0-11.cvvyMK > ul > li:nth-child(2) > a > span > span.score').text.strip() if soup.select_one('div.sc-491663c0-11.cvvyMK > ul > li:nth-child(2) > a > span > span.score') else 0
     
        age_miss = soup.select_one('div.sc-491663c0-3.bdjVSf > div.sc-b7c53eda-0.dUpRPQ > ul > li:nth-child(3) > a').text.strip() if soup.select_one('div.sc-491663c0-3.bdjVSf > div.sc-b7c53eda-0.dUpRPQ > ul > li:nth-child(3) > a') else 'N/A'

        data.append({
            'title_i': title_i,
            'original_title_i': original_title_i,
            'year_i': year_i,
            'total_rate': total_rate,
            'total_count': total_count,
            'watchlist': watchlist,
            'popularity': popularity,
            'user_review': user_review,
            'critic_review': critic_review,
            'age_miss': age_miss,
        })
        print(f"Scraped: {title_i}, {original_title_i}, {year_i}, {total_rate}, {total_count}, {watchlist}, {popularity}, {user_review}, {critic_review}, {age_miss}")
    
    except Exception as e:
        print(f"Error occurred while processing URL {url}: {e}")
        data.append({
            'title_i': 'N/A',
            'original_title_i': 'N/A',
            'year_i': 'N/A',
            'total_rate': 0,
            'total_count': 0,
            'watchlist': 0,
            'popularity': 0,
            'user_review': 0,
            'critic_review': 0,
            'age_miss': 'N/A',
        })
    finally:
        driver.quit()

df = pd.DataFrame(data)
df.to_excel('')

## 03. 드라마 제작사, 배급사 페이지 크롤링

In [None]:
# 제작사, 배급사 크롤링
for idx, row in tqdm_notebook(drama.iterrows()):

    # 집합 만들기
    production_set = set(); distributor_set = set()
    production_text = '';  production_list = ''; production_companies = ''
    distributor_text = ''; distributor_list = ''; distributor_companies = ''

    # url 오류 확인
    try:
        page = row['IMDb_URL'] + 'companycredits'

        # 하위 홈페이지를 selenium으로 열기
        driver = webdriver.Chrome()
        driver.get(page)
        
        # more 열기
        for x in range(20):
            try:
                more_link = WebDriverWait(driver, 1).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, 'div.sc-f65f65be-0.bBlII > ul > div > span.ipc-see-more'))
                )
                more_link.click()
            
            except:
                pass
    
        time.sleep(8)
        
        # 현재 페이지 내의 정보를 저장
        raw = driver.page_source
        
        # BeautifulSoup을 이용하여 html 파싱
        soup = BeautifulSoup(raw, 'html.parser')
            
        
        # 제작사
        try:
            production_list = soup.select('#__next > main > div > section > div > section > div > div.sc-978e9339-1.ihWZgK.ipc-page-grid__item.ipc-page-grid__item--span-2 > section:nth-child(2)')[0]
            production_text = production_list.find('span', string='Production Companies').get_text(strip=True)

            if production_text == 'Production Companies':
                production_companies = production_list.find_all(class_="ipc-metadata-list-item__label--link")
                for company in production_companies:
                    production_set.add(company.text)
                drama.at[idx, 'production'] = list(production_set)

        except:
            drama.at[idx, 'production'] = np.NaN

        # 배급사

        # 두 번째 항목인지 확인
        try:
            distributor_list = soup.select('#__next > main > div > section > div > section > div > div.sc-978e9339-1.ihWZgK.ipc-page-grid__item.ipc-page-grid__item--span-2 > section:nth-child(3)')[0]
            distributor_text = distributor_list.find('span', string='Distributors').get_text(strip=True)

            if distributor_text == 'Distributors':
                distributor_companies = distributor_list.find_all(class_="ipc-metadata-list-item__label--link")
                for company in distributor_companies:
                    distributor_set.add(company.text)
                drama.at[idx, 'distributor'] = list(distributor_set)

        except:

            # 첫 번째 항목인지 확인
            try:
                distributor_list = soup.select('#__next > main > div > section > div > section > div > div.sc-978e9339-1.ihWZgK.ipc-page-grid__item.ipc-page-grid__item--span-2 > section:nth-child(2)')[0]
                distributor_text = distributor_list.find('span', string='Distributors').get_text(strip=True)

                if distributor_text == 'Distributors':
                    distributor_companies = distributor_list.find_all(class_="ipc-metadata-list-item__label--link")
                    for company in distributor_companies:
                        distributor_set.add(company.text)
                    drama.at[idx, 'distributor'] = list(distributor_set)
                else:
                    distributor_list = soup.select('#__next > main > div > section > div > section > div > div.sc-978e9339-1.ihWZgK.ipc-page-grid__item.ipc-page-grid__item--span-2 > section:nth-child(4)')[0]
                    distributor_text = distributor_list.find('span', string='Distributors').get_text(strip=True)

                    if distributor_text == 'Distributors':
                        distributor_companies = distributor_list.find_all(class_="ipc-metadata-list-item__label--link")
                        for company in distributor_companies:
                            distributor_set.add(company.text)
                        drama.at[idx, 'distributor'] = list(distributor_set)
            
            # 세 번째 항목인지 확인
            except:
                distributor_list = soup.select('#__next > main > div > section > div > section > div > div.sc-978e9339-1.ihWZgK.ipc-page-grid__item.ipc-page-grid__item--span-2 > section:nth-child(4)')[0]
                distributor_text = distributor_list.find('span', string='Distributors').get_text(strip=True)

                if distributor_text == 'Distributors':
                    distributor_companies = distributor_list.find_all(class_="ipc-metadata-list-item__label--link")
                    for company in distributor_companies:
                        distributor_set.add(company.text)
                    drama.at[idx, 'distributor'] = list(distributor_set)
                else:
                    distributor_list = soup.select('#__next > main > div > section > div > section > div > div.sc-978e9339-1.ihWZgK.ipc-page-grid__item.ipc-page-grid__item--span-2 > section:nth-child(2)')[0]
                    distributor_text = distributor_list.find('span', string='Distributors').get_text(strip=True)

                    if distributor_text == 'Distributors':
                        distributor_companies = distributor_list.find_all(class_="ipc-metadata-list-item__label--link")
                        for company in distributor_companies:
                            distributor_set.add(company.text)
                        drama.at[idx, 'distributor'] = list(distributor_set)

        driver.close()

    except:
        drama.at[idx, 'production'] = np.NaN
        drama.at[idx, 'distributor'] = np.NaN

## 04. 드라마 감독, 작가, 배우 페이지 크롤링

In [None]:
import numpy as np

# 크롤링 결과를 저장할 리스트
data = []

# Selenium 웹 드라이버 설정
driver = webdriver.Chrome()

for url in urls:
    try:
        # cast & crew 페이지 URL 생성
        fullcredits_url = url + 'fullcredits/?ref_=tt_ql_1'
        
        # cast & crew 페이지로 이동
        driver.get(fullcredits_url)

        # 페이지가 로드될 때까지 기다림
        WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'h1')))

        # 페이지 소스 가져오기
        html = driver.page_source
        soup = BeautifulSoup(html, 'html.parser')

        # 감독 정보 추출
        directors = []
        director_elements = soup.select('#director + .simpleCreditsTable a')
        for elem in director_elements:
            directors.append(elem.text.strip())

        # 작가 정보 추출
        writers = []
        writer_elements = soup.select('#writer + .simpleCreditsTable a')
        for elem in writer_elements:
            writers.append(elem.text.strip())

        # 배우 정보 추출 (10명까지)
        actors = []
        actor_elements = soup.select('table.cast_list tr .primary_photo + td a')
        for elem in actor_elements[:10]:
            actors.append(elem.text.strip())

        # 중복 제거 및 결과를 리스트에 추가
        data.append({
            'IMDb_URL': url,
            'title': df.loc[df['IMDb_URL'] == url, 'title'].values[0],  # 제목도 추가
            'fullcredits_url': fullcredits_url,
            'director': ', '.join(sorted(set(directors))) if directors else np.nan,
            'writer': ', '.join(sorted(set(writers))) if writers else np.nan,
            'actor': ', '.join(actors) if actors else np.nan
        })
        
    except Exception as e:
        print(f"Failed to process URL {url}: {e}")
        data.append({
            'IMDb_URL': url,
            'title': df.loc[df['IMDb_URL'] == url, 'title'].values[0] if not df.loc[df['IMDb_URL'] == url].empty else np.nan,  # 제목도 추가
            'fullcredits_url': np.nan,
            'director': np.nan,
            'writer': np.nan,
            'actor': np.nan
        })

# 드라이버 종료
driver.quit()


## 05. 드라마 시즌별 페이지 크롤링

In [None]:
driver = webdriver.Chrome()  # Make sure you have the appropriate WebDriver installed
action = ActionChains(driver)

for idx, url in tqdm(df.iterrows()):
    if type(url['imdb_url']) == float:
        continue
    else:
        driver.get(url['imdb_url'] + 'episodes')
    
    # 시즌 개수 추출
    season = driver.find_elements(By.CSS_SELECTOR,'a[data-testid="tab-season-entry"]')

    for s in range(len(season)):
        time.sleep(3)
        
        years = []
        score_dict = {}

        # 에피소드별 정보 추출
        episode_score = driver.find_elements(By.CSS_SELECTOR, '#__next > main > div > section > div > section > div > div > section:nth-child(2) > section > article')

        
        # 해당 시즌 에피소드의 연도만 추출
        for i in episode_score:
            y = 0
            for j in re.split('(\d\d\d\d)', i.text):
                if len(j) == 4:
                    years.append(j)
                    break
        
        if '2024' in years:
            break
       
        for row in episode_score:
            data = row.text
            data = data.split('\n')
            
            for m in data:
                if any(month in m[0:9] for month in ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']):
                    date_item = m
                    print(date_item)
                    break

            for n in data:
                if n[0] == 'S':
                    season_episode_num = n.split(' ∙ ')[0]
                    break
            
            try:
                if data[-1] == 'Watch options':
                    score = float(data[-5])
                    vote_num = data[-3].strip()[1:-1]
                    if vote_num[-1] == 'k':
                        vote_num = int(vote_num[:-1]) * 1000
                    else:
                        vote_num = int(vote_num[:-1])
                    if len(score) > 3:
                        column = 'season_' + str(s + 1)
                        df.at[idx, column] = '평점 없음'
                        break
                    score_dict[season_episode_num] = [score, vote_num,date_item]
                else:
                    score = float(data[-4])
                    vote_num = data[-2].strip()[1:-1]
                    if vote_num[-1] == 'k':
                        vote_num = int(vote_num[:-1]) * 1000
                    else:
                        vote_num = int(vote_num[:-1])
                    if len(score) > 3:
                        column = 'season_' + str(s + 1)
                        df.at[idx, column] = '평점 없음'
                        break
                    score_dict[season_episode_num] = [score, vote_num,date_item]
            except:
                break
        
        if len(score_dict.values()) == 0:
            continue
        else:
            column = 'season_' + str(s + 1)
            df.at[idx, column] = score_dict

        if len(df.at[idx, column]) == 1:
            df.at[idx, column] = '확인 필요'

        if len(score) > 3:
            df.at[idx, column] = '평점 없음'

        episode_score = 0

        print(score_dict)

        if s < len(season) - 1:
            some_tag = driver.find_element(By.CSS_SELECTOR, '#next-season-btn > svg')
            action.move_to_element(some_tag).perform()
            driver.find_element(By.CSS_SELECTOR, '#next-season-btn > svg').click()

driver.quit()

In [None]:
# 확인 필요 컬럼 추출

for idx, row in df.loc[:,'season_1':'season_17'].iterrows():
    for i in row:
        if i == '확인 필요':
            print(idx)

In [None]:
# 시즌 넘버가 잘못 지정된 밸류 추출

for idx, row in df.loc[:,'season_1':'season_17'].iterrows():
    s = 1
    for j in row:
        if type(j) == float:
            break
        elif j == None:
            continue
        elif type(j) != str:
            for i in j.keys():
                if 'S'+str(s) == i.split('.')[0]:
                    continue
                elif 'S'+str(s) != i.split('.')[0]:
                    print(idx)
                    break
        s+=1

In [None]:
# 문자열이 들어간 밸류 추출

for idx, row in df.loc[:,'season_1':'season_17'].iterrows():
    for j in row:
        if type(j) == float:
            break
        if type(j) != str:
            for i in j.values():
                if len(i[0]) > 3:
                    print(idx)

## 06. 드라마 수상, 노미네이션 경력 크롤링

In [None]:
# IMDb URL 리스트 추출
urls = df['imdb_url'].tolist()

# Selenium 웹 드라이버 설정
driver = webdriver.Chrome()

# 크롤링 결과를 저장할 리스트
data = []

for url in urls:
    try:
        # IMDb 페이지로 이동
        driver.get(url)

        # 페이지가 로드될 때까지 기다림
        WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.XPATH, '//*[@id="__next"]/main/div/section[1]/div/section/div/div[1]/section[1]/div/ul/li/div/ul/li/span')))

        # Wins와 Nominations 크롤링
        wins = 0
        nominations = 0

        try:
            # Wins와 Nominations 텍스트 가져오기
            win_nom_elements = driver.find_elements(By.XPATH, '//*[@id="__next"]/main/div/section[1]/div/section/div/div[1]/section[1]/div/ul/li/div/ul/li/span')

            # Wins와 Nominations 추출
            for element in win_nom_elements:
                text = element.text
                win_match = re.search(r'(\d+)\s+win', text)
                nom_match = re.search(r'(\d+)\s+nomination', text)
                if win_match:
                    wins = int(win_match.group(1))
                if nom_match:
                    nominations = int(nom_match.group(1))
        except Exception as e:
            print(f"Failed to extract wins and nominations: {e}")

        data.append({
            'imdb_url': url,
            'wins': wins,
            'nominations': nominations
        })
        
    except Exception as e:
        print(f"Failed to extract wins and nominations for URL {url}: {e}")
        data.append({
            'imdb_url': url,
            'wins': 0,
            'nominations': 0
        })

# 드라이버 종료
driver.quit()
