Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 132 additions & 87 deletions howlongtobeatpy/howlongtobeatpy/HTMLRequests.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# ---------------------------------------------------------------------
# IMPORTS

import re
import json
import re
import time
from enum import Enum
from bs4 import BeautifulSoup

import aiohttp
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent

# ---------------------------------------------------------------------
Expand All @@ -26,62 +28,69 @@ class SearchModifiers(Enum):

class SearchInformations:
search_url = None
api_key = None

def __init__(self, script_content: str):
self.api_key = self.__extract_api_from_script(script_content)
self.search_url = self.__extract_search_url_script(script_content)
if HTMLRequests.BASE_URL.endswith("/") and self.search_url is not None:
self.search_url = self.search_url.lstrip("/")

def __extract_api_from_script(self, script_content: str):
"""
Function that extract the htlb code to use in the request from the given script
@return: the string of the api key found
"""
# Try multiple find one after the other as hltb keep changing format
# Test 1 - The API Key is in the user id in the request json
user_id_api_key_pattern = r'users\s*:\s*{\s*id\s*:\s*"([^"]+)"'
matches = re.findall(user_id_api_key_pattern, script_content)
if matches:
key = ''.join(matches)
return key
# Test 2 - The API Key is in format fetch("/api/[word here]/".concat("X").concat("Y")...
concat_api_key_pattern = r'\/api\/\w+\/"(?:\.concat\("[^"]*"\))*'
matches = re.findall(concat_api_key_pattern, script_content)
if matches:
matches = str(matches).split('.concat')
matches = [re.sub(r'["\(\)\[\]\']', '', match) for match in matches[1:]]
key = ''.join(matches)
return key
# Unable to find :(
return None


def __extract_search_url_script(self, script_content: str):
"""
Function that extract the htlb search url to append from the script as /api/search
@return: the search url to append
Function that finds the 'fetch' call using 'method: "POST"',
extracts the base endpoint path, and returns the full '/api/path'
string (e.g., "/api/search").

This avoids relying on the exact string "search" by confirming
the use of the POST method, which identifies the actual search endpoint.

@return: The full API endpoint string (e.g., "/api/search") or None.
"""
# Pattern explanation:
# 1. Capture Group 1: Matches the path suffix (e.g., "search" or "find").
# 2. Ensures the request options contain 'method: "POST"' to filter out the GET init call.
pattern = re.compile(
r'fetch\(\s*["\'](\/api\/[^"\']*)["\']' # Matches the endpoint
r'((?:\s*\.concat\(\s*["\']([^"\']*)["\']\s*\))+)' # Captures concatenated strings
r'\s*,', # Matches up to the comma
re.DOTALL
# Capture Group 1: The path suffix after /api/ (e.g., "search" or "find/v2")
r'fetch\s*\(\s*["\']/api/([a-zA-Z0-9_/]+)[^"\']*["\']\s*,\s*{[^}]*method:\s*["\']POST["\'][^}]*}',
re.DOTALL | re.IGNORECASE
)
matches = pattern.finditer(script_content)
for match in matches:
endpoint = match.group(1)
concat_calls = match.group(2)
# Extract all concatenated strings
concat_strings = re.findall(r'\.concat\(\s*["\']([^"\']*)["\']\s*\)', concat_calls)
concatenated_str = ''.join(concat_strings)
# Check if the concatenated string matches the known string
if concatenated_str == self.api_key:
return endpoint
# Unable to find :(

match = pattern.search(script_content)

if match:
# Example captured string: "search" or "find/v2"
path_suffix = match.group(1)

# Determine the root path (e.g., "search" from "search/v2")
# This ensures we get the base endpoint name even if sub-paths are used.
if '/' in path_suffix:
base_path = path_suffix.split('/')[0]
else:
base_path = path_suffix

if base_path != "find":
full_endpoint = f"/api/{base_path}"

return full_endpoint

return None


class SearchAuthToken:
search_url = "api/search/init"
auth_token = None

def extract_auth_token_from_response(self, response_content: requests.Response):
"""
Extract the auth token from the request
@return: The auth token in the response json if present, also assigned to self.auth_token
"""
data = response_content.json()
return self.extract_auth_token_from_json(data)

def extract_auth_token_from_json(self, json_content):
self.auth_token = json_content.get('token')
return self.auth_token

