In [None]:
# | default_exp cli.token

In [None]:
# | include: false

from airt.testing import activate_by_import

[INFO] airt.testing.activate_by_import: Testing environment activated.


In [None]:
# | export

from typing import *

In [None]:
# | exporti

import logging
import os
import time
from datetime import datetime, timedelta

import typer

from airt.cli import helper
from airt.client import Client, User
from airt.constant import (
    CLIENT_NAME,
    SERVER_URL,
    SERVICE_PASSWORD,
    SERVICE_TOKEN,
    SERVICE_USERNAME,
)
from airt.logger import get_logger, set_level

In [None]:
from contextlib import contextmanager
from random import randrange

from typer.testing import CliRunner

import airt.sanitizer
from airt.constant import (
    SERVICE_PASSWORD,
    SERVICE_SUPER_USER,
    SERVICE_TOKEN,
    SERVICE_USERNAME,
)

In [None]:
# | exporti

SESSION_TIME_LIMIT = 10  # mins

In [None]:
# | exporti

app = typer.Typer()

In [None]:
# | include: false

runner = CliRunner()

In [None]:
# | export

logger = get_logger(__name__)

In [None]:
# | include: false

set_level(logging.WARNING)

In [None]:
# | include: false

# Testing logger settings

display(logger.getEffectiveLevel())
assert logger.getEffectiveLevel() == logging.WARNING

logger.debug("This is a debug message")
logger.info("This is an info")
logger.warning("This is a warning")
logger.error("This is an error")

30

[ERROR] __main__: This is an error


In [None]:
# | exporti


