In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import bs4
import re
import time
import logging
import json
import requests
import concurrent.futures

from undetected_chromedriver import Chrome, ChromeOptions
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By

# Preconfig

In [3]:
logger = logging.getLogger("etl_logger")
handler1 = logging.FileHandler('Logs/logger1.log')
formatter1 = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler1.setFormatter(formatter1)
logger.addHandler(handler1)
if logger.hasHandlers():
    logger.handlers.clear()
    logger.addHandler(handler1)
logger.setLevel(logging.DEBUG)

# Installing Packages

In [39]:
!pip install selenium undetected-chromedriver

Defaulting to user installation because normal site-packages is not writeable


# USD Exchange rates and Games id retrieval

## Extraction

Note: wait time is added to avoid proceeding with the steps before the webpage hasn't even loaded and to avoid DDoS-like behaviour.

In [9]:
def extract_list_videogames(browser, list_number=1):

    """extracts games' ids from a website with a games id ordered list
        Parameters:
        browser: driver selenium object
        
        Returns:
        A soup object that belongs to the beautifulsoup4 library """

    try:
        browser.get('https://steamdb.info/charts/')
        time.sleep(10)
        html = browser.page_source
        soup = bs4.BeautifulSoup(html, "html.parser")
    except Exception as e:
        browser.quit()
        logger.debug("Hubo un problema",e)
    else:
        browser.quit()
        return soup


In [10]:
def get_game_prices(game_url,browser,keep_browser):

    """extracts a game's cost per country
        Parameters:
        game_url: a string with a game's url from steamdb site
        browser: driver selenium object
        keep_browser: boolean (this opens the game_url on a different tab in case many games' urls are being opened on browser)
        
        Returns:
        A soup object that belongs to the beautifulsoup4 library"""

    if keep_browser:
        browser.execute_script(f"window.open('https://steamdb.info{game_url}', '_blank');")
        browser.switch_to.window(browser.window_handles[-1])
    else:
        browser.get('https://steamdb.info'+game_url)
    time.sleep(30+np.random.random()*60)
    us_cookie = {
        "name":"__Host-cc",
        "value":"us"
    }
    browser.add_cookie(us_cookie)
    browser.refresh()
    button = browser.find_element(By.ID,'js-currency-selector')
    button.click()
    time.sleep(5)
    button = browser.find_element(By.CSS_SELECTOR,'button[data-cc="us"]')
    button.click()
    time.sleep(5)
    html = browser.page_source
    soup = bs4.BeautifulSoup(html, "html.parser")
    return soup

## Transform

In [11]:
def process_country_prices(soup):

    """extracts the raw value and country tags and processes them into clean values
    
        Parameters:
        soup: soup object that belongs to the beautifulsoup4 library
        
        Returns:
        A list with the country name in iso code, currency name, game's price in local currency and game's price in USD"""

    country_tags_raw = soup.find("div",class_="table-responsive").find_all("td",attrs={"data-cc":True})
    countries_row_data_raw = soup.find("div",class_="table-responsive").find_all("tr")
    price_tags_raw = soup.find("div",class_="table-responsive").find_all("td",class_="table-prices-converted")
    usd_prices_processed = []
    countries_local_price_processed = []
    for tag in price_tags_raw:
        if "%" not in tag.text:
            if tag.text == "N/A":
                usd_prices_processed.append(np.nan)
            else:
                usd_prices_processed.append(float(re.findall(r"[0-9]+\.?[0-9]*",tag.text)[0]))

    for tag in countries_row_data_raw:
        if tag.find_all("td"):
            price_match = re.findall(r"[0-9]+[,\.]?[0-9]*",tag.find_all("td")[1].text)
            if price_match:
                countries_local_price_processed.append(float(price_match[0].replace(",",".")))
            else:
                countries_local_price_processed.append(np.nan)

    iso_country_code_processed = [tag["data-cc"].strip("\n").strip(" ") for tag in country_tags_raw]
    countries_name_processed = [tag.text.strip("\n").strip(" ") for tag in country_tags_raw]
    countries_prices_processed = list(zip(iso_country_code_processed, countries_name_processed, countries_local_price_processed, usd_prices_processed))
    return countries_prices_processed

Let's obtain the games list for reference