class HTMLRequests:
BASE_URL = 'https://howlongtobeat.com/'
REFERER_HEADER = BASE_URL
Expand All @@ -90,7 +99,7 @@ class HTMLRequests:
SEARCH_URL = BASE_URL + "api/s/"

@staticmethod
def get_search_request_headers():
def get_search_request_headers(auth_token = None):
"""
Generate the headers for the search request
@return: The headers object for the request
Expand All @@ -102,10 +111,14 @@ def get_search_request_headers():
'User-Agent': ua.random.strip(),
'referer': HTMLRequests.REFERER_HEADER
}

if auth_token is not None:
headers['x-auth-token'] = str(auth_token)

return headers

@staticmethod
def get_search_request_data(game_name: str, search_modifiers: SearchModifiers, page: int, search_info: SearchInformations):
def get_search_request_data(game_name: str, search_modifiers: SearchModifiers, page: int):
"""
Generate the data payload for the search request
@param game_name: The name of the game to search
Expand Down Expand Up @@ -154,10 +167,6 @@ def get_search_request_data(game_name: str, search_modifiers: SearchModifiers, p
'useCache': True
}

# If api_key is passed add it to the dict
if search_info is not None and search_info.api_key is not None:
payload['searchOptions']['users']['id'] = search_info.api_key

return json.dumps(payload)

@staticmethod
Expand All @@ -170,23 +179,16 @@ def send_web_request(game_name: str, search_modifiers: SearchModifiers = SearchM
@param page: The page to explore of the research, unknown if this is actually used
@return: The HTML code of the research if the request returned 200(OK), None otherwise
"""
headers = HTMLRequests.get_search_request_headers()
auth_token = HTMLRequests.send_website_get_auth_token()
headers = HTMLRequests.get_search_request_headers(auth_token)
search_info_data = HTMLRequests.send_website_request_getcode(False)
if search_info_data is None or search_info_data.api_key is None:
if search_info_data is None or search_info_data.search_url is None:
search_info_data = HTMLRequests.send_website_request_getcode(True)
# Make the request
if search_info_data.search_url is not None:
if search_info_data is not None and search_info_data.search_url is not None:
HTMLRequests.SEARCH_URL = HTMLRequests.BASE_URL + search_info_data.search_url
# The main method currently is the call to the API search URL
search_url_with_key = HTMLRequests.SEARCH_URL + search_info_data.api_key
payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, None)
resp = requests.post(search_url_with_key, headers=headers, data=payload, timeout=60)
if resp.status_code == 200:
return resp.text
# Try to call with the standard url adding the api key to the user
search_url = HTMLRequests.SEARCH_URL
payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, search_info_data)
resp = requests.post(search_url, headers=headers, data=payload, timeout=60)
payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page)
resp = requests.post(HTMLRequests.SEARCH_URL, headers=headers, data=payload, timeout=60)
if resp.status_code == 200:
return resp.text
return None
Expand All @@ -201,27 +203,22 @@ async def send_async_web_request(game_name: str, search_modifiers: SearchModifie
@param page: The page to explore of the research, unknown if this is actually used
@return: The HTML code of the research if the request returned 200(OK), None otherwise
"""
headers = HTMLRequests.get_search_request_headers()
auth_token = await HTMLRequests.async_send_website_get_auth_token()
headers = HTMLRequests.get_search_request_headers(auth_token)
search_info_data = HTMLRequests.send_website_request_getcode(False)
if search_info_data is None or search_info_data.api_key is None:
if search_info_data is None or search_info_data.search_url is None:
search_info_data = HTMLRequests.send_website_request_getcode(True)
# Make the request
if search_info_data.search_url is not None:
if search_info_data is not None and search_info_data.search_url is not None:
HTMLRequests.SEARCH_URL = HTMLRequests.BASE_URL + search_info_data.search_url
# The main method currently is the call to the API search URL
search_url_with_key = HTMLRequests.SEARCH_URL + search_info_data.api_key
payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, None)
payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page)
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession() as session:
async with session.post(search_url_with_key, headers=headers, data=payload) as resp_with_key:
async with session.post(HTMLRequests.SEARCH_URL, headers=headers, data=payload, timeout=timeout) as resp_with_key:
if resp_with_key is not None and resp_with_key.status == 200:
return await resp_with_key.text()
else:
search_url = HTMLRequests.SEARCH_URL
payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, search_info_data)
async with session.post(search_url, headers=headers, data=payload) as resp_user_id:
if resp_user_id is not None and resp_user_id.status == 200:
return await resp_user_id.text()
return None
return None

