Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added command_line auth provider that validates credentials by calling a command #19985

Merged
merged 15 commits into from Feb 7, 2019
Merged
149 changes: 149 additions & 0 deletions homeassistant/auth/providers/command_line.py
@@ -0,0 +1,149 @@
"""Auth provider that validates credentials via an external script."""

import typing as T
bob1de marked this conversation as resolved.
Show resolved Hide resolved

import asyncio.subprocess
import collections
import logging

import voluptuous as vol

from homeassistant.exceptions import HomeAssistantError

from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow
from ..models import Credentials, UserMeta


CONF_PROGRAM = "program"
CONF_ARGS = "args"

CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend({
vol.Required(CONF_PROGRAM): str,
vol.Optional(CONF_ARGS, default=None): vol.Any(vol.DefaultTo(list), [str]),
}, extra=vol.PREVENT_EXTRA)

_LOGGER = logging.getLogger(__name__)


class InvalidAuthError(HomeAssistantError):
"""Raised when authentication with given credentials fails."""


@AUTH_PROVIDERS.register("command_line")
class CommandLineAuthProvider(AuthProvider):
"""Auth provider validating credentials by calling a command."""

DEFAULT_TITLE = "Command Line Authentication"

# which keys to accept from a program's stdout
ALLOWED_META_KEYS = ("name",)

def __init__(self, *args: T.Any, **kwargs: T.Any) -> None:
awarecan marked this conversation as resolved.
Show resolved Hide resolved
"""Extend parent's __init__.

Adds self._user_meta dictionary to hold the user-specific
attributes provided by external programs.
"""
super().__init__(*args, **kwargs)
self._user_meta = {} # type: T.Dict[str, T.Dict[str, T.Any]]

async def async_login_flow(self, context: T.Optional[T.Dict]) -> LoginFlow:
"""Return a flow to login."""
return CommandLineLoginFlow(self)

async def async_validate_login(self, username: str, password: str) -> None:
"""Validate a username and password."""
env = {
"username": username,
balloob marked this conversation as resolved.
Show resolved Hide resolved
"password": password,
}
try:
process = await asyncio.subprocess.create_subprocess_exec(
bob1de marked this conversation as resolved.
Show resolved Hide resolved
self.config[CONF_PROGRAM], *self.config[CONF_ARGS],
env=env,
stdout=asyncio.subprocess.PIPE,
)
stdout = (await process.communicate())[0]
bob1de marked this conversation as resolved.
Show resolved Hide resolved
except OSError as err:
_LOGGER.error("Error while authenticating '%s': %s",
username, err)
raise InvalidAuthError

if process.returncode != 0:
raise InvalidAuthError
bob1de marked this conversation as resolved.
Show resolved Hide resolved

meta = {} # type: T.Dict[str, str]
for _line in stdout.splitlines():
try:
line = _line.decode().lstrip()
if line.startswith("#"):
continue
key, value = line.split("=", 1)
bob1de marked this conversation as resolved.
Show resolved Hide resolved
except ValueError:
# malformed line
continue
key = key.strip()
value = value.strip()
if key in self.ALLOWED_META_KEYS:
meta[key] = value
self._user_meta[username] = meta

async def async_get_or_create_credentials(
self, flow_result: T.Dict[str, str]
) -> Credentials:
"""Get credentials based on the flow result."""
username = flow_result["username"]
balloob marked this conversation as resolved.
Show resolved Hide resolved
for credential in await self.async_credentials():
if credential.data["username"] == username:
return credential

# Create new credentials.
return self.async_create_credentials({
"username": username,
})

async def async_user_meta_for_credentials(
self, credentials: Credentials
) -> UserMeta:
"""Return extra user metadata for credentials.

Currently, only name is supported.
"""
meta = self._user_meta.get(credentials.data["username"], {})
return UserMeta(
name=meta.get("name"),
is_active=True,
)


class CommandLineLoginFlow(LoginFlow):
"""Handler for the login flow."""

async def async_step_init(
self, user_input: T.Optional[T.Dict[str, str]] = None
) -> T.Dict[str, T.Any]:
"""Handle the step of the form."""
errors = {}

if user_input is not None:
try:
await T.cast(CommandLineAuthProvider, self._auth_provider) \
.async_validate_login(
user_input["username"], user_input["password"]
)
except InvalidAuthError:
errors["base"] = "invalid_auth"

if not errors:
user_input.pop("password")
return await self.async_finish(user_input)

schema = collections.OrderedDict() # type: T.Dict[str, type]
schema["username"] = str
schema["password"] = str

return self.async_show_form(
step_id="init",
data_schema=vol.Schema(schema),
errors=errors,
)