diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 8b94cb66496614..c21cffa84b1064 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -3,6 +3,8 @@ from http import HTTPStatus from ipaddress import ip_address import logging +import secrets +import string from telegram import Update from telegram.error import TimedOut @@ -18,11 +20,17 @@ TELEGRAM_WEBHOOK_URL = "/api/telegram_webhooks" REMOVE_WEBHOOK_URL = "" +SECRET_TOKEN_LENGTH = 32 async def async_setup_platform(hass, bot, config): """Set up the Telegram webhooks platform.""" - pushbot = PushBot(hass, bot, config) + + # Generate an ephemeral secret token + alphabet = string.ascii_letters + string.digits + "-_" + secret_token = "".join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH)) + + pushbot = PushBot(hass, bot, config, secret_token) if not pushbot.webhook_url.startswith("https"): _LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url) @@ -34,7 +42,13 @@ async def async_setup_platform(hass, bot, config): hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.deregister_webhook) hass.http.register_view( - PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS]) + PushBotView( + hass, + bot, + pushbot.dispatcher, + config[CONF_TRUSTED_NETWORKS], + secret_token, + ) ) return True @@ -42,10 +56,11 @@ async def async_setup_platform(hass, bot, config): class PushBot(BaseTelegramBotEntity): """Handles all the push/webhook logic and passes telegram updates to `self.handle_update`.""" - def __init__(self, hass, bot, config): + def __init__(self, hass, bot, config, secret_token): """Create Dispatcher before calling super().""" self.bot = bot self.trusted_networks = config[CONF_TRUSTED_NETWORKS] + self.secret_token = secret_token # Dumb dispatcher that just gets our updates to our handler callback (self.handle_update) self.dispatcher = Dispatcher(bot, None) self.dispatcher.add_handler(TypeHandler(Update, self.handle_update)) @@ -61,7 +76,11 @@ def _try_to_set_webhook(self): retry_num = 0 while retry_num < 3: try: - return self.bot.set_webhook(self.webhook_url, timeout=5) + return self.bot.set_webhook( + self.webhook_url, + api_kwargs={"secret_token": self.secret_token}, + timeout=5, + ) except TimedOut: retry_num += 1 _LOGGER.warning("Timeout trying to set webhook (retry #%d)", retry_num) @@ -108,12 +127,13 @@ class PushBotView(HomeAssistantView): url = TELEGRAM_WEBHOOK_URL name = "telegram_webhooks" - def __init__(self, hass, bot, dispatcher, trusted_networks): + def __init__(self, hass, bot, dispatcher, trusted_networks, secret_token): """Initialize by storing stuff needed for setting up our webhook endpoint.""" self.hass = hass self.bot = bot self.dispatcher = dispatcher self.trusted_networks = trusted_networks + self.secret_token = secret_token async def post(self, request): """Accept the POST from telegram.""" @@ -121,6 +141,10 @@ async def post(self, request): if not any(real_ip in net for net in self.trusted_networks): _LOGGER.warning("Access denied from %s", real_ip) return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) + secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") + if secret_token_header is None or self.secret_token != secret_token_header: + _LOGGER.warning("Invalid secret token from %s", real_ip) + return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED) try: update_data = await request.json() diff --git a/tests/components/telegram_bot/conftest.py b/tests/components/telegram_bot/conftest.py index d8d445fbb86b2f..af23efc1afc682 100644 --- a/tests/components/telegram_bot/conftest.py +++ b/tests/components/telegram_bot/conftest.py @@ -65,6 +65,23 @@ def mock_register_webhook(): yield +@pytest.fixture +def mock_generate_secret_token(): + """Mock secret token generated for webhook.""" + mock_secret_token = "DEADBEEF12345678DEADBEEF87654321" + with patch( + "homeassistant.components.telegram_bot.webhooks.secrets.choice", + side_effect=mock_secret_token, + ): + yield mock_secret_token + + +@pytest.fixture +def incorrect_secret_token(): + """Mock incorrect secret token.""" + return "AAAABBBBCCCCDDDDEEEEFFFF00009999" + + @pytest.fixture def update_message_command(): """Fixture for mocking an incoming update of type message/command.""" @@ -156,7 +173,9 @@ def update_callback_query(): @pytest.fixture -async def webhook_platform(hass, config_webhooks, mock_register_webhook): +async def webhook_platform( + hass, config_webhooks, mock_register_webhook, mock_generate_secret_token +): """Fixture for setting up the webhooks platform using appropriate config and mocks.""" await async_setup_component( hass, diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index b87f15b3ed3678..be28f7be63668c 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -35,12 +35,17 @@ async def test_webhook_endpoint_generates_telegram_text_event( webhook_platform, hass_client: ClientSessionGenerator, update_message_text, + mock_generate_secret_token, ) -> None: """POST to the configured webhook endpoint and assert fired `telegram_text` event.""" client = await hass_client() events = async_capture_events(hass, "telegram_text") - response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_text) + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_text, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, + ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" @@ -56,12 +61,17 @@ async def test_webhook_endpoint_generates_telegram_command_event( webhook_platform, hass_client: ClientSessionGenerator, update_message_command, + mock_generate_secret_token, ) -> None: """POST to the configured webhook endpoint and assert fired `telegram_command` event.""" client = await hass_client() events = async_capture_events(hass, "telegram_command") - response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_command) + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_command, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, + ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" @@ -77,12 +87,17 @@ async def test_webhook_endpoint_generates_telegram_callback_event( webhook_platform, hass_client: ClientSessionGenerator, update_callback_query, + mock_generate_secret_token, ) -> None: """POST to the configured webhook endpoint and assert fired `telegram_callback` event.""" client = await hass_client() events = async_capture_events(hass, "telegram_callback") - response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_callback_query) + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_callback_query, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, + ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" @@ -119,13 +134,16 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex webhook_platform, hass_client: ClientSessionGenerator, unauthorized_update_message_text, + mock_generate_secret_token, ) -> None: """Update with unauthorized user/chat should not trigger event.""" client = await hass_client() events = async_capture_events(hass, "telegram_text") response = await client.post( - TELEGRAM_WEBHOOK_URL, json=unauthorized_update_message_text + TELEGRAM_WEBHOOK_URL, + json=unauthorized_update_message_text, + headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token}, ) assert response.status == 200 assert (await response.read()).decode("utf-8") == "" @@ -134,3 +152,39 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex await hass.async_block_till_done() assert len(events) == 0 + + +async def test_webhook_endpoint_without_secret_token_is_denied( + hass: HomeAssistant, + webhook_platform, + hass_client: ClientSessionGenerator, + update_message_text, +) -> None: + """Request without a secret token header should be denied.""" + client = await hass_client() + async_capture_events(hass, "telegram_text") + + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_text, + ) + assert response.status == 401 + + +async def test_webhook_endpoint_invalid_secret_token_is_denied( + hass: HomeAssistant, + webhook_platform, + hass_client: ClientSessionGenerator, + update_message_text, + incorrect_secret_token, +) -> None: + """Request with an invalid secret token header should be denied.""" + client = await hass_client() + async_capture_events(hass, "telegram_text") + + response = await client.post( + TELEGRAM_WEBHOOK_URL, + json=update_message_text, + headers={"X-Telegram-Bot-Api-Secret-Token": incorrect_secret_token}, + ) + assert response.status == 401