In [17]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
import time
from tqdm import tqdm
import random
from sqlalchemy import create_engine, inspect

# 헤더 설정 (기존 headers 변수 사용)
# 대량 크롤링 시 IP 차단당해서 Client Error가 발생할 수 있으니 주기적으로 IP를 변경해주기
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.7151.104 Safari/537.36'
}

In [18]:
!pip install pymysql



In [None]:
user = 'joohyun'
password = 'joohyun0805'
host = '13.125.208.76'
port = '3306'
database = 'kbo_preview_database'

myngine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}')

In [20]:
selectsql="""
select max(s_no) as max_sno
from(
	select c.s_no as s_no, 
	c.`경기 날짜` as play_date,
	c.`원정 선발투수` as away_pitcher,
	c.`홈 선발투수` as home_pitcher,
	d.p_no as home_no,
	c.away_no
	from (
		select a.*, 
			b.p_no as away_no
		from kbo_preview_data a 
		left join kbo_pitcher_data b
		on a.`원정 선발투수` = b.p_name	
	) c
	left join kbo_pitcher_data d
	on c.`홈 선발투수` = d.p_name
)e
"""

In [22]:
masno= pd.read_sql_query(sql=selectsql, con=myngine)
masno.max_sno.values[0]

20250345

In [None]:
start_no = masno.max_sno.values[0] + 1
END_NO = masno.max_sno.values[0] + 5


# Funtion1: 단일 게임 데이터를 크롤링
def crawl_single_game_data(s_no):
    url = f"https://statiz.sporki.com/schedule/?m=preview&s_no={s_no}"
    
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # total_score = crawl_score_from_soup(soup)
        pitchers = crawl_pitcher_from_soup(soup)
        game_date = crawl_date_from_soup(soup)
        
        return {
            's_no': s_no,
            'play_date': game_date,
            # '총득점': total_score,
            'home_pitcher': pitchers[0] if len(pitchers) > 0 else None,
            'away_pitcher': pitchers[1] if len(pitchers) > 1 else None
        }
        
    except Exception as e:
        print(f"s_no {s_no} 크롤링 실패: {e}")
        return None

def crawl_pitcher_from_soup(soup):
    try:
        pitchers_name = [div.get_text(strip=True) for div in soup.find_all('div', style="margin-top:.3rem;")]
        return pitchers_name
    except Exception as e:
        print(f"투수 정보 크롤링 오류: {e}")
        return []

# Function4: 경기 날짜 크롤링 
def crawl_date_from_soup(soup):
    try:
        text = soup.select_one('div.txt').get_text(strip=True)
        date_data = re.search(r'\d{2}-\d{2}', text).group()
        return date_data
    except Exception as e:
        print(f"날짜 크롤링 오류: {e}")
        return None

# 전체 데이터 크롤링 함수 (DataFrame append 방식)
def crawl_all_games():
    # 빈 DataFrame 초기화
    merged_df = pd.DataFrame(columns=['s_no', 'play_date', 'home_pitcher', 'away_pitcher'])
    
    for s_no in tqdm(range(start_no, END_NO + 1), desc="크롤링 진행"):
        game_data = crawl_single_game_data(s_no)
        if game_data:
            # 개별 게임 데이터를 DataFrame으로 변환
            single_game_df = pd.DataFrame([game_data])
            # 전체 DataFrame에 append
            merged_df = pd.concat([merged_df, single_game_df], ignore_index=True)
            print(f"s_no {s_no} 추가 완료 (총 {len(merged_df)}개)")
        
        # 서버 부하 방지를 위한 딜레이
        # 초기에 0.5로 설정하니까 웹사이트 내부에서 비정상적인 행동 패턴으로 파악하고 크롤링을 차단해버림
        time.sleep(random.uniform(1.5, 3.0))
    
    return merged_df