@staticmethod
def __cut_game_title(page_source: str):
Expand Down Expand Up @@ -296,8 +293,9 @@ async def async_get_game_title(game_id: int):
headers = HTMLRequests.get_title_request_headers()

# Request and extract title
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession() as session:
async with session.post(HTMLRequests.GAME_URL, params=params, headers=headers) as resp:
async with session.post(HTMLRequests.GAME_URL, params=params, headers=headers, timeout=timeout) as resp:
if resp is not None and resp.status == 200:
text = await resp.text()
return HTMLRequests.__cut_game_title(text)
Expand All @@ -306,8 +304,8 @@ async def async_get_game_title(game_id: int):
@staticmethod
def send_website_request_getcode(parse_all_scripts: bool):
"""
Function that send a request to howlongtobeat to scrape the API key
@return: The string key to use
Function that send a request to howlongtobeat to scrape the correct search url
@return: The search informations to use in the request
"""
# Make the post request and return the result if is valid
headers = HTMLRequests.get_title_request_headers()
Expand All @@ -326,21 +324,21 @@ def send_website_request_getcode(parse_all_scripts: bool):
script_resp = requests.get(script_url, headers=headers, timeout=60)
if script_resp.status_code == 200 and script_resp.text is not None:
search_info = SearchInformations(script_resp.text)
if search_info.api_key is not None:
# The api key is necessary
if search_info.search_url is not None:
return search_info
return None

@staticmethod
async def async_send_website_request_getcode(parse_all_scripts: bool):
"""
Function that send a request to howlongtobeat to scrape the key used in the search URL
@return: The string key to use
Function that send a request to howlongtobeat to scrape the correct search url
@return: The search informations to use in the request
"""
# Make the post request and return the result if is valid
headers = HTMLRequests.get_title_request_headers()
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession() as session:
async with session.get(HTMLRequests.BASE_URL, headers=headers) as resp:
async with session.get(HTMLRequests.BASE_URL, headers=headers, timeout=timeout) as resp:
if resp is not None and resp.status == 200:
resp_text = await resp.text()
# Parse the HTML content using BeautifulSoup
Expand All @@ -354,14 +352,61 @@ async def async_send_website_request_getcode(parse_all_scripts: bool):
for script_url in matching_scripts:
script_url = HTMLRequests.BASE_URL + script_url
async with aiohttp.ClientSession() as session:
async with session.get(script_url, headers=headers) as script_resp:
async with session.get(script_url, headers=headers, timeout=timeout) as script_resp:
if script_resp is not None and resp.status == 200:
script_resp_text = await script_resp.text()
search_info = SearchInformations(script_resp_text)
if search_info.api_key is not None:
if search_info.search_url is not None:
# The api key is necessary
return search_info
else:
return None
else:
return None

@staticmethod
def get_auth_token_request_params():
"""
Generate the params for the auth token request
@return: The params object for the request
"""
timestamp = int(time.time() * 1000)
params = {
't': timestamp
}
return params

@staticmethod
def send_website_get_auth_token():
"""
Function that send a request to howlongtobeat to get the x-auth-token to get in the request
@return: The auth token to use
"""
# Make the post request and return the result if is valid
headers = HTMLRequests.get_title_request_headers()
params = HTMLRequests.get_auth_token_request_params()
auth_token = SearchAuthToken()
auth_token_url = HTMLRequests.BASE_URL + auth_token.search_url
resp = requests.get(auth_token_url, params=params, headers=headers, timeout=60)
if resp.status_code == 200 and resp.text is not None:
return auth_token.extract_auth_token_from_response(resp)
return None

@staticmethod
async def async_send_website_get_auth_token():
"""
Function that send a request to howlongtobeat to get the x-auth-token to get in the request
@return: The auth token to use
"""
# Make the post request and return the result if is valid
headers = HTMLRequests.get_title_request_headers()
params = HTMLRequests.get_auth_token_request_params()
auth_token = SearchAuthToken()
auth_token_url = HTMLRequests.BASE_URL + auth_token.search_url
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession() as session:
async with session.get(auth_token_url, params=params, headers=headers, timeout=timeout) as resp:
if resp is not None and resp.status == 200:
json_data = await resp.json()
return auth_token.extract_auth_token_from_json(json_data)
return None