Skip to content

Commit

Permalink
feat: Login flow v2 (#255)
Browse files Browse the repository at this point in the history
Changes proposed in this pull request:

 * Implement Login flow v2 for standard Nextcloud clients.
 * Allow user to initialize standard Nextcloud clients without providing credentials so that the user can log in using Login flow v2.
  • Loading branch information
blvdek committed May 22, 2024
1 parent ddce6bf commit 5d41bf9
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 5 deletions.
18 changes: 18 additions & 0 deletions docs/reference/LoginFlowV2.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
.. py:currentmodule:: nc_py_api.loginflow_v2
LoginFlow V2
============

Login flow v2 is an authorization process for the standard Nextcloud client that allows each client to have their own set of credentials.

.. autoclass:: _LoginFlowV2API
:inherited-members:
:members:

.. autoclass:: Credentials
:inherited-members:
:members:

.. autoclass:: LoginFlow
:inherited-members:
:members:
3 changes: 3 additions & 0 deletions docs/reference/Session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ Internal
.. autoclass:: NcSessionApp
:members:
:inherited-members:

.. autoclass:: NcSession
:members:
1 change: 1 addition & 0 deletions docs/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ Reference
ActivityApp
Notes
Session
LoginFlowV2
5 changes: 4 additions & 1 deletion nc_py_api/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ class Config(BasicConfig):

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.auth = (self._get_config_value("nc_auth_user", **kwargs), self._get_config_value("nc_auth_pass", **kwargs))
nc_auth_user = self._get_config_value("nc_auth_user", raise_not_found=False, **kwargs)
nc_auth_pass = self._get_config_value("nc_auth_pass", raise_not_found=False, **kwargs)
if nc_auth_user and nc_auth_pass:
self.auth = (nc_auth_user, nc_auth_pass)


@dataclass
Expand Down
161 changes: 161 additions & 0 deletions nc_py_api/loginflow_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""Login flow v2 API wrapper."""

import asyncio
import json
import time
from dataclasses import dataclass

import httpx

from ._exceptions import check_error
from ._session import AsyncNcSession, NcSession

MAX_TIMEOUT = 60 * 20


@dataclass
class LoginFlow:
"""The Nextcloud Login flow v2 initialization response representation."""

def __init__(self, raw_data: dict) -> None:
self.raw_data = raw_data

@property
def login(self) -> str:
"""The URL for user authorization.
Should be opened by the user in the default browser to authorize in Nextcloud.
"""
return self.raw_data["login"]

@property
def token(self) -> str:
"""Token for a polling for confirmation of user authorization."""
return self.raw_data["poll"]["token"]

@property
def endpoint(self) -> str:
"""Endpoint for polling."""
return self.raw_data["poll"]["endpoint"]

def __repr__(self) -> str:
return f"<{self.__class__.__name__} login_url={self.login}>"


@dataclass
class Credentials:
"""The Nextcloud Login flow v2 response with app credentials representation."""

def __init__(self, raw_data: dict) -> None:
self.raw_data = raw_data

@property
def server(self) -> str:
"""The address of Nextcloud to connect to.
The server may specify a protocol (http or https). If no protocol is specified https will be used.
"""
return self.raw_data["server"]

@property
def login_name(self) -> str:
"""The username for authenticating with Nextcloud."""
return self.raw_data["loginName"]

@property
def app_password(self) -> str:
"""The application password generated for authenticating with Nextcloud."""
return self.raw_data["appPassword"]

def __repr__(self) -> str:
return f"<{self.__class__.__name__} login={self.login_name} app_password={self.app_password}>"


class _LoginFlowV2API:
"""Class implementing Nextcloud Login flow v2."""

_ep_init: str = "/index.php/login/v2"
_ep_poll: str = "/index.php/login/v2/poll"

def __init__(self, session: NcSession) -> None:
self._session = session

def init(self, user_agent: str = "nc_py_api") -> LoginFlow:
"""Init a Login flow v2.
:param user_agent: Application name. Application password will be associated with this name.
"""
r = self._session.adapter.post(self._ep_init, headers={"user-agent": user_agent})
return LoginFlow(_res_to_json(r))

def poll(self, token: str, timeout: int = MAX_TIMEOUT, step: int = 1, overwrite_auth: bool = True) -> Credentials:
"""Poll the Login flow v2 credentials.
:param token: Token for a polling for confirmation of user authorization.
:param timeout: Maximum time to wait for polling in seconds, defaults to MAX_TIMEOUT.
:param step: Interval for polling in seconds, defaults to 1.
:param overwrite_auth: If True current session will be overwritten with new credentials, defaults to True.
:raises ValueError: If timeout more than 20 minutes.
"""
if timeout > MAX_TIMEOUT:
msg = "Timeout can't be more than 20 minutes."
raise ValueError(msg)
for _ in range(timeout // step):
r = self._session.adapter.post(self._ep_poll, data={"token": token})
if r.status_code == 200:
break
time.sleep(step)
r_model = Credentials(_res_to_json(r))
if overwrite_auth:
self._session.cfg.auth = (r_model.login_name, r_model.app_password)
self._session.init_adapter(restart=True)
self._session.init_adapter_dav(restart=True)
return r_model


class _AsyncLoginFlowV2API:
"""Class implementing Async Nextcloud Login flow v2."""

_ep_init: str = "/index.php/login/v2"
_ep_poll: str = "/index.php/login/v2/poll"

def __init__(self, session: AsyncNcSession) -> None:
self._session = session

async def init(self, user_agent: str = "nc_py_api") -> LoginFlow:
"""Init a Login flow v2.
:param user_agent: Application name. Application password will be associated with this name.
"""
r = await self._session.adapter.post(self._ep_init, headers={"user-agent": user_agent})
return LoginFlow(_res_to_json(r))

async def poll(
self, token: str, timeout: int = MAX_TIMEOUT, step: int = 1, overwrite_auth: bool = True
) -> Credentials:
"""Poll the Login flow v2 credentials.
:param token: Token for a polling for confirmation of user authorization.
:param timeout: Maximum time to wait for polling in seconds, defaults to MAX_TIMEOUT.
:param step: Interval for polling in seconds, defaults to 1.
:param overwrite_auth: If True current session will be overwritten with new credentials, defaults to True.
:raises ValueError: If timeout more than 20 minutes.
"""
if timeout > MAX_TIMEOUT:
raise ValueError("Timeout can't be more than 20 minutes.")
for _ in range(timeout // step):
r = await self._session.adapter.post(self._ep_poll, data={"token": token})
if r.status_code == 200:
break
await asyncio.sleep(step)
r_model = Credentials(_res_to_json(r))
if overwrite_auth:
self._session.cfg.auth = (r_model.login_name, r_model.app_password)
self._session.init_adapter(restart=True)
self._session.init_adapter_dav(restart=True)
return r_model


def _res_to_json(response: httpx.Response) -> dict:
check_error(response)
return json.loads(response.text)
15 changes: 11 additions & 4 deletions nc_py_api/nextcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .ex_app.providers.providers import AsyncProvidersApi, ProvidersApi
from .ex_app.ui.ui import AsyncUiApi, UiApi
from .files.files import AsyncFilesAPI, FilesAPI
from .loginflow_v2 import _AsyncLoginFlowV2API, _LoginFlowV2API
from .notes import _AsyncNotesAPI, _NotesAPI
from .notifications import _AsyncNotificationsAPI, _NotificationsAPI
from .user_status import _AsyncUserStatusAPI, _UserStatusAPI
Expand Down Expand Up @@ -246,15 +247,18 @@ class Nextcloud(_NextcloudBasic):
"""

_session: NcSession
loginflow_v2: _LoginFlowV2API
"""Nextcloud Login flow v2."""

def __init__(self, **kwargs):
"""If the parameters are not specified, they will be taken from the environment.
:param nextcloud_url: url of the nextcloud instance.
:param nc_auth_user: login username.
:param nc_auth_pass: password or app-password for the username.
:param nc_auth_user: login username. Optional.
:param nc_auth_pass: password or app-password for the username. Optional.
"""
self._session = NcSession(**kwargs)
self.loginflow_v2 = _LoginFlowV2API(self._session)
super().__init__(self._session)

@property
Expand All @@ -270,15 +274,18 @@ class AsyncNextcloud(_AsyncNextcloudBasic):
"""

_session: AsyncNcSession
loginflow_v2: _AsyncLoginFlowV2API
"""Nextcloud Login flow v2."""

def __init__(self, **kwargs):
"""If the parameters are not specified, they will be taken from the environment.
:param nextcloud_url: url of the nextcloud instance.
:param nc_auth_user: login username.
:param nc_auth_pass: password or app-password for the username.
:param nc_auth_user: login username. Optional.
:param nc_auth_pass: password or app-password for the username. Optional.
"""
self._session = AsyncNcSession(**kwargs)
self.loginflow_v2 = _AsyncLoginFlowV2API(self._session)
super().__init__(self._session)

@property
Expand Down
24 changes: 24 additions & 0 deletions tests/actual_tests/loginflow_v2_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest

from nc_py_api import NextcloudException


def test_init_poll(nc_client):
lf = nc_client.loginflow_v2.init()
assert isinstance(lf.endpoint, str)
assert isinstance(lf.login, str)
assert isinstance(lf.token, str)
with pytest.raises(NextcloudException) as exc_info:
nc_client.loginflow_v2.poll(lf.token, 1)
assert exc_info.value.status_code == 404


@pytest.mark.asyncio(scope="session")
async def test_init_poll_async(anc_client):
lf = await anc_client.loginflow_v2.init()
assert isinstance(lf.endpoint, str)
assert isinstance(lf.login, str)
assert isinstance(lf.token, str)
with pytest.raises(NextcloudException) as exc_info:
await anc_client.loginflow_v2.poll(lf.token, 1)
assert exc_info.value.status_code == 404

0 comments on commit 5d41bf9

Please sign in to comment.