@helper.requires_totp_or_otp(
    message_template_name="get_token", requires_auth_token=False
)
def token(
    username: Optional[str] = typer.Option(
        None,
        "--username",
        "-u",
        help="Username for the developer account. If None (default value), then the value from"
        f" **{SERVICE_USERNAME}** environment variable is used.",
    ),
    password: Optional[str] = typer.Option(
        None,
        "--password",
        "-p",
        help="Password for the developer account. If None (default value), then the value from"
        f" **{SERVICE_PASSWORD}** environment variable is used.",
    ),
    server: Optional[str] = typer.Option(
        None,
        "--server",
        "-s",
        help=f"The {CLIENT_NAME} server uri. If None (default value), then the value from **{SERVER_URL}** environment variable"
        " is used. If the variable is not set as well, then the default public server will be used. Please leave this"
        f" setting to default unless you are running the service in your own server (please email us to info@{CLIENT_NAME}.ai"
        " for that possibility).",
    ),
    otp: Optional[str] = typer.Option(
        None,
        "--otp",
        help=f"Dynamically generated six-digit verification code from the authenticator app or the OTP you have received via SMS."
        " Please do not pass this parameter if you haven't enabled the multi-factor authentication for your account.",
    ),
    sso_provider: Optional[str] = typer.Option(
        None,
        "--sso_provider",
        "-sp",
        help=f"Name of the Single sign-on (SSO) provider. At the moment, we only support google and github as SSO providers."
        " Please pass this parameter only if you have successfully enabled SSO for the provider.",
    ),
    quiet: bool = typer.Option(
        False,
        "--quiet",
        "-q",
        help="Output authentication token only.",
    ),
    debug: bool = typer.Option(
        False, "--debug", "-d", help="Set logger level to DEBUG and output everything."
    ),
) -> None:
    """Get application token for airt service from a username/password pair.

    To access the airt service, you must first create a developer account. To obtain one, please contact us at info@airt.ai.

    After successful verification, you will receive an email with the username and password for the developer account.

    Once you have the credentials, use them to get an access token by running **airt token** command. It is necessary to
    get an access token; otherwise, you won't be able to access all of the airt service's APIs. You can either pass the
    username, password, and server address as command line arguments or store them in the environment variables
    **AIRT_SERVICE_USERNAME**, **AIRT_SERVICE_PASSWORD**, and **AIRT_SERVER_URL**.

    If you've already enabled multi-factor authentication (MFA) for your account, you'll need to pass the dynamically
    generated six-digit verification code along with your username and password to generate new tokens.

    If the token is requested using Single sign-on (SSO), an authorization URL will be returned. Please copy and paste
    it into your preferred browser and complete the SSO provider authentication within 10 minutes. Otherwise,
    the SSO login will time out and you will need to re-request the token.

    Single sign-on (SSO) can be enabled for your account in three simple steps:

    1. Enable the SSO for a provider by calling the command `airt user sso enable` with the SSO provider name and an email address.
    At the moment, we only support "google" and "github" as SSO providers. We intend to support additional SSO providers in future releases.

    2. Before you can start generating new tokens with SSO, you must first authenticate with the SSO provider. Call the `airt token` command with
    the same SSO provider you have enabled in the step above to generate an SSO authorization URL. Please copy and paste it into your
    preferred browser and complete the authentication process with the SSO provider.

    3. After successfully authenticating with the SSO provider, an access token will be generated and returned. Please set it in the
    **AIRT_SERVICE_TOKEN** environment variable for accessing the airt service.
    """
    try:
        if debug:
            set_level(logging.DEBUG)
        else:
            set_level(logging.WARNING)

        if sso_provider is None:
            Client.get_token(
                username=username, password=password, server=server, otp=otp
            )

            if quiet:
                typer.echo(Client.auth_token)
            else:
                typer.echo(f"token: {Client.auth_token}")

        else:
            authorization_url = Client.get_token(
                username=username,
                password=password,
                server=server,
                otp=otp,
                sso_provider=sso_provider,
            )

            typer.echo(
                "\nPlease copy and paste the authorization URL below into your preferred browser and complete the SSO provider authentication "
                "within 10 minutes. Otherwise, the SSO login will time out and you will have to re-run the token command."
            )

            typer.echo(f"\n\n{authorization_url}\n")

            typer.echo(
                f"\nAfter successfully authenticating with the SSO provider, an access token will be returned. Please set it in the {SERVICE_TOKEN} "
                f"environment variable for accessing the `{CLIENT_NAME}` service."
            )

            typer.echo(
                "\nIf there are any errors, an error message will be displayed in the terminal and this command will be terminated.\n"
            )

            end_time = datetime.utcnow() + timedelta(minutes=SESSION_TIME_LIMIT)
            while datetime.utcnow() < end_time:
                err = None
                try:
                    Client.set_sso_token()
                    typer.echo(
                        f"\nSSO authentication is successful, please set the below token in the `{SERVICE_TOKEN}` environment variable for accessing the `{CLIENT_NAME}` service.\n"
                    )
                    typer.echo(f"{Client.auth_token}\n")
                    break
                except ValueError as e:
                    err = str(e)
                    if "SSO authentication is not complete" not in str(e):
                        raise ValueError(f"\n{e}")
                time.sleep(1)
            if err is not None:
                raise ValueError(
                    f"\nYour session has expired. Please call the {CLIENT_NAME} token command with the sso provider and try again."
                )

        if not quiet:
            details = User.details()
            status = helper.get_phone_registration_status(details)
            if status is not None:
                typer.echo(status)

    except KeyError as e:
        typer.echo(message=f"Error: {e}", err=True)
        typer.echo(f"\nTry '{CLIENT_NAME} token --help' for help.")
        raise typer.Exit(code=1)

    except Exception as e:
        typer.echo(message=f"Error: {e}", err=True)
        if ("Invalid OTP" in str(e)) or ("OTP is required" in str(e)):
            raise ValueError(e)
        raise typer.Exit(code=1)

In [None]:
# | exporti


app.command()(token)

<function __main__.token(username: Union[str, NoneType] = <typer.models.OptionInfo object>, password: Union[str, NoneType] = <typer.models.OptionInfo object>, server: Union[str, NoneType] = <typer.models.OptionInfo object>, otp: Union[str, NoneType] = <typer.models.OptionInfo object>, sso_provider: Union[str, NoneType] = <typer.models.OptionInfo object>, quiet: bool = <typer.models.OptionInfo object>, debug: bool = <typer.models.OptionInfo object>) -> None>

In [None]:
# | include: false

result = runner.invoke(app, ["--help"])

display(str(result.stdout))

