Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 139 additions & 28 deletions homeassistant/components/mcp_server/http.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
"""Model Context Protocol transport protocol for Server Sent Events (SSE).
"""Model Context Protocol transport protocol for Streamable HTTP and SSE.

This registers HTTP endpoints that supports SSE as a transport layer
for the Model Context Protocol. There are two HTTP endpoints:
This registers HTTP endpoints that support the Streamable HTTP protocol as
well as the older SSE as a transport layer.

The Streamable HTTP protocol uses a single HTTP endpoint:

- /api/mcp_server: The Streamable HTTP endpoint currently implements the
stateless protocol for simplicity. This receives client requests and
sends them to the MCP server, then waits for a response to send back to
the client.

The older SSE protocol has two HTTP endpoints:

- /mcp_server/sse: The SSE endpoint that is used to establish a session
with the client and glue to the MCP server. This is used to push responses
Expand All @@ -14,20 +23,24 @@
See https://modelcontextprotocol.io/docs/concepts/transports
"""

import asyncio
from dataclasses import dataclass
from http import HTTPStatus
import logging

from aiohttp import web
from aiohttp.web_exceptions import HTTPBadRequest, HTTPNotFound
from aiohttp_sse import sse_response
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp import types
from mcp import JSONRPCRequest, types
from mcp.server import InitializationOptions, Server
from mcp.shared.message import SessionMessage

from homeassistant.components import conversation
from homeassistant.components.http import KEY_HASS, HomeAssistantView
from homeassistant.const import CONF_LLM_HASS_API
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import Context, HomeAssistant, callback
from homeassistant.helpers import llm

from .const import DOMAIN
Expand All @@ -37,6 +50,14 @@

_LOGGER = logging.getLogger(__name__)

# Streamable HTTP endpoint
STREAMABLE_API = f"/api/{DOMAIN}"
TIMEOUT = 60 # Seconds

# Content types
CONTENT_TYPE_JSON = "application/json"

# Legacy SSE endpoint
SSE_API = f"/{DOMAIN}/sse"
MESSAGES_API = f"/{DOMAIN}/messages/{{session_id}}"

Expand All @@ -46,6 +67,7 @@ def async_register(hass: HomeAssistant) -> None:
"""Register the websocket API."""
hass.http.register_view(ModelContextProtocolSSEView())
hass.http.register_view(ModelContextProtocolMessagesView())
hass.http.register_view(ModelContextProtocolStreamableView())


def async_get_config_entry(hass: HomeAssistant) -> MCPServerConfigEntry:
Expand All @@ -66,6 +88,52 @@ def async_get_config_entry(hass: HomeAssistant) -> MCPServerConfigEntry:
return config_entries[0]


@dataclass
class Streams:
"""Pairs of streams for MCP server communication."""

# The MCP server reads from the read stream. The HTTP handler receives
# incoming client messages and writes the to the read_stream_writer.
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception]

# The MCP server writes to the write stream. The HTTP handler reads from
# the write stream and sends messages to the client.
write_stream: MemoryObjectSendStream[SessionMessage]
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]


def create_streams() -> Streams:
"""Create a new pair of streams for MCP server communication."""
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
return Streams(
read_stream=read_stream,
read_stream_writer=read_stream_writer,
write_stream=write_stream,
write_stream_reader=write_stream_reader,
)


async def create_mcp_server(
hass: HomeAssistant, context: Context, entry: MCPServerConfigEntry
) -> tuple[Server, InitializationOptions]:
"""Initialize the MCP server to ensure it's ready to handle requests."""
llm_context = llm.LLMContext(
platform=DOMAIN,
context=context,
language="*",
assistant=conversation.DOMAIN,
device_id=None,
)
llm_api_id = entry.data[CONF_LLM_HASS_API]
server = await create_server(hass, llm_api_id, llm_context)
options = await hass.async_add_executor_job(
server.create_initialization_options # Reads package for version info
)
return server, options


class ModelContextProtocolSSEView(HomeAssistantView):
"""Model Context Protocol SSE endpoint."""

