From 2b11e20c5eda30c2fb833ed50afa287810cc72b3 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Sat, 23 Sep 2023 22:36:58 -0400 Subject: [PATCH 01/14] Support secret_token for setWebHook api --- homeassistant/components/telegram_bot/__init__.py | 3 +++ homeassistant/components/telegram_bot/webhooks.py | 13 +++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/homeassistant/components/telegram_bot/__init__.py b/homeassistant/components/telegram_bot/__init__.py index 3d56cc7ed33265..5d6c6d473e6ee6 100644 --- a/homeassistant/components/telegram_bot/__init__.py +++ b/homeassistant/components/telegram_bot/__init__.py @@ -94,6 +94,7 @@ CONF_PROXY_URL = "proxy_url" CONF_PROXY_PARAMS = "proxy_params" CONF_TRUSTED_NETWORKS = "trusted_networks" +CONF_SECRET_TOKEN = "secret_token" DOMAIN = "telegram_bot" @@ -122,6 +123,7 @@ PARSER_MD = "markdown" DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")] +DEFAULT_SECRET_TOKEN = "insecure_default_secret_token" CONFIG_SCHEMA = vol.Schema( { @@ -145,6 +147,7 @@ vol.Optional( CONF_TRUSTED_NETWORKS, default=DEFAULT_TRUSTED_NETWORKS ): vol.All(cv.ensure_list, [ip_network]), + vol.Optional(CONF_SECRET_TOKEN, default=DEFAULT_SECRET_TOKEN): cv.string, } ) ], diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 8b94cb66496614..652d3548239b82 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -12,7 +12,7 @@ from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.helpers.network import get_url -from . import CONF_TRUSTED_NETWORKS, CONF_URL, BaseTelegramBotEntity +from . import CONF_TRUSTED_NETWORKS, CONF_SECRET_TOKEN, CONF_URL, BaseTelegramBotEntity _LOGGER = logging.getLogger(__name__) @@ -34,7 +34,7 @@ async def async_setup_platform(hass, bot, config): hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.deregister_webhook) hass.http.register_view( - PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS]) + PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS], config[CONF_SECRET_TOKEN]) ) return True @@ -46,6 +46,7 @@ def __init__(self, hass, bot, config): """Create Dispatcher before calling super().""" self.bot = bot self.trusted_networks = config[CONF_TRUSTED_NETWORKS] + self.secret_token = config[CONF_SECRET_TOKEN] # Dumb dispatcher that just gets our updates to our handler callback (self.handle_update) self.dispatcher = Dispatcher(bot, None) self.dispatcher.add_handler(TypeHandler(Update, self.handle_update)) @@ -61,7 +62,7 @@ def _try_to_set_webhook(self): retry_num = 0 while retry_num < 3: try: - return self.bot.set_webhook(self.webhook_url, timeout=5) + return self.bot.set_webhook(self.webhook_url, api_kwargs={'secret_token': self.secret_token}, timeout=5) except TimedOut: retry_num += 1 _LOGGER.warning("Timeout trying to set webhook (retry #%d)", retry_num) @@ -108,12 +109,13 @@ class PushBotView(HomeAssistantView): url = TELEGRAM_WEBHOOK_URL name = "telegram_webhooks" - def __init__(self, hass, bot, dispatcher, trusted_networks): + def __init__(self, hass, bot, dispatcher, trusted_networks, secret_token): """Initialize by storing stuff needed for setting up our webhook endpoint.""" self.hass = hass self.bot = bot self.dispatcher = dispatcher self.trusted_networks = trusted_networks + self.secret_token = secret_token async def post(self, request): """Accept the POST from telegram.""" @@ -121,6 +123,9 @@ async def post(self, request): if not any(real_ip in net for net in self.trusted_networks): _LOGGER.warning("Access denied from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) + if not self.secret_token == request.headers['X-Telegram-Bot-Api-Secret-Token']: + _LOGGER.warning("Invalid secret token from %s", real_ip) + return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) try: update_data = await request.json() From b7dfcf6c7970eceae5f633bcf883ea93c9d15621 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Mon, 25 Sep 2023 13:20:43 -0400 Subject: [PATCH 02/14] Revert configuration YAML changes; generate and store secret token instead --- .../components/telegram_bot/__init__.py | 7 +++--- .../components/telegram_bot/webhooks.py | 24 +++++++++++++++---- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/homeassistant/components/telegram_bot/__init__.py b/homeassistant/components/telegram_bot/__init__.py index 5d6c6d473e6ee6..826d7cd5f802cb 100644 --- a/homeassistant/components/telegram_bot/__init__.py +++ b/homeassistant/components/telegram_bot/__init__.py @@ -94,7 +94,6 @@ CONF_PROXY_URL = "proxy_url" CONF_PROXY_PARAMS = "proxy_params" CONF_TRUSTED_NETWORKS = "trusted_networks" -CONF_SECRET_TOKEN = "secret_token" DOMAIN = "telegram_bot" @@ -114,6 +113,10 @@ SERVICE_DELETE_MESSAGE = "delete_message" SERVICE_LEAVE_CHAT = "leave_chat" +STORE_VERSION = 1 +STORE_SECRET_TOKEN = "secret_token" +SECRET_TOKEN_LENGTH = 32 + EVENT_TELEGRAM_CALLBACK = "telegram_callback" EVENT_TELEGRAM_COMMAND = "telegram_command" EVENT_TELEGRAM_TEXT = "telegram_text" @@ -123,7 +126,6 @@ PARSER_MD = "markdown" DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")] -DEFAULT_SECRET_TOKEN = "insecure_default_secret_token" CONFIG_SCHEMA = vol.Schema( { @@ -147,7 +149,6 @@ vol.Optional( CONF_TRUSTED_NETWORKS, default=DEFAULT_TRUSTED_NETWORKS ): vol.All(cv.ensure_list, [ip_network]), - vol.Optional(CONF_SECRET_TOKEN, default=DEFAULT_SECRET_TOKEN): cv.string, } ) ], diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 652d3548239b82..77f69e765226ae 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -3,6 +3,8 @@ from http import HTTPStatus from ipaddress import ip_address import logging +import secrets +import string from telegram import Update from telegram.error import TimedOut @@ -11,8 +13,9 @@ from homeassistant.components.http import HomeAssistantView from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.helpers.network import get_url +from homeassistant.helpers.storage import Store -from . import CONF_TRUSTED_NETWORKS, CONF_SECRET_TOKEN, CONF_URL, BaseTelegramBotEntity +from . import CONF_TRUSTED_NETWORKS, CONF_URL, BaseTelegramBotEntity, DOMAIN, STORE_VERSION, STORE_SECRET_TOKEN, SECRET_TOKEN_LENGTH _LOGGER = logging.getLogger(__name__) @@ -22,7 +25,18 @@ async def async_setup_platform(hass, bot, config): """Set up the Telegram webhooks platform.""" - pushbot = PushBot(hass, bot, config) + + # Obtain secret token from storage, or create a new one if it does not exist + store = Store(hass, STORE_VERSION, DOMAIN) + if (data := await store.async_load()) is None: + alphabet = string.ascii_letters + string.digits + '-_' + secret_token = ''.join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH)) + data = { + STORE_SECRET_TOKEN: secret_token, + } + await store.async_save(data) + + pushbot = PushBot(hass, bot, config, data[STORE_SECRET_TOKEN]) if not pushbot.webhook_url.startswith("https"): _LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url) @@ -34,7 +48,7 @@ async def async_setup_platform(hass, bot, config): hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.deregister_webhook) hass.http.register_view( - PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS], config[CONF_SECRET_TOKEN]) + PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS], data[STORE_SECRET_TOKEN]) ) return True @@ -42,11 +56,11 @@ async def async_setup_platform(hass, bot, config): class PushBot(BaseTelegramBotEntity): """Handles all the push/webhook logic and passes telegram updates to `self.handle_update`.""" - def __init__(self, hass, bot, config): + def __init__(self, hass, bot, config, secret_token): """Create Dispatcher before calling super().""" self.bot = bot self.trusted_networks = config[CONF_TRUSTED_NETWORKS] - self.secret_token = config[CONF_SECRET_TOKEN] + self.secret_token = secret_token # Dumb dispatcher that just gets our updates to our handler callback (self.handle_update) self.dispatcher = Dispatcher(bot, None) self.dispatcher.add_handler(TypeHandler(Update, self.handle_update)) From dd99cd072adc04aba45a82f3a29e56b68031e1c9 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Wed, 27 Sep 2023 17:20:15 -0400 Subject: [PATCH 03/14] Reformat codes --- .../components/telegram_bot/webhooks.py | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 77f69e765226ae..5558f7fa00fc6f 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -15,7 +15,15 @@ from homeassistant.helpers.network import get_url from homeassistant.helpers.storage import Store -from . import CONF_TRUSTED_NETWORKS, CONF_URL, BaseTelegramBotEntity, DOMAIN, STORE_VERSION, STORE_SECRET_TOKEN, SECRET_TOKEN_LENGTH +from . import ( + CONF_TRUSTED_NETWORKS, + CONF_URL, + BaseTelegramBotEntity, + DOMAIN, + STORE_VERSION, + STORE_SECRET_TOKEN, + SECRET_TOKEN_LENGTH, +) _LOGGER = logging.getLogger(__name__) @@ -29,8 +37,10 @@ async def async_setup_platform(hass, bot, config): # Obtain secret token from storage, or create a new one if it does not exist store = Store(hass, STORE_VERSION, DOMAIN) if (data := await store.async_load()) is None: - alphabet = string.ascii_letters + string.digits + '-_' - secret_token = ''.join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH)) + alphabet = string.ascii_letters + string.digits + "-_" + secret_token = "".join( + secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH) + ) data = { STORE_SECRET_TOKEN: secret_token, } @@ -48,7 +58,13 @@ async def async_setup_platform(hass, bot, config): hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.deregister_webhook) hass.http.register_view( - PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS], data[STORE_SECRET_TOKEN]) + PushBotView( + hass, + bot, + pushbot.dispatcher, + config[CONF_TRUSTED_NETWORKS], + data[STORE_SECRET_TOKEN], + ) ) return True @@ -76,7 +92,11 @@ def _try_to_set_webhook(self): retry_num = 0 while retry_num < 3: try: - return self.bot.set_webhook(self.webhook_url, api_kwargs={'secret_token': self.secret_token}, timeout=5) + return self.bot.set_webhook( + self.webhook_url, + api_kwargs={"secret_token": self.secret_token}, + timeout=5, + ) except TimedOut: retry_num += 1 _LOGGER.warning("Timeout trying to set webhook (retry #%d)", retry_num) @@ -137,7 +157,7 @@ async def post(self, request): if not any(real_ip in net for net in self.trusted_networks): _LOGGER.warning("Access denied from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) - if not self.secret_token == request.headers['X-Telegram-Bot-Api-Secret-Token']: + if self.secret_token != request.headers["X-Telegram-Bot-Api-Secret-Token"]: _LOGGER.warning("Invalid secret token from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) From 38d2f2e8fbd75640b88326f5f904a8202e3a22ed Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Wed, 27 Sep 2023 17:41:41 -0400 Subject: [PATCH 04/14] Revert storage of secret token; use ephemeral secret token instead --- .../components/telegram_bot/__init__.py | 4 --- .../components/telegram_bot/webhooks.py | 26 ++++++------------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/homeassistant/components/telegram_bot/__init__.py b/homeassistant/components/telegram_bot/__init__.py index 826d7cd5f802cb..3d56cc7ed33265 100644 --- a/homeassistant/components/telegram_bot/__init__.py +++ b/homeassistant/components/telegram_bot/__init__.py @@ -113,10 +113,6 @@ SERVICE_DELETE_MESSAGE = "delete_message" SERVICE_LEAVE_CHAT = "leave_chat" -STORE_VERSION = 1 -STORE_SECRET_TOKEN = "secret_token" -SECRET_TOKEN_LENGTH = 32 - EVENT_TELEGRAM_CALLBACK = "telegram_callback" EVENT_TELEGRAM_COMMAND = "telegram_command" EVENT_TELEGRAM_TEXT = "telegram_text" diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 5558f7fa00fc6f..ed9c3e35ed34fb 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -13,40 +13,30 @@ from homeassistant.components.http import HomeAssistantView from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.helpers.network import get_url -from homeassistant.helpers.storage import Store from . import ( CONF_TRUSTED_NETWORKS, CONF_URL, BaseTelegramBotEntity, - DOMAIN, - STORE_VERSION, - STORE_SECRET_TOKEN, - SECRET_TOKEN_LENGTH, ) _LOGGER = logging.getLogger(__name__) TELEGRAM_WEBHOOK_URL = "/api/telegram_webhooks" REMOVE_WEBHOOK_URL = "" +SECRET_TOKEN_LENGTH = 32 async def async_setup_platform(hass, bot, config): """Set up the Telegram webhooks platform.""" - # Obtain secret token from storage, or create a new one if it does not exist - store = Store(hass, STORE_VERSION, DOMAIN) - if (data := await store.async_load()) is None: - alphabet = string.ascii_letters + string.digits + "-_" - secret_token = "".join( - secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH) - ) - data = { - STORE_SECRET_TOKEN: secret_token, - } - await store.async_save(data) + # Generate a ephemeral secret token + alphabet = string.ascii_letters + string.digits + "-_" + secret_token = "".join( + secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH) + ) - pushbot = PushBot(hass, bot, config, data[STORE_SECRET_TOKEN]) + pushbot = PushBot(hass, bot, config, secret_token) if not pushbot.webhook_url.startswith("https"): _LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url) @@ -63,7 +53,7 @@ async def async_setup_platform(hass, bot, config): bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS], - data[STORE_SECRET_TOKEN], + secret_token, ) ) return True From 64f9443ff5110ab067e7e067c026650e5dc3d17f Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Wed, 27 Sep 2023 17:58:23 -0400 Subject: [PATCH 05/14] Reformat --- homeassistant/components/telegram_bot/webhooks.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index ed9c3e35ed34fb..b9c0545b7b4f55 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -14,11 +14,7 @@ from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.helpers.network import get_url -from . import ( - CONF_TRUSTED_NETWORKS, - CONF_URL, - BaseTelegramBotEntity, -) +from . import CONF_TRUSTED_NETWORKS, CONF_URL, BaseTelegramBotEntity _LOGGER = logging.getLogger(__name__) @@ -32,9 +28,7 @@ async def async_setup_platform(hass, bot, config): # Generate a ephemeral secret token alphabet = string.ascii_letters + string.digits + "-_" - secret_token = "".join( - secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH) - ) + secret_token = "".join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH)) pushbot = PushBot(hass, bot, config, secret_token) From c2d9ae51e4ffad45ec270ec5f7fe431a73c08335 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Thu, 28 Sep 2023 10:19:21 +0200 Subject: [PATCH 06/14] Update homeassistant/components/telegram_bot/webhooks.py --- homeassistant/components/telegram_bot/webhooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index b9c0545b7b4f55..2bd9ca9f893bc4 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -26,7 +26,7 @@ async def async_setup_platform(hass, bot, config): """Set up the Telegram webhooks platform.""" - # Generate a ephemeral secret token + # Generate an ephemeral secret token alphabet = string.ascii_letters + string.digits + "-_" secret_token = "".join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH)) From e285fc1c5ad07e19ff4bfab820964deb7197951a Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Thu, 28 Sep 2023 09:56:50 -0400 Subject: [PATCH 07/14] Fix when header is not present --- homeassistant/components/telegram_bot/webhooks.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 2bd9ca9f893bc4..10b1c8ccc9c52c 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -141,9 +141,12 @@ async def post(self, request): if not any(real_ip in net for net in self.trusted_networks): _LOGGER.warning("Access denied from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) - if self.secret_token != request.headers["X-Telegram-Bot-Api-Secret-Token"]: - _LOGGER.warning("Invalid secret token from %s", real_ip) - return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) + # If a secret token is expected, verify the header + if self.secret_token: + secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") + if secret_token_header is None or self.secret_token != secret_token_header: + _LOGGER.warning("Invalid secret token from %s", real_ip) + return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) try: update_data = await request.json() From c1d75717dd1a0a353780edf740b59d6c2fc1e214 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Thu, 28 Sep 2023 10:02:19 -0400 Subject: [PATCH 08/14] Check for non-empty token --- homeassistant/components/telegram_bot/webhooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 10b1c8ccc9c52c..75b637f52e5629 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -142,7 +142,7 @@ async def post(self, request): _LOGGER.warning("Access denied from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) # If a secret token is expected, verify the header - if self.secret_token: + if self.secret_token and bool(self.secret_token): secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") if secret_token_header is None or self.secret_token != secret_token_header: _LOGGER.warning("Invalid secret token from %s", real_ip) From f2c6c367d4fb73607b91e1cc9da6d4a2a5609274 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Thu, 28 Sep 2023 12:41:15 -0400 Subject: [PATCH 09/14] Fix tests to support secret token --- .../components/telegram_bot/webhooks.py | 8 ++++-- tests/components/telegram_bot/conftest.py | 16 ++++++++++-- .../telegram_bot/test_telegram_bot.py | 26 ++++++++++++++++--- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 75b637f52e5629..e794564406dc21 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -23,12 +23,16 @@ SECRET_TOKEN_LENGTH = 32 +def generate_secret_token(length): + alphabet = string.ascii_letters + string.digits + "-_" + return "".join(secrets.choice(alphabet) for _ in range(length)) + + async def async_setup_platform(hass, bot, config): """Set up the Telegram webhooks platform.""" # Generate an ephemeral secret token - alphabet = string.ascii_letters + string.digits + "-_" - secret_token = "".join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH)) + secret_token = generate_secret_token(SECRET_TOKEN_LENGTH) pushbot = PushBot(hass, bot, config, secret_token) diff --git a/tests/components/telegram_bot/conftest.py b/tests/components/telegram_bot/conftest.py index d8d445fbb86b2f..e90d941a8f6ef8 100644 --- a/tests/components/telegram_bot/conftest.py +++ b/tests/components/telegram_bot/conftest.py @@ -1,5 +1,5 @@ """Tests for the telegram_bot integration.""" -from unittest.mock import patch +from unittest.mock import patch, MagicMock import pytest @@ -65,6 +65,18 @@ def mock_register_webhook(): yield +@pytest.fixture +def mock_generate_secret_token(): + """Mock secret token generated for webhook""" + mock_secret_token = "DEADBEEF12345678DEADBEEF87654321" + with patch( + "homeassistant.components.telegram_bot.webhooks.generate_secret_token", + return_value=mock_secret_token, + ): + yield mock_secret_token + + + @pytest.fixture def update_message_command(): """Fixture for mocking an incoming update of type message/command.""" @@ -156,7 +168,7 @@ def update_callback_query(): @pytest.fixture -async def webhook_platform(hass, config_webhooks, mock_register_webhook): +async def webhook_platform(hass, config_webhooks, mock_register_webhook, mock_generate_secret_token): """Fixture for setting up the webhooks platform using appropriate config and mocks.""" await async_setup_component( hass, diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index b87f15b3ed3678..cfb385f57867f4 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -35,12 +35,17 @@ async def test_webhook_endpoint_generates_telegram_text_event( webhook_platform, hass_client: ClientSessionGenerator, update_message_text, + mock_generate_secret_token, ) -> None: """POST to the configured webhook endpoint and assert fired `telegram_text` event.""" client = await hass_client() events = async_capture_events(hass, "telegram_text") - response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_text) + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_text, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, + ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" @@ -56,12 +61,17 @@ async def test_webhook_endpoint_generates_telegram_command_event( webhook_platform, hass_client: ClientSessionGenerator, update_message_command, + mock_generate_secret_token, ) -> None: """POST to the configured webhook endpoint and assert fired `telegram_command` event.""" client = await hass_client() events = async_capture_events(hass, "telegram_command") - response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_command) + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_command, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, + ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" @@ -77,12 +87,17 @@ async def test_webhook_endpoint_generates_telegram_callback_event( webhook_platform, hass_client: ClientSessionGenerator, update_callback_query, + mock_generate_secret_token, ) -> None: """POST to the configured webhook endpoint and assert fired `telegram_callback` event.""" client = await hass_client() events = async_capture_events(hass, "telegram_callback") - response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_callback_query) + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_callback_query, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, + ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" @@ -119,13 +134,16 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex webhook_platform, hass_client: ClientSessionGenerator, unauthorized_update_message_text, + mock_generate_secret_token, ) -> None: """Update with unauthorized user/chat should not trigger event.""" client = await hass_client() events = async_capture_events(hass, "telegram_text") response = await client.post( - TELEGRAM_WEBHOOK_URL, json=unauthorized_update_message_text + TELEGRAM_WEBHOOK_URL, + json=unauthorized_update_message_text, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" From 122159c4505d88a241d7befa5feb5d2c7a57bbed Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Thu, 28 Sep 2023 13:07:53 -0400 Subject: [PATCH 10/14] Add tests for invalid secret token --- .../components/telegram_bot/webhooks.py | 9 +++-- tests/components/telegram_bot/conftest.py | 11 ++++-- .../telegram_bot/test_telegram_bot.py | 36 +++++++++++++++++++ 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index e794564406dc21..4a1b7d846dc7e6 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -146,11 +146,10 @@ async def post(self, request): _LOGGER.warning("Access denied from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) # If a secret token is expected, verify the header - if self.secret_token and bool(self.secret_token): - secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") - if secret_token_header is None or self.secret_token != secret_token_header: - _LOGGER.warning("Invalid secret token from %s", real_ip) - return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) + secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") + if secret_token_header is None or self.secret_token != secret_token_header: + _LOGGER.warning("Invalid secret token from %s", real_ip) + return self.json_message("Access denied", HTTPStatus.FORBIDDEN) try: update_data = await request.json() diff --git a/tests/components/telegram_bot/conftest.py b/tests/components/telegram_bot/conftest.py index e90d941a8f6ef8..837578d986da22 100644 --- a/tests/components/telegram_bot/conftest.py +++ b/tests/components/telegram_bot/conftest.py @@ -1,5 +1,5 @@ """Tests for the telegram_bot integration.""" -from unittest.mock import patch, MagicMock +from unittest.mock import patch import pytest @@ -76,6 +76,11 @@ def mock_generate_secret_token(): yield mock_secret_token +@pytest.fixture +def incorrect_secret_token(): + """Mock incorrect secret token""" + return "AAAABBBBCCCCDDDDEEEEFFFF00009999" + @pytest.fixture def update_message_command(): @@ -168,7 +173,9 @@ def update_callback_query(): @pytest.fixture -async def webhook_platform(hass, config_webhooks, mock_register_webhook, mock_generate_secret_token): +async def webhook_platform( + hass, config_webhooks, mock_register_webhook, mock_generate_secret_token +): """Fixture for setting up the webhooks platform using appropriate config and mocks.""" await async_setup_component( hass, diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index cfb385f57867f4..3a47217af84767 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -152,3 +152,39 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex await hass.async_block_till_done() assert len(events) == 0 + + +async def test_webhook_endpoint_without_secret_token_is_denied( + hass: HomeAssistant, + webhook_platform, + hass_client: ClientSessionGenerator, + update_message_text, +) -> None: + """Request without a secret token header should be denied""" + client = await hass_client() + events = async_capture_events(hass, "telegram_text") + + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_text, + ) + assert response.status == 403 + + +async def test_webhook_endpoint_invalid_secret_token_is_denied( + hass: HomeAssistant, + webhook_platform, + hass_client: ClientSessionGenerator, + update_message_text, + incorrect_secret_token, +) -> None: + """Request with an invalid secret token header should be denied""" + client = await hass_client() + events = async_capture_events(hass, "telegram_text") + + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_text, + headers={"X-Telegram-Bot-Api-Secret-Token": incorrect_secret_token}, + ) + assert response.status == 403 From 47a4fe0051463864efed0688ab4ea113a75b9e47 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Thu, 28 Sep 2023 13:09:27 -0400 Subject: [PATCH 11/14] Minor: remove comment --- homeassistant/components/telegram_bot/webhooks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 4a1b7d846dc7e6..f45c505adff4a2 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -145,7 +145,6 @@ async def post(self, request): if not any(real_ip in net for net in self.trusted_networks): _LOGGER.warning("Access denied from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) - # If a secret token is expected, verify the header secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") if secret_token_header is None or self.secret_token != secret_token_header: _LOGGER.warning("Invalid secret token from %s", real_ip) From 7fdd4ffbb015f05a7965ed81481f7d0a36d5ada2 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Thu, 28 Sep 2023 13:30:39 -0400 Subject: [PATCH 12/14] Revert back to 401 --- homeassistant/components/telegram_bot/webhooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index f45c505adff4a2..50286a8b229bd0 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -148,7 +148,7 @@ async def post(self, request): secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") if secret_token_header is None or self.secret_token != secret_token_header: _LOGGER.warning("Invalid secret token from %s", real_ip) - return self.json_message("Access denied", HTTPStatus.FORBIDDEN) + return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) try: update_data = await request.json() From 00597616eb0eb4c7ff7ba4f098faaf4839956613 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Thu, 28 Sep 2023 13:31:33 -0400 Subject: [PATCH 13/14] ... and for tests --- tests/components/telegram_bot/test_telegram_bot.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index 3a47217af84767..36b4f10845f569 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -168,7 +168,7 @@ async def test_webhook_endpoint_without_secret_token_is_denied( TELEGRAM_WEBHOOK_URL, json=update_message_text, ) - assert response.status == 403 + assert response.status == 401 async def test_webhook_endpoint_invalid_secret_token_is_denied( @@ -187,4 +187,4 @@ async def test_webhook_endpoint_invalid_secret_token_is_denied( json=update_message_text, headers={"X-Telegram-Bot-Api-Secret-Token": incorrect_secret_token}, ) - assert response.status == 403 + assert response.status == 401 From 6a3d5f2c416bca165a6e2fdf95ba446304ec4905 Mon Sep 17 00:00:00 2001 From: Zehuan Li <22104836+zehuanli@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:34:28 -0400 Subject: [PATCH 14/14] Change patching method for the generation of secret tokens --- homeassistant/components/telegram_bot/webhooks.py | 8 ++------ tests/components/telegram_bot/conftest.py | 8 ++++---- tests/components/telegram_bot/test_telegram_bot.py | 8 ++++---- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 50286a8b229bd0..c21cffa84b1064 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -23,16 +23,12 @@ SECRET_TOKEN_LENGTH = 32 -def generate_secret_token(length): - alphabet = string.ascii_letters + string.digits + "-_" - return "".join(secrets.choice(alphabet) for _ in range(length)) - - async def async_setup_platform(hass, bot, config): """Set up the Telegram webhooks platform.""" # Generate an ephemeral secret token - secret_token = generate_secret_token(SECRET_TOKEN_LENGTH) + alphabet = string.ascii_letters + string.digits + "-_" + secret_token = "".join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH)) pushbot = PushBot(hass, bot, config, secret_token) diff --git a/tests/components/telegram_bot/conftest.py b/tests/components/telegram_bot/conftest.py index 837578d986da22..af23efc1afc682 100644 --- a/tests/components/telegram_bot/conftest.py +++ b/tests/components/telegram_bot/conftest.py @@ -67,18 +67,18 @@ def mock_register_webhook(): @pytest.fixture def mock_generate_secret_token(): - """Mock secret token generated for webhook""" + """Mock secret token generated for webhook.""" mock_secret_token = "DEADBEEF12345678DEADBEEF87654321" with patch( - "homeassistant.components.telegram_bot.webhooks.generate_secret_token", - return_value=mock_secret_token, + "homeassistant.components.telegram_bot.webhooks.secrets.choice", + side_effect=mock_secret_token, ): yield mock_secret_token @pytest.fixture def incorrect_secret_token(): - """Mock incorrect secret token""" + """Mock incorrect secret token.""" return "AAAABBBBCCCCDDDDEEEEFFFF00009999" diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index 36b4f10845f569..be28f7be63668c 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -160,9 +160,9 @@ async def test_webhook_endpoint_without_secret_token_is_denied( hass_client: ClientSessionGenerator, update_message_text, ) -> None: - """Request without a secret token header should be denied""" + """Request without a secret token header should be denied.""" client = await hass_client() - events = async_capture_events(hass, "telegram_text") + async_capture_events(hass, "telegram_text") response = await client.post( TELEGRAM_WEBHOOK_URL, @@ -178,9 +178,9 @@ async def test_webhook_endpoint_invalid_secret_token_is_denied( update_message_text, incorrect_secret_token, ) -> None: - """Request with an invalid secret token header should be denied""" + """Request with an invalid secret token header should be denied.""" client = await hass_client() - events = async_capture_events(hass, "telegram_text") + async_capture_events(hass, "telegram_text") response = await client.post( TELEGRAM_WEBHOOK_URL,