In [2]:
import sqlite3
import requests
import time
import logging
from bs4 import BeautifulSoup
import os
from dotenv import load_dotenv

# Charger les variables d'environnement
load_dotenv()

# Configuration du logging
logging.basicConfig(filename='populate_aug_steam_games.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

def get_game_details(steam_game_id):
    url = f"https://store.steampowered.com/api/appdetails?appids={steam_game_id}"
    try:
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            if data[str(steam_game_id)]['success']:
                return data[str(steam_game_id)]['data']
    except Exception as e:
        logging.error(f"Error fetching game details for {steam_game_id}: {str(e)}")
    return None

def get_steam_page_info(app_id):
    url = f"https://store.steampowered.com/app/{app_id}/"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    try:
        response = requests.get(url, headers=headers)
        soup = BeautifulSoup(response.text, 'html.parser')
        
        ai_disclosure = soup.find(string=lambda text: "AI GENERATED CONTENT DISCLOSURE" in text.upper() if text else False)
        ai_generated = bool(ai_disclosure)
        ai_content = None
        if ai_generated:
            ai_section = soup.find('h2', string='AI Generated Content Disclosure')
            if ai_section:
                ai_paragraph = ai_section.find_next('i')
                if ai_paragraph:
                    ai_content = ai_paragraph.text.strip()
        
        tags = [tag.text.strip() for tag in soup.find_all('a', class_='app_tag')]
        
        return {
            'ai_generated': ai_generated,
            'ai_content': ai_content,
            'tags': tags
        }
    except Exception as e:
        logging.error(f"Error scraping Steam page for {app_id}: {str(e)}")
    return None

def insert_aug_steam_game(cursor, game_data, steam_page_info):
    game_id = game_data['steam_appid']
    add_date = int(time.time())
    dev = ', '.join(game_data.get('developers', []))
    publisher = ', '.join(game_data.get('publishers', []))
    tags = ', '.join(steam_page_info['tags']) if steam_page_info else ''
    release_date = game_data.get('release_date', {}).get('date', '')
    description = game_data.get('short_description', '')
    ai_generated = 'Yes' if steam_page_info and steam_page_info['ai_generated'] else 'No'
    ai_content = steam_page_info['ai_content'] if steam_page_info and steam_page_info['ai_generated'] else None
    content_descriptors = game_data.get('content_descriptors', {})
    content_descriptors_ids = content_descriptors.get('ids', [])
    content_descriptors_str = ', '.join(map(str, content_descriptors_ids))
    supported_languages = game_data.get('supported_languages', '')
    free = 'Yes' if game_data.get('is_free', False) else 'No'
    dlc = 'Yes' if game_data.get('type', '') == 'dlc' else 'No'

    cursor.execute('''
    INSERT OR REPLACE INTO aug_steam_games
    (game_id, add_date, dev, publisher, tags, release_date, description, ai_generated, ai_content, 
    content_descriptors, supported_languages, free, dlc)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', (game_id, add_date, dev, publisher, tags, release_date, description, ai_generated, ai_content,
          content_descriptors_str, supported_languages, free, dlc))

def populate_aug_steam_games():
    conn_steam = sqlite3.connect('steam_games.db')
    conn_aug = sqlite3.connect('aug_steam_games.db')
    cursor_steam = conn_steam.cursor()
    cursor_aug = conn_aug.cursor()

    try:
        # Ensure the aug_steam_games table exists
        cursor_aug.execute('''
        CREATE TABLE IF NOT EXISTS aug_steam_games (
            game_id INTEGER PRIMARY KEY,
            add_date INTEGER,
            dev TEXT,
            publisher TEXT,
            tags TEXT,
            release_date TEXT,
            description TEXT,
            ai_generated TEXT,
            ai_content TEXT,
            content_descriptors TEXT,
            supported_languages TEXT,
            free TEXT,
            dlc TEXT
        )
        ''')

        # Get all game IDs from steam_games that are not in aug_steam_games
        cursor_steam.execute('''
        SELECT steam_game_id FROM games
        WHERE steam_game_id NOT IN (SELECT game_id FROM aug_steam_games)
        ''')

        games_to_add = cursor_steam.fetchall()
        total_games = len(games_to_add)
        logging.info(f"Total games to process: {total_games}")

        for index, (game_id,) in enumerate(games_to_add, 1):
            if index % 100 == 0:
                logging.info(f"Processing game {index} of {total_games}")

            game_data = get_game_details(game_id)
            if game_data:
                steam_page_info = get_steam_page_info(game_id)
                insert_aug_steam_game(cursor_aug, game_data, steam_page_info)

                if index % 100 == 0:
                    conn_aug.commit()
                    logging.info(f"Committed 100 games to aug_steam_games.db")

            time.sleep(1)  # To avoid overwhelming the Steam API

        conn_aug.commit()
        logging.info("Population of aug_steam_games completed successfully")

    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")
    finally:
        conn_steam.close()
        conn_aug.close()

if __name__ == "__main__":
    populate_aug_steam_games()

In [3]:
import sqlite3
import requests
import time
import logging
from bs4 import BeautifulSoup
import os
from dotenv import load_dotenv
import tempfile

# Charger les variables d'environnement
load_dotenv()

# Configuration du logging
logging.basicConfig(filename='populate_aug_steam_games.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Configuration
GITHUB_REPO = 'steampage-creation-date'
DB_FILE_PATH = 'steam_games.db'

def download_db(url, local_path):
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(local_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return True
    logging.error(f"Échec du téléchargement de la base de données. Code de statut: {response.status_code}")
    return False

def get_game_details(steam_game_id):
    url = f"https://store.steampowered.com/api/appdetails?appids={steam_game_id}"
    try:
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            if data[str(steam_game_id)]['success']:
                return data[str(steam_game_id)]['data']
    except Exception as e:
        logging.error(f"Error fetching game details for {steam_game_id}: {str(e)}")
    return None

def get_steam_page_info(app_id):
    url = f"https://store.steampowered.com/app/{app_id}/"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    try:
        response = requests.get(url, headers=headers)
        soup = BeautifulSoup(response.text, 'html.parser')
        
        ai_disclosure = soup.find(string=lambda text: "AI GENERATED CONTENT DISCLOSURE" in text.upper() if text else False)
        ai_generated = bool(ai_disclosure)
        ai_content = None
        if ai_generated:
            ai_section = soup.find('h2', string='AI Generated Content Disclosure')
            if ai_section:
                ai_paragraph = ai_section.find_next('i')
                if ai_paragraph:
                    ai_content = ai_paragraph.text.strip()
        
        tags = [tag.text.strip() for tag in soup.find_all('a', class_='app_tag')]
        
        return {
            'ai_generated': ai_generated,
            'ai_content': ai_content,
            'tags': tags
        }
    except Exception as e:
        logging.error(f"Error scraping Steam page for {app_id}: {str(e)}")
    return None

def insert_aug_steam_game(cursor, game_data, steam_page_info):
    game_id = game_data['steam_appid']
    add_date = int(time.time())
    dev = ', '.join(game_data.get('developers', []))
    publisher = ', '.join(game_data.get('publishers', []))
    tags = ', '.join(steam_page_info['tags']) if steam_page_info else ''
    release_date = game_data.get('release_date', {}).get('date', '')
    description = game_data.get('short_description', '')
    ai_generated = 'Yes' if steam_page_info and steam_page_info['ai_generated'] else 'No'
    ai_content = steam_page_info['ai_content'] if steam_page_info and steam_page_info['ai_generated'] else None
    content_descriptors = game_data.get('content_descriptors', {})
    content_descriptors_ids = content_descriptors.get('ids', [])
    content_descriptors_str = ', '.join(map(str, content_descriptors_ids))
    supported_languages = game_data.get('supported_languages', '')
    free = 'Yes' if game_data.get('is_free', False) else 'No'
    dlc = 'Yes' if game_data.get('type', '') == 'dlc' else 'No'

    cursor.execute('''
    INSERT OR REPLACE INTO aug_steam_games
    (game_id, add_date, dev, publisher, tags, release_date, description, ai_generated, ai_content, 
    content_descriptors, supported_languages, free, dlc)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', (game_id, add_date, dev, publisher, tags, release_date, description, ai_generated, ai_content,
          content_descriptors_str, supported_languages, free, dlc))

