Skip to content
This repository has been archived by the owner on Aug 19, 2023. It is now read-only.

Commit

Permalink
Merge ec23afe into 45db73d
Browse files Browse the repository at this point in the history
  • Loading branch information
bossjones committed Jul 6, 2020
2 parents 45db73d + ec23afe commit 478884f
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 1 deletion.
39 changes: 39 additions & 0 deletions tasks/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,45 @@ def black(ctx, loc="local", check=True, debug=False, verbose=0):
ctx.run(_cmd)


# # TODO: Enable this version of the black task. from https://github.com/bossjones/fake-medium-fastapi/blob/master/tasks/ci.py
# @task(
# pre=[call(clean, loc="local"),], incrementable=["verbose"],
# )
# def black(ctx, loc="local", check=False, debug=False, verbose=0, tests=False):
# """
# Run black code formatter
# Usage: inv ci.black
# """
# env = get_compose_env(ctx, loc=loc)

# # Only display result
# ctx.config["run"]["echo"] = True

# # Override run commands env variables one key at a time
# for k, v in env.items():
# ctx.config["run"]["env"][k] = v

# _cmd = "poetry run black "

# if check:
# _cmd += "--check "

# if verbose >= 3:
# _cmd += "--verbose "

# if tests:
# _cmd += "tests tasks "

# _cmd += "app"

# if verbose >= 1:
# msg = "[black] bout to run command: \n"
# click.secho(msg, fg="green")
# click.secho(_cmd, fg="green")

# ctx.run(_cmd)


@task(incrementable=["verbose"])
def isort(
ctx, loc="local", check=False, dry_run=False, verbose=0, apply=False, diff=False
Expand Down
Empty file.
110 changes: 110 additions & 0 deletions tests/api/repositories/test_repo_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# from ultron8.api.repositories.repo_user import CRUDUser

from fastapi.encoders import jsonable_encoder
from sqlalchemy import inspect
from sqlalchemy.orm import Session

from ultron8.api.core.security import get_password_hash, verify_password
from ultron8.api.db_models.user import User
from ultron8.api.factories.users import _MakeRandomNormalUserFactory
from ultron8.api.models.user import UserCreate, UserUpdate
from ultron8.api.repositories.repo_user import user as repository_user

from tests.utils.utils import get_server_api, random_email, random_lower_string


def test_create_user(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
email = data.email
password = data.password
user_in = UserCreate(email=email, password=password)
user = repository_user.create(db, obj_in=user_in)
assert user.email == email
assert hasattr(user, "hashed_password")


def test_authenticate_user(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
email = data.email
password = data.password
user_in = UserCreate(email=email, password=password)
user = repository_user.create(db, obj_in=user_in)
authenticated_user = repository_user.authenticate(
db, email=email, password=password
)
assert authenticated_user
assert user.email == authenticated_user.email


def test_not_authenticate_user(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
email = data.email
password = data.password
user = repository_user.authenticate(db, email=email, password=password)
assert user is None


def test_check_if_user_is_active(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
email = data.email
password = data.password
user_in = UserCreate(email=email, password=password)
user = repository_user.create(db, obj_in=user_in)
is_active = repository_user.is_active(user)
assert is_active is True


def test_check_if_user_is_active_inactive(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
email = data.email
password = data.password
user_in = UserCreate(email=email, password=password, disabled=True)
user = repository_user.create(db, obj_in=user_in)
is_active = repository_user.is_active(user)
assert is_active


def test_check_if_user_is_superuser(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
email = data.email
password = data.password
user_in = UserCreate(email=email, password=password, is_superuser=True)
user = repository_user.create(db, obj_in=user_in)
is_superuser = repository_user.is_superuser(user)
assert is_superuser is True


def test_check_if_user_is_superuser_normal_user(db: Session) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = repository_user.create(db, obj_in=user_in)
is_superuser = repository_user.is_superuser(user)
assert is_superuser is False


def test_get_user(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
username = data.email
password = data.password
user_in = UserCreate(email=username, password=password, is_superuser=True)
user = repository_user.create(db, obj_in=user_in)
user_2 = repository_user.get(db, id=user.id)
assert user_2
assert user.email == user_2.email
assert jsonable_encoder(user) == jsonable_encoder(user_2)


def test_update_user(db: Session) -> None:
data = _MakeRandomNormalUserFactory()
email = data.email
password = data.password
user_in = UserCreate(email=email, password=password, is_superuser=True)
user = repository_user.create(db, obj_in=user_in)
new_password = random_lower_string()
user_in_update = UserUpdate(password=new_password, is_superuser=True)
repository_user.update(db, db_obj=user, obj_in=user_in_update)
user_2 = repository_user.get(db, id=user.id)
assert user_2
assert user.email == user_2.email
assert verify_password(new_password, user_2.hashed_password)
2 changes: 1 addition & 1 deletion tests/utils/test_ip_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from ultron8.utils.ip_utils import split_host_port, is_ipv6, is_ipv4
from ultron8.utils.ip_utils import is_ipv4, is_ipv6, split_host_port


class TestIPUtilsTests:
Expand Down
Empty file.
75 changes: 75 additions & 0 deletions ultron8/api/repositories/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# fake-medium-fastapi

from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union

from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session

from ultron8.api.db.u_sqlite.base_class import CustomBase

ModelType = TypeVar("ModelType", bound=CustomBase)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)


class BaseRepository:
def __init__(self, conn: Session) -> None:
self._conn = conn

@property
def connection(self) -> Session:
return self._conn


class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model

def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()

def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()

def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data) # type: ignore
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj

def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj

def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj
57 changes: 57 additions & 0 deletions ultron8/api/repositories/repo_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Any, Dict, Optional, Union

# from fastapi.encoders import jsonable_encoder
# from sqlalchemy import inspect
from sqlalchemy.orm import Session

from ultron8.api.core.security import get_password_hash, verify_password
from ultron8.api.db_models.user import User
from ultron8.api.models.user import UserCreate, UserUpdate
from ultron8.api.repositories.base import CRUDBase


class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()

def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj

def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data["password"]:
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)

def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user

def is_active(self, user: User) -> bool:
return user.is_active

def is_superuser(self, user: User) -> bool:
return user.is_superuser


user = CRUDUser(User)

0 comments on commit 478884f

Please sign in to comment.