In [1]:
from bs4 import BeautifulSoup as bs
from datetime import datetime
from pandasql import sqldf
import requests as req
import pandas as pd
import numpy as np
import sqlite3
import json
import time
import os
import re

In [2]:
os.getcwd()

'/home/jovyan/project/DoveNest/informations/src/ipynb/SCRAPING'

In [3]:
NOW     = datetime.now()
Y, M, D = NOW.year, NOW.month, NOW.day

## 미리 API를 통해 받아둔 게임 정보 JSON파일 경로
ROOT_PATH = '/home/jovyan/project/DoveNest'
DATA_PATH = f'{ROOT_PATH}/informations/jsons'
DB_PATH   = f'{ROOT_PATH}/informations/db'

today =  f'{Y}{str(M).zfill(2)}{str(D).zfill(2)}'

## json 파일을 불러와주는 함수
load_json     = lambda json_path: json.loads(open(json_path, 'r').read())

In [4]:
## steam API 관련 클래스 생성
class SteamAPI:
    
    ## api URL들을 저장해주는 딕셔너리
    URLS = {
        'sales'       : 'http://store.steampowered.com/api/featuredcategories/?l=koreana',
        'library'     : 'https://api.steampowered.com/IPlayerService/GetOwnedGames/v0001',
        'steam_page'  : 'https://store.steampowered.com/app',
        'get_summary' : 'https://partner.steam-api.com/ISteamUser/GetPlayerSummaries/v2/',
    }


    #! 게임 정보를 불러와 주는 api 함수 appid (상점에 있는 내용)을 기반으로 데이터를 불러옴.
    ## 원래 api를 호출하여 사용하려고 했으나, 한 번 호출하는데 0.3 ~ 0.6초 가량 걸리고,
    ## 하루 API 허가 호출량이 정해져있어 미리 api로 호출하여 저장해둔 json 데이터를 불러와 사용하는 것으로 변경
    def get_info(appid):
        
        json_path = f'{DATA_PATH}/{appid}/{appid}.json'
        
        if os.path.isfile(json_path): 
            return load_json(json_path)
        
        else:
            # print(f'[ERR.J-0001] <{appid}> json 파일이 존재하지 않습니다.')
            return {}


    ## api key와 steam user_id64를 입력받아서 해당 유저의 라이브러리 정보를 불러오는 함수
    def get_user_library(key, user_id):
        
        library_url = f'{SteamAPI.URLS["library"]}/?key={key}&steamid={user_id}&format=json'
        games = get_api(library_url)['response']['games']
        
        user_datas = []
        for game in games:
            try:
                last_play_time = unix2datetime(game['rtime_last_played'])
                
                info = [game['appid'], game['playtime_forever'], str(last_play_time)]
                user_datas.append(info)
                
            except Exception as e:
                print(traceback.format_exc(), e)
                
        return user_datas


    ## 가장 많이 플레이한 게임 리스트 5개 반환해주는 함수
    def most_played(library, platform = 'steam'):

        library = sorted(library, key = lambda x: x[1], reverse = True)[:5]
        information = []

        for lib in library:
            appid, played_time, last_played = lib
            
            try:
                datas       = load_json(f'{DATA_PATH}/{appid}/{appid}.json')
                played_time = f'{mt.ceil(played_time / 60)} 시간' if played_time / 60 > 1 else f'{int(played_time)} 분'

                info  = {   
                            'image' : datas['header_image'],
                            'name'  : datas['name'],
                            'genre' : ', '.join([data['description'] for data in datas['genres']][:3]),
                            'played_time' : played_time,
                            'last_played' : last_played
                        }
                
                information.append(info)

            except:
                pass 
                # print(f'[WARN.D.A-0001] <{appid}> 현재 그 게임은 {platform}에서 제공 되지 않습니다.')

        return information


    ## 게임 관련 통계 만들어주는 함수
    def get_stats(library, platform = 'steam'):
        
        genres, developers = [], []
        num_games = 0

        for lib in library:
            appid, _, _ = lib
            
            try:
                datas = load_json(f'{DATA_PATH}/{appid}/{appid}.json')
                genres     += [data['description'] for data in datas['genres']]
                developers += [data  for  data  in datas['developers']]

                num_games += 1
            except Exception as e:
                pass
                # print(f'[WARN.D.A-0001] <{appid}> 현재 그 게임은 {platform}에서 제공 되지 않습니다. {e}')

        genres, developers = Counter(genres), Counter(developers)
        return genres, developers, num_games


    ## 게임에서 장르 가져와주는 함수
    def get_genre(appid, platform, datas = None):
        try:
            if datas == None: datas     = SteamAPI.get_info(appid)
            genre     = ', '.join([data['description'] for data in datas['genres']][:3])

    
        except Exception as e:
            genre     = f'{platform}에서 제공하지 않음.'
            # print(f'[WARN.D.A-0001] <{appid}> 현재 그 게임은 {platform}에서 제공 되지 않습니다. {e}')

        return genre


    ## 현재 스팀에서 가장 인기 있는 게임들
    def get_trendy(sales, platform, top_sellers, top_names, nums):
        
        num1, num2 = nums
        for game in sales['top_sellers']['items'][num1 : num2]:

            appid = game['id']
            name  = game['name']

            genre = SteamAPI.get_genre(appid, platform)

            info  = {   
                        'image'      : game['header_image'],
                        'name'       : name,
                        'genre'      : genre,
                        'discounted' : game['discounted'],
                        'steam_page' : f'{SteamAPI.URLS["steam_page"]}/{appid}'
                    }

            if name not in top_names:
                top_sellers.append(info)
                top_names.append(name)

            else:
                print(f'[WARN.D-0001] 중복된 데이터 입니다. {name}')

        return top_sellers, top_names

