This Notebook uses code based on code written by Nik Davis. This code will gather data from the Steam Store API using python. Please check out his amazing blog post! https://nik-davis.github.io/posts/2019/steam-data-collection/

Their version was greatly out dated, so I also use a modified version of Nik's code that was written by Duerkos. https://github.com/Duerkos/steam_analysis/tree/main

That two was also a little outdated, so I added slight modifications to the code in order for it to run in more modern python.

This entire notebook is separated into three smaller notebooks for easier consumption.

In [1]:
# standard library imports
import csv
import datetime as dt
import json
import os
import statistics
import time

# third-party imports
import numpy as np
import pandas as pd
import requests
from requests.exceptions import SSLError

# customisations - ensure tables show all columns
pd.set_option("display.max_columns", 100)

In [2]:
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
def get_request(url,parameters=None, steamspy=False):
    """Return json-formatted response of a get request using optional parameters.
    
    Parameters
    ----------
    url : string
    parameters : {'parameter': 'value'}
        parameters to pass as part of get request
    
    Returns
    -------
    json_data
        json-formatted response (dict-like)
    """
    try:
        response = requests.get(url=url, params=parameters)
    except SSLError as s:
        print('SSL Error:', s)
        
        for i in range(5, 0, -1):
            print('\rWaiting... ({})'.format(i), end='')
            time.sleep(1)
        print('\rRetrying.' + ' '*10)
        
        # recursively try again
        return get_request(url, parameters, steamspy)
    
    if response:
        try:
            return response.json()
        except:
            False
    else:
        # We do not know how many pages steamspy has... and it seems to work well, so we will use no response to stop.
        if steamspy:
            return "stop"
        else :
            # response is none usually means too many requests. Wait and try again 
            print('No response, waiting 10 seconds...')
            time.sleep(10)
            print('Retrying.')
            return get_request(url, parameters, steamspy)

Define Download Data

In [4]:
def get_app_data(app_list, start, stop, parser, pause):
    """Return list of app data generated from parser.
    
    parser : function to handle request
    """
    app_data = []
    
    # iterate through each row of app_list, confined by start and stop
    for index, appid in app_list[start:stop].items():
        print('Current index: {}'.format(index), end='\r')

        # retrive app data for a row, handled by supplied parser, and append to list
        try:
            data = parser(appid)
            app_data.append(data)
        except:
            print("Error with "+str(appid))
        time.sleep(pause) # prevent overloading api with requests
    
    return app_data


def process_batches(parser, app_list, download_path, data_filename, index_filename,
                    columns, begin=0, end=-1, batchsize=100, pause=1):
    """Process app data in batches, writing directly to file.
    
    parser : custom function to format request
    app_list : dataframe of appid and name
    download_path : path to store data
    data_filename : filename to save app data
    index_filename : filename to store highest index written
    columns : column names for file
    
    Keyword arguments:
    
    begin : starting index (get from index_filename, default 0)
    end : index to finish (defaults to end of app_list)
    batchsize : number of apps to write in each batch (default 100)
    pause : time to wait after each api request (defualt 1)
    
    returns: none
    """
    print('Starting at index {}:\n'.format(begin))
    
    # by default, process all apps in app_list
    if end == -1:
        end = len(app_list) + 1
    
    # generate array of batch begin and end points
    batches = np.arange(begin, end, batchsize)
    batches = np.append(batches, end)
    
    apps_written = 0
    batch_times = []
    
    for i in range(len(batches) - 1):
        start_time = time.time()
        
        start = batches[i]
        stop = batches[i+1]
        
        app_data = get_app_data(app_list, start, stop, parser, pause)
        
        rel_path = os.path.join(download_path, data_filename)
        
        # writing app data to file
        with open(rel_path, 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=columns, extrasaction='ignore')
            
            for j in range(3,0,-1):
                print("\rAbout to write data, don't stop script! ({})".format(j), end='')
                time.sleep(0.5)
            
            writer.writerows(app_data)
            print('\rExported lines {}-{} to {}.'.format(start, stop-1, data_filename), end=' ')
            
        apps_written += len(app_data)
        
        idx_path = os.path.join(download_path, index_filename)
        
        # writing last index to file
        with open(idx_path, 'w') as f:
            index = stop
            print(index, file=f)
            
        # logging time taken
        end_time = time.time()
        time_taken = end_time - start_time
        
        batch_times.append(time_taken)
        mean_time = statistics.mean(batch_times)
        
        est_remaining = (len(batches) - i - 2) * mean_time
        
        remaining_td = dt.timedelta(seconds=round(est_remaining))
        time_td = dt.timedelta(seconds=round(time_taken))
        mean_td = dt.timedelta(seconds=round(mean_time))
        
        print('Batch {} time: {} (avg: {}, remaining: {})'.format(i, time_td, mean_td, remaining_td))
            
    print('\nProcessing batches complete. {} apps written'.format(apps_written))