def populate_aug_steam_games():
    db_url = f"https://raw.githubusercontent.com/{os.getenv('PAT_GITHUB_USERNAME')}/{GITHUB_REPO}/main/{DB_FILE_PATH}"
    logging.info(f"URL de la base de données : {db_url}")

    conn_aug = sqlite3.connect('aug_steam_games.db')
    cursor_aug = conn_aug.cursor()

    try:
        # Ensure the aug_steam_games table exists
        cursor_aug.execute('''
        CREATE TABLE IF NOT EXISTS aug_steam_games (
            game_id INTEGER PRIMARY KEY,
            add_date INTEGER,
            dev TEXT,
            publisher TEXT,
            tags TEXT,
            release_date TEXT,
            description TEXT,
            ai_generated TEXT,
            ai_content TEXT,
            content_descriptors TEXT,
            supported_languages TEXT,
            free TEXT,
            dlc TEXT
        )
        ''')

        with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as temp_db:
            if download_db(db_url, temp_db.name):
                logging.info(f"Base de données téléchargée avec succès : {temp_db.name}")
                conn_steam = sqlite3.connect(temp_db.name)
                cursor_steam = conn_steam.cursor()

                # Get all game IDs from steam_games that are not in aug_steam_games
                cursor_steam.execute('''
                SELECT steam_game_id FROM games
                WHERE steam_game_id NOT IN (SELECT game_id FROM aug_steam_games)
                ''')

                games_to_add = cursor_steam.fetchall()
                total_games = len(games_to_add)
                logging.info(f"Total games to process: {total_games}")

                for index, (game_id,) in enumerate(games_to_add, 1):
                    if index % 100 == 0:
                        logging.info(f"Processing game {index} of {total_games}")

                    game_data = get_game_details(game_id)
                    if game_data:
                        steam_page_info = get_steam_page_info(game_id)
                        insert_aug_steam_game(cursor_aug, game_data, steam_page_info)

                        if index % 100 == 0:
                            conn_aug.commit()
                            logging.info(f"Committed 100 games to aug_steam_games.db")

                    time.sleep(1)  # To avoid overwhelming the Steam API

                conn_aug.commit()
                conn_steam.close()
                logging.info("Population of aug_steam_games completed successfully")
            else:
                logging.error("Failed to download the database")
        
        os.unlink(temp_db.name)
        logging.info("Temporary database file deleted")

    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")
    finally:
        conn_aug.close()