Expand All @@ -86,38 +154,20 @@ async def get(self, request: web.Request) -> web.StreamResponse:
entry = async_get_config_entry(hass)
session_manager = entry.runtime_data

context = llm.LLMContext(
platform=DOMAIN,
context=self.context(request),
language="*",
assistant=conversation.DOMAIN,
device_id=None,
)
llm_api_id = entry.data[CONF_LLM_HASS_API]
server = await create_server(hass, llm_api_id, context)
options = await hass.async_add_executor_job(
server.create_initialization_options # Reads package for version info
)

read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception]
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)

write_stream: MemoryObjectSendStream[SessionMessage]
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
server, options = await create_mcp_server(hass, self.context(request), entry)
streams = create_streams()

async with (
sse_response(request) as response,
session_manager.create(Session(read_stream_writer)) as session_id,
session_manager.create(Session(streams.read_stream_writer)) as session_id,
):
session_uri = MESSAGES_API.format(session_id=session_id)
_LOGGER.debug("Sending SSE endpoint: %s", session_uri)
await response.send(session_uri, event="endpoint")

async def sse_reader() -> None:
"""Forward MCP server responses to the client."""
async for session_message in write_stream_reader:
async for session_message in streams.write_stream_reader:
_LOGGER.debug("Sending SSE message: %s", session_message)
await response.send(
session_message.message.model_dump_json(
Expand All @@ -128,7 +178,7 @@ async def sse_reader() -> None:

async with anyio.create_task_group() as tg:
tg.start_soon(sse_reader)
await server.run(read_stream, write_stream, options)
await server.run(streams.read_stream, streams.write_stream, options)

return response

Expand Down Expand Up @@ -168,3 +218,64 @@ async def post(
_LOGGER.debug("Received client message: %s", message)
await session.read_stream_writer.send(SessionMessage(message))
return web.Response(status=200)


class ModelContextProtocolStreamableView(HomeAssistantView):
"""Model Context Protocol Streamable HTTP endpoint."""

name = f"{DOMAIN}:streamable"
url = STREAMABLE_API

async def get(self, request: web.Request) -> web.StreamResponse:
"""Handle unsupported methods."""
return web.Response(
status=HTTPStatus.METHOD_NOT_ALLOWED, text="Only POST method is supported"
)

async def post(self, request: web.Request) -> web.StreamResponse:
"""Process JSON-RPC messages for the Model Context Protocol."""
hass = request.app[KEY_HASS]
entry = async_get_config_entry(hass)

# The request must include a JSON-RPC message
if CONTENT_TYPE_JSON not in request.headers.get("accept", ""):
raise HTTPBadRequest(text=f"Client must accept {CONTENT_TYPE_JSON}")
if request.content_type != CONTENT_TYPE_JSON:
raise HTTPBadRequest(text=f"Content-Type must be {CONTENT_TYPE_JSON}")
try:
json_data = await request.json()
message = types.JSONRPCMessage.model_validate(json_data)
except ValueError as err:
_LOGGER.debug("Failed to parse message as JSON-RPC message: %s", err)
raise HTTPBadRequest(text="Request must be a JSON-RPC message") from err

_LOGGER.debug("Received client message: %s", message)

# For notifications and responses only, return 202 Accepted
if not isinstance(message.root, JSONRPCRequest):
_LOGGER.debug("Notification or response received, returning 202")
return web.Response(status=HTTPStatus.ACCEPTED)

# The MCP server runs as a background task for the duration of the
# request. We open a buffered stream pair to communicate with it. The
# request is sent to the MCP server and we wait for a single response
# then shut down the server.
server, options = await create_mcp_server(hass, self.context(request), entry)
streams = create_streams()

async def run_server() -> None:
await server.run(
streams.read_stream, streams.write_stream, options, stateless=True
)

async with asyncio.timeout(TIMEOUT), anyio.create_task_group() as tg:
tg.start_soon(run_server)

await streams.read_stream_writer.send(SessionMessage(message))
session_message = await anext(streams.write_stream_reader)
tg.cancel_scope.cancel()

_LOGGER.debug("Sending response: %s", session_message)
return web.json_response(
data=session_message.message.model_dump(by_alias=True, exclude_none=True),
)
6 changes: 5 additions & 1 deletion homeassistant/components/ollama/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def _convert_content(
return ollama.Message(
role=MessageRole.ASSISTANT.value,
content=chat_content.content,
thinking=chat_content.thinking_content,
tool_calls=[
ollama.Message.ToolCall(
function=ollama.Message.ToolCall.Function(
Expand All @@ -103,7 +104,8 @@ def _convert_content(
)
)
for tool_call in chat_content.tool_calls or ()
],
]
or None,
)
if isinstance(chat_content, conversation.UserContent):
images: list[ollama.Image] = []
Expand Down Expand Up @@ -162,6 +164,8 @@ async def _transform_stream(
]
if (content := response_message.get("content")) is not None:
chunk["content"] = content
if (thinking := response_message.get("thinking")) is not None:
chunk["thinking_content"] = thinking
if response_message.get("done"):
new_msg = True
yield chunk
Expand Down
2 changes: 1 addition & 1 deletion homeassistant/components/system_bridge/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def partition_usage(
key="power_usage",
translation_key="power_usage",
state_class=SensorStateClass.MEASUREMENT,
device_class=SensorDeviceClass.POWER,
native_unit_of_measurement=UnitOfPower.WATT,
suggested_display_precision=2,
icon="mdi:power-plug",
Expand Down Expand Up @@ -577,7 +578,6 @@ async def async_setup_entry(
key=f"gpu_{gpu.id}_power_usage",
name=f"{gpu.name} power usage",
entity_registry_enabled_default=False,
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfPower.WATT,
value=lambda data, k=index: gpu_power_usage(data, k),
Expand Down
63 changes: 32 additions & 31 deletions homeassistant/components/zwave_js/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1501,41 +1501,42 @@ async def async_step_esphome(
if not is_hassio(self.hass):
return self.async_abort(reason="not_hassio")

if (
discovery_info.zwave_home_id
and (
current_config_entries := self._async_current_entries(
include_ignore=False
if discovery_info.zwave_home_id:
if (
(
current_config_entries := self._async_current_entries(
include_ignore=False
)
)
)
and (home_id := str(discovery_info.zwave_home_id))
and (
existing_entry := next(
(
entry
for entry in current_config_entries
if entry.unique_id == home_id
),
None,
and (home_id := str(discovery_info.zwave_home_id))
and (
existing_entry := next(
(
entry
for entry in current_config_entries
if entry.unique_id == home_id
),
None,
)
)
# Only update existing entries that are configured via sockets
and existing_entry.data.get(CONF_SOCKET_PATH)
# And use the add-on
and existing_entry.data.get(CONF_USE_ADDON)
):
await self._async_set_addon_config(
{CONF_ADDON_SOCKET: discovery_info.socket_path}
)
# Reloading will sync add-on options to config entry data
self.hass.config_entries.async_schedule_reload(existing_entry.entry_id)
return self.async_abort(reason="already_configured")

# We are not aborting if home ID configured here, we just want to make sure that it's set
# We will update a USB based config entry automatically in `async_step_finish_addon_setup_user`
await self.async_set_unique_id(
str(discovery_info.zwave_home_id), raise_on_progress=False
)
# Only update existing entries that are configured via sockets
and existing_entry.data.get(CONF_SOCKET_PATH)
# And use the add-on
and existing_entry.data.get(CONF_USE_ADDON)
):
await self._async_set_addon_config(
{CONF_ADDON_SOCKET: discovery_info.socket_path}
)
# Reloading will sync add-on options to config entry data
self.hass.config_entries.async_schedule_reload(existing_entry.entry_id)
return self.async_abort(reason="already_configured")

# We are not aborting if home ID configured here, we just want to make sure that it's set
# We will update a USB based config entry automatically in `async_step_finish_addon_setup_user`
await self.async_set_unique_id(
str(discovery_info.zwave_home_id), raise_on_progress=False
)
self.socket_path = discovery_info.socket_path
self.context["title_placeholders"] = {
CONF_NAME: f"{discovery_info.name} via ESPHome"
Expand Down
Loading
Loading