diff --git a/howlongtobeatpy/howlongtobeatpy/HTMLRequests.py b/howlongtobeatpy/howlongtobeatpy/HTMLRequests.py index f8ed086..226a25c 100644 --- a/howlongtobeatpy/howlongtobeatpy/HTMLRequests.py +++ b/howlongtobeatpy/howlongtobeatpy/HTMLRequests.py @@ -1,12 +1,14 @@ # --------------------------------------------------------------------- # IMPORTS -import re import json +import re +import time from enum import Enum -from bs4 import BeautifulSoup + import aiohttp import requests +from bs4 import BeautifulSoup from fake_useragent import UserAgent # --------------------------------------------------------------------- @@ -26,62 +28,69 @@ class SearchModifiers(Enum): class SearchInformations: search_url = None - api_key = None def __init__(self, script_content: str): - self.api_key = self.__extract_api_from_script(script_content) self.search_url = self.__extract_search_url_script(script_content) if HTMLRequests.BASE_URL.endswith("/") and self.search_url is not None: self.search_url = self.search_url.lstrip("/") - - def __extract_api_from_script(self, script_content: str): - """ - Function that extract the htlb code to use in the request from the given script - @return: the string of the api key found - """ - # Try multiple find one after the other as hltb keep changing format - # Test 1 - The API Key is in the user id in the request json - user_id_api_key_pattern = r'users\s*:\s*{\s*id\s*:\s*"([^"]+)"' - matches = re.findall(user_id_api_key_pattern, script_content) - if matches: - key = ''.join(matches) - return key - # Test 2 - The API Key is in format fetch("/api/[word here]/".concat("X").concat("Y")... - concat_api_key_pattern = r'\/api\/\w+\/"(?:\.concat\("[^"]*"\))*' - matches = re.findall(concat_api_key_pattern, script_content) - if matches: - matches = str(matches).split('.concat') - matches = [re.sub(r'["\(\)\[\]\']', '', match) for match in matches[1:]] - key = ''.join(matches) - return key - # Unable to find :( - return None - + def __extract_search_url_script(self, script_content: str): """ - Function that extract the htlb search url to append from the script as /api/search - @return: the search url to append + Function that finds the 'fetch' call using 'method: "POST"', + extracts the base endpoint path, and returns the full '/api/path' + string (e.g., "/api/search"). + + This avoids relying on the exact string "search" by confirming + the use of the POST method, which identifies the actual search endpoint. + + @return: The full API endpoint string (e.g., "/api/search") or None. """ + # Pattern explanation: + # 1. Capture Group 1: Matches the path suffix (e.g., "search" or "find"). + # 2. Ensures the request options contain 'method: "POST"' to filter out the GET init call. pattern = re.compile( - r'fetch\(\s*["\'](\/api\/[^"\']*)["\']' # Matches the endpoint - r'((?:\s*\.concat\(\s*["\']([^"\']*)["\']\s*\))+)' # Captures concatenated strings - r'\s*,', # Matches up to the comma - re.DOTALL + # Capture Group 1: The path suffix after /api/ (e.g., "search" or "find/v2") + r'fetch\s*\(\s*["\']/api/([a-zA-Z0-9_/]+)[^"\']*["\']\s*,\s*{[^}]*method:\s*["\']POST["\'][^}]*}', + re.DOTALL | re.IGNORECASE ) - matches = pattern.finditer(script_content) - for match in matches: - endpoint = match.group(1) - concat_calls = match.group(2) - # Extract all concatenated strings - concat_strings = re.findall(r'\.concat\(\s*["\']([^"\']*)["\']\s*\)', concat_calls) - concatenated_str = ''.join(concat_strings) - # Check if the concatenated string matches the known string - if concatenated_str == self.api_key: - return endpoint - # Unable to find :( + + match = pattern.search(script_content) + + if match: + # Example captured string: "search" or "find/v2" + path_suffix = match.group(1) + + # Determine the root path (e.g., "search" from "search/v2") + # This ensures we get the base endpoint name even if sub-paths are used. + if '/' in path_suffix: + base_path = path_suffix.split('/')[0] + else: + base_path = path_suffix + + if base_path != "find": + full_endpoint = f"/api/{base_path}" + + return full_endpoint + return None +class SearchAuthToken: + search_url = "api/search/init" + auth_token = None + + def extract_auth_token_from_response(self, response_content: requests.Response): + """ + Extract the auth token from the request + @return: The auth token in the response json if present, also assigned to self.auth_token + """ + data = response_content.json() + return self.extract_auth_token_from_json(data) + + def extract_auth_token_from_json(self, json_content): + self.auth_token = json_content.get('token') + return self.auth_token + class HTMLRequests: BASE_URL = 'https://howlongtobeat.com/' REFERER_HEADER = BASE_URL @@ -90,7 +99,7 @@ class HTMLRequests: SEARCH_URL = BASE_URL + "api/s/" @staticmethod - def get_search_request_headers(): + def get_search_request_headers(auth_token = None): """ Generate the headers for the search request @return: The headers object for the request @@ -102,10 +111,14 @@ def get_search_request_headers(): 'User-Agent': ua.random.strip(), 'referer': HTMLRequests.REFERER_HEADER } + + if auth_token is not None: + headers['x-auth-token'] = str(auth_token) + return headers @staticmethod - def get_search_request_data(game_name: str, search_modifiers: SearchModifiers, page: int, search_info: SearchInformations): + def get_search_request_data(game_name: str, search_modifiers: SearchModifiers, page: int): """ Generate the data payload for the search request @param game_name: The name of the game to search @@ -154,10 +167,6 @@ def get_search_request_data(game_name: str, search_modifiers: SearchModifiers, p 'useCache': True } - # If api_key is passed add it to the dict - if search_info is not None and search_info.api_key is not None: - payload['searchOptions']['users']['id'] = search_info.api_key - return json.dumps(payload) @staticmethod @@ -170,23 +179,16 @@ def send_web_request(game_name: str, search_modifiers: SearchModifiers = SearchM @param page: The page to explore of the research, unknown if this is actually used @return: The HTML code of the research if the request returned 200(OK), None otherwise """ - headers = HTMLRequests.get_search_request_headers() + auth_token = HTMLRequests.send_website_get_auth_token() + headers = HTMLRequests.get_search_request_headers(auth_token) search_info_data = HTMLRequests.send_website_request_getcode(False) - if search_info_data is None or search_info_data.api_key is None: + if search_info_data is None or search_info_data.search_url is None: search_info_data = HTMLRequests.send_website_request_getcode(True) # Make the request - if search_info_data.search_url is not None: + if search_info_data is not None and search_info_data.search_url is not None: HTMLRequests.SEARCH_URL = HTMLRequests.BASE_URL + search_info_data.search_url - # The main method currently is the call to the API search URL - search_url_with_key = HTMLRequests.SEARCH_URL + search_info_data.api_key - payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, None) - resp = requests.post(search_url_with_key, headers=headers, data=payload, timeout=60) - if resp.status_code == 200: - return resp.text - # Try to call with the standard url adding the api key to the user - search_url = HTMLRequests.SEARCH_URL - payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, search_info_data) - resp = requests.post(search_url, headers=headers, data=payload, timeout=60) + payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page) + resp = requests.post(HTMLRequests.SEARCH_URL, headers=headers, data=payload, timeout=60) if resp.status_code == 200: return resp.text return None @@ -201,27 +203,22 @@ async def send_async_web_request(game_name: str, search_modifiers: SearchModifie @param page: The page to explore of the research, unknown if this is actually used @return: The HTML code of the research if the request returned 200(OK), None otherwise """ - headers = HTMLRequests.get_search_request_headers() + auth_token = await HTMLRequests.async_send_website_get_auth_token() + headers = HTMLRequests.get_search_request_headers(auth_token) search_info_data = HTMLRequests.send_website_request_getcode(False) - if search_info_data is None or search_info_data.api_key is None: + if search_info_data is None or search_info_data.search_url is None: search_info_data = HTMLRequests.send_website_request_getcode(True) # Make the request - if search_info_data.search_url is not None: + if search_info_data is not None and search_info_data.search_url is not None: HTMLRequests.SEARCH_URL = HTMLRequests.BASE_URL + search_info_data.search_url - # The main method currently is the call to the API search URL - search_url_with_key = HTMLRequests.SEARCH_URL + search_info_data.api_key - payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, None) + payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page) + timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession() as session: - async with session.post(search_url_with_key, headers=headers, data=payload) as resp_with_key: + async with session.post(HTMLRequests.SEARCH_URL, headers=headers, data=payload, timeout=timeout) as resp_with_key: if resp_with_key is not None and resp_with_key.status == 200: return await resp_with_key.text() else: - search_url = HTMLRequests.SEARCH_URL - payload = HTMLRequests.get_search_request_data(game_name, search_modifiers, page, search_info_data) - async with session.post(search_url, headers=headers, data=payload) as resp_user_id: - if resp_user_id is not None and resp_user_id.status == 200: - return await resp_user_id.text() - return None + return None @staticmethod def __cut_game_title(page_source: str): @@ -296,8 +293,9 @@ async def async_get_game_title(game_id: int): headers = HTMLRequests.get_title_request_headers() # Request and extract title + timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession() as session: - async with session.post(HTMLRequests.GAME_URL, params=params, headers=headers) as resp: + async with session.post(HTMLRequests.GAME_URL, params=params, headers=headers, timeout=timeout) as resp: if resp is not None and resp.status == 200: text = await resp.text() return HTMLRequests.__cut_game_title(text) @@ -306,8 +304,8 @@ async def async_get_game_title(game_id: int): @staticmethod def send_website_request_getcode(parse_all_scripts: bool): """ - Function that send a request to howlongtobeat to scrape the API key - @return: The string key to use + Function that send a request to howlongtobeat to scrape the correct search url + @return: The search informations to use in the request """ # Make the post request and return the result if is valid headers = HTMLRequests.get_title_request_headers() @@ -326,21 +324,21 @@ def send_website_request_getcode(parse_all_scripts: bool): script_resp = requests.get(script_url, headers=headers, timeout=60) if script_resp.status_code == 200 and script_resp.text is not None: search_info = SearchInformations(script_resp.text) - if search_info.api_key is not None: - # The api key is necessary + if search_info.search_url is not None: return search_info return None @staticmethod async def async_send_website_request_getcode(parse_all_scripts: bool): """ - Function that send a request to howlongtobeat to scrape the key used in the search URL - @return: The string key to use + Function that send a request to howlongtobeat to scrape the correct search url + @return: The search informations to use in the request """ # Make the post request and return the result if is valid headers = HTMLRequests.get_title_request_headers() + timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession() as session: - async with session.get(HTMLRequests.BASE_URL, headers=headers) as resp: + async with session.get(HTMLRequests.BASE_URL, headers=headers, timeout=timeout) as resp: if resp is not None and resp.status == 200: resp_text = await resp.text() # Parse the HTML content using BeautifulSoup @@ -354,14 +352,61 @@ async def async_send_website_request_getcode(parse_all_scripts: bool): for script_url in matching_scripts: script_url = HTMLRequests.BASE_URL + script_url async with aiohttp.ClientSession() as session: - async with session.get(script_url, headers=headers) as script_resp: + async with session.get(script_url, headers=headers, timeout=timeout) as script_resp: if script_resp is not None and resp.status == 200: script_resp_text = await script_resp.text() search_info = SearchInformations(script_resp_text) - if search_info.api_key is not None: + if search_info.search_url is not None: # The api key is necessary return search_info else: return None else: return None + + @staticmethod + def get_auth_token_request_params(): + """ + Generate the params for the auth token request + @return: The params object for the request + """ + timestamp = int(time.time() * 1000) + params = { + 't': timestamp + } + return params + + @staticmethod + def send_website_get_auth_token(): + """ + Function that send a request to howlongtobeat to get the x-auth-token to get in the request + @return: The auth token to use + """ + # Make the post request and return the result if is valid + headers = HTMLRequests.get_title_request_headers() + params = HTMLRequests.get_auth_token_request_params() + auth_token = SearchAuthToken() + auth_token_url = HTMLRequests.BASE_URL + auth_token.search_url + resp = requests.get(auth_token_url, params=params, headers=headers, timeout=60) + if resp.status_code == 200 and resp.text is not None: + return auth_token.extract_auth_token_from_response(resp) + return None + + @staticmethod + async def async_send_website_get_auth_token(): + """ + Function that send a request to howlongtobeat to get the x-auth-token to get in the request + @return: The auth token to use + """ + # Make the post request and return the result if is valid + headers = HTMLRequests.get_title_request_headers() + params = HTMLRequests.get_auth_token_request_params() + auth_token = SearchAuthToken() + auth_token_url = HTMLRequests.BASE_URL + auth_token.search_url + timeout = aiohttp.ClientTimeout(total=60) + async with aiohttp.ClientSession() as session: + async with session.get(auth_token_url, params=params, headers=headers, timeout=timeout) as resp: + if resp is not None and resp.status == 200: + json_data = await resp.json() + return auth_token.extract_auth_token_from_json(json_data) + return None