Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add secret_token support to telegram_bot component #100869

Merged
merged 14 commits into from
Oct 2, 2023
34 changes: 29 additions & 5 deletions homeassistant/components/telegram_bot/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from http import HTTPStatus
from ipaddress import ip_address
import logging
import secrets
import string

from telegram import Update
from telegram.error import TimedOut
Expand All @@ -18,11 +20,17 @@

TELEGRAM_WEBHOOK_URL = "/api/telegram_webhooks"
REMOVE_WEBHOOK_URL = ""
SECRET_TOKEN_LENGTH = 32


async def async_setup_platform(hass, bot, config):
"""Set up the Telegram webhooks platform."""
pushbot = PushBot(hass, bot, config)

# Generate an ephemeral secret token
alphabet = string.ascii_letters + string.digits + "-_"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use f-strings instead of string concatenation.

secret_token = "".join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH))

pushbot = PushBot(hass, bot, config, secret_token)

if not pushbot.webhook_url.startswith("https"):
_LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url)
Expand All @@ -34,18 +42,25 @@ async def async_setup_platform(hass, bot, config):

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.deregister_webhook)
hass.http.register_view(
PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS])
PushBotView(
hass,
bot,
pushbot.dispatcher,
config[CONF_TRUSTED_NETWORKS],
secret_token,
)
)
return True


class PushBot(BaseTelegramBotEntity):
"""Handles all the push/webhook logic and passes telegram updates to `self.handle_update`."""

def __init__(self, hass, bot, config):
def __init__(self, hass, bot, config, secret_token):
"""Create Dispatcher before calling super()."""
self.bot = bot
self.trusted_networks = config[CONF_TRUSTED_NETWORKS]
self.secret_token = secret_token
# Dumb dispatcher that just gets our updates to our handler callback (self.handle_update)
self.dispatcher = Dispatcher(bot, None)
self.dispatcher.add_handler(TypeHandler(Update, self.handle_update))
Expand All @@ -61,7 +76,11 @@ def _try_to_set_webhook(self):
retry_num = 0
while retry_num < 3:
try:
return self.bot.set_webhook(self.webhook_url, timeout=5)
return self.bot.set_webhook(
self.webhook_url,
api_kwargs={"secret_token": self.secret_token},
timeout=5,
)
except TimedOut:
retry_num += 1
_LOGGER.warning("Timeout trying to set webhook (retry #%d)", retry_num)
Expand Down Expand Up @@ -108,19 +127,24 @@ class PushBotView(HomeAssistantView):
url = TELEGRAM_WEBHOOK_URL
name = "telegram_webhooks"

def __init__(self, hass, bot, dispatcher, trusted_networks):
def __init__(self, hass, bot, dispatcher, trusted_networks, secret_token):
"""Initialize by storing stuff needed for setting up our webhook endpoint."""
self.hass = hass
self.bot = bot
self.dispatcher = dispatcher
self.trusted_networks = trusted_networks
self.secret_token = secret_token

async def post(self, request):
"""Accept the POST from telegram."""
real_ip = ip_address(request.remote)
if not any(real_ip in net for net in self.trusted_networks):
_LOGGER.warning("Access denied from %s", real_ip)
return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED)
secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if secret_token_header is None or self.secret_token != secret_token_header:
_LOGGER.warning("Invalid secret token from %s", real_ip)
return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED)

try:
update_data = await request.json()
Expand Down
21 changes: 20 additions & 1 deletion tests/components/telegram_bot/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ def mock_register_webhook():
yield


@pytest.fixture
def mock_generate_secret_token():
"""Mock secret token generated for webhook."""
mock_secret_token = "DEADBEEF12345678DEADBEEF87654321"
with patch(
"homeassistant.components.telegram_bot.webhooks.secrets.choice",
side_effect=mock_secret_token,
):
yield mock_secret_token


@pytest.fixture
def incorrect_secret_token():
"""Mock incorrect secret token."""
return "AAAABBBBCCCCDDDDEEEEFFFF00009999"