assert "**AIRT_SERVICE_USERNAME**" in str(result.stdout)
assert "**AIRT_SERVICE_PASSWORD**" in str(result.stdout)
assert "The airt server uri." in str(result.stdout)
assert "**AIRT_SERVER_URL**" in str(result.stdout)
assert "info@airt.ai" in str(result.stdout)

'Usage: token [OPTIONS]\n\n  Get application token for airt service from a username/password pair.\n\n  To access the airt service, you must first create a developer account. To\n  obtain one, please contact us at info@airt.ai.\n\n  After successful verification, you will receive an email with the username and\n  password for the developer account.\n\n  Once you have the credentials, use them to get an access token by running\n  **airt token** command. It is necessary to  get an access token; otherwise,\n  you won\'t be able to access all of the airt service\'s APIs. You can either\n  pass the  username, password, and server address as command line arguments or\n  store them in the environment variables  **AIRT_SERVICE_USERNAME**,\n  **AIRT_SERVICE_PASSWORD**, and **AIRT_SERVER_URL**.\n\n  If you\'ve already enabled multi-factor authentication (MFA) for your account,\n  you\'ll need to pass the dynamically  generated six-digit verification code\n  along with your username and password 

In [None]:
# Testing SSO Flow
# Negative case: Generating token using SSO without calling the enable SSO command
result = runner.invoke(
    app,
    [
        "-u",
        os.environ[SERVICE_USERNAME],
        "-p",
        os.environ[SERVICE_PASSWORD],
        "--sso_provider",
        "google",
    ],
)

assert result.exit_code == 1
assert "SSO is not enabled " in result.stdout
result.stdout

'Error: SSO is not enabled for the provider.\n'

In [None]:
# Testing SSO Flow
# Negative case: Generating token without enabling SSO
result = runner.invoke(
    app,
    [
        "-u",
        os.environ[SERVICE_USERNAME],
        "-p",
        os.environ[SERVICE_PASSWORD],
        "--sso_provider",
        "github",
    ],
)

assert result.exit_code == 1
assert "SSO is not enabled " in result.stdout
result.stdout

'Error: SSO is not enabled for the provider.\n'

In [None]:
# Helper context manager for testing


@contextmanager
def new_user():
    # login as super user
    username = os.environ[SERVICE_SUPER_USER]
    password = os.environ[SERVICE_PASSWORD]

    Client.get_token(username=username, password=password)
    # create new user
    _user_name = f"random_user_{randrange(10000)}_{randrange(10000)}"
    _email = f"random_user_{randrange(10000)}_{randrange(10000)}@email.com"
    _password = "random_password"

    req_data = dict(
        username=_user_name,
        first_name="random_first_name",
        last_name="random_last_name",
        email=_email,
        password=_password,
        super_user=False,
        subscription_type="test",
    )

    response = Client._post_data(relative_url=f"/user/", data=req_data)

    Client.get_token(username=_user_name, password=_password)
    user_token = Client.auth_token

    try:
        os.environ[SERVICE_TOKEN] = user_token
        yield _user_name, _password
    finally:
        del os.environ[SERVICE_TOKEN]

In [None]:
# | include: false


def mask(s: str) -> str:
    return "*" * len(s)

In [None]:
# | include: false

assert mask("davor") == "*****"

In [None]:
# | include: false

# Tests for token

# Testing positive scenario without quiet

result = runner.invoke(
    app,
    ["-u", os.environ[SERVICE_USERNAME], "-p", os.environ[SERVICE_PASSWORD]],
)

auth_token = result.stdout[:-1]
masked_token = mask(auth_token.split(": ")[1])
display(f"auth_token: {masked_token}")


assert result.exit_code == 0
assert len(auth_token) >= 127  # maybe

# Testing positive scenario with quiet

result = runner.invoke(
    app,
    [
        "-u",
        os.environ[SERVICE_USERNAME],
        "-p",
        os.environ[SERVICE_PASSWORD],
        "-s",
        os.environ[SERVER_URL],
        "-q",
    ],
)

auth_token = result.stdout[:-1]
display(f"{mask(auth_token)}")

assert result.exit_code == 0
assert len(auth_token) >= 127  # maybe

