In [6]:
!pip install python-steam-api

Collecting python-steam-api
  Downloading python_steam_api-1.2.2-py3-none-any.whl.metadata (18 kB)
Downloading python_steam_api-1.2.2-py3-none-any.whl (11 kB)
Installing collected packages: python-steam-api
Successfully installed python-steam-api-1.2.2


In [5]:
from steam import Steam
from decouple import config
import requests
import json
from pathlib import Path
from pymongo import MongoClient
import pandas as pd
from IPython.display import display, HTML
import matplotlib.pyplot as plt
import numpy as np
import time
from tqdm import tqdm
from multiprocessing import Process
import multiprocessing 
from pymongo import UpdateOne
from pymongo import InsertOne
from concurrent.futures import ThreadPoolExecutor, as_completed
from IPython.display import clear_output

ImportError: cannot import name 'Steam' from 'steam' (C:\Users\senti\anaconda3\Lib\site-packages\steam\__init__.py)

## API calls list

List of all the games ( appid and name ) : http://api.steampowered.com/ISteamApps/GetAppList/v0002/?key={key}&format=json

Get concurrent players: http://api.steampowered.com/ISteamUserStats/GetNumberOfCurrentPlayers/v1/?appid={app_id}

Get review stats: https://store.steampowered.com/appreviews/{app_id}?json=1&language=all&l=english

Get game details: https://store.steampowered.com/api/appdetails?appids={app_id}

Get concurrent players: https://api.steampowered.com/ISteamUserStats/GetNumberOfCurrentPlayers/v1/?appid={app_id}&key={key}


In [9]:
# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['DM_Project']
collection = db['Games']
df = pd.DataFrame(list(collection.find()))

In [10]:
file_key_path = 'key.txt'
file_gamelist_path = 'Data/games_appid_name.json'
session = requests.Session()
key=''

with open(file_key_path, 'r') as file:
    key = file.read()
steam = Steam(key)



In [11]:


def get_game_list(file_path):
    file_path = Path(file_path) 
    if file_path.exists():
        print(f"File found {file_path}")
        with open(file_path, 'r') as json_file:
            data = json.load(json_file)
            return data
    else:
        print(f"File not found {file_path}")
        url = f"http://api.steampowered.com/ISteamApps/GetAppList/v0002/?key={key}&format=json"
        response = session.get(url)
        if response.status_code == 200:
            return response.json()
        else:
            return {}
        
def get_CCU(app_id,key_id):
    try:
        url = f"https://api.steampowered.com/ISteamUserStats/GetNumberOfCurrentPlayers/v1/?appid={app_id}&key={key}"
        response = session.get(url)
        if response.status_code == 200:
            return response.json()
        else:
            #print(f"Api call get_CCU on game {app_id} failed")
            return {} 
    except requests.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")  
        return {}
    except requests.exceptions.Timeout:
        print("The request timed out")
        return {}
    except ValueError:  
        print("Response content is not valid JSON")
        return {}
    except Exception as err:
        print(f"An error occurred: {err}")
        return {}

    
def get_game_details(app_id):
    try:
        url = f"https://store.steampowered.com/api/appdetails?appids={app_id}&l=english"
        response = session.get(url)
        if response.status_code == 200:
            return response.json()
        else:
            #print(f"Api call get_game_details on game {app_id} failed")
            return {}
    except requests.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")  
        return {}
    except requests.exceptions.Timeout:
        print("The request timed out")
        return {}
    except ValueError: 
        print("Response content is not valid JSON")
        return {}
    except Exception as err:
        print(f"An error occurred: {err}")
        return {}

def get_game_reviews(app_id):
    try:
        url = f"https://store.steampowered.com/appreviews/{app_id}?json=1&language=all&l=english"
        response = session.get(url)
        if response.status_code == 200:
            return response.json()
        else:
            #print(f"Api call get_game_reviews on game {app_id} failed")
            return {}
    except requests.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")  # Python 3.6
        return {}
    except requests.exceptions.Timeout:
        print("The request timed out")
        return {}
    except ValueError: 
        print("Response content is not valid JSON")
        return {}
    except Exception as err:
        print(f"An error occurred: {err}")
        return {}

def get_game_tags(app_id):
    try:
        url = f"http://store.steampowered.com/api/appdetails?appids={app_id}&l=english"
        response = session.get(url)
        if response.status_code == 200:
            data = response.json()
            return data[str(app_id)]['data']['genres'], data[str(app_id)]['data']['categories']
        else:
            #print(f"Api call get_game_tags on game {app_id} failed")
            return {}, {}
    except requests.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")  # Python 3.6
        return {}
    except requests.exceptions.Timeout:
        print("The request timed out")
        return {}
    except ValueError: 
        print("Response content is not valid JSON")
        return {}
    except Exception as err:
        print(f"An error occurred: {err}")
        return {}