In [5]:
def reset_index(download_path, index_filename):
    """Reset index in file to 0."""
    rel_path = os.path.join(download_path, index_filename)
    
    f= open(rel_path, 'w')
    f.write("0")
        

def get_index(download_path, index_filename):
    """Retrieve index from file, returning 0 if file not found."""
    try:
        rel_path = os.path.join(download_path, index_filename)

        with open(rel_path, 'r') as f:
            index = int(f.readline())
            #This just reads the initial line
    
    except FileNotFoundError:
        index = 0
        
    return index


def prepare_data_file(download_path, filename, index, columns):
    """Create file and write headers if index is 0."""
    if index == 0:
        rel_path = os.path.join(download_path, filename)

        with open(rel_path, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=columns)
            writer.writeheader()

Download Steam Data

In [6]:
def getAppListBatch(url, parameters):
    json_data = get_request(url, parameters=parameters)
    steam_id = pd.DataFrame.from_dict(json_data["response"]["apps"])
    try:
        more_results = json_data["response"]["have_more_results"]
        last_appid =  json_data["response"]["last_appid"]
    except:
        more_results = False
        last_appid = False
    return more_results, steam_id, last_appid

def get_update_ids_old(updatedlist, oldlist):
    updatedlist['key1'] = 1
    oldlist['key2'] = 1
    updatedlist = pd.merge(updatedlist, oldlist, right_on=['steam_appid','name'],left_on=['appid','name'], how = 'outer')
    updatedlist = updatedlist[~(updatedlist.key2 == updatedlist.key1)]
    updatedlist = updatedlist.drop(['key1','key2','steam_appid'], axis=1)
    return updatedlist

def get_update_ids(idList, oldFullList):
    #We are going to forget about names and only care about IDs.
    idList = idList["appid"]
    oldFullList = oldFullList["steam_appid"]
    oldFullList.columns = ["appid"]
    updatedList = pd.concat([idList, oldFullList])
    updatedList = updatedList.drop_duplicates(keep=False)
    updatedList = updatedList.reset_index(drop=True)
    return updatedList

In [7]:
def getAppList():
    with open('../data/steam_key.txt') as f:
        key = f.read()

    url = "https://api.steampowered.com/IStoreService/GetAppList/v1/?"
    parameters = {"key": key}
    more_results = True
    begin = True
    # from the request we get the more_results flag and also the last_appid, so we use them for the next requests.
    while (more_results):
        more_results, steam_ids, last_appid = getAppListBatch(url, parameters)
        parameters["last_appid"] = last_appid
        if (begin):
            steam_allids = steam_ids
            begin = False
        else:
            steam_allids = pd.concat([steam_allids, steam_ids])
    return steam_allids
# request 'all' from steam spy and parse into dataframe

In [8]:
def parse_steam_request(appid):
    """Unique parser to handle data from Steam Store API.
    
    Returns : json formatted data (dict-like)
    """
    with open('../data/steam_key.txt') as f:
        key = f.read()
        
    url = "http://store.steampowered.com/api/appdetails/"
    parameters = {"appids": appid, "key": key}
    
    json_data = get_request(url, parameters=parameters)
    json_app_data = json_data[str(appid)]
    
    if json_app_data['success']:
        data = json_app_data['data']
    else:
        data = {'steam_appid': appid}
        
    return data


# Set file parameters
download_path = '../data/download/'
steam_app_data = 'steam_app_data.csv'
steam_app_data_delta = 'steam_app_data_delta.csv'
steam_index = 'steam_index.txt'

steam_columns = [
    'type', 'name', 'steam_appid', 'required_age', 'is_free', 'controller_support',
    'dlc', 'detailed_description', 'about_the_game', 'short_description', 'fullgame',
    'supported_languages', 'header_image', 'website', 'pc_requirements', 'mac_requirements',
    'linux_requirements', 'legal_notice', 'drm_notice', 'ext_user_account_notice',
    'developers', 'publishers', 'demos', 'price_overview', 'packages', 'package_groups',
    'platforms', 'metacritic', 'reviews', 'categories', 'genres', 'screenshots',
    'movies', 'recommendations', 'achievements', 'release_date', 'support_info',
    'background', 'content_descriptors'
]