In [12]:
options = ChromeOptions()
options.add_argument("--disable-popup-blocking") # disables popup blocking for being able to open a new tab
browser = Chrome(options=options)
videogame_list_soup = extract_list_videogames(browser)
videogame_urls_tags = videogame_list_soup.find_all("a", href=lambda href: href and href.startswith("/app/"))
videogame_urls_processed = list(set([tag["href"].strip(" ").rstrip("/charts").lstrip("/app/") for tag in videogame_urls_tags])) # removes /charts/app and whitespaces from urls

Now, we use the game ``/app/221100`` (DayZ) to obtain the difference prices for all currencies

In [13]:
games_dict = {}
options = ChromeOptions()
options.add_argument("--disable-popup-blocking") # disables popup blocking for being able to open a new tab
browser = Chrome(use_subprocess=True,options=options) # use_subprocess is necessary for bypassing anti-scrapping measurements
for i,url in enumerate(["/app/221100"]): 
    try:
        if i == 0:
            videogame_website_soup = get_game_prices(url,browser,keep_browser=False) # don't open a new tab for first game
        else:
            videogame_website_soup = get_game_prices(url,browser,keep_browser=True) # open a new tab for subsequent games not to be asked for cloudfare human validation
    except NoSuchElementException as e: # if there's a free videogame, this error will propagate which causes the script to close the browser and open another
        logging.debug(e)
        logging.debug("restarting webbrowser...")
        
        browser.quit()
        options = ChromeOptions()
        options.add_argument("--disable-popup-blocking")
        browser = Chrome(use_subprocess=True,options=options)
    if videogame_website_soup is not None: # if the videogame is not free (there's an available price), then add it to the dict 
        price_data = process_country_prices(videogame_website_soup)
browser.close()
        

## Save the data (checkpoint)

In [14]:
# save the data to prevent having to look for it again
with open("../Data/usd_rates_reference.txt","w") as f:
    json.dump(price_data,f)
    
with open("../Data/videogames_id_reference.txt","w") as f:
    json.dump(videogame_urls_processed,f)

## Load the data (checkpoint)

We can use the saved data to prevent sending requests to the Internet every time we want to process the same games

In [4]:
price_data = None
videogame_urls_processed = None
with open("../Data/usd_rates_reference.txt","r") as f:
    price_data = json.load(f)

with open("../Data/videogames_id_reference.txt","r") as f:
    videogame_urls_processed = json.load(f)
    
price_data, videogame_urls_processed