@pytest.fixture
def update_message_command():
"""Fixture for mocking an incoming update of type message/command."""
Expand Down Expand Up @@ -156,7 +173,9 @@ def update_callback_query():


@pytest.fixture
async def webhook_platform(hass, config_webhooks, mock_register_webhook):
async def webhook_platform(
hass, config_webhooks, mock_register_webhook, mock_generate_secret_token
):
"""Fixture for setting up the webhooks platform using appropriate config and mocks."""
await async_setup_component(
hass,
Expand Down
62 changes: 58 additions & 4 deletions tests/components/telegram_bot/test_telegram_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ async def test_webhook_endpoint_generates_telegram_text_event(
webhook_platform,
hass_client: ClientSessionGenerator,
update_message_text,
mock_generate_secret_token,
) -> None:
"""POST to the configured webhook endpoint and assert fired `telegram_text` event."""
client = await hass_client()
events = async_capture_events(hass, "telegram_text")

response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_text)
response = await client.post(
TELEGRAM_WEBHOOK_URL,
json=update_message_text,
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
)
assert response.status == 200
assert (await response.read()).decode("utf-8") == ""

Expand All @@ -56,12 +61,17 @@ async def test_webhook_endpoint_generates_telegram_command_event(
webhook_platform,
hass_client: ClientSessionGenerator,
update_message_command,
mock_generate_secret_token,
) -> None:
"""POST to the configured webhook endpoint and assert fired `telegram_command` event."""
client = await hass_client()
events = async_capture_events(hass, "telegram_command")

response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_command)
response = await client.post(
TELEGRAM_WEBHOOK_URL,
json=update_message_command,
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
)
assert response.status == 200
assert (await response.read()).decode("utf-8") == ""

Expand All @@ -77,12 +87,17 @@ async def test_webhook_endpoint_generates_telegram_callback_event(
webhook_platform,
hass_client: ClientSessionGenerator,
update_callback_query,
mock_generate_secret_token,
) -> None:
"""POST to the configured webhook endpoint and assert fired `telegram_callback` event."""
client = await hass_client()
events = async_capture_events(hass, "telegram_callback")

response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_callback_query)
response = await client.post(
TELEGRAM_WEBHOOK_URL,
json=update_callback_query,
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
)
assert response.status == 200
assert (await response.read()).decode("utf-8") == ""

Expand Down Expand Up @@ -119,13 +134,16 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex
webhook_platform,
hass_client: ClientSessionGenerator,
unauthorized_update_message_text,
mock_generate_secret_token,
) -> None:
"""Update with unauthorized user/chat should not trigger event."""
client = await hass_client()
events = async_capture_events(hass, "telegram_text")

response = await client.post(
TELEGRAM_WEBHOOK_URL, json=unauthorized_update_message_text
TELEGRAM_WEBHOOK_URL,
json=unauthorized_update_message_text,
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
)
assert response.status == 200
assert (await response.read()).decode("utf-8") == ""
Expand All @@ -134,3 +152,39 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex
await hass.async_block_till_done()

assert len(events) == 0


async def test_webhook_endpoint_without_secret_token_is_denied(
hass: HomeAssistant,
webhook_platform,
hass_client: ClientSessionGenerator,
update_message_text,
) -> None:
"""Request without a secret token header should be denied."""
client = await hass_client()
async_capture_events(hass, "telegram_text")

response = await client.post(
TELEGRAM_WEBHOOK_URL,
json=update_message_text,
)
assert response.status == 401


async def test_webhook_endpoint_invalid_secret_token_is_denied(
hass: HomeAssistant,
webhook_platform,
hass_client: ClientSessionGenerator,
update_message_text,
incorrect_secret_token,
) -> None:
"""Request with an invalid secret token header should be denied."""
client = await hass_client()
async_capture_events(hass, "telegram_text")

response = await client.post(
TELEGRAM_WEBHOOK_URL,
json=update_message_text,
headers={"X-Telegram-Bot-Api-Secret-Token": incorrect_secret_token},
)
assert response.status == 401