if __name__ == "__main__":
    populate_aug_steam_games()

In [4]:
import sqlite3
import requests
import time
import logging
from bs4 import BeautifulSoup
import os
from dotenv import load_dotenv
import tempfile

# Charger les variables d'environnement
load_dotenv()

# Configuration du logging
logging.basicConfig(filename='populate_aug_steam_games.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Configuration
GITHUB_REPO = 'steampage-creation-date'
DB_FILE_PATH = 'steam_games.db'

def download_db(url, local_path):
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(local_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return True
    logging.error(f"Échec du téléchargement de la base de données. Code de statut: {response.status_code}")
    return False

def get_game_details(steam_game_id):
    url = f"https://store.steampowered.com/api/appdetails?appids={steam_game_id}"
    try:
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            if data[str(steam_game_id)]['success']:
                return data[str(steam_game_id)]['data']
    except Exception as e:
        logging.error(f"Error fetching game details for {steam_game_id}: {str(e)}")
    return None

def get_steam_page_info(app_id):
    url = f"https://store.steampowered.com/app/{app_id}/"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    try:
        response = requests.get(url, headers=headers)
        soup = BeautifulSoup(response.text, 'html.parser')
        
        ai_disclosure = soup.find(string=lambda text: "AI GENERATED CONTENT DISCLOSURE" in text.upper() if text else False)
        ai_generated = bool(ai_disclosure)
        ai_content = None
        if ai_generated:
            ai_section = soup.find('h2', string='AI Generated Content Disclosure')
            if ai_section:
                ai_paragraph = ai_section.find_next('i')
                if ai_paragraph:
                    ai_content = ai_paragraph.text.strip()
        
        tags = [tag.text.strip() for tag in soup.find_all('a', class_='app_tag')]
        
        return {
            'ai_generated': ai_generated,
            'ai_content': ai_content,
            'tags': tags
        }
    except Exception as e:
        logging.error(f"Error scraping Steam page for {app_id}: {str(e)}")
    return None

def create_aug_steam_games_table(conn):
    cursor = conn.cursor()
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS aug_steam_games (
        game_id INTEGER PRIMARY KEY,
        add_date INTEGER,
        dev TEXT,
        publisher TEXT,
        tags TEXT,
        release_date TEXT,
        description TEXT,
        ai_generated TEXT,
        ai_content TEXT,
        content_descriptors TEXT,
        supported_languages TEXT,
        free TEXT,
        dlc TEXT
    )
    ''')
    conn.commit()

def insert_aug_steam_game(cursor, game_data, steam_page_info):
    game_id = game_data['steam_appid']
    add_date = int(time.time())
    dev = ', '.join(game_data.get('developers', []))
    publisher = ', '.join(game_data.get('publishers', []))
    tags = ', '.join(steam_page_info['tags']) if steam_page_info else ''
    release_date = game_data.get('release_date', {}).get('date', '')
    description = game_data.get('short_description', '')
    ai_generated = 'Yes' if steam_page_info and steam_page_info['ai_generated'] else 'No'
    ai_content = steam_page_info['ai_content'] if steam_page_info and steam_page_info['ai_generated'] else None
    content_descriptors = game_data.get('content_descriptors', {})
    content_descriptors_ids = content_descriptors.get('ids', [])
    content_descriptors_str = ', '.join(map(str, content_descriptors_ids))
    supported_languages = game_data.get('supported_languages', '')
    free = 'Yes' if game_data.get('is_free', False) else 'No'
    dlc = 'Yes' if game_data.get('type', '') == 'dlc' else 'No'

    cursor.execute('''
    INSERT OR REPLACE INTO aug_steam_games
    (game_id, add_date, dev, publisher, tags, release_date, description, ai_generated, ai_content, 
    content_descriptors, supported_languages, free, dlc)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', (game_id, add_date, dev, publisher, tags, release_date, description, ai_generated, ai_content,
          content_descriptors_str, supported_languages, free, dlc))