# 에러 처리 및 재시도 기능 포함 크롤링 (DataFrame append 방식)
def crawl_with_retry(start_no=start_no, end_no=END_NO, max_retries=3):
    # 빈 DataFrame 초기화
    merged_df = pd.DataFrame(columns=['s_no', 'play_date', 'home_pitcher', 'away_pitcher'])
    failed_s_nos = []
    
    for s_no in tqdm(range(start_no, end_no + 1), desc="크롤링 진행"):
        success = False
        for attempt in range(max_retries):
            try:
                game_data = crawl_single_game_data(s_no)
                if game_data:
                    # 개별 게임 데이터를 DataFrame으로 변환
                    single_game_df = pd.DataFrame([game_data])
                    # 전체 DataFrame에 append
                    merged_df = pd.concat([merged_df, single_game_df], ignore_index=True)
                    print(f"s_no {s_no} 추가 완료 (총 {len(merged_df)}개)")
                    success = True
                    break
                else:
                    time.sleep(1)  # 실패 시 잠시 대기
            except Exception as e:
                if attempt == max_retries - 1:
                    print(f"s_no {s_no} 최종 실패: {e}")
                    failed_s_nos.append(s_no)
                else:
                    time.sleep(2)  # 재시도 전 더 긴 대기
        if success:
            time.sleep(0.5)  # 성공 시 기본 대기
    if failed_s_nos:
        print(f"실패한 s_no 목록: {failed_s_nos}")
    
    return merged_df, failed_s_nos

# 중간 저장 기능 포함 크롤링 (DataFrame append 방식, 추천)
def crawl_with_checkpoint(start_no=start_no, end_no=END_NO, checkpoint_interval=50):
    # 빈 DataFrame 초기화
    merged_df = pd.DataFrame(columns=['s_no', 'play_date', 'home_pitcher', 'away_pitcher'])
    
    for i in range(start_no, end_no + 1, checkpoint_interval):
        batch_end = min(i + checkpoint_interval - 1, end_no)
        for s_no in tqdm(range(i, batch_end + 1), desc=f"배치 {i}-{batch_end}"):
            game_data = crawl_single_game_data(s_no)
            if game_data:
                # 개별 게임 데이터를 DataFrame으로 변환
                single_game_df = pd.DataFrame([game_data])
                # 전체 DataFrame에 append
                merged_df = pd.concat([merged_df, single_game_df], ignore_index=True)
            time.sleep(0.5)
        # 중간 저장 (백업용)
        checkpoint_file = f'checkpoint_baseball_data_{start_no}_games.csv'
        merged_df.to_csv(checkpoint_file, index=False, encoding='utf-8-sig')
    
    return merged_df

# 실패한 s_no들에 대해서만 재크롤링 (DataFrame append 방식)
def retry_failed_games(failed_s_nos, max_retries=5):
    if not failed_s_nos:
        print("재크롤링할 데이터가 없습니다.")
        return pd.DataFrame(), []
    # 빈 DataFrame 초기화
    retry_df = pd.DataFrame(columns=['s_no', 'play_date', 'home_pitcher', 'away_pitcher'])
    still_failed = []
    
    for s_no in tqdm(failed_s_nos, desc="재크롤링"):
        success = False
        for attempt in range(max_retries):
            try:
                game_data = crawl_single_game_data(s_no)
                if game_data:
                    # 개별 게임 데이터를 DataFrame으로 변환
                    single_game_df = pd.DataFrame([game_data])
                    # 재시도 DataFrame에 append
                    retry_df = pd.concat([retry_df, single_game_df], ignore_index=True)
                    success = True
                    break
                else:
                    time.sleep(2)
            except Exception as e:
                time.sleep(3)
        if not success:
            still_failed.append(s_no)
    if still_failed:
        print(f"여전히 실패한 s_no: {still_failed}")
    
    return retry_df, still_failed


merged_df = crawl_with_checkpoint(start_no, END_NO, checkpoint_interval=50)
merged_df


In [None]:
pitchers_df = pd.read_csv('pitcher_no.csv')
pitchers_df = pd.DataFrame(pitchers_df)
pitchers_df

In [25]:
test_data_df = pd.merge(
  merged_df,
  pitchers_df,
  left_on='home_pitcher',
  right_on='p_name',
  how='left'
).rename(columns={'p_no' : 'home_pitcher_no'}).drop(columns=['p_name'])

test_data_df = pd.merge(
  test_data_df,
  pitchers_df,
  left_on='away_pitcher',
  right_on='p_name',
  how='left'
).rename(columns={'p_no': 'away_pitcher_no'}).drop(columns=['p_name'])