# Overwrites last index for demonstration (would usually store highest index so can continue across sessions)
if (os.path.isfile(download_path+steam_app_data_delta) == False):
    reset_index(download_path, steam_index)

# Retrieve last index downloaded from file
index = get_index(download_path, steam_index)

# Wipe or create data file and write headers if no previous  data
if (os.path.isfile(download_path+steam_app_data) == False):
    prepare_data_file(download_path, steam_app_data, index, steam_columns)
    
# Wipe or create data file delta and write headers if index is 0
if (os.path.isfile(download_path+steam_app_data_delta) == False):
    prepare_data_file(download_path, steam_app_data_delta, index, steam_columns)
    
    
# Here we get the list of appids from steam
full_steam_ids = getAppList()

# Here we get the real list of ids not yet in our dataframe. If this is the first time we are downloading the data, we can skip
# This step and instead use the full app_list.
try:
    oldlist = pd.read_csv('../data/download/steam_app_data.csv', usecols = ['name','steam_appid'])
    steam_ids = get_update_ids(full_steam_ids, oldlist)
except FileNotFoundError:
    print("Pre-existing file not found. First time downloading full app data from steam. This will take a while.\n")
    steam_ids = full_steam_ids

In [9]:
print("New IDs detected: "+str(len(steam_ids)))

New IDs detected: 1143


In [None]:
# I separated the long process to be able to debug it better.
# Set end and chunksize for demonstration - remove to run through entire app list
# Here by default we passed "app_list" that contained all the information and saved it, now we will modify it a bit
# And add pre-processing and post-processing
print("Adding "+str(len(steam_ids))+" new ids.\n")
process_batches(
    parser=parse_steam_request,
    app_list=steam_ids,
    download_path=download_path,
    data_filename=steam_app_data_delta,
    index_filename=steam_index,
    columns=steam_columns,
    begin=index,
    #end=10,
    #batchsize=5
)

try:
    oldlist = pd.read_csv('../data/download/steam_app_data.csv')
    # We change the old file to backup, so remove any backup named this way before...
    os.rename('../data/download/steam_app_data.csv', '../data/download/steam_app_data_backup.csv')
    newlist = pd.read_csv('../data/download/steam_app_data_delta.csv')
    oldlist = oldlist.append(newlist, ignore_index=True)
    oldlist.to_csv('../data/download/steam_app_data.csv', index=False)
except FileNotFoundError:
    os.rename('../data/download/steam_app_data_delta.csv', '../data/download/steam_app_data.csv')

In [None]:
os.rename('../data/download/steam_app_data_delta.csv', '../data/download/steam_app_data.csv')

Checking Data for Cleaning

In [10]:
steam_app_data = pd.read_csv('../data/download/steam_app_data.csv')

  steam_app_data = pd.read_csv('../data/download/steam_app_data.csv')


In [None]:
steam_app_data.head()

In [None]:
steam_app_data[steam_app_data.duplicated(subset="steam_appid")]

In [None]:
steam_app_data[steam_app_data["steam_appid"] == 34330]

In [None]:
len(steam_app_data)-len(full_steam_ids)

In [None]:
diff_ids = get_update_ids(full_steam_ids, steam_app_data)

In [None]:
len(diff_ids)

There are 135 new items added to steam since initially running this on 2/6/2024. Ran the second time on 2/8/2024.

Before getting rid of dups I will only focus on the apps with names.

In [None]:
steam_app_data = steam_app_data[~steam_app_data["name"].isna()]
steam_app_data = steam_app_data.drop_duplicates(subset="steam_appid", keep="last")
steam_app_data.to_csv("../data/download/steam_app_data.csv", index=False)

In [None]:
# inspect downloaded data
steam_app_data.info()

In [None]:
steam_app_data[steam_app_data.duplicated(subset="steam_appid")]

In [10]:
full_steam_ids.duplicated(subset="appid").sum()

0

In [11]:
full_steam_ids

Unnamed: 0,appid,name,last_modified,price_change_number
0,10,Counter-Strike,1666823513,21760363
1,20,Team Fortress Classic,1579634708,21760363
2,30,Day of Defeat,1512413490,21319050
3,40,Deathmatch Classic,1568752159,21760363
4,50,Half-Life: Opposing Force,1579628243,21760363
...,...,...,...,...
9218,2845120,Exoracer,1708450746,0
9219,2845400,Hidden Cats In Breeze Village,1708397843,0
9220,2845790,Life as a Lich,1708450623,0
9221,2846340,Color Splash: Predators,1708467377,0


Steam Spy API