In [12]:
def update_CCU(key_id):
    query = {'Peak CCU':0}
    zero_ccu_df = pd.DataFrame(list(collection.find(query)))
    bulk_ops = []  
    bulk_size = 100
    try:
        for index, row in tqdm(zero_ccu_df.iterrows(), total=len(zero_ccu_df), desc="Updating CCU"):
            appid = row['AppID']
            CCU_json = get_CCU(appid,key)
            response_data = CCU_json.get('response', {})
            CCU = response_data.get('player_count', 0)
            query = {'AppID':appid}
            update = {'$set': {'Peak CCU':CCU}}
            bulk_ops.append(UpdateOne(query, update))
            if len(bulk_ops) == bulk_size:
                collection.bulk_write(bulk_ops)
                bulk_ops = []
                time.sleep(5)
            else:
                continue
    except Exception as e:
        print(f"An error occurred, processed CCU will be writed: {e}")
        collection.bulk_write(bulk_ops)
    finally:
        collection.bulk_write(bulk_ops)       
        
        
def parallel_requests(appid, key):
    with ThreadPoolExecutor() as executor:
        future_to_request = {
            executor.submit(get_game_details, appid): 'details',
            executor.submit(get_game_reviews, appid): 'reviews',
            executor.submit(get_CCU, appid, key): 'CCU'
        }

        results = {}
        for future in as_completed(future_to_request):
            request_type = future_to_request[future]
            try:
                results[request_type] = future.result()
            except Exception as exc:
                print(f'{request_type} request generated an exception: {exc}')
        
    return results  


def retry_function(func,collection,df_r):
    delay_seconds=20
    while not df_r['updated'].all():
        try:
            df_r = pd.DataFrame(list(collection.find()))
            return func()
        except Exception as e:
            print(f"Function failed with error: {e}. Retrying in {delay_seconds} seconds...")
            time.sleep(delay_seconds)
    raise

def update_games(df,key):
    collection = db['Games updated']    
    bulk_ops = []  
    bulk_size = 100
    try:
        for index, row in tqdm(df.iterrows(), total=len(df), desc="Updating games"):
            if index <= 14110: continue
            appid=-1
            appid = row['AppID']
            game_name = row['Name']
            game_genres = row['Genres']
            game_categories  = row['Categories']
            CCU = row['Peak CCU']
            #results = parallel_requests(appid, key)
            #details_json = results['details']
            #reviews_json = results['reviews']
            #CCU_json = results['CCU']
            details_json = get_game_details(appid)
            reviews_json = get_game_reviews(appid)
            CCU_json = get_CCU(appid,key)
            update_success = True
            if CCU == 0 and CCU_json is not None:
                response_data = CCU_json.get('response', {})
                curr_CCU = response_data.get('player_count', 0) 
            if details_json.get(str(appid),{}).get('success',False):
                game_data = details_json.get(str(appid)).get('data')
                game_name = game_data.get('name',row['Name'])
                game_genres = [genre['description'] for genre in game_data.get('genres', [])]
                game_categories =[category['description'] for category in game_data.get('categories', [])]
            else:
                update_success = False  
                #print('Sleeping 5 sec on failed request')
                time.sleep(5)

            if reviews_json.get('success',False):
                review_data= reviews_json.get('query_summary',{})
                review_positive = review_data.get('total_positive',row['Positive'])
                review_negative = review_data.get('total_negative',row['Negative'])
            else:
                update_success = False
                #print('Sleeping 5 sec on failed request')
                time.sleep(5)
            query = {'AppID':appid}
            update = {'$set': {'updated': True, 'update_successful': update_success,'Name':game_name,'Genres':game_genres,'Categories':game_categories,'Positive':review_positive,'Negative':review_negative,'Peak CCU':CCU}}
            bulk_ops.append(UpdateOne(query, update))


            if len(bulk_ops) == bulk_size:
                collection.bulk_write(bulk_ops)
                bulk_ops = []
                #print('Sleeping 30 seconds after bulk write to not overload Steam with requests')
                time.sleep(30)

            else:
                continue
    except Exception as e:
        print(f"An error occurred, processed games will be writed: {e}")
        collection.bulk_write(bulk_ops)
    finally:
        collection.bulk_write(bulk_ops)

        
def add_games_from_list(game_list,app_ids,collection):
    bulk_ops = []  
    bulk_size = 100
    try:
        for index, g in enumerate(tqdm(game_list, desc=f'Processing games, curr bulk size:{len(bulk_ops)}')):
            if index < 0:
                continue 
            if g.get('appid') in app_ids:
                continue
            appid = g.get('appid')
            game_name = g.get('name')
            game_genres = []
            game_categories  = []
            CCU = 0
            details_json = get_game_details(appid)
            reviews_json = get_game_reviews(appid)
            CCU_json = get_CCU(appid,key)
            review_positive = 0
            review_negative = 0
            if CCU == 0 and CCU_json is not None:
                response_data = CCU_json.get('response', {})
                curr_CCU = response_data.get('player_count', 0) 
                
            if details_json.get(str(appid),{}).get('success',False):
                game_data = details_json.get(str(appid)).get('data')
                game_genres = [genre['description'] for genre in game_data.get('genres', [])]
                game_categories =[category['description'] for category in game_data.get('categories', [])]
            else:
                continue

            if reviews_json.get('success',False):
                review_data= reviews_json.get('query_summary',{})
                review_positive = review_data.get('total_positive',0)
                review_negative = review_data.get('total_negative',0)
                if (review_positive + review_negative) == 0:
                    continue
            else:
                continue
            
            insert =  {'AppID':appid,'Name':game_name,'Genres':game_genres,'Categories':game_categories,'Positive':review_positive,'Negative':review_negative,'Peak CCU':CCU}
            bulk_ops.append(InsertOne(insert))
            if len(bulk_ops) == bulk_size:
                collection.bulk_write(bulk_ops)
                bulk_ops = []
                time.sleep(5)
            else:
                continue
    except Exception as e:
        print(f"An error occurred, processed games will be writed: {e}")
        collection.bulk_write(bulk_ops)
    finally:
        collection.bulk_write(bulk_ops)