'auth_token: **************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************'

'*******************************************************************************************************************************'

In [None]:
# | include: false

# Tests for getting auth_token
# Testing negative scenarios

# Wrong username and password combination
result = runner.invoke(app, ["-u", "random_name", "-p", os.environ[SERVICE_PASSWORD]])
display(result.stdout)

assert result.exit_code == 1, f"{result.exit_code=}"

'Error: Incorrect username or password. Please try again.\n'

In [None]:
# | include: false

# Tests for getting auth_token
# Testing negative scenario

# Passing wrong server address
result = runner.invoke(
    app,
    [
        "-u",
        os.environ[SERVICE_USERNAME],
        "-p",
        os.environ[SERVICE_PASSWORD],
        "-s",
        "https://my-fake-url:8000",
    ],
)
display(result.stdout)

assert result.exit_code == 1, f"{result.exit_code=}"

"Error: HTTPSConnectionPool(host='my-fake-url', port=8000): Max retries exceeded with url: /token (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object>: Failed to establish a new connection: [Errno -2] Name or service not known'))\n"

In [None]:
# | include: false

# Tests for getting auth_token
# Testing negative scenarios

# Username and password is not passed in argument nor set in the environment variables

# Assign env vars to temp variables and delete the env vars
if os.environ.get(SERVICE_USERNAME):
    airt_service_username = os.environ.get(SERVICE_USERNAME)
    del os.environ[SERVICE_USERNAME]

if os.environ.get(SERVICE_PASSWORD):
    airt_service_password = os.environ.get(SERVICE_PASSWORD)
    del os.environ[SERVICE_PASSWORD]

result = runner.invoke(app)
display(result.stdout)

assert result.exit_code == 1, f"{result.exit_code=}"
assert (
    f"Error: 'The username and password are neither passed as parameters nor set in the environment variables `{SERVICE_USERNAME}` and `{SERVICE_PASSWORD}`.'\n"
    in result.stdout
)
assert f"{CLIENT_NAME} token --help" in result.stdout

"Error: 'The username and password are neither passed as parameters nor set in the environment variables `AIRT_SERVICE_USERNAME` and `AIRT_SERVICE_PASSWORD`.'\n\nTry 'airt token --help' for help.\n"

In [None]:
# | include: false

# Tests for get_token
# Testing Negative scenarion.

# Only Username is set in environment variable and password is not passed in argument nor set in the environment variables

# Assign only username in env var
os.environ[SERVICE_USERNAME] = airt_service_username

result = runner.invoke(app)

display(result.stdout)

assert result.exit_code == 1
assert (
    f"Error: 'The password is neither passed as parameter nor set in the environment variable {SERVICE_PASSWORD}.'\n"
    in result.stdout
)
assert f"{CLIENT_NAME} token --help" in result.stdout

"Error: 'The password is neither passed as parameter nor set in the environment variable AIRT_SERVICE_PASSWORD.'\n\nTry 'airt token --help' for help.\n"

In [None]:
# | include: false

# Tests for get_token
# Testing Positive scenario.

# setting the password in environment variable
os.environ[SERVICE_PASSWORD] = airt_service_password

result = runner.invoke(app, ["-q"])

display(f"auth_token: {mask(result.stdout)}")

assert result.exit_code == 0

display(f"{mask(airt_service_username)=}, {mask(airt_service_password)=}")

assert os.environ[SERVICE_USERNAME] == airt_service_username
assert os.environ[SERVICE_PASSWORD] == airt_service_password

'auth_token: ********************************************************************************************************************************'

"mask(airt_service_username)='*******', mask(airt_service_password)='********************************'"

In [None]:
# Testing negative scenario: Non-mfa user send otp param

random_otp = 123456
result = runner.invoke(
    app,
    [
        "-u",
        os.environ[SERVICE_USERNAME],
        "-p",
        os.environ[SERVICE_PASSWORD],
        "--otp",
        random_otp,
    ],
)

assert result.exit_code == 1
assert "Incorrect username or password" in str(result.stdout)
str(result.stdout)

'Error: Incorrect username or password. Please try again.\n'