def populate_aug_steam_games():
    db_url = f"https://raw.githubusercontent.com/{os.getenv('PAT_GITHUB_USERNAME')}/{GITHUB_REPO}/main/{DB_FILE_PATH}"
    logging.info(f"URL de la base de données : {db_url}")

    conn_aug = sqlite3.connect('aug_steam_games.db')
    
    try:
        # Create the aug_steam_games table if it doesn't exist
        create_aug_steam_games_table(conn_aug)
        cursor_aug = conn_aug.cursor()

        with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as temp_db:
            if download_db(db_url, temp_db.name):
                logging.info(f"Base de données téléchargée avec succès : {temp_db.name}")
                conn_steam = sqlite3.connect(temp_db.name)
                cursor_steam = conn_steam.cursor()

                # Get all game IDs from steam_games that are not in aug_steam_games
                cursor_steam.execute('''
                SELECT steam_game_id FROM games
                WHERE steam_game_id NOT IN (SELECT game_id FROM aug_steam_games)
                ''')

                games_to_add = cursor_steam.fetchall()
                total_games = len(games_to_add)
                logging.info(f"Total games to process: {total_games}")

                for index, (game_id,) in enumerate(games_to_add, 1):
                    if index % 100 == 0:
                        logging.info(f"Processing game {index} of {total_games}")

                    game_data = get_game_details(game_id)
                    if game_data:
                        steam_page_info = get_steam_page_info(game_id)
                        insert_aug_steam_game(cursor_aug, game_data, steam_page_info)

                        if index % 100 == 0:
                            conn_aug.commit()
                            logging.info(f"Committed 100 games to aug_steam_games.db")

                    time.sleep(1)  # To avoid overwhelming the Steam API

                conn_aug.commit()
                conn_steam.close()
                logging.info("Population of aug_steam_games completed successfully")
            else:
                logging.error("Failed to download the database")
        
        os.unlink(temp_db.name)
        logging.info("Temporary database file deleted")

    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")
    finally:
        conn_aug.close()

if __name__ == "__main__":
    populate_aug_steam_games()