In [14]:
def parse_steamspy_request(appid):
    """Parser to handle SteamSpy API data."""
    url = "https://steamspy.com/api.php"
    parameters = {"request": "appdetails", "appid": appid}
    
    json_data = get_request(url, parameters)
    return json_data


# set files and columns
download_path = '../data/download'
steamspy_data = 'steamspy_data.csv'
steamspy_index = 'steamspy_index.txt'

steamspy_columns = [
    'appid', 'name', 'developer', 'publisher', 'score_rank', 'positive',
    'negative', 'userscore', 'owners', 'average_forever', 'average_2weeks',
    'median_forever', 'median_2weeks', 'price', 'initialprice', 'discount',
    'languages', 'genre', 'ccu', 'tags'
]

reset_index(download_path, steamspy_index)
index = get_index(download_path, steamspy_index)

# Wipe data file if index is 0
prepare_data_file(download_path, steamspy_data, index, steamspy_columns)

process_batches(
    parser=parse_steamspy_request,
    app_list=full_steam_ids["appid"],
    download_path=download_path, 
    data_filename=steamspy_data,
    index_filename=steamspy_index,
    columns=steamspy_columns,
    begin=index,
    end=len(full_steam_ids),
    batchsize=300,
    pause=0.1
)

Starting at index 0:

Exported lines 0-299 to steamspy_data.csv. Batch 0 time: 0:02:32 (avg: 0:02:32, remaining: 13:55:07)
Exported lines 300-599 to steamspy_data.csv. Batch 1 time: 0:02:34 (avg: 0:02:33, remaining: 13:57:16)
Exported lines 600-899 to steamspy_data.csv. Batch 2 time: 0:02:31 (avg: 0:02:32, remaining: 13:51:14)
Exported lines 900-1199 to steamspy_data.csv. Batch 3 time: 0:02:32 (avg: 0:02:32, remaining: 13:49:11)
Exported lines 1200-1499 to steamspy_data.csv. Batch 4 time: 0:02:31 (avg: 0:02:32, remaining: 13:45:45)
Exported lines 1500-1799 to steamspy_data.csv. Batch 5 time: 0:02:31 (avg: 0:02:32, remaining: 13:42:04)
Exported lines 1800-2099 to steamspy_data.csv. Batch 6 time: 0:02:31 (avg: 0:02:32, remaining: 13:38:43)
Exported lines 2100-2399 to steamspy_data.csv. Batch 7 time: 0:02:31 (avg: 0:02:32, remaining: 13:35:45)
Exported lines 2400-2699 to steamspy_data.csv. Batch 8 time: 0:02:32 (avg: 0:02:32, remaining: 13:33:29)
Exported lines 2700-2999 to steamspy_data.

AttributeError: 'NoneType' object has no attribute 'get'

In [None]:
steam_spy_data = pd.read_csv('../data/download/steamspy_data.csv')

In [None]:
steam_spy_data.info()

In [None]:
steam_spy_data.duplicated(subset="appid").sum()

Steam Reviews

In [None]:
def parse_steamreviews_request(appid):
    """Parser to handle SteamSpy API data."""
    url = "https://store.steampowered.com/appreviews/" + str(appid)
    parameters = {"json": 1, "num_per_page": "0", "language": "all", "purchase_type": "all"}
    json_data = get_request(url, parameters)
    json_data = json_data['query_summary']
    json_data["appid"]=appid
    return json_data


# set files and columns
download_path = '../data/download'
steamreviews_data = 'steamreviews_data.csv'
steamreviews_index = 'steamreviews_index.txt'

steamreviews_columns = [
    'appid', 'review_score', 'review_score_desc', 'total_positive', 'total_negative', 'total_reviews'
]

reset_index(download_path, steamreviews_index)
index = get_index(download_path, steamreviews_index)

# Wipe data file if index is 0
prepare_data_file(download_path, steamreviews_data, index, steamreviews_columns)

full_steam_ids=pd.read_csv("../data/download/steam_app_data.csv")

process_batches(
    parser=parse_steamreviews_request,
    app_list=full_steam_ids["steam_appid"],
    download_path=download_path, 
    data_filename=steamreviews_data,
    index_filename=steamreviews_index,
    columns=steamreviews_columns,
    begin=index,
    end=len(full_steam_ids),
    batchsize=300,
    pause=0
)

In [None]:
steamreviews=pd.read_csv("../data/download/steamreviews_data.csv", index_col="appid")

In [None]:
steamreviews.info()

In [None]:
steamreviews["total_reviews"].value_counts()

In [None]:
steamreviews["review_score_desc"].value_counts()

In [None]:
steamreviews["review_score"].value_counts()