In [None]:
collection_updated = db['Games_updated']
df_updated = pd.DataFrame(list(collection_updated.find()))
update_games(df_updated,key)

Updating games:  39%|███▉      | 30658/78774 [6:35:28<18:27:08,  1.38s/it] 

In [6]:

gl = get_game_list(file_gamelist_path)
app_ids = collection.distinct('AppID')
filtered_gl = [game for game in tqdm(gl, desc="Filtering games") if game.get('appid') not in app_ids]
filtered_gl = sorted(filtered_gl, key=lambda x: x['appid'])

File found Data\games_appid_name.json


Filtering games: 100%|██████████| 189176/189176 [02:08<00:00, 1477.84it/s]


In [22]:
seen_appids = set()
unique_games = []
for game in filtered_gl:
    if game['appid'] not in seen_appids:
        unique_games.append(game)
        seen_appids.add(game['appid'])
filtered_gl = unique_games

In [25]:
db = client['DM_Project']
collection_new = db['New_games']
add_games_from_list(filtered_gl,app_ids,collection_new)

Processing games, curr bulk size:0:  35%|███▌      | 30811/87882 [6:46:13<13:31:38,  1.17it/s] 

Response content is not valid JSON


Processing games, curr bulk size:0:  44%|████▍     | 38609/87882 [8:35:59<11:12:06,  1.22it/s]

Response content is not valid JSON


Processing games, curr bulk size:0:  53%|█████▎    | 47015/87882 [10:31:30<8:33:28,  1.33it/s] 

Response content is not valid JSON


Processing games, curr bulk size:0: 100%|██████████| 87882/87882 [19:09:19<00:00,  1.27it/s]   


Updating games:  17%|█▋        | 13092/78774 [01:08<31:50, 34.39it/s]   

Sleeping 30 seconds after bulk write to not overload Steam with requests


Updating games:  17%|█▋        | 13092/78774 [01:26<31:50, 34.39it/s]

In [35]:
%%script false
collection = db['Games']
df = pd.DataFrame(list(collection.find()))

bulk_ops = []
collection_updated = db['Games_updated']
for index, row in tqdm(df.iterrows(), total=len(df), desc="Adding games to new db"):
    if row['Positive']+row['Negative']> 0:
            insert =  {'AppID':row['AppID'],'Name':row['Name'],'Genres':row['Genres'],'Categories':row['Categories'],'Positive':row['Positive'],'Negative':row['Negative'],'Peak CCU':row['Peak CCU']}
            bulk_ops.append(InsertOne(insert))
    if len(bulk_ops)> 100:
        collection_updated.bulk_write(bulk_ops)
        bulk_ops = []
collection_updated.bulk_write(bulk_ops)
bulk_ops = []

Adding games to new db:   0%|          | 158/83560 [00:00<01:33, 895.62it/s]






In [42]:
%%script false
collection_new = db['New_games']
df_new = pd.DataFrame(list(collection_new.find()))

app_ids = collection_updated.distinct('AppID')
bulk_ops = []
for index, row in tqdm(df_new.iterrows(), total=len(df_new), desc="Adding games to new db"):
    if row['AppID'] not in app_ids:
            insert =  {'AppID':row['AppID'],'Name':row['Name'],'Genres':row['Genres'],'Categories':row['Categories'],'Positive':row['Positive'],'Negative':row['Negative'],'Peak CCU':row['Peak CCU']}
            bulk_ops.append(InsertOne(insert))
    if len(bulk_ops)> 100:
        collection_updated.bulk_write(bulk_ops)
        bulk_ops = []
collection_updated.bulk_write(bulk_ops)
bulk_ops = []

Adding games to new db: 100%|██████████| 12445/12445 [00:12<00:00, 1016.35it/s]


In [13]:
from pymongo import MongoClient

collection_updated = db['Games_updated']

# Fetch only the AppID from all documents
app_ids = collection_updated.find({}, {'AppID': 1})

# Extract AppIDs and convert them to a comma-separated string
app_ids_str = ','.join(str(doc['AppID']) for doc in app_ids)

# Save the string to a text file
with open('app_ids.txt', 'w') as file:
    file.write(app_ids_str)

print('AppIDs have been saved to app_ids.txt.')


AppIDs have been saved to app_ids.txt.


[]