In [5]:
def preprop(string, dtype = 'discount'): 
    
    if dtype == 'discount': string = string.replace(' ', '').split('₩')[-1]
    string = string.replace(',', '').replace('₩', '')
    
    return int(string)
    

In [6]:
## DB에 있는 데이터를 조회하는 함수 고급 쿼리 부분은 좀 더 구현해야 한다.
def search_table(cursor, table_name, columns = '*', **kwargs):

    ## keyword argument 값에서 데이터가 없는 경우 기본값 지정

    ## 이부분도 너무 킹받게 짜졌다,, 안되는거 그냥 덕지덕지 수정했더니...
    sorting_col = kwargs['sorting_col'] if 'sorting_col' in kwargs.keys() else columns
    sorting_col = sorting_col if sorting_col != '*' else  \
                ('appid' if type(sorting_col) != list else sorting_col[0])

    conditions  = kwargs['conditions']  if 'conditions' in kwargs.keys() else None
    how_many    = kwargs['how_many']    if 'how_many' in kwargs.keys() else 1
    reverse     = kwargs['desc']        if 'desc' in kwargs.keys() else False


    col_indexes = {k : v for v, k in enumerate(['id', 'appid', 'percent', 'original', 
                                                'discounted', 'date'])}

    ## 코드가 너무 킹받게 짜졌다..
    col_indexes = col_indexes if columns == '*' else \
                ({k : v for v, k in enumerate(columns)} \
                if type(columns) == list else {columns : 0})

    col_keys = [col for col in col_indexes.keys()]
    try:
        assert sorting_col in col_keys, f'''\n[ERR.DB.Co-0001] 선택하신 조건에 맞는 컬럼이 존재하지 않아 선택 하신 옵션으로 정렬 할 수 없었습니다. \
                                            {col_keys}에서 골라 주십시오.'''

        columns = ', '.join(columns) if type(columns) == list else columns


        ## 데이터 조회할 때 그 어떤 조건도 없는 경우 그냥 테이블에서 컬럼만 받아 사용
        if conditions == None:
            query = f"""
                        SELECT DISTINCT {columns} FROM {table_name}
                    """

        else:
            ## 고급 쿼리에 사용할 거
            conditions = np.array(conditions)

            ## WHERE 문으로 찾을 column, 조건, 데이터를 받아 조회해준다.
            if len(conditions) == 3:
                col, cond, data = conditions

                symbols = ['>', '<', '!=', '=', '>=', '<=', 'IN']
                assert cond in symbols, f'\n[ERR.DB.Co-0002] 조건이 올바르지 않습니다. {symbols}에서 선택해 넣어주십시오.'


                query = f"""
                            SELECT DISTINCT {columns} FROM {table_name}
                            WHERE {col} {cond} {data};
                        """

            ## 데이터를 찾을 column과 찾을 data만 있는 경우
            ## 기본값으로 동일한 데이터만 찾도록 지정해주었다.
            elif len(conditions) == 2:
                col, data = conditions
                query = f"""
                            SELECT DISTINCT {columns} FROM {table_name}
                            WHERE {col}={data};
                        """

        cursor.execute(query)

        ## 데이터 정렬에 사용할 인덱스 값들.
        col_index = col_indexes[sorting_col]

    except Exception as e:
        print(f'[ERR.DB.Q-0001] 쿼리에 문제가 발생하였습니다. 확인 후 수정바랍니다. {e}')
        query     = f'SELECT * FROM {table_name}'
        col_index = 0

    ## how_many가 0을 포함한 음의 정수가 된다면 모든 데이터를 조회해준다.
    return sorted(cursor.fetchmany(how_many), 
                  key = lambda x: x[col_index], reverse = reverse) 