In [27]:
emptyDf1=pd.DataFrame()
emptyDf2=pd.DataFrame()

In [None]:
url_nodata ='https://statiz.sporki.com/stats/?m=team&m2=pitching'
def crawl_statiz_pitcher_data(url):
    """
    스탯티즈에서 선발투수 등판 기록을 크롤링하는 함수
    """
    tempValue = ''     #rowspan이 2로 행이 묶여 정보가 누락되는 문제 해결을 위한 변수들
    onRowapply1 = False #한 행의 열데이터 들을 수집하면서 rowspan==2를 감지하면 True
    onRowapply2 = False #한 행의 열데이터를 모두 수집하였을때 onRowapply1이 True면 True
    j=0
    try:
        
        # HTTP 요청 헤더 설정 (봇 차단 방지)
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.7151.104 Safari/537.36'
        }
        
        # 웹페이지 요청
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        
        # BeautifulSoup 객체 생성
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 테이블 찾기 (일반적으로 통계 데이터는 table 태그 안에 있음)
        table = soup.find('table')
        if not table:
            print("테이블을 찾을 수 없습니다.")
            return None
        
        # 헤더 추출 (th 태그 또는 첫 번째 tr의 td 태그)
        headers = []
        header_row = table.find('tr')
        if header_row:
            header_cells = header_row.find_all(['th', 'td'])
            headers = [cell.get_text(strip=True) for cell in header_cells]
        
        # 데이터 행 추출
        data_rows = []
        rows = table.find_all('tr')[1:]  # 첫 번째 행(헤더) 제외
        for i in range (len(list(rows))-1): #마지막 통산 기록 부분 제외를 위해 -1
            row = rows[i]
            cells = row.find_all('td')
            if cells:  # td 태그가 있는 행만 처리
                row_data = []
                
                while j <= (len(list(cells))-1):
                    cell = cells[j]
                    if onRowapply2 ==True:
                        text= tempValue
                        onRowapply1 = False
                        onRowapply2 = False
                        j-=1 #rowspan==2로 인해 누락된 데이터를 입력하느라 원래 데이터를 누락하지 않도록 함
                    else:
                        # 셀 내용 추출 (줄바꿈 제거, 공백 정리)
                        text = cell.get_text(separator=' ', strip=True)

                    # 추가 줄바꿈 문자 제거
                    text = text.replace('\n', ' ').replace('\r', '').strip()
                    # 연속된 공백을 하나로 통일
                    text = ' '.join(text.split())
                    row_data.append(text)
                    # 첫번째 컬럼의 td row span 속성값이 2이상일 경우 값을 저장하고 onRowapply1을 True로 변경
                    if i==1 and j==0:
                        tmpRow = cell.attrs["rowspan"]
                        if int( cell["rowspan"] ) >= 2:
                            tempValue = text
                            onRowapply1 = True #rowspan==2를 감지하면 True로 변경
                    j+=1
                j=0
                
                if row_data:  # 빈 행이 아닌 경우만 추가
                    data_rows.append(row_data)
            if onRowapply1== True: #누락된 데이터를 바로 다음 열에 입력하지 않도록 onRowapply2를 이용
                onRowapply2= True
        
        return headers, data_rows
        
    except requests.RequestException as e:
        print(f"웹페이지 요청 중 오류 발생: {e}")
        return None
    except Exception as e:
        print(f"데이터 추출 중 오류 발생: {e}")
        return None
def crawl_statiz_pitcher_nodata(url_nodata):
    """
    결측치 처리를 위한 함수
    스탯티즈에서 선발투수 등판 기록이 없을때 리그 평균 스탯을 크롤링하는 함수
    """
    try:
        headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.7151.104 Safari/537.36'
        }

        # 웹페이지 요청
        response = requests.get(url_nodata, headers=headers)
        response.raise_for_status()

        # BeautifulSoup 객체 생성
        soup = BeautifulSoup(response.text, 'html.parser')

        # 테이블 찾기 (일반적으로 통계 데이터는 table 태그 안에 있음)
        table = soup.find('table')
        if not table:
            print("테이블을 찾을 수 없습니다.")
            return None
        #colum데이터 추출
        headers1 = []
        header_row1 = table.find('tr')
        if header_row1:
            header_cells1 = header_row1.find_all([ 'th','td'])
            headers1 = [cell.get_text(strip=True) for cell in header_cells1]
        #league_colum_DF = pd.DataFrame(headers1)
        # 리그 시즌기록 데이터 추출 (tbody태그 내부에 데이터 존재)
        headers2 = []
        header_row2 = table.find('tbody')
        if header_row2:
            header_cells2 = header_row2.find_all([ 'th','td'])
            headers2 = [cell.get_text(strip=True) for cell in header_cells2]
        #nostat_pitcher_DF = nostat_pitcher_DF.transpose()


        return headers1,headers2

    except requests.RequestException as e:
        print(f"웹페이지 요청 중 오류 발생: {e}")
        return None
    except Exception as e:
        print(f"데이터 추출 중 오류 발생: {e}")
        return None

