# This script is the first phase of the Steam Data Pipeline.

Its purpose is to:
1. Fetch a master list of all games from the SteamSpy API.
2. Process the data for each game, performing basic cleaning and feature engineering.
3. Store the cleaned data in a MongoDB collection, ensuring no duplicates are created.



In [None]:
import os
import re
from pymongo import MongoClient
from tqdm import tqdm
import requests
import logging
from datetime import datetime, timedelta

# Imports the custom logger
from utility.log_debug import logging
from pymongo.operations import UpdateOne
import time 
from concurrent.futures import ThreadPoolExecutor

# Configuration setup


In [2]:

MONGO_URI = os.environ.get('MONGO_URI')

client = MongoClient(MONGO_URI)

DB_NAME = "steam_games"
GAME_COLLECTION = "game_infos"
REVIEW_COLLECTION = "reviews"

MAX_WORKERS = 15

game_collection = client[DB_NAME][GAME_COLLECTION]
review_collection = client[DB_NAME][REVIEW_COLLECTION]

STEAMSPY_API_URL = "https://steamspy.com/api.php?request=all"

try:
    client.admin.command('ping')
    logging.info("MongoDB connection successful.")
except Exception as e:
    logging.error(f"UNABLE to connect to MongoDB. Please check your MONGO_URI. Error: {e}")

2025-10-19 15:52:00,993 - INFO - MongoDB connection successful.


#  DATA FETCHING FUNCTION 

In [3]:

def get_steamspy_game(STEAMSPY_API_URL):
    """Fetches the complete game dataset from the SteamSpy API."""
    try:
        logging.info("Attempting to retrieve data from SteamSpy...")
        response = requests.get(STEAMSPY_API_URL)
        # This will automatically raise an error if the API returns a bad status (e.g., 404, 503).
        response.raise_for_status()
        logging.info("Retrieve data from SteamSpy SUCCESSFULLY!")
        return response.json()
    except Exception as e:
        logging.error(f"Error! Unable to retrieve data from SteamSpy: {e}")
        return None
    

    

In [4]:
all_games_data = get_steamspy_game(STEAMSPY_API_URL)


if all_games_data:
    logging.info(f"Retrieved data for {len(all_games_data)} games. Starting ingestion process...")

    for app_info, info_dict in tqdm(all_games_data.items(), desc="Processing game data"):
        try:
            app_id = int(app_info)
            # Safely parse the 'owners' string (e.g., "20,000 .. 50,000") into numbers.
            min_owner = 0  
            max_owner = 0  
            owner_string = info_dict.get("owners")
            if owner_string:

                owner_numbers = re.findall(r'\d+', owner_string.replace(",", ""))

                if len(owner_numbers) >= 2:
                    min_owner = int(owner_numbers[0])
                    max_owner = int(owner_numbers[1])
                elif len(owner_numbers) == 1:
                    # Handles cases where only one number is provided (e.g., "5,000").
                    min_owner = int(owner_numbers[0])
                    max_owner = int(owner_numbers[0])


            # ---  Create the final document for MongoDB ---
            game_document = {
                "_id": app_id,  # Use the app_id as the unique primary key.
                "name": info_dict.get("name"),
                "developer": info_dict.get("developer"),
                "publisher": info_dict.get("publisher"),
                "score_rank": info_dict.get("score_rank"),
                "positive_reviews": info_dict.get("positive"),
                "negative_reviews": info_dict.get("negative"),
                "user_score": info_dict.get("userscore"),
                "min_owners_estimated": min_owner, # Use the cleaned integer value.
                "max_owners_estimated": max_owner, # Use the cleaned integer value.
                "avg_playtime_forever": info_dict.get("average_forever"),
                "avg_playtime_2weeks": info_dict.get("average_2weeks"),
                "price_cents": info_dict.get("price"),
                "initial_price_cents": info_dict.get("initialprice"),
                "discount_percent": info_dict.get("discount"),
                "concurrent_users": info_dict.get("ccu"),
            }
            
            filter_query = {"_id": app_id}
            update_operation = {"$set": game_document}
           
            game_collection.update_one(filter_query, update_operation, upsert=True)
        except Exception as e:
            logging.error(f"Unable to process {app_id}: {info_dict.get('name')}. Error: {e}")
            continue


logging.info("--- COMPLETE LOADING GAME INFO ---")

2025-10-19 15:52:01,016 - INFO - Attempting to retrieve data from SteamSpy...
2025-10-19 15:52:01,378 - INFO - Retrieve data from SteamSpy SUCCESSFULLY!
2025-10-19 15:52:01,384 - INFO - Retrieved data for 1000 games. Starting ingestion process...
Processing game data: 100%|██████████| 1000/1000 [00:19<00:00, 50.58it/s]
2025-10-19 15:52:21,166 - INFO - --- COMPLETE LOADING GAME INFO ---


In [None]:
def fetch_steam_reviews(appid, num_reviews_target=1000, months_to_fetch=3):
    """
    Fetches up to a target number of reviews from the last N months.
    """

    reviews_collected = []
    cursor = '*'
    
    # Calculate the cutoff date. We will stop when reviews are older than this.
    cutoff_date = datetime.now() - timedelta(days=months_to_fetch * 30)

    while len(reviews_collected) < num_reviews_target:
        try:
            url = f"https://store.steampowered.com/appreviews/{appid}?json=1&cursor={cursor}&language=english&filter=all&num_per_page=100"
            
            response = requests.get(url, timeout=15)
            response.raise_for_status()
            data = response.json()
            
            if not data or data.get("success") != 1 or not data.get("reviews"):
                break
            
            batch_reviews = data.get("reviews", [])
            for review in batch_reviews:
                review_date = datetime.fromtimestamp(review['timestamp_created'])
                if review_date >= cutoff_date:
                    review['app_id'] = appid
                    reviews_collected.append(review)
            
            if batch_reviews:
                last_review_date = datetime.fromtimestamp(batch_reviews[-1]['timestamp_created'])
                if last_review_date < cutoff_date:
                    break

            next_cursor = data.get("cursor")
            if not next_cursor or next_cursor == cursor:
                break
            
            cursor = next_cursor
            time.sleep(1)

        except requests.exceptions.RequestException as e:
            logging.error(f"HTTP Error while fetching reviews for AppID {appid}: {e}")
            break
    return reviews_collected

def process_review(app_id):
    try:
        reviews = fetch_steam_reviews(app_id)
        if reviews:
            bulk_op = []
            for single_review in reviews:
                query = {"recommendationid" : single_review["recommendationid"]}
                operation = {"$set": single_review}
                bulk_op.append(UpdateOne(query, operation, upsert= True))

            if bulk_op:
                review_collection.bulk_write(bulk_op)
    except Exception as e:
        logging.error(f"ERROR, Unable to load {single_review} : {e}")
    return 



In [6]:
logging.info("--- BEGIN TO COLLECT GAME REVIEWS ---")
app_ids = []

for app_info, info_dict in all_games_data.items():
        app_ids.append(int(app_info))

with ThreadPoolExecutor(max_workers= MAX_WORKERS) as executor:
    list(tqdm(executor.map(process_review, app_ids), total=len(app_ids), desc= "Processing..."))
    
logging.info("--- COMPLETE LOADING GAME REVIEWS ---")

2025-10-19 15:52:21,190 - INFO - --- BEGIN TO COLLECT GAME REVIEWS ---
Processing...: 100%|██████████| 1000/1000 [09:20<00:00,  1.79it/s]
2025-10-19 16:01:41,400 - INFO - --- COMPLETE LOADING GAME REVIEWS ---