In [7]:
whole_page = ""

start_time = time.time()

for idx in range(1):
    url = f'https://store.steampowered.com/search/?specials=1&filter=topsellers&page={idx}'
    res = req.get(url)
    whole_page += res.text

time.time() - start_time

0.3834528923034668

In [8]:
start_time = time.time()
soup       = bs(whole_page, 'html.parser')
sales      = soup.select('div#search_resultsRows > a') 
time.time() - start_time

1.6462464332580566

In [9]:
sales[0].select('span.title')[0].text

'Tom Clancy’s The Division® 2'

In [13]:
dbpath = '../../../db/test.db'
conn   = sqlite3.connect(dbpath)
cursor = conn.cursor()

In [14]:
query = """
CREATE TABLE IF NOT EXISTS steamsale(
id INTEGER PRIMARY KEY AUTOINCREMENT,
appid INTEGER NOT NULL,
percent INTEGER NOT NULL,
original INTEGER NOT NULL,
discounted INTEGER NOT NULL,
date TEXT NOT NULL);"""

cursor.execute(query)

<sqlite3.Cursor at 0x7f6afabb5b90>

In [15]:
sale_list = []
for idx, sale in enumerate(sales):
    try:
        appid      = sale['data-ds-appid']

        percent    = sale.select('.search_discount > span')[0].text
        percent    = int(percent.replace('-', '').replace('%', ''))

        original   = preprop(sale.select('strike')[0].text)
        discounted = preprop(sale.select(".discounted")[0].text)
        
        name  = sale.select('span.title')[0].text
        print(name)
        
        today = f'{Y}{str(M).zfill(2)}{str(D).zfill(2)}'
        query = f"""
                INSERT INTO steamsale
                (appid, percent, original, discounted, date)
                VALUES({appid}, {percent}, {original}, {discounted}, {today});
                """

        cursor.execute(query)
              
    except Exception as e: print(e)

Tom Clancy’s The Division® 2
Sid Meier’s Civilization® VI
Cyberpunk 2077
Skul: The Hero Slayer
Forza Horizon 5
NBA 2K23
Human: Fall Flat
Project Hospital
Borderlands 3
Muse Dash
Muse Dash - Just as planned
Bright Memory: Infinite
Forza Horizon 4
A Dance of Fire and Ice
The Callisto Protocol™
Ruined King: A League of Legends Story™
Unrailed!
SANABI
Tiny Tina's Wonderlands
Green Hell
DEATH STRANDING DIRECTOR'S CUT
WW2 Rebuilder
Command & Conquer™ Remastered Collection
DYNASTY WARRIORS 9 Empires
Airborne Kingdom
Touhou Mystia's Izakaya
list index out of range
list index out of range
DYNASTY WARRIORS 8: Xtreme Legends Complete Edition
Warhammer 40,000: Chaos Gate - Daemonhunters
Sid Meier’s Civilization® VI: Rise and Fall
The Division 2 - Warlords of New York - Expansion
Sid Meier's Civilization® VI: Gathering Storm
Endzone - A World Apart
Biped
Ship of Fools
list index out of range
My Time at Sandrock
list index out of range
list index out of range
Chrono Ark
list index out of range
Proje

In [None]:
with conn:
    with open('../db/test_backup.sql', 'w') as f:
        for line in conn.iterdump():
            f.write('%s\n' % line)
        print('Completed')

In [None]:
db_datas = search_table(cursor, 'steamsale', columns = '*', 
            sorting_col = 'id', desc = False, how_many = 0, 
             conditions = ['date', today])

In [None]:
for idx, db_data in enumerate(db_datas[:100], 1):
    _, appid, percent, original, discounted, date = db_data

    platform = 'steam'
    json_data = SteamAPI.get_info(appid)
    genre = SteamAPI.get_genre(appid, platform, datas = json_data)
    
    try: print(idx, appid, json_data['name'], genre)
    except: pass