([['us', 'U.S. Dollar', 44.99, 44.99],
  ['ar', 'Argentine Peso', 2499.99, 11.71],
  ['tr', 'Turkish Lira', 259.99, 13.42],
  ['ru', 'Russian Ruble', 1199.0, 14.7],
  ['br', 'Brazilian Real', 119.99, 24.43],
  ['ua', 'Ukrainian Hryvnia', 999.0, 27.02],
  ['no', 'Norwegian Krone', 299.0, 28.78],
  ['cn', 'Chinese Yuan', 199.0, 28.96],
  ['co', 'Colombian Peso', 129900.0, 29.57],
  ['za', 'South African Rand', 599.0, 33.09],
  ['kz', 'Kazakhstani Tenge', 14999.0, 33.21],
  ['mx', 'Mexican Peso', 599.0, 33.25],
  ['in', 'Indian Rupee', 2799.0, 34.19],
  ['uy', 'Uruguayan Peso', 1349.0, 34.79],
  ['cl', 'Chilean Peso', 28000.0, 35.11],
  ['ph', 'Philippine Peso', 1999.0, 36.12],
  ['my', 'Malaysian Ringgit', 159.99, 36.34],
  ['pe', 'Peruvian Sol', 139.95, 37.09],
  ['id', 'Indonesian Rupiah', 550000.0, 37.2],
  ['nz', 'New Zealand Dollar', 59.99, 37.27],
  ['sg', 'Singapore Dollar', 49.99, 37.58],
  ['hk', 'Hong Kong Dollar', 299.99, 38.21],
  ['kr', 'South Korean Won', 49900.0, 38.25],
 

Let's obtain the usd exchange rate steam uses for each currency to estimate prices

In [5]:
# get the usd exchange rates for each currency
usd_rates = {}
for country_data in price_data:
    country_iso_code = country_data[0] 
    local_price = country_data[-2]
    usd_price = country_data[-1]
    usd_ex_rate = local_price/usd_price
    usd_rates[country_iso_code] = round(usd_ex_rate,2)

# Game data retrieval

Now that we've got the games' ids and the usd exchange rate for all currencies, we can proceed to retrieve the prices for all games in USD

## Checkpoint Load

In case we interrupt the game's data extraction process, we can pick it up where we left off using this

In [32]:
checkpoint_data = {}
try:
    with open("../Data/game_processing_checkpoint_data.txt","r") as f:
        checkpoint_data = json.load(f)
except FileNotFoundError:
    logger.debug("checkpoint file hasn't been created...")

checkpoint_game_id = checkpoint_data.get("game_id") or 0
checkpoint_index = checkpoint_data.get("index") or 0

Now, let's define the functions that will help us retrieve the game's data

In [30]:
def send_request(params):

    """ sends a request to steam store API with some params and sends back the retrieved data 
        Parameters:
        params => dict-like object
        Returns: dict-like object containing game's data"""

    req = requests.get("https://store.steampowered.com/api/appdetails/", params=params)
    try:
        json_data = json.loads(req.text)
        if json_data[params["appids"]]["success"]: # if the game has regional restrictions, this won't meet
            return json_data[params["appids"]]["data"]
    except Exception as e: # in case the get request fails
        logger.debug(f"request to {params.get('appids')} failed... {e}")
    return {} #send_request(params)
        

In [31]:
def get_country_prices(params):

    """ processes the data from the steam store API to obtain a game's price per country
        Parameters:
        params => dict-like object
        Returns:
        A tuple containing the currency's name as first item, and the game's usd price for that country in the second position"""

    json_data = send_request(params=params)
    try:
        if json_data:
            price_formatted = re.findall(r"[0-9]+[,\. ]?[0-9]*",json_data.get("price_overview").get("final_formatted"))
            if price_formatted:
                usd_price = float(price_formatted[0].replace(" ","").replace(".","").replace(",",".")) / usd_rates[params["cc"]] # if the country uses . for thousands this is ok
                if usd_price > 100 or usd_price < 0.1:
                    usd_price = float(price_formatted[0].replace(" ","").replace(",","")) / usd_rates[params["cc"]] # use this for countries that use . for floating point numbers
                return (params["cc"],usd_price)
    except Exception as e:
        logger.debug(f"execution for {params['cc']} and {params['appids']} failed. Sending NaN...")
    return (params["cc"],np.nan) # if the price couldn't be parsed, then send NaN

In [34]:
game_data = {}
start = 0
end = len(usd_rates.keys()) - 1
batch = 10
country_iso_codes = list(usd_rates.keys())
wait_time = 10
videogame_urls_processed = sorted([int(game_id) for game_id in videogame_urls_processed]) # for reproducibility purposes when loading the checkpoint
for game_id in videogame_urls_processed:
    if int(game_id) < int(checkpoint_game_id):
        continue # if game_id has already been processed, then ignore
    params = {
                    "appids":f"{game_id}",
                    "cc":"us"
            }
    json_data = send_request(params)
    if not json_data.get("is_free"):
        game_data[game_id] = list()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = []
            for i in range(start, end, batch):
                if i < checkpoint_index and int(game_id) == int(checkpoint_game_id):
                    continue

                for j in range(batch):

                    params = {
                        "appids":f"{game_id}",
                        "cc":country_iso_codes[i+j]
                    }
                    logger.debug(f"Country {country_iso_codes[i+j]} for game {game_id} and batch {i} started processing...")
                    futures.append(executor.submit(get_country_prices,params=params))
                
                for future in concurrent.futures.as_completed(futures):
                    game_data[game_id].append(future.result())

                    # add checkpoint save
                    with open("../Data/game_processing_game_data.txt","w") as f:
                        json.dump(game_data,f)

                    with open("../Data/game_processing_checkpoint_data.txt","w") as f:
                        json.dump({
                            "index":i,
                            "game_id":game_id
                        },f)

                time.sleep(wait_time)
        logger.debug(f"game {game_id} has been processed...")
        game_data[game_id] = dict(game_data[game_id])

# Load

In [47]:
steam_prices_df = pd.read_json(json.dumps(game_data)).T
steam_prices_df.to_excel("../Data/list_1_processed.xlsx")