columlist = ['ERA', 'FIP', 'WHIP']
columlist1 = ['ERA1', 'FIP1', 'WHIP1']
columlist2 = ['ERA2', 'FIP2', 'WHIP2']

picher_stat10_list = []

for i in range(5):
    picher_stat2_list=[]
    #원정 투수 스탯
    
    player_id=test_data_df.home_pitcher_no.values[i]
    url = f"https://statiz.sporki.com/player/?m=year&p_no={player_id}"
    picher_data=crawl_statiz_pitcher_data(url)

    if picher_data[1]==[]:
        picher_nodata=crawl_statiz_pitcher_nodata(url_nodata)
        picher_nodata_DF=pd.DataFrame(picher_nodata[1])
        picher_nodata_DF = picher_nodata_DF.set_axis(picher_nodata[0], axis=0).transpose()
        picher_stat_DF = picher_nodata_DF

    else:
        picherdata_DF=pd.DataFrame(picher_data[1])
        picher_data_DF = picherdata_DF.set_axis(picher_data[0], axis=1)
        picher_stat_DF = picher_data_DF
    
    picher_stat_DF.iloc[-1]
    picher_stat_DF = pd.DataFrame(picher_stat_DF.iloc[-1])
    picher_stat_DF = picher_stat_DF.transpose()

    picher_stat_DF = picher_stat_DF[columlist]
    picher_stat_DF.columns = columlist1

    emptyDf1 = pd.concat( [emptyDf1, picher_stat_DF] )

    player_id=test_data_df.away_pitcher_no.values[i]
    url = f"https://statiz.sporki.com/player/?m=year&p_no={player_id}"
    picher_data=crawl_statiz_pitcher_data(url)

    if picher_data[1]==[]:
        picher_nodata=crawl_statiz_pitcher_nodata(url_nodata)
        picher_nodata_DF=pd.DataFrame(picher_nodata[1])
        picher_nodata_DF = picher_nodata_DF.set_axis(picher_nodata[0], axis=0).transpose()
        picher_stat_DF = picher_nodata_DF

    else:
        picherdata_DF=pd.DataFrame(picher_data[1])
        picher_data_DF = picherdata_DF.set_axis(picher_data[0], axis=1)
        picher_stat_DF = picher_data_DF
    
    picher_stat_DF.iloc[-1]
    picher_stat_DF = pd.DataFrame(picher_stat_DF.iloc[-1])
    picher_stat_DF = picher_stat_DF.transpose()

    picher_stat_DF = picher_stat_DF[columlist]
    picher_stat_DF.columns = columlist2

    
    emptyDf2 = pd.concat( [emptyDf2, picher_stat_DF], axis=0 )
    
    # picher_stat2_list.append(picher_stat_DF)
    # print(picher_stat2_list)
    
    # picher_stat10_list.append(picher_stat2_list)
    # print(picher_stat10_list)
emptyDf1.reset_index(drop=True, inplace=True)
emptyDf2.reset_index(drop=True, inplace=True)



In [29]:
picher10_stat_DF=pd.concat( [emptyDf1, emptyDf2], axis=1)


In [30]:
test_column_list = ["s_no", "ERA1", "FIP1", "WHIP1", "ERA2", "FIP2", "WHIP2"]

In [31]:
tempDF = pd.concat([test_data_df, picher10_stat_DF], axis=1)
test_DF=tempDF[test_column_list]

In [32]:
test_DF.to_csv